Skip to main content

hdfs

Sends message parts as files to a HDFS directory.

# Common config fields, showing default values
output:
label: ""
hdfs:
hosts:
- localhost:9000
user: benthos_hdfs
directory: ""
path: ${!count("files")}-${!timestamp_unix_nano()}.txt
max_in_flight: 1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Each file is written with the path specified with the 'path' field, in order to have a different path for each object you should use function interpolations described here.

Performance​

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

Fields​

hosts​

A list of hosts to connect to.

Type: array
Default: ["localhost:9000"]

# Examples
hosts: localhost:9000

user​

A user identifier.

Type: string
Default: "benthos_hdfs"

directory​

A directory to store message files within. If the directory does not exist it will be created.

Type: string
Default: ""

path​

The path to upload messages as, interpolation functions should be used in order to generate unique file paths. This field supports interpolation functions.

Type: string
Default: "${!count(\"files\")}-${!timestamp_unix_nano()}.txt"

# Examples
path: ${!count("files")}-${!timestamp_unix_nano()}.txt

max_in_flight​

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 1

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}