Benthos
Fancy stream processing made operationally mundane
It's boringly easy to use
Written in Go, deployed as a static binary, declarative configuration. Open source and cloud native as utter heck.
- Curl
- Homebrew
- Docker
# Installcurl -Lsf https://sh.benthos.dev | bash# Make a configbenthos create nats/protobuf/aws_sqs > ./config.yaml# Runbenthos -c ./config.yaml
# Installbrew install benthos# Make a configbenthos create nats/protobuf/aws_sqs > ./config.yaml# Runbenthos -c ./config.yaml
# Pulldocker pull jeffail/benthos# Make a configdocker run --rm jeffail/benthos create nats/protobuf/aws_sqs > ./config.yaml# Rundocker run --rm -v $(pwd)/config.yaml:/benthos.yaml jeffail/benthos
- Mapping
- Multiplexing
- Windowing
- Enrichments
Read aboutinput:gcp_pubsub:project: foosubscription: barpipeline:processors:- bloblang: |root.message = thisroot.meta.link_count = this.links.length()root.user.age = this.user.age.number()output:redis_streams:url: tcp://TODO:6379stream: bazmax_in_flight: 20
Read aboutinput:kafka:addresses: [ TODO ]topics: [ foo, bar ]consumer_group: foogroupoutput:switch:cases:- check: doc.tags.contains("AWS")output:aws_sqs:url: https://sqs.us-west-2.amazonaws.com/TODO/TODOmax_in_flight: 20- output:redis_pubsub:url: tcp://TODO:6379channel: bazmax_in_flight: 20
Read aboutinput:nats_jetstream:urls: [ nats://TODO:4222 ]queue: myqueuesubject: traffic.light.eventsdeliver: allbuffer:system_window:timestamp_mapping: root = this.created_atsize: 1hpipeline:processors:- group_by_value:value: '${! json("traffic_light_id") }'- bloblang: |root = if batch_index() == 0 {{"traffic_light_id": this.traffic_light_id,"created_at": meta("window_end_timestamp"),"total_cars": json("registration_plate").from_all().unique().length(),"passengers": json("passengers").from_all().sum(),}} else { deleted() }output:pulsar:url: pulsar://TODO:6650topic: traffic_windows
Read aboutinput:mqtt:urls: [ tcp://TODO:1883 ]topics: [ foo ]pipeline:processors:- branch:request_map: |root.id = this.doc.idroot.content = this.doc.bodyprocessors:- aws_lambda:function: sentiment_analysisresult_map: root.results.sentiment = thisoutput:aws_s3:bucket: TODOpath: '${! meta("partition") }/${! timestamp_unix_nano() }.tar.gz'batching:count: 100period: 10sprocessors:- archive:format: tar- compress:algorithm: gzip
Takes Care of the Dull Stuff
Benthos solves common data engineering tasks such as transformations, integrations, and multiplexing with declarative and unit testable configuration. This allows you to easily and incrementally adapt your data pipelines as requirements change, letting you focus on the more exciting stuff.
It comes armed with a wide range of processors, a lit mapping language, stateless windowed processing capabilities and an industry leading mascot.
Well Connected
Benthos is able to glue a wide range of sources and sinks together and hook into a variety of databases, caches, HTTP APIs, lambdas and more, enabling you to seamlessly drop it into your existing infrastructure.
Working with disparate APIs and services can be a daunting task, doubly so in a streaming data context. With Benthos it's possible to break these tasks down and automatically parallelize them as a streaming workflow.
Reliable and Operationally Simple
Benthos runs fast and processes messages using a transaction model, making it able to guarantee at-least-once delivery even in the event of crashes, disk corruption, or other unexpected server faults.
It's completely stateless with no need for disk persisted state, allowing for easy deployment and liberal scaling. It also exposes metrics and tracing events to targets of your choice.
At Meltwater it's enriching over 450 million documents per day with a network of more than 20 NLP services. It sounds very interesting but rest assured, it's totally drab.
Extendable
Sometimes the components that come with Benthos aren't enough. Luckily, Benthos has been designed to be easily plugged with whatever components you need.
You can either write plugins directly in Go (recommended) or you can have Benthos run your plugin as a subprocess.