Skip to main content

broker

Allows you to route messages to multiple child outputs using a range of brokering patterns.

# Common config fields, showing default values
output:
label: ""
broker:
pattern: fan_out
outputs: []
batching:
count: 0
byte_size: 0
period: ""
check: ""

Processors can be listed to apply across individual outputs or all outputs:

output:
broker:
pattern: fan_out
outputs:
- resource: foo
- resource: bar
# Processors only applied to messages sent to bar.
processors:
- resource: bar_processor
# Processors applied to messages sent to all brokered outputs.
processors:
- resource: general_processor

Fields​

copies​

The number of copies of each configured output to spawn.

Type: int
Default: 1

pattern​

The brokering pattern to use.

Type: string
Default: "fan_out"
Options: fan_out, fan_out_sequential, round_robin, greedy.

max_in_flight​

The maximum number of parallel message batches to have in flight at any given time. Note that if a child output has a higher max_in_flight then the switch output will automatically match it, therefore this value is the minimum max_in_flight to set in cases where the child values can't be inferred (such as when using resource outputs as children). Only relevant for fan_out, fan_out_sequential brokers.

Type: int
Default: 1

outputs​

A list of child outputs to broker.

Type: array
Default: []

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}

Patterns​

The broker pattern determines the way in which messages are allocated and can be chosen from the following:

fan_out​

With the fan out pattern all outputs will be sent every message that passes through Benthos in parallel.

If an output applies back pressure it will block all subsequent messages, and if an output fails to send a message it will be retried continuously until completion or service shut down.

Sometimes it is useful to disable the back pressure or retries of certain fan out outputs and instead drop messages that have failed or were blocked. In this case you can wrap outputs with a drop_on output.

fan_out_sequential​

Similar to the fan out pattern except outputs are written to sequentially, meaning an output is only written to once the preceding output has confirmed receipt of the same message.

round_robin​

With the round robin pattern each message will be assigned a single output following their order. If an output applies back pressure it will block all subsequent messages. If an output fails to send a message then the message will be re-attempted with the next input, and so on.

greedy​

The greedy pattern results in higher output throughput at the cost of potentially disproportionate message allocations to those outputs. Each message is sent to a single output, which is determined by allowing outputs to claim messages as soon as they are able to process them. This results in certain faster outputs potentially processing more messages at the cost of slower outputs.