gcp_cloud_storage
BETA
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
Sends message parts as objects to a Google Cloud Storage bucket. Each object is
uploaded with the path specified with the path
field.
Introduced in version 3.43.0.
- Common
- Advanced
# Common config fields, showing default valuesoutput:label: ""gcp_cloud_storage:bucket: ""path: ${!count("files")}-${!timestamp_unix_nano()}.txtcontent_type: application/octet-streamcollision_mode: overwritemax_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""
# All config fields, showing default valuesoutput:label: ""gcp_cloud_storage:bucket: ""path: ${!count("files")}-${!timestamp_unix_nano()}.txtcontent_type: application/octet-streamcollision_mode: overwritecontent_encoding: ""chunk_size: 16777216max_in_flight: 1batching:count: 0byte_size: 0period: ""check: ""processors: []
In order to have a different path for each object you should use function interpolations described here, which are calculated per message of a batch.
Metadata​
Metadata fields on messages will be sent as headers, in order to mutate these values (or remove them) check out the metadata docs.
Credentials​
By default Benthos will use a shared credentials file when connecting to GCP services. You can find out more in this document.
Batching​
It's common to want to upload messages to Google Cloud Storage as batched
archives, the easiest way to do this is to batch your messages at the output
level and join the batch of messages with an
archive
and/or
compress
processor.
For example, if we wished to upload messages as a .tar.gz archive of documents we could achieve that with the following config:
output:gcp_cloud_storage:bucket: TODOpath: ${!count("files")}-${!timestamp_unix_nano()}.tar.gzbatching:count: 100period: 10sprocessors:- archive:format: tar- compress:algorithm: gzip
Alternatively, if we wished to upload JSON documents as a single large document containing an array of objects we can do that with:
output:gcp_cloud_storage:bucket: TODOpath: ${!count("files")}-${!timestamp_unix_nano()}.jsonbatching:count: 100processors:- archive:format: json_array
Performance​
This output benefits from sending multiple messages in flight in parallel for
improved performance. You can tune the max number of in flight messages with the
field max_in_flight
.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
Fields​
bucket
​
The bucket to upload messages to.
Type: string
Default: ""
path
​
The path of each message to upload. This field supports interpolation functions.
Type: string
Default: "${!count(\"files\")}-${!timestamp_unix_nano()}.txt"
# Examplespath: ${!count("files")}-${!timestamp_unix_nano()}.txtpath: ${!meta("kafka_key")}.jsonpath: ${!json("doc.namespace")}/${!json("doc.id")}.json
content_type
​
The content type to set for each object. This field supports interpolation functions.
Type: string
Default: "application/octet-stream"
collision_mode
​
Determines how file path collisions should be dealt with.
Type: string
Default: "overwrite"
Requires version 3.53.0 or newer
Option | Summary |
---|---|
overwrite | Replace the existing file with the new one. |
append | Append the message bytes to the original file. |
error-if-exists | Return an error, this is the equivalent of a nack. |
ignore | Do not modify the original file, the new data will be dropped. |
content_encoding
​
An optional content encoding to set for each object. This field supports interpolation functions.
Type: string
Default: ""
chunk_size
​
An optional chunk size which controls the maximum number of bytes of the object that the Writer will attempt to send to the server in a single request. If ChunkSize is set to zero, chunking will be disabled.
Type: int
Default: 16777216
max_in_flight
​
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
Type: int
Default: 1
batching
​
Allows you to configure a batching policy.
Type: object
# Examplesbatching:byte_size: 5000count: 0period: 1sbatching:count: 10period: 1sbatching:check: this.contains("END BATCH")count: 0period: 1m
batching.count
​
A number of messages at which the batch should be flushed. If 0
disables count based batching.
Type: int
Default: 0
batching.byte_size
​
An amount of bytes at which the batch should be flushed. If 0
disables size based batching.
Type: int
Default: 0
batching.period
​
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examplesperiod: 1speriod: 1mperiod: 500ms
batching.check
​
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examplescheck: this.type == "end_of_transaction"
batching.processors
​
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: array
Default: []
# Examplesprocessors:- archive:format: linesprocessors:- archive:format: json_arrayprocessors:- merge_json: {}