Streams Via Config Files
When running Benthos in streams
mode it's possible to create streams with their own static configurations, simply list one or more files after the streams
subcommand:
benthos streams ./foo.yaml ./configs/*.yaml
Resources​
A stream configuration should only include the base stream component fields (input
, buffer
, pipeline
, output
), and therefore should NOT include any resources. Instead, define resources separately and import them using the -r
/--resources
flag:
benthos -r "./resources/prod/*.yaml" streams ./stream_configs/*.yaml
Walkthrough​
Make a directory of stream configs:
$ mkdir ./streams$ cat > ./streams/foo.yaml <<EOFinput:http_server: {}pipeline:threads: 4processors:- bloblang: 'root = {"id": this.user.id, "content": this.body.content}'output:http_server: {}EOF$ cat > ./streams/bar.yaml <<EOFinput:kafka:addresses:- localhost:9092topics:- my_topicpipeline:threads: 1processors:- bloblang: 'root = this.uppercase()'output:elasticsearch:urls:- http://localhost:9200EOF
Run Benthos in streams mode, pointing to our directory of streams:
$ benthos streams ./streams/*.yaml
On a separate terminal you can query the set of streams loaded:
$ curl http://localhost:4195/streams | jq '.'{"bar": {"active": true,"uptime": 19.381001424,"uptime_str": "19.381001552s"},"foo": {"active": true,"uptime": 19.380582951,"uptime_str": "19.380583306s"}}
You can also query a specific stream to see the loaded configuration:
$ curl http://localhost:4195/streams/foo | jq '.'{"active": true,"uptime": 69.334717193,"uptime_str": "1m9.334717193s","config": {"input": {"http_server": {"address": "","cert_file": "","key_file": "","path": "/post","timeout": "5s"}},"buffer": {"memory": {"limit": 10000000}},"pipeline": {"processors": [{"bloblang": "root = {\"id\": this.user.id, \"content\": this.body.content}",}],"threads": 4},"output": {"http_server": {"address": "","cert_file": "","key_file": "","path": "/get","stream_path": "/get/stream","timeout": "5s"}}}}
You can then send data to the stream via it's namespaced URL:
$ curl http://localhost:4195/foo/post -d '{"user":{"id":"foo"},"body":{"content":"bar"}}'
There are other endpoints in the REST API for creating, updating and deleting streams.