Skip to main content

dedupe

Deduplicates message batches by caching selected (and optionally hashed) messages, dropping batches that are already cached.

# Common config fields, showing default values
label: ""
dedupe:
cache: ""
hash: none
key: ""
drop_on_err: true

This processor acts across an entire batch, in order to deduplicate individual messages within a batch use this processor with the for_each processor.

Optionally, the key field can be populated in order to hash on a function interpolated string rather than the full contents of messages. This allows you to deduplicate based on dynamic fields within a message, such as its metadata, JSON fields, etc. A full list of interpolation functions can be found here.

For example, the following config would deduplicate based on the concatenated values of the metadata field kafka_key and the value of the JSON path id within the message contents:

pipeline:
processors:
- dedupe:
cache: foocache
key: ${! meta("kafka_key") }-${! json("id") }

Caches should be configured as a resource, for more information check out the documentation here.

When using this processor with an output target that might fail you should always wrap the output within a retry block. This ensures that during outages your messages aren't reprocessed after failures, which would result in messages being dropped.

Delivery Guarantees​

Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Benthos pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Benthos instance.

If you intend to preserve at-least-once delivery guarantees you can avoid this problem by using a memory based cache. This is a compromise that can achieve effective deduplication but parallel deployments of the pipeline as well as service restarts increase the chances of duplicates passing undetected.

Fields​

cache​

The cache resource to target with this processor.

Type: string
Default: ""

hash​

The hash type to used.

Type: string
Default: "none"
Options: none, xxhash.

key​

An optional key to use for deduplication (instead of the entire message contents). This field supports interpolation functions.

Type: string
Default: ""

drop_on_err​

Whether messages should be dropped when the cache returns an error.

Type: bool
Default: true

parts​

An array of message indexes within the batch to deduplicate based on. If left empty all messages are included. This field is only applicable when batching messages at the input level.

Type: array
Default: [0]