aws_s3
Sends message parts as objects to an Amazon S3 bucket. Each object is uploaded
with the path specified with the path
field.
Introduced in version 3.36.0.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""aws_s3:bucket: ""path: ${!count("files")}-${!timestamp_unix_nano()}.txttags: {}content_type: application/octet-streammetadata:exclude_prefixes: []max_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""region: eu-west-1
# All config fields, showing default valuesoutput:label: ""aws_s3:bucket: ""path: ${!count("files")}-${!timestamp_unix_nano()}.txttags: {}content_type: application/octet-streamcontent_encoding: ""cache_control: ""content_disposition: ""content_language: ""website_redirect_location: ""metadata:exclude_prefixes: []storage_class: STANDARDkms_key_id: ""server_side_encryption: ""force_path_style_urls: falsemax_in_flight: 1timeout: 5sbatching:count: 0byte_size: 0period: ""check: ""processors: []region: eu-west-1endpoint: ""credentials:profile: ""id: ""secret: ""token: ""role: ""role_external_id: ""
In order to have a different path for each object you should use function interpolations described here, which are calculated per message of a batch.
Metadata​
Metadata fields on messages will be sent as headers, in order to mutate these values (or remove them) check out the metadata docs.
Tags​
The tags field allows you to specify key/value pairs to attach to objects as tags, where the values support interpolation functions:
output:aws_s3:bucket: TODOpath: ${!count("files")}-${!timestamp_unix_nano()}.tar.gztags:Key1: Value1Timestamp: ${!meta("Timestamp")}
Credentials​
By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in this document.
Batching​
It's common to want to upload messages to S3 as batched archives, the easiest
way to do this is to batch your messages at the output level and join the batch
of messages with an
archive
and/or
compress
processor.
For example, if we wished to upload messages as a .tar.gz archive of documents we could achieve that with the following config:
output:aws_s3:bucket: TODOpath: ${!count("files")}-${!timestamp_unix_nano()}.tar.gzbatching:count: 100period: 10sprocessors:- archive:format: tar- compress:algorithm: gzip
Alternatively, if we wished to upload JSON documents as a single large document containing an array of objects we can do that with:
output:aws_s3:bucket: TODOpath: ${!count("files")}-${!timestamp_unix_nano()}.jsonbatching:count: 100processors:- archive:format: json_array
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
Fields​
bucket
​
The bucket to upload messages to.
Type: string
Default: ""
path
​
The path of each message to upload. This field supports interpolation functions.
Type: string
Default: "${!count(\"files\")}-${!timestamp_unix_nano()}.txt"
# Examplespath: ${!count("files")}-${!timestamp_unix_nano()}.txtpath: ${!meta("kafka_key")}.jsonpath: ${!json("doc.namespace")}/${!json("doc.id")}.json
tags
​
Key/value pairs to store with the object as tags. This field supports interpolation functions.
Type: object
Default: {}
# Examplestags:Key1: Value1Timestamp: ${!meta("Timestamp")}
content_type
​
The content type to set for each object. This field supports interpolation functions.
Type: string
Default: "application/octet-stream"
content_encoding
​
An optional content encoding to set for each object. This field supports interpolation functions.
Type: string
Default: ""
cache_control
​
The cache control to set for each object. This field supports interpolation functions.
Type: string
Default: ""
content_disposition
​
The content disposition to set for each object. This field supports interpolation functions.
Type: string
Default: ""
content_language
​
The content language to set for each object. This field supports interpolation functions.
Type: string
Default: ""
website_redirect_location
​
The website redirect location to set for each object. This field supports interpolation functions.
Type: string
Default: ""
metadata
​
Specify criteria for which metadata values are attached to objects as headers.
Type: object
metadata.exclude_prefixes
​
Provide a list of explicit metadata key prefixes to be excluded when adding metadata to sent messages.
Type: array
Default: []
storage_class
​
The storage class to set for each object. This field supports interpolation functions.
Type: string
Default: "STANDARD"
Options: STANDARD
, REDUCED_REDUNDANCY
, GLACIER
, STANDARD_IA
, ONEZONE_IA
, INTELLIGENT_TIERING
, DEEP_ARCHIVE
.
kms_key_id
​
An optional server side encryption key.
Type: string
Default: ""
server_side_encryption
​
An optional server side encryption algorithm.
Type: string
Default: ""
Requires version 3.63.0 or newer
force_path_style_urls
​
Forces the client API to use path style URLs, which helps when connecting to custom endpoints.
Type: bool
Default: false
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1
timeout
​
The maximum period to wait on an upload before abandoning it and reattempting.
Type: string
Default: "5s"
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}
region
​
The AWS region to target.
Type: string
Default: "eu-west-1"
endpoint
​
Allows you to specify a custom endpoint for the AWS API.
Type: string
Default: ""
credentials
​
Optional manual configuration of AWS credentials to use. More information can be found in this document.
Type: object
credentials.profile
​
A profile from ~/.aws/credentials
to use.
Type: string
Default: ""
credentials.id
​
The ID of credentials to use.
Type: string
Default: ""
credentials.secret
​
The secret for the credentials being used.
Type: string
Default: ""
credentials.token
​
The token for the credentials being used, required when using short term credentials.
Type: string
Default: ""
credentials.role
​
A role ARN to assume.
Type: string
Default: ""
credentials.role_external_id
​
An external ID to provide when assuming a role.
Type: string
Default: ""