Skip to main content

Advanced Bloblang

Map Parameters​

A map definition only has one input parameter, which is the context that it is called upon:

map formatting {
root = "(%v)".format(this)
}
root.a = this.a.apply("formatting")
root.b = this.b.apply("formatting")
# In: {"a":"foo","b":"bar"}
# Out: {"a":"(foo)","b":"(bar)"}

However, in cases where we wish to provide multiple named parameters to a mapping we can execute them on object literals for the same effect:

However, we can still use object literals for this purpose. Imagine if we wanted a map that is the exact same as above except the pattern is [%v] instead, with the potential for even more patterns in the future. To do that we can pass an object with a field value with our target to map and a field pattern which allows us to specify the pattern to apply:

map formatting {
root = this.pattern.format(this.value)
}
root.a = {
"value":this.a,
"pattern":this.pattern,
}.apply("formatting")
root.b = {
"value":this.b,
"pattern":this.pattern,
}.apply("formatting")
# In: {"a":"foo","b":"bar","pattern":"[%v]"}
# Out: {"a":"[foo]","b":"[bar]"}

Walking the Tree​

Sometimes it's necessary to perform a mapping on all values within an unknown tree structure. You can do that easily with recursive mapping:

map unescape_values {
root = match {
this.type() == "object" => this.map_each(item -> item.value.apply("unescape_values")),
this.type() == "array" => this.map_each(ele -> ele.apply("unescape_values")),
this.type() == "string" => this.unescape_html(),
this.type() == "bytes" => this.unescape_html(),
_ => this,
}
}
root = this.apply("unescape_values")
# In: {"first":{"nested":"foo & bar"},"second":10,"third":["1 < 2",{"also_nested":"2 > 1"}]}
# Out: {"first":{"nested":"foo & bar"},"second":10,"third":["1 < 2",{"also_nested":"2 > 1"}]}

Message Expansion​

Expanding a single message into multiple messages can be done by mapping messages into an array and following it up with an unarchive processor. For example, given documents of this format:

{
"id": "foobar",
"items": [
{"content":"foo"},
{"content":"bar"},
{"content":"baz"}
]
}

We can pull items out to the root with root = items with a bloblang processor and follow it with an unarchive processor to expand each element into its own independent message:

pipeline:
processors:
- bloblang: root = this.items
- unarchive:
format: json_array

However, most of the time we also need to map the elements before expanding them, and often that includes copying fields outside of our target array. We can do that with methods such as map_each and merge:

root = this.items.map_each(ele -> this.without("items").merge(ele))
# In: {"id":"foobar","items":[{"content":"foo"},{"content":"bar"},{"content":"baz"}]}
# Out: [{"content":"foo","id":"foobar"},{"content":"bar","id":"foobar"},{"content":"baz","id":"foobar"}]

However, the above mapping is slightly inefficient as we would create a copy of our source object for each element with the this.without("items") part. A more efficient way to do this would be to capture that query within a variable:

let doc_root = this.without("items")
root = this.items.map_each($doc_root.merge(this))
# In: {"id":"foobar","items":[{"content":"foo"},{"content":"bar"},{"content":"baz"}]}
# Out: [{"content":"foo","id":"foobar"},{"content":"bar","id":"foobar"},{"content":"baz","id":"foobar"}]

Also note that when we set doc_root we remove the field items from the target document. The full config would now be:

pipeline:
processors:
- bloblang: |
let doc_root = this.without("items")
root = this.items.map_each($doc_root.merge(this))
- unarchive:
format: json_array

Creating CSV​

Benthos has a few different ways of outputting a stream of CSV data. However, the best way to do it is by converting the documents into CSV rows with Bloblang as this gives you full control over exactly how the schema is generated, erroneous data is handled, and escaping of column data is performed.

A common and simple use case is to simply flatten documents and write out the column values in alphabetical order. The first row we generate should also be prefixed with a row containing those column names. Here's a mapping that achieves this by using a count function to detect the very first invocation of the mapping in a stream pipeline:

map escape_csv {
root = if this.re_match("[\"\n,]+") {
"\"" + this.replace("\"", "\"\"") + "\""
} else {
this
}
}
# Extract key/value pairs as an array and sort by the key
let kvs = this.key_values().sort_by(v -> v.key)
# Create a header prefix for our output only on the first row
let header = if count("rows_in_file") == 1 {
$kvs.map_each(kv -> kv.key.apply("escape_csv")).join(",") + "\n"
} else { "" }
root = $header + $kvs.map_each(kv -> kv.value.string().apply("escape_csv")).join(",")

And with this mapping we can write the data to a newly created CSV file using an output with a simple lines codec:

output:
file:
path: ./result.csv
codec: lines

Perhaps the first expansion of this mapping that would be worthwhile is to add an explicit list of column names, or at least confirm that the number of values in a row matches an expected count.