Skip to main content

gcp_bigquery

EXPERIMENTAL

This component is experimental and therefore subject to change or removal outside of major version releases.

Sends messages as new rows to a Google Cloud BigQuery table.

Introduced in version 3.55.0.

# Common config fields, showing default values
output:
label: ""
gcp_bigquery:
project: ""
dataset: ""
table: ""
format: NEWLINE_DELIMITED_JSON
max_in_flight: 64
csv:
header: []
field_delimiter: ','
batching:
count: 0
byte_size: 0
period: ""
check: ""

Credentials​

By default Benthos will use a shared credentials file when connecting to GCP services. You can find out more in this document.

Format​

This output currently supports only CSV and NEWLINE_DELIMITED_JSON formats. Learn more about how to use GCP BigQuery with them here:

Each message may contain multiple elements separated by newlines. For example a single message containing:

{"key": "1"}
{"key": "2"}

Is equivalent to two separate messages:

{"key": "1"}

And:

{"key": "2"}

The same is true for the CSV format.

CSV​

For the CSV format when the field csv.header is specified a header row will be inserted as the first line of each message batch. If this field is not provided then the first message of each message batch must include a header line.

Performance​

This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages with the field max_in_flight.

This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.

Fields​

project​

The project ID of the dataset to insert data to. If not set, it will be inferred from the credentials or read from the GOOGLE_CLOUD_PROJECT environment variable.

Type: string
Default: ""

dataset​

The BigQuery Dataset ID.

Type: string

table​

The table to insert messages to.

Type: string

format​

The format of each incoming message.

Type: string
Default: "NEWLINE_DELIMITED_JSON"
Options: NEWLINE_DELIMITED_JSON, CSV.

max_in_flight​

The maximum number of messages to have in flight at a given time. Increase this to improve throughput.

Type: int
Default: 64

write_disposition​

Specifies how existing data in a destination table is treated.

Type: string
Default: "WRITE_APPEND"
Options: WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE.

create_disposition​

Specifies the circumstances under which destination table will be created. If CREATE_IF_NEEDED is used the GCP BigQuery will create the table if it does not already exist and tables are created atomically on successful completion of a job. The CREATE_NEVER option ensures the table must already exist and will not be automatically created.

Type: string
Default: "CREATE_IF_NEEDED"
Options: CREATE_IF_NEEDED, CREATE_NEVER.

ignore_unknown_values​

Causes values not matching the schema to be tolerated. Unknown values are ignored. For CSV this ignores extra values at the end of a line. For JSON this ignores named values that do not match any column name. If this field is set to false (the default value), records containing unknown values are treated as bad records. The max_bad_records field can be used to customize how bad records are handled.

Type: bool
Default: false

max_bad_records​

The maximum number of bad records that will be ignored when reading data.

Type: int
Default: 0

auto_detect​

Indicates if we should automatically infer the options and schema for CSV and JSON sources. If the table doesn't exist and this field is set to false the output may not be able to insert data and will throw insertion error. Be careful using this field since it delegates to the GCP BigQuery service the schema detection and values like "no" may be treated as booleans for the CSV format.

Type: bool
Default: false

csv​

Specify how CSV data should be interpretted.

Type: object

csv.header​

A list of values to use as header for each batch of messages. If not specified the first line of each message will be used as header.

Type: array
Default: []

csv.field_delimiter​

The separator for fields in a CSV file, used when reading or exporting data.

Type: string
Default: ","

csv.allow_jagged_rows​

Causes missing trailing optional columns to be tolerated when reading CSV data. Missing values are treated as nulls.

Type: bool
Default: false

csv.allow_quoted_newlines​

Sets whether quoted data sections containing newlines are allowed when reading CSV data.

Type: bool
Default: false

csv.encoding​

Encoding is the character encoding of data to be read.

Type: string
Default: "UTF-8"
Options: UTF-8, ISO-8859-1.

csv.skip_leading_rows​

The number of rows at the top of a CSV file that BigQuery will skip when reading data. The default value is 1 since Benthos will add the specified header in the first line of each batch sent to BigQuery.

Type: int
Default: 1

batching​

Allows you to configure a batching policy.

Type: object

# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m

batching.count​

A number of messages at which the batch should be flushed. If 0 disables count based batching.

Type: int
Default: 0

batching.byte_size​

An amount of bytes at which the batch should be flushed. If 0 disables size based batching.

Type: int
Default: 0

batching.period​

A period in which an incomplete batch should be flushed regardless of its size.

Type: string
Default: ""

# Examples
period: 1s
period: 1m
period: 500ms

batching.check​

A Bloblang query that should return a boolean value indicating whether a message should end a batch.

Type: string
Default: ""

# Examples
check: this.type == "end_of_transaction"

batching.processors​

A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.

Type: array

# Examples
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
processors:
- merge_json: {}