Skip to main content

parquet

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

Converts batches of documents to or from Parquet files.

Introduced in version 3.62.0.

# Config fields, showing default values
label: ""
parquet:
operator: ""
compression: snappy
schema_file: ""
schema: ""

Troubleshooting​

This processor is experimental and the error messages that it provides are often vague and unhelpful. An error message of the form interface {} is nil, not <value type> implies that a field of the given type was expected but not found in the processed message when writing parquet files.

Unfortunately the name of the field will sometimes be missing from the error, in which case it's worth double checking the schema you provided to make sure that there are no typos in the field names, and if that doesn't reveal the issue it can help to mark fields as OPTIONAL in the schema and gradually change them back to REQUIRED until the error returns.

Defining the Schema​

The schema must be specified as a JSON string, containing an object that describes the fields expected at the root of each document. Each field can itself have more fields defined, allowing for nested structures:

{
"Tag": "name=root, repetitiontype=REQUIRED",
"Fields": [
{"Tag": "name=name, inname=NameIn, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=REQUIRED"},
{"Tag": "name=age, inname=Age, type=INT32, repetitiontype=REQUIRED"},
{"Tag": "name=id, inname=Id, type=INT64, repetitiontype=REQUIRED"},
{"Tag": "name=weight, inname=Weight, type=FLOAT, repetitiontype=REQUIRED"},
{
"Tag": "name=favPokemon, inname=FavPokemon, type=LIST, repetitiontype=OPTIONAL",
"Fields": [
{"Tag": "name=name, inname=PokeName, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=REQUIRED"},
{"Tag": "name=coolness, inname=Coolness, type=FLOAT, repetitiontype=REQUIRED"}
]
}
]
}

Fields​

operator​

Determines whether the processor converts messages into a parquet file or expands parquet files into messages. Converting into JSON allows subsequent processors and mappings to convert the data into any other format.

Type: string

OptionSummary
from_jsonCompress a batch of JSON documents into a file.
to_jsonExpand a file into one or more JSON messages.

compression​

The type of compression to use when writing parquet files, this field is ignored when consuming parquet files.

Type: string
Default: "snappy"
Options: uncompressed, snappy, gzip, lz4, zstd.

schema_file​

A file path containing a schema used to describe the parquet files being generated or consumed, the format of the schema is a JSON document detailing the tag and fields of documents. The schema can be found at: https://pkg.go.dev/github.com/xitongsys/parquet-go#readme-json. Either a schema_file or schema field must be specified.

Type: string

# Examples
schema_file: schemas/foo.json

schema​

A schema used to describe the parquet files being generated or consumed, the format of the schema is a JSON document detailing the tag and fields of documents. The schema can be found at: https://pkg.go.dev/github.com/xitongsys/parquet-go#readme-json. Either a schema_file or schema field must be specified.

Type: string

# Examples
schema: |-
{
"Tag": "name=root, repetitiontype=REQUIRED",
"Fields": [
{"Tag":"name=name,inname=NameIn,type=BYTE_ARRAY,convertedtype=UTF8, repetitiontype=REQUIRED"},
{"Tag":"name=age,inname=Age,type=INT32,repetitiontype=REQUIRED"}
]
}

Examples​

Parquet is often used to write batches of documents to a file store.

output:
broker:
outputs:
- file:
path: ./stuff-${! uuid_v4() }.parquet
codec: all-bytes
batching:
count: 100
period: 30s
processors:
- parquet:
operator: from_json
schema: |-
{
"Tag": "name=root, repetitiontype=REQUIRED",
"Fields": [
{"Tag":"name=name,inname=NameIn,type=BYTE_ARRAY,convertedtype=UTF8, repetitiontype=REQUIRED"},
{"Tag":"name=age,inname=Age,type=INT32,repetitiontype=REQUIRED"}
]
}