cassandra
BETA
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Runs a query against a Cassandra database for each message in order to insert data.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""cassandra:addresses: []query: ""args_mapping: ""timeout: 600msmax_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesoutput:label: ""cassandra:addresses: []tls:enabled: falseskip_cert_verify: falseenable_renegotiation: falseroot_cas: ""root_cas_file: ""client_certs: []password_authenticator:enabled: falseusername: ""password: ""disable_initial_host_lookup: falsequery: ""args_mapping: ""consistency: QUORUMmax_retries: 3backoff:initial_interval: 1smax_interval: 5stimeout: 600msmax_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""processors: []
Query arguments can be set using interpolation functions in the args
field or by creating a bloblang array for the fields using the args_mapping
field.
When populating timestamp columns the value must either be a string in ISO 8601 format (2006-01-02T15:04:05Z07:00), or an integer representing unix time in seconds.
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Examples​
- Basic Inserts
- Insert JSON Documents
If we were to create a table with some basic columns with CREATE TABLE foo.bar (id int primary key, content text, created_at timestamp);
, and were processing JSON documents of the form {"id":"342354354","content":"hello world","timestamp":1605219406}
, we could populate our table with the following config:
output:cassandra:addresses:- localhost:9042query: 'INSERT INTO foo.bar (id, content, created_at) VALUES (?, ?, ?)'args_mapping: |root = [this.id,this.content,this.timestamp]batching:count: 500
The following example inserts JSON documents into the table footable
of the keyspace foospace
using INSERT JSON (https://cassandra.apache.org/doc/latest/cql/json.html#insert-json).
output:cassandra:addresses:- localhost:9042query: 'INSERT INTO foospace.footable JSON ?'args_mapping: 'root = [ this ]'batching:count: 500
Fields​
addresses
​
A list of Cassandra nodes to connect to. Multiple comma separated addresses can be specified on a single line.
Type: array
Default: []
# Examplesaddresses:- localhost:9042addresses:- foo:9042- bar:9042addresses:- foo:9042,bar:9042
tls
​
Custom TLS settings can be used to override system defaults.
Type: object
tls.enabled
​
Whether custom TLS settings are enabled.
Type: bool
Default: false
tls.skip_cert_verify
​
Whether to skip server side certificate verification.
Type: bool
Default: false
tls.enable_renegotiation
​
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation
.
Type: bool
Default: false
Requires version 3.45.0 or newer
tls.root_cas
​
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas: |------BEGIN CERTIFICATE-----...-----END CERTIFICATE-----
tls.root_cas_file
​
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas_file: ./root_cas.pem
tls.client_certs
​
A list of client certificates to use. For each certificate either the fields cert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
# Examplesclient_certs:- cert: fookey: barclient_certs:- cert_file: ./example.pemkey_file: ./example.key
tls.client_certs[].cert
​
A plain text certificate to use.
Type: string
Default: ""
tls.client_certs[].key
​
A plain text certificate key to use.
Type: string
Default: ""
tls.client_certs[].cert_file
​
The path to a certificate to use.
Type: string
Default: ""
tls.client_certs[].key_file
​
The path of a certificate key to use.
Type: string
Default: ""
password_authenticator
​
An object containing the username and password.
Type: object
password_authenticator.enabled
​
Whether to use password authentication.
Type: bool
Default: false
password_authenticator.username
​
A username.
Type: string
Default: ""
password_authenticator.password
​
A password.
Type: string
Default: ""
disable_initial_host_lookup
​
If enabled the driver will not attempt to get host info from the system.peers table. This can speed up queries but will mean that data_centre, rack and token information will not be available.
Type: bool
Default: false
query
​
A query to execute for each message.
Type: string
Default: ""
args_mapping
​
A Bloblang mapping that can be used to provide arguments to Cassandra queries. The result of the query must be an array containing a matching number of elements to the query arguments.
Type: string
Default: ""
Requires version 3.55.0 or newer
consistency
​
The consistency level to use.
Type: string
Default: "QUORUM"
Options: ANY
, ONE
, TWO
, THREE
, QUORUM
, ALL
, LOCAL_QUORUM
, EACH_QUORUM
, LOCAL_ONE
.
max_retries
​
The maximum number of retries before giving up on a request.
Type: int
Default: 3
backoff
​
Control time intervals between retry attempts.
Type: object
backoff.initial_interval
​
The initial period to wait between retry attempts.
Type: string
Default: "1s"
backoff.max_interval
​
The maximum period to wait between retry attempts.
Type: string
Default: "5s"
timeout
​
The client connection timeout.
Type: string
Default: "600ms"
Requires version 3.63.0 or newer
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}