Skip to main content

sql_insert

Inserts a row into an SQL database for each message.

Introduced in version 3.59.0.

# Common config fields, showing default values
output:
label: ""
sql_insert:
driver: ""
dsn: ""
table: ""
columns: []
args_mapping: ""
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""

Examples​

Here we insert rows into a database by populating the columns id, name and topic with values extracted from messages and metadata:

output:
sql_insert:
driver: mysql
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
table: footable
columns: [ id, name, topic ]
args_mapping: |
root = [
this.user.id,
this.user.name,
meta("kafka_topic"),
]

Fields​

driver​

A database driver to use.

Type: string
Options: mysql, postgres, clickhouse, mssql.

dsn​

A Data Source Name to identify the target database.

Drivers​

The following is a list of supported drivers and their respective DSN formats:

DriverData Source Name Format
clickhousetcp://[netloc][:port][?param1=value1&...&paramN=valueN]
mysql[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
postgrespostgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
mssqlsqlserver://[user[:password]@][netloc][:port][?database=dbname&param1=value1&...]

Please note that the postgres driver enforces SSL by default, you can override this with the parameter sslmode=disable if required.

Type: string

# Examples
dsn: tcp://host1:9000?username=user&password=qwerty&database=clicks&read_timeout=10&write_timeout=20&alt_hosts=host2:9000,host3:9000
dsn: foouser:foopassword@tcp(localhost:3306)/foodb
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable

table​

The table to insert to.

Type: string

# Examples
table: foo

columns​

A list of columns to insert.

Type: array

# Examples
columns:
- foo
- bar
- baz

args_mapping​

A Bloblang mapping which should evaluate to an array of values matching in size to the number of columns specified.

Type: string

# Examples
args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ]
args_mapping: root = [ meta("user.id") ]

prefix​

An optional prefix to prepend to the insert query (before INSERT).

Type: string

suffix​

An optional suffix to append to the insert query.

Type: string

# Examples
suffix: ON CONFLICT (name) DO NOTHING

max_in_flight​

The maximum number of inserts to run in parallel.

Type: int
Default: 64

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}