sql
Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.
- Common
- Advanced
# Common config fields, showing default valueslabel: ""sql:driver: mysqldata_source_name: ""query: ""args_mapping: ""result_codec: none
# All config fields, showing default valueslabel: ""sql:driver: mysqldata_source_name: ""query: ""unsafe_dynamic_query: falseargs_mapping: ""result_codec: none
Alternatives​
For basic inserts or select queries use use either the sql_insert
or the sql_select
processor.
For more complex queries use the sql_raw
processor.
Examples​
- Table Insert (MySQL)
- Table Query (PostgreSQL)
The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:
pipeline:processors:- sql:driver: mysqldata_source_name: foouser:foopassword@tcp(localhost:3306)/foodbquery: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'
Here we query a database for columns of footable that share a user_id
with the message user.id
. The result_codec
is set to
json_array
and a branch
processor
is used in order to insert the resulting array into the original message at the
path foo_rows
:
pipeline:processors:- branch:processors:- sql:driver: postgresresult_codec: json_arraydata_source_name: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disablequery: "SELECT * FROM footable WHERE user_id = $1;"args_mapping: '[ this.user.id ]'result_map: 'root.foo_rows = this'
Fields​
driver
​
A database driver to use.
Type: string
Default: "mysql"
Options: mysql
, postgres
, clickhouse
, mssql
.
data_source_name
​
A Data Source Name to identify the target database.
Type: string
Default: ""
# Examplesdata_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000data_source_name: foouser:foopassword@tcp(localhost:3306)/foodbdata_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
query
​
The query to run against the database.
Type: string
Default: ""
# Examplesquery: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);
unsafe_dynamic_query
​
Whether to enable dynamic queries that support interpolation functions. WARNING: This feature opens up the possibility of SQL injection attacks and is considered unsafe.
Type: bool
Default: false
args_mapping
​
A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.
Type: string
Default: ""
Requires version 3.47.0 or newer
# Examplesargs_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'args_mapping: root = [ uuid_v4() ].merge(this.document.args)
result_codec
​
A codec to determine how resulting rows are converted into messages.
Type: string
Default: "none"
Options: none
, json_array
.
Result Codecs​
When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.
none
​
The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.
json_array
​
The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.