elasticsearch
Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""elasticsearch:urls:- http://localhost:9200index: benthos_indexid: ${!count("elastic_ids")}-${!timestamp_unix()}type: docmax_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesoutput:label: ""elasticsearch:urls:- http://localhost:9200index: benthos_indexaction: indexpipeline: ""id: ${!count("elastic_ids")}-${!timestamp_unix()}type: docrouting: ""sniff: truehealthcheck: truetimeout: 5stls:enabled: falseskip_cert_verify: falseenable_renegotiation: falseroot_cas: ""root_cas_file: ""client_certs: []max_in_flight: 1max_retries: 0backoff:initial_interval: 1smax_interval: 5smax_elapsed_time: 30sbasic_auth:enabled: falseusername: ""password: ""batching:count: 0byte_size: 0period: ""check: ""processors: []aws:enabled: falseregion: eu-west-1endpoint: ""credentials:profile: ""id: ""secret: ""token: ""role: ""role_external_id: ""gzip_compression: false
Both the id
and index
fields can be dynamically set using function
interpolations described here. When
sending batched messages these interpolations are performed per message part.
AWS​
It's possible to enable AWS connectivity with this output using the aws
fields. However, you may need to set sniff
and healthcheck
to
false for connections to succeed.
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Fields​
urls
​
A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.
Type: array
Default: ["http://localhost:9200"]
# Examplesurls:- http://localhost:9200
index
​
The index to place messages. This field supports interpolation functions.
Type: string
Default: "benthos_index"
action
​
The action to take on the document. This field supports interpolation functions.
Type: string
Default: "index"
Options: index
, update
, delete
.
pipeline
​
An optional pipeline id to preprocess incoming documents. This field supports interpolation functions.
Type: string
Default: ""
id
​
The ID for indexed messages. Interpolation should be used in order to create a unique ID for each message. This field supports interpolation functions.
Type: string
Default: "${!count(\"elastic_ids\")}-${!timestamp_unix()}"
type
​
The document type.
Type: string
Default: "doc"
routing
​
The routing key to use for the document. This field supports interpolation functions.
Type: string
Default: ""
sniff
​
Prompts Benthos to sniff for brokers to connect to when establishing a connection.
Type: bool
Default: true
healthcheck
​
Whether to enable healthchecks.
Type: bool
Default: true
timeout
​
The maximum time to wait before abandoning a request (and trying again).
Type: string
Default: "5s"
tls
​
Custom TLS settings can be used to override system defaults.
Type: object
tls.enabled
​
Whether custom TLS settings are enabled.
Type: bool
Default: false
tls.skip_cert_verify
​
Whether to skip server side certificate verification.
Type: bool
Default: false
tls.enable_renegotiation
​
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation
.
Type: bool
Default: false
Requires version 3.45.0 or newer
tls.root_cas
​
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas: |------BEGIN CERTIFICATE-----...-----END CERTIFICATE-----
tls.root_cas_file
​
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas_file: ./root_cas.pem
tls.client_certs
​
A list of client certificates to use. For each certificate either the fields cert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
# Examplesclient_certs:- cert: fookey: barclient_certs:- cert_file: ./example.pemkey_file: ./example.key
tls.client_certs[].cert
​
A plain text certificate to use.
Type: string
Default: ""
tls.client_certs[].key
​
A plain text certificate key to use.
Type: string
Default: ""
tls.client_certs[].cert_file
​
The path to a certificate to use.
Type: string
Default: ""
tls.client_certs[].key_file
​
The path of a certificate key to use.
Type: string
Default: ""
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1
max_retries
​
The maximum number of retries before giving up on the request. If set to zero there is no discrete limit.
Type: int
Default: 0
backoff
​
Control time intervals between retry attempts.
Type: object
backoff.initial_interval
​
The initial period to wait between retry attempts.
Type: string
Default: "1s"
backoff.max_interval
​
The maximum period to wait between retry attempts.
Type: string
Default: "5s"
backoff.max_elapsed_time
​
The maximum period to wait before retry attempts are abandoned. If zero then no limit is used.
Type: string
Default: "30s"
basic_auth
​
Allows you to specify basic authentication.
Type: object
basic_auth.enabled
​
Whether to use basic authentication in requests.
Type: bool
Default: false
basic_auth.username
​
A username to authenticate as.
Type: string
Default: ""
basic_auth.password
​
A password to authenticate with.
Type: string
Default: ""
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}
aws
​
Enables and customises connectivity to Amazon Elastic Service.
Type: object
aws.enabled
​
Whether to connect to Amazon Elastic Service.
Type: bool
Default: false
aws.region
​
The AWS region to target.
Type: string
Default: "eu-west-1"
aws.endpoint
​
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
aws.credentials
​
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
aws.credentials.profile
​
A profile from ~/.aws/credentials
to use.
Type: string
Default: ""
aws.credentials.id
​
The ID of credentials to use.
Type: string
Default: ""
aws.credentials.secret
​
The secret for the credentials being used.
Type: string
Default: ""
aws.credentials.token
​
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
aws.credentials.role
​
A role ARN to assume.
Type: string
Default: ""
aws.credentials.role_external_id
​
An external ID to provide when assuming a role.
Type: string
Default: ""
gzip_compression
​
Enable gzip compression on the request side.
Type: bool
Default: false