Skip to main content

elasticsearch

Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.

# Common config fields, showing default values
output:
label: ""
elasticsearch:
urls:
- http://localhost:9200
index: benthos_index
id: ${!count("elastic_ids")}-${!timestamp_unix()}
type: doc
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Both the id and index fields can be dynamically set using function interpolations described here. When sending batched messages these interpolations are performed per message part.

AWS​

It's possible to enable AWS connectivity with this output using the aws fields. However, you may need to set sniff and healthcheck to false for connections to succeed.

Performance​

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Fields​

urls​

A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.

Type: array
Default: ["http://localhost:9200"]

# Examples
urls:
- http://localhost:9200

index​

The index to place messages. This field supports interpolation functions.

Type: string
Default: "benthos_index"

action​

The action to take on the document. This field supports interpolation functions.

Type: string
Default: "index"
Options: index, update, delete.

pipeline​

An optional pipeline id to preprocess incoming documents. This field supports interpolation functions.

Type: string
Default: ""

id​

The ID for indexed messages. Interpolation should be used in order to create a unique ID for each message. This field supports interpolation functions.

Type: string
Default: "${!count(\"elastic_ids\")}-${!timestamp_unix()}"

type​

The document type.

Type: string
Default: "doc"

routing​

The routing key to use for the document. This field supports interpolation functions.

Type: string
Default: ""

sniff​

Prompts Benthos to sniff for brokers to connect to when establishing a connection.

Type: bool
Default: true

healthcheck​

Whether to enable healthchecks.

Type: bool
Default: true

timeout​

The maximum time to wait before abandoning a request (and trying again).

Type: string
Default: "5s"

tls​

Custom TLS settings can be used to override system defaults.

Type: object

tls.enabled​

Whether custom TLS settings are enabled.

Type: bool
Default: false

tls.skip_cert_verify​

Whether to skip server side certificate verification.

Type: bool
Default: false

tls.enable_renegotiation​

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation.

Type: bool
Default: false
Requires version 3.45.0 or newer

tls.root_cas​

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples
root_cas: |-
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----

tls.root_cas_file​

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string
Default: ""

# Examples
root_cas_file: ./root_cas.pem

tls.client_certs​

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: array
Default: []

# Examples
client_certs:
- cert: foo
key: bar
client_certs:
- cert_file: ./example.pem
key_file: ./example.key

tls.client_certs[].cert​

A plain text certificate to use.

Type: string
Default: ""

tls.client_certs[].key​

A plain text certificate key to use.

Type: string
Default: ""

tls.client_certs[].cert_file​

The path to a certificate to use.

Type: string
Default: ""

tls.client_certs[].key_file​

The path of a certificate key to use.

Type: string
Default: ""

max_in_flight​

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 1

max_retries​

The maximum number of retries before giving up on the request. If set to zero there is no discrete limit.

Type: int
Default: 0

backoff​

Control time intervals between retry attempts.

Type: object

backoff.initial_interval​

The initial period to wait between retry attempts.

Type: string
Default: "1s"

backoff.max_interval​

The maximum period to wait between retry attempts.

Type: string
Default: "5s"

backoff.max_elapsed_time​

The maximum period to wait before retry attempts are abandoned. If zero then no limit is used.

Type: string
Default: "30s"

basic_auth​

Allows you to specify basic authentication.

Type: object

basic_auth.enabled​

Whether to use basic authentication in requests.

Type: bool
Default: false

basic_auth.username​

A username to authenticate as.

Type: string
Default: ""

basic_auth.password​

A password to authenticate with.

Type: string
Default: ""

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}

aws​

Enables and customises connectivity to Amazon Elastic Service.

Type: object

aws.enabled​

Whether to connect to Amazon Elastic Service.

Type: bool
Default: false

aws.region​

The AWS region to target.

Type: string
Default: "eu-west-1"

aws.endpoint​

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

aws.credentials​

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

aws.credentials.profile​

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

aws.credentials.id​

The ID of credentials to use.

Type: string
Default: ""

aws.credentials.secret​

The secret for the credentials being used.

Type: string
Default: ""

aws.credentials.token​

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

aws.credentials.role​

A role ARN to assume.

Type: string
Default: ""

aws.credentials.role_external_id​

An external ID to provide when assuming a role.

Type: string
Default: ""

gzip_compression​

Enable gzip compression on the request side.

Type: bool
Default: false