Skip to main content

sql_insert

Inserts rows into an SQL database for each message, and leaves the message unchanged.

Introduced in version 3.59.0.

# Common config fields, showing default values
label: ""
sql_insert:
driver: ""
dsn: ""
table: ""
columns: []
args_mapping: ""

If the insert fails to execute then the message will still remain unchanged and the error can be caught using error handling methods outlined here.

Examples​

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

pipeline:
processors:
- sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ id, name, topic ]
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]

Fields​

driver​

A database driver to use.

Type: string
Options: mysql, postgres, clickhouse, mssql.

dsn​

A Data Source Name to identify the target database.

Drivers​

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Type: string

# Examples
dsn: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

table​

The table to insert to.

Type: string

# Examples
table: foo

columns​

A list of columns to insert.

Type: array

# Examples
columns:
- foo
- bar
- baz

args_mapping​

A Bloblang mapping which should evaluate to an array of values matching in size to the number of columns specified.

Type: string

# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

prefix​

An optional prefix to prepend to the insert query (before INSERT).

Type: string

suffix​

An optional suffix to append to the insert query.

Type: string

# Examples
suffix: ON CONFLICT (name) DO NOTHING