kinesis_balanced
DEPRECATED
This component is deprecated and will be removed in the next major version release. Please consider moving onto alternative components.
Receives messages from a Kinesis stream and automatically balances shards across consumers.
- Common
- Advanced
# Common config fields, showing default valuesinput:label: ""kinesis_balanced:stream: ""dynamodb_table: ""start_from_oldest: trueregion: eu-west-1batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesinput:label: ""kinesis_balanced:stream: ""dynamodb_table: ""dynamodb_billing_mode: ""dynamodb_read_provision: 0dynamodb_write_provision: 0start_from_oldest: trueregion: eu-west-1endpoint: ""credentials:profile: ""id: ""secret: ""token: ""role: ""role_external_id: ""batching:count: 0byte_size: 0period: ""check: ""processors: []
Alternatives​
This input is being replaced with the shiny new aws_kinesis
input, which has improved features, consider trying it out instead.
Metadata​
This input adds the following metadata fields to each message:
- kinesis_shard- kinesis_partition_key- kinesis_sequence_number
You can access these metadata fields using function interpolation.
Fields​
stream
​
The Kinesis stream to consume from.
Type: string
Default: ""
dynamodb_table
​
A DynamoDB table to use for offset storage.
Type: string
Default: ""
dynamodb_billing_mode
​
A billing mode to set for the offset DynamoDB table.
Type: string
Default: ""
dynamodb_read_provision
​
The read capacity of the offset DynamoDB table.
Type: int
Default: 0
dynamodb_write_provision
​
The write capacity of the offset DynamoDB table.
Type: int
Default: 0
start_from_oldest
​
Whether to consume from the oldest message when an offset does not yet exist for the stream.
Type: bool
Default: true
region
​
The AWS region to target.
Type: string
Default: "eu-west-1"
endpoint
​
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
credentials
​
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
credentials.profile
​
A profile from ~/.aws/credentials
to use.
Type: string
Default: ""
credentials.id
​
The ID of credentials to use.
Type: string
Default: ""
credentials.secret
​
The secret for the credentials being used.
Type: string
Default: ""
credentials.token
​
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
credentials.role
​
A role ARN to assume.
Type: string
Default: ""
credentials.role_external_id
​
An external ID to provide when assuming a role.
Type: string
Default: ""
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}