dedupe
Deduplicates message batches by caching selected (and optionally hashed) messages, dropping batches that are already cached.
- Common
- Advanced
# Common config fields, showing default valueslabel: ""dedupe:cache: ""hash: nonekey: ""drop_on_err: true
# All config fields, showing default valueslabel: ""dedupe:cache: ""hash: nonekey: ""drop_on_err: trueparts:- 0
This processor acts across an entire batch, in order to deduplicate individual
messages within a batch use this processor with the
for_each
processor.
Optionally, the key
field can be populated in order to hash on a
function interpolated string rather than the full contents of messages. This
allows you to deduplicate based on dynamic fields within a message, such as its
metadata, JSON fields, etc. A full list of interpolation functions can be found
here.
For example, the following config would deduplicate based on the concatenated
values of the metadata field kafka_key
and the value of the JSON
path id
within the message contents:
pipeline:processors:- dedupe:cache: foocachekey: ${! meta("kafka_key") }-${! json("id") }
Caches should be configured as a resource, for more information check out the documentation here.
When using this processor with an output target that might fail you should
always wrap the output within a retry
block. This ensures that during outages your messages aren't reprocessed after
failures, which would result in messages being dropped.
Delivery Guarantees​
Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Benthos pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Benthos instance.
If you intend to preserve at-least-once delivery guarantees you can avoid this problem by using a memory based cache. This is a compromise that can achieve effective deduplication but parallel deployments of the pipeline as well as service restarts increase the chances of duplicates passing undetected.
Fields​
cache
​
The cache
resource to target with this processor.
Type: string
Default: ""
hash
​
The hash type to used.
Type: string
Default: "none"
Options: none
, xxhash
.
key
​
An optional key to use for deduplication (instead of the entire message contents). This field supports interpolation functions.
Type: string
Default: ""
drop_on_err
​
Whether messages should be dropped when the cache returns an error.
Type: bool
Default: true
parts
​
An array of message indexes within the batch to deduplicate based on. If left empty all messages are included. This field is only applicable when batching messages at the input level.
Type: array
Default: [0]