sql_insert
Inserts rows into an SQL database for each message, and leaves the message unchanged.
Introduced in version 3.59.0.
- Common
- Advanced
# Common config fields, showing default valueslabel: ""sql_insert:driver: ""dsn: ""table: ""columns: []args_mapping: ""
# All config fields, showing default valueslabel: ""sql_insert:driver: ""dsn: ""table: ""columns: []args_mapping: ""prefix: ""suffix: ""
If the insert fails to execute then the message will still remain unchanged and the error can be caught using error handling methods outlined here.
Examples​
- Table Insert (MySQL)
Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:
pipeline:processors:- sql_insert:driver: mysqldsn: foouser:foopassword@tcp(localhost:3306)/foodbtable: footablecolumns: [ id, name, topic ]args_mapping: |root = [this.user.id,this.user.name,meta("kafka_topic"),]
Fields​
driver
​
A database driver to use.
Type: string
Options: mysql
, postgres
, clickhouse
, mssql
.
dsn
​
A Data Source Name to identify the target database.
Drivers​
The following is a list of supported drivers and their respective DSN formats:
Driver | Data Source Name Format |
---|---|
clickhouse | tcp://[netloc][:port][?param1=value1&...¶mN=valueN] |
mysql | [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] |
postgres | postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] |
mssql | sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...] |
Please note that the postgres
driver enforces SSL by default, you
can override this with the parameter sslmode=disable
if required.
Type: string
# Examplesdsn: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000dsn: foouser:foopassword@tcp(localhost:3306)/foodbdsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
table
​
The table to insert to.
Type: string
# Examplestable: foo
columns
​
A list of columns to insert.
Type: array
# Examplescolumns:- foo- bar- baz
args_mapping
​
A Bloblang mapping which should evaluate to an array of values matching in size to the number of columns specified.
Type: string
# Examplesargs_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]args_mapping: root = [ meta("user.id") ]
prefix
​
An optional prefix to prepend to the insert query (before INSERT).
Type: string
suffix
​
An optional suffix to append to the insert query.
Type: string
# Examplessuffix: ON CONFLICT (name) DO NOTHING