Skip to main content

kinesis_balanced

DEPRECATED

This component is deprecated and will be removed in the next major version release. Please consider moving onto alternative components.

Receives messages from a Kinesis stream and automatically balances shards across consumers.

# Common config fields, showing default values
input:
label: ""
kinesis_balanced:
stream: ""
dynamodb_table: ""
start_from_oldest: true
region: eu-west-1
batching:
count: 0
byte_size: 0
period: ""
check: ""

Alternatives​

This input is being replaced with the shiny new aws_kinesis input, which has improved features, consider trying it out instead.

Metadata​

This input adds the following metadata fields to each message:

- kinesis_shard
- kinesis_partition_key
- kinesis_sequence_number

You can access these metadata fields using function interpolation.

Fields​

stream​

The Kinesis stream to consume from.

Type: string
Default: ""

dynamodb_table​

A DynamoDB table to use for offset storage.

Type: string
Default: ""

dynamodb_billing_mode​

A billing mode to set for the offset DynamoDB table.

Type: string
Default: ""

dynamodb_read_provision​

The read capacity of the offset DynamoDB table.

Type: int
Default: 0

dynamodb_write_provision​

The write capacity of the offset DynamoDB table.

Type: int
Default: 0

start_from_oldest​

Whether to consume from the oldest message when an offset does not yet exist for the stream.

Type: bool
Default: true

region​

The AWS region to target.

Type: string
Default: "eu-west-1"

endpoint​

Allows you to specify a custom endpoint for the AWS API.

Type: string
Default: ""

credentials​

Optional manual configuration of AWS credentials to use. More information can be found in this document.

Type: object

credentials.profile​

A profile from ~/.aws/credentials to use.

Type: string
Default: ""

credentials.id​

The ID of credentials to use.

Type: string
Default: ""

credentials.secret​

The secret for the credentials being used.

Type: string
Default: ""

credentials.token​

The token for the credentials being used, required when using short term credentials.

Type: string
Default: ""

credentials.role​

A role ARN to assume.

Type: string
Default: ""

credentials.role_external_id​

An external ID to provide when assuming a role.

Type: string
Default: ""

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array
Default: []

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}