Splits a batch of messages into N batches, where each resulting batch contains a group of messages determined by a Bloblang query.
# Config fields, showing default valueslabel: ""group_by: 
Once the groups are established a list of processors are applied to their respective grouped batch, which can be used to label the batch as per their grouping. Messages that do not pass the check of any specified group are placed in their own group.
The functionality of this processor depends on being applied across messages that are batched. You can find out more about batching in this doc.
A Bloblang query that should return a boolean value indicating whether a message belongs to a given group.
# Examplescheck: this.type == "foo"check: this.contents.urls.contains("https://benthos.dev/")check: "true"
A list of processors to execute on the newly formed group.
- Grouped Processing
Imagine we have a batch of messages that we wish to split into a group of foos and everything else, which should be sent to different output destinations based on those groupings. We also need to send the foos as a tar gzip archive. For this purpose we can use the
group_by processor with a
pipeline:processors:- group_by:- check: content().contains("this is a foo")processors:- archive:format: tar- compress:algorithm: gzip- bloblang: 'meta grouping = "foo"'output:switch:cases:- check: meta("grouping") == "foo"output:gcp_pubsub:project: foo_prodtopic: only_the_foos- output:gcp_pubsub:project: somewhere_elsetopic: no_foos_here