Skip to main content

sql

Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.

# Common config fields, showing default values
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args_mapping: ""
result_codec: none

Alternatives​

For basic inserts or select queries use use either the sql_insert or the sql_select processor.

For more complex queries use the sql_raw processor.

Examples​

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

pipeline:
processors:
- sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'

Fields​

driver​

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse, mssql.

data_source_name​

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query​

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

unsafe_dynamic_query​

Whether to enable dynamic queries that support interpolation functions. WARNING: This feature opens up the possibility of SQL injection attacks and is considered unsafe.

Type: bool
Default: false

args_mapping​

A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.

Type: string
Default: ""
Requires version 3.47.0 or newer

# Examples
args_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'
args_mapping: root = [ uuid_v4() ].merge(this.document.args)

result_codec​

A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"
Options: none, json_array.

Result Codecs​

When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.

none​

The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array​

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.