Skip to main content

Cross Post: We're Bringing Simple Back (to Streaming)

ยท 4 min read

(Cross-posted with https://vectorized.io/blog/benthos/)

Combining the power of Redpanda and Benthos for your streaming needs is so simple that this blog post is almost over already.

Benthos is an open source stream processor that provides data mapping, filtering, hydration and enrichment capabilities across a wide range of connectors. It is driven by a minimal, declarative configuration spec, and with a transaction based architecture it eliminates the development effort of building resilient stream processing pipelines.

Likewise, with its simplicity and high performance, Redpanda eliminates the operational effort of data persistence and availability by providing a Kafka-compatible streaming platform without the moving parts.

With so much taken care of you're well in for a boring, uneventful time when you combine the two. Make sure you've grabbed a copy of both services, full instructions can be found in the getting started guide for Benthos and the Redpanda docs. In this post we'll be running them with Docker so we'll start by pulling both images:

docker pull vectorized/redpanda:latest
docker pull jeffail/benthos:latest

We can then create a new network for the services to connect with:

docker network create -d bridge redpandanet

Next, run Redpanda in the background, we'll go with a single node for now:

docker run -d \
--network redpandanet \
--name redpanda \
-p 9092:9092 \
vectorized/redpanda redpanda start \
--reserve-memory 0M \
--overprovisioned \
--smp 1 \
--memory 1G \
--advertise-kafka-addr redpanda:9092

In order to send data to Redpanda with Benthos we'll need to create a config, starting off with a simple Stdin to Kafka pipeline, copy the following config into a file producer.yaml:

input:
stdin: {}
output:
kafka:
addresses: [ redpanda:9092 ]
topic: topic_A

Pro tip: You can also use Benthos itself to generate a config like this with docker run --rm jeffail/benthos create stdin//kafka > ./producer.yaml.

And now run Benthos by adding the config as a Docker volume, along with a pseudo-TTY for writing our messages:

docker run --rm -it \
--network redpandanet \
-v $(pwd)/producer.yaml:/benthos.yaml \
jeffail/benthos

This will open an interactive shell where you can write in some data to send. Benthos will gobble up anything you throw at it, try mixing structured and unstructured messages, ending each message with a newline:

{"id":"1","data":"a structured message"}
but this here ain't structured at all!
[{"id":"2"},"also structured in a different (but totally valid) way"]

When you're finished hit CTRL+C and it'll exit.

Next, let's try reading that data back out from Redpanda, this time let's also add a processor in order to mutate our data, copy the following into a file consumer.yaml:

input:
kafka:
addresses: [ redpanda:9092 ]
topics: [ topic_A ]
consumer_group: example_group
pipeline:
processors:
- bloblang: |
root.doc = this | content().string()
root.length = content().length()
root.topic = meta("kafka_topic")
output:
stdout: {}

And run it with our new config, and without the pseudo-TTY this time:

docker run --rm \
--network redpandanet \
-v $(pwd)/consumer.yaml:/benthos.yaml \
jeffail/benthos

Now you should see it print mutated versions of the messages you sent to Stdout:

{"doc":{"data":"a structured message","id":"1"},"length":40,"topic":"topic_A"}
{"doc":"but this here ain't structured at all!","length":38,"topic":"topic_A"}
{"doc":[{"id":"2"},"also structured in a different (but totally valid) way"],"length":69,"topic":"topic_A"}

The Bloblang processor in our consumer config has remapped the original message to a new field doc, first attempting to extract it as a structured document, but falling back to a stringified version of it when it's unstructured. We've also added a field length which contains the length of the original message, and topic which contains the Kafka topic the message was consumed from.

That's it for now, if you're still hungry for more then check out the Benthos website at https://www.benthos.dev, and you can learn more about the Benthos mapping language Bloblang in this guide.