Skip to main content

sql

Runs an SQL prepared query against a target database for each message.

Introduced in version 3.33.0.

# Common config fields, showing default values
output:
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args_mapping: ""
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Alternatives

For basic inserts use the sql_insert output instead. For more complex queries use the sql_raw output.

Performance

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Examples

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

output:
sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args_mapping: '[ this.document.foo, this.document.bar, meta("kafka_topic") ]'
batching:
count: 500

Fields

driver

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse, mssql.

data_source_name

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

args_mapping

A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.

Type: string
Default: ""
Requires version 3.47.0 or newer

# Examples
args_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'
args_mapping: root = [ uuid_v4() ].merge(this.document.args)

max_in_flight

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 1

batching

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}