For the last few weeks I've been working on improving the workflow story in Benthos. That means reducing the number of processors, simplifying them, and at the same time making them more powerful than before. The new functionality outlined here can be used in the latest release v3.26.0.
The Motivationโ
After similar efforts to improve the mapping story in Benthos it seemed sensible to target workflows. Specifically, I've added a new branch
processor for wrapping child processors in request/result maps, and have reworked the workflow
processor to use them.
If you haven't used workflows in Benthos then there's a section in the new workflow
processor page outlining why they're useful. In short, when performing multiple integrations within a pipeline such as hitting HTTP services, lambdas, caches, etc, it's best to perform them in parallel when possible in order to reduce the processing latency of messages, organizing these integrations into a topology with a workflow makes it easier to manage their interdependencies and ensure they're executed in the right order.
In the old world you could use the process_dag
processor which has child process_map
processors, where the mappings were a series of clunky to/from dot paths, separated into optional and non-optional mappings. There was no way to manually specify the dependency tree, and conditional flows required a separate list of conditions which didn't factor into dependency resolution.
Having such complex and brittle mapping capabilities meant these processors were difficult to document and more so to understand and use.
Leaning into Bloblangโ
Thankfully, with Bloblang now finished it was pretty easy to replace most of the complexity of the workflow mappings for the language itself.
For example, when mapping the request payload for an integration you can express a bunch of different patterns...
Empty request body:
request_map: root = ""
Sub-object (foo
) as request body, if the sub-object doesn't exist (or is null) the integration is abandoned:
request_map: root = this.foo.not_null()
Sub-object as request body which can be obtained from one of a number of possible paths:
request_map: root = this.(foo | bar | baz).doc.not_null()
Conditional integration applies when the type
is foo
, with an unmodified message as request body:
request_map: |root = if this.type != "foo" {deleted()}
Conditional integration applies when the type
is foo
, with a sub-object as the request body:
request_map: |root = if this.type == "foo" {this.foo.not_null()} else {deleted()}
Similarly, it's possible to express a bunch of things in the result mapping...
Discard the result (the original message is unchanged):
result_map: ""
Place the entire result at a path:
result_map: root.foo = this
Place the result in a metadata field:
result_map: meta foo = this
If you want to see what it looks like there is an enrichment cookbook that demonstrates workflows in action, but there are also smaller examples on the workflow page such as the following snippet:
pipeline:processors:- workflow:meta_path: meta.workflowbranches:foo:request_map: 'root = ""'processors:- http:url: TODOresult_map: 'root.foo = this'bar:request_map: 'root = this.body'processors:- lambda:function: TODOresult_map: 'root.bar = this'baz:request_map: |root.fooid = this.foo.idroot.barstuff = this.bar.contentprocessors:- cache:resource: TODOoperator: setkey: ${! json("fooid") }value: ${! json("barstuff") }
Conclusionโ
The docs have been updated to use these new goodies. Obviously the old processors are still being maintained but in a mostly dormant state. The workflow and branch processors are currently labelled as beta
, but their general behavior is stable with the only exceptions being odd edge cases that might arise.
With the behavior of these processors being dramatically simplified I've also been able to simplify the documentation for them, which also means using more space on the page for example configs.
If you have feedback then get the absolute heck in the chat you utter recluse.