Skip to main content

sql

Runs an SQL prepared query against a target database for each message and, for queries that return rows, replaces it with the result according to a codec.

# Common config fields, showing default values
label: ""
sql:
driver: mysql
data_source_name: ""
query: ""
args_mapping: ""
result_codec: none

Alternatives

For basic inserts or select queries use use either the sql_insert or the sql_select processor.

For more complex queries use the sql_raw processor.

Examples

The following example inserts rows into the table footable with the columns foo, bar and baz populated with values extracted from messages:

pipeline:
processors:
- sql:
driver: mysql
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
query: "INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);"
args_mapping: '[ document.foo, document.bar, meta("kafka_topic") ]'

Fields

driver

A database driver to use.

Type: string
Default: "mysql"
Options: mysql, postgres, clickhouse, mssql.

data_source_name

A Data Source Name to identify the target database.

Type: string
Default: ""

# Examples
data_source_name: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
data_source_name: foouser:foopassword@tcp(localhost:3306)/foodb
data_source_name: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

query

The query to run against the database.

Type: string
Default: ""

# Examples
query: INSERT INTO footable (foo, bar, baz) VALUES (?, ?, ?);

unsafe_dynamic_query

Whether to enable dynamic queries that support interpolation functions. WARNING: This feature opens up the possibility of SQL injection attacks and is considered unsafe.

Type: bool
Default: false

args_mapping

A Bloblang mapping that produces the arguments for the query. The mapping must return an array containing the number of arguments in the query.

Type: string
Default: ""
Requires version 3.47.0 or newer

# Examples
args_mapping: '[ this.foo, this.bar.not_empty().catch(null), meta("baz") ]'
args_mapping: root = [ uuid_v4() ].merge(this.document.args)

result_codec

A codec to determine how resulting rows are converted into messages.

Type: string
Default: "none"
Options: none, json_array.

Result Codecs

When a query returns rows they are serialised according to a chosen codec, and the message contents are replaced with the serialised result.

none

The result of the query is ignored and the message remains unchanged. If your query does not return rows then this is the appropriate codec.

json_array

The resulting rows are serialised into an array of JSON objects, where each object represents a row, where the key is the column name and the value is that columns value in the row.