read_until
Reads messages from a child input until a consumed message passes a Bloblang query, at which point the input closes.
# Config fields, showing default valuesinput:label: ""read_until:input: {}check: ""restart_input: false
Messages are read continuously while the query check returns false, when the query returns true the message that triggered the check is sent out and the input is closed. Use this to define inputs where the stream should end once a certain message appears.
Sometimes inputs close themselves. For example, when the file
input type reaches the end of a file it will shut down. By default this type will also shut down. If you wish for the input type to be restarted every time it shuts down until the query check is met then set restart_input
to true
.
Metadata​
A metadata key benthos_read_until
containing the value final
is added to the first part of the message that triggers the input to stop.
Fields​
input
​
The child input to consume from.
Type: input
Default: {}
check
​
A Bloblang query that should return a boolean value indicating whether the input should now be closed.
Type: string
Default: ""
# Examplescheck: this.type == "foo"check: count("messages") >= 100
restart_input
​
Whether the input should be reopened if it closes itself before the condition has resolved to true.
Type: bool
Default: false
Examples​
- Consume N Messages
A common reason to use this input is to consume only N messages from an input and then stop. This can easily be done with the count
function:
# Only read 100 messages, and then exit.input:read_until:check: count("messages") >= 100input:kafka:addresses: [ TODO ]topics: [ foo, bar ]consumer_group: foogroup