mqtt
Pushes messages to an MQTT broker.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""mqtt:urls:- tcp://localhost:1883topic: benthos_topicclient_id: benthos_outputqos: 1connect_timeout: 30swrite_timeout: 3sretained: falsemax_in_flight: 1
# All config fields, showing default valuesoutput:label: ""mqtt:urls:- tcp://localhost:1883topic: benthos_topicclient_id: benthos_outputdynamic_client_id_suffix: ""qos: 1connect_timeout: 30swrite_timeout: 3sretained: falseretained_interpolated: ""will:enabled: falseqos: 0retained: falsetopic: ""payload: ""user: ""password: ""keepalive: 30tls:enabled: falseskip_cert_verify: falseenable_renegotiation: falseroot_cas: ""root_cas_file: ""client_certs: []max_in_flight: 1
The topic
field can be dynamically set using function interpolations
described here. When sending batched
messages these interpolations are performed per message part.
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
Fields​
urls
​
A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.
Type: array
Default: ["tcp://localhost:1883"]
# Examplesurls:- tcp://localhost:1883
topic
​
The topic to publish messages to.
Type: string
Default: "benthos_topic"
client_id
​
An identifier for the client connection.
Type: string
Default: "benthos_output"
dynamic_client_id_suffix
​
Append a dynamically generated suffix to the specified client_id
on each run of the pipeline. This can be useful when clustering Benthos producers.
Type: string
Default: ""
Option | Summary |
---|---|
nanoid | append a nanoid of length 21 characters |
qos
​
The QoS value to set for each message.
Type: int
Default: 1
Options: 0
, 1
, 2
.
connect_timeout
​
The maximum amount of time to wait in order to establish a connection before the attempt is abandoned.
Type: string
Default: "30s"
Requires version 3.58.0 or newer
# Examplesconnect_timeout: 1sconnect_timeout: 500ms
write_timeout
​
The maximum amount of time to wait to write data before the attempt is abandoned.
Type: string
Default: "3s"
Requires version 3.58.0 or newer
# Exampleswrite_timeout: 1swrite_timeout: 500ms
retained
​
Set message as retained on the topic.
Type: bool
Default: false
retained_interpolated
​
Override the value of retained
with an interpolable value, this allows it to be dynamically set based on message contents. The value must resolve to either true
or false
.
This field supports interpolation functions.
Type: string
Default: ""
Requires version 3.59.0 or newer
will
​
Set last will message in case of Benthos failure
Type: object
will.enabled
​
Whether to enable last will messages.
Type: bool
Default: false
will.qos
​
Set QoS for last will message.
Type: int
Default: 0
Options: 0
, 1
, 2
.
will.retained
​
Set retained for last will message.
Type: bool
Default: false
will.topic
​
Set topic for last will message.
Type: string
Default: ""
will.payload
​
Set payload for last will message.
Type: string
Default: ""
user
​
A username to connect with.
Type: string
Default: ""
password
​
A password to connect with.
Type: string
Default: ""
keepalive
​
Max seconds of inactivity before a keepalive message is sent.
Type: int
Default: 30
tls
​
Custom TLS settings can be used to override system defaults.
Type: object
Requires version 3.45.0 or newer
tls.enabled
​
Whether custom TLS settings are enabled.
Type: bool
Default: false
tls.skip_cert_verify
​
Whether to skip server side certificate verification.
Type: bool
Default: false
tls.enable_renegotiation
​
Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message local error: tls: no renegotiation
.
Type: bool
Default: false
Requires version 3.45.0 or newer
tls.root_cas
​
An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas: |------BEGIN CERTIFICATE-----...-----END CERTIFICATE-----
tls.root_cas_file
​
An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.
Type: string
Default: ""
# Examplesroot_cas_file: ./root_cas.pem
tls.client_certs
​
A list of client certificates to use. For each certificate either the fields cert
and key
, or cert_file
and key_file
should be specified, but not both.
Type: array
Default: []
# Examplesclient_certs:- cert: fookey: barclient_certs:- cert_file: ./example.pemkey_file: ./example.key
tls.client_certs[].cert
​
A plain text certificate to use.
Type: string
Default: ""
tls.client_certs[].key
​
A plain text certificate key to use.
Type: string
Default: ""
tls.client_certs[].cert_file
​
The path to a certificate to use.
Type: string
Default: ""
tls.client_certs[].key_file
​
The path of a certificate key to use.
Type: string
Default: ""
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1