sql
Runs an SQL prepared query against a target database for each message.
Introduced in version 3.33.0.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""sql:driver: mysqldata_source_name: ""query: ""args_mapping: ""max_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesoutput:label: ""sql:driver: mysqldata_source_name: ""query: ""args_mapping: ""max_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""processors: []
Alternatives​
For basic inserts use the sql_insert
output instead. For more complex queries use the sql_raw
output.
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Examples​
- Table Insert (MySQL)
- Table Insert (PostgreSQL)
The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:
output:sql:driver: mysqldata_source_name: foouser:foopassword@tcp(localhost:3306)/foodbquery: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"args_mapping: '[ this.document.foo, this.document.bar, meta("kafka_topic") ]'batching:count: 500
The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:
output:sql:driver: postgresdata_source_name: postgres://foouser:foopassword@localhost:5432/foodb?sslmode=disablequery: "INSERT INTO footable (foo, bar, baz) VALUES ($1, $2, $3);"args_mapping: '[ this.document.foo, this.document.bar, meta("kafka_topic") ]'batching:count: 500
Fields​
driver
​
A database driver to use.
Type: string
Default: "mysql"
Options: mysql
, postgres
, clickhouse
, mssql
.
data_source_name
​
A Data Source Name to identify the target database.
Type: string
Default: ""
# Examplesdata_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000data_source_name: foouser:foopassword@tcp(localhost:3306)/foodbdata_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
query
​
The query to run against the database.
Type: string
Default: ""
# Examplesquery: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
args_mapping
​
A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.
Type: string
Default: ""
Requires version 3.47.0 or newer
# Examplesargs_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'args_mapping: root = [ uuid_v4() ].merge(this.document.args)
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}