diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 65efc220b73ed..ac398ea512504 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -244,6 +244,7 @@ armhf backpressure backticks bigendian +bigquery bindir binfmt bitcast diff --git a/.github/actions/spelling/excludes.txt b/.github/actions/spelling/excludes.txt index 07a8e2a32cbc8..9f4b7c9f9fdf7 100644 --- a/.github/actions/spelling/excludes.txt +++ b/.github/actions/spelling/excludes.txt @@ -118,4 +118,5 @@ ^\Qwebsite/layouts/shortcodes/config/unit-tests.html\E$ ^lib/codecs/tests/data/native_encoding/ ^\Qwebsite/config.toml\E$ +^proto/google/ ignore$ diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index f239a69d464e9..79881d4b15da5 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -524,6 +524,7 @@ ingesters ingestor initdb initech +Insertdata installdeb Instrumentable interpolatedstring diff --git a/build.rs b/build.rs index 5419a99b1b4d5..45b8ca5992977 100644 --- a/build.rs +++ b/build.rs @@ -117,6 +117,7 @@ fn main() { #[cfg(feature = "protobuf-build")] { println!("cargo:rerun-if-changed=proto/third-party/dnstap.proto"); + println!("cargo:rerun-if-changed=proto/third-party/google/cloud/bigquery/storage/v1/storage.proto"); println!("cargo:rerun-if-changed=proto/third-party/google/pubsub/v1/pubsub.proto"); println!("cargo:rerun-if-changed=proto/third-party/google/rpc/status.proto"); println!("cargo:rerun-if-changed=proto/vector/dd_metric.proto"); @@ -148,6 +149,7 @@ fn main() { "proto/vector/ddsketch_full.proto", "proto/vector/dd_metric.proto", "proto/vector/dd_trace.proto", + "proto/third-party/google/cloud/bigquery/storage/v1/storage.proto", "proto/third-party/google/pubsub/v1/pubsub.proto", "proto/third-party/google/rpc/status.proto", "proto/vector/vector.proto", @@ -268,3 +270,4 @@ fn main() { // Emit the aforementioned stanzas. tracker.emit_rerun_stanzas(); } + diff --git a/lib/codecs/tests/data/protobuf/integration.desc b/lib/codecs/tests/data/protobuf/integration.desc new file mode 100644 index 0000000000000..3164f30f2b192 --- /dev/null +++ b/lib/codecs/tests/data/protobuf/integration.desc @@ -0,0 +1,11 @@ + +ˇ +integration.prototest"… + Integration +time (Rtime +count (Rcount + +percentile (R +percentile +message ( Rmessage +user ( Ruser \ No newline at end of file diff --git a/lib/codecs/tests/data/protobuf/integration.proto b/lib/codecs/tests/data/protobuf/integration.proto new file mode 100644 index 0000000000000..dd1c7ad016aa3 --- /dev/null +++ b/lib/codecs/tests/data/protobuf/integration.proto @@ -0,0 +1,14 @@ +// Remember to recompile `integration.desc` when you update this file: +// protoc -I . -o integration.desc integration.proto + +syntax = "proto2"; + +package test; + +message Integration { + required int64 time = 1; + required int64 count = 2; + optional float percentile = 3; + required string message = 4; + optional string user = 5; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/arrow.proto b/proto/third-party/google/cloud/bigquery/storage/v1/arrow.proto new file mode 100644 index 0000000000000..05036d21db528 --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/arrow.proto @@ -0,0 +1,64 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "ArrowProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; + +// Arrow schema as specified in +// https://arrow.apache.org/docs/python/api/datatypes.html +// and serialized to bytes using IPC: +// https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc +// +// See code samples on how this message can be deserialized. +message ArrowSchema { + // IPC serialized Arrow schema. + bytes serialized_schema = 1; +} + +// Arrow RecordBatch. +message ArrowRecordBatch { + // IPC-serialized Arrow RecordBatch. + bytes serialized_record_batch = 1; + + // [Deprecated] The count of rows in `serialized_record_batch`. + // Please use the format-independent ReadRowsResponse.row_count instead. + int64 row_count = 2 [deprecated = true]; +} + +// Contains options specific to Arrow Serialization. +message ArrowSerializationOptions { + // Compression codec's supported by Arrow. + enum CompressionCodec { + // If unspecified no compression will be used. + COMPRESSION_UNSPECIFIED = 0; + + // LZ4 Frame (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) + LZ4_FRAME = 1; + + // Zstandard compression. + ZSTD = 2; + } + + // The compression codec to use for Arrow buffers in serialized record + // batches. + CompressionCodec buffer_compression = 2; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/avro.proto b/proto/third-party/google/cloud/bigquery/storage/v1/avro.proto new file mode 100644 index 0000000000000..588406aba31c5 --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/avro.proto @@ -0,0 +1,56 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "AvroProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; + +// Avro schema. +message AvroSchema { + // Json serialized schema, as described at + // https://avro.apache.org/docs/1.8.1/spec.html. + string schema = 1; +} + +// Avro rows. +message AvroRows { + // Binary serialized rows in a block. + bytes serialized_binary_rows = 1; + + // [Deprecated] The count of rows in the returning block. + // Please use the format-independent ReadRowsResponse.row_count instead. + int64 row_count = 2 [deprecated = true]; +} + +// Contains options specific to Avro Serialization. +message AvroSerializationOptions { + // Enable displayName attribute in Avro schema. + // + // The Avro specification requires field names to be alphanumeric. By + // default, in cases when column names do not conform to these requirements + // (e.g. non-ascii unicode codepoints) and Avro is requested as an output + // format, the CreateReadSession call will fail. + // + // Setting this field to true, populates avro field names with a placeholder + // value and populates a "displayName" attribute for every avro field with the + // original column name. + bool enable_display_name_attribute = 1; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/protobuf.proto b/proto/third-party/google/cloud/bigquery/storage/v1/protobuf.proto new file mode 100644 index 0000000000000..e12f4d8db0794 --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/protobuf.proto @@ -0,0 +1,48 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +import "google/protobuf/descriptor.proto"; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "ProtoBufProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; + +// ProtoSchema describes the schema of the serialized protocol buffer data rows. +message ProtoSchema { + // Descriptor for input message. The provided descriptor must be self + // contained, such that data rows sent can be fully decoded using only the + // single descriptor. For data rows that are compositions of multiple + // independent messages, this means the descriptor may need to be transformed + // to only use nested types: + // https://developers.google.com/protocol-buffers/docs/proto#nested + // + // For additional information for how proto types and values map onto BigQuery + // see: https://cloud.google.com/bigquery/docs/write-api#data_type_conversions + google.protobuf.DescriptorProto proto_descriptor = 1; +} + +message ProtoRows { + // A sequence of rows serialized as a Protocol Buffer. + // + // See https://developers.google.com/protocol-buffers/docs/overview for more + // information on deserializing this field. + repeated bytes serialized_rows = 1; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/storage.proto b/proto/third-party/google/cloud/bigquery/storage/v1/storage.proto new file mode 100644 index 0000000000000..187bf549dac7a --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/storage.proto @@ -0,0 +1,757 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +import "google/api/annotations.proto"; +import "google/api/client.proto"; +import "google/api/field_behavior.proto"; +import "google/api/resource.proto"; +import "google/cloud/bigquery/storage/v1/arrow.proto"; +import "google/cloud/bigquery/storage/v1/avro.proto"; +import "google/cloud/bigquery/storage/v1/protobuf.proto"; +import "google/cloud/bigquery/storage/v1/stream.proto"; +import "google/cloud/bigquery/storage/v1/table.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; +import "google/rpc/status.proto"; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "StorageProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; +option (google.api.resource_definition) = { + type: "bigquery.googleapis.com/Table" + pattern: "projects/{project}/datasets/{dataset}/tables/{table}" +}; + +// BigQuery Read API. +// +// The Read API can be used to read data from BigQuery. +service BigQueryRead { + option (google.api.default_host) = "bigquerystorage.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/bigquery," + "https://www.googleapis.com/auth/cloud-platform"; + + // Creates a new read session. A read session divides the contents of a + // BigQuery table into one or more streams, which can then be used to read + // data from the table. The read session also specifies properties of the + // data to be read, such as a list of columns or a push-down filter describing + // the rows to be returned. + // + // A particular row can be read by at most one stream. When the caller has + // reached the end of each stream in the session, then all the data in the + // table has been read. + // + // Data is assigned to each stream such that roughly the same number of + // rows can be read from each stream. Because the server-side unit for + // assigning data is collections of rows, the API does not guarantee that + // each stream will return the same number or rows. Additionally, the + // limits are enforced based on the number of pre-filtered rows, so some + // filters can lead to lopsided assignments. + // + // Read sessions automatically expire 6 hours after they are created and do + // not require manual clean-up by the caller. + rpc CreateReadSession(CreateReadSessionRequest) returns (ReadSession) { + option (google.api.http) = { + post: "/v1/{read_session.table=projects/*/datasets/*/tables/*}" + body: "*" + }; + option (google.api.method_signature) = + "parent,read_session,max_stream_count"; + } + + // Reads rows from the stream in the format prescribed by the ReadSession. + // Each response contains one or more table rows, up to a maximum of 100 MiB + // per response; read requests which attempt to read individual rows larger + // than 100 MiB will fail. + // + // Each request also returns a set of stream statistics reflecting the current + // state of the stream. + rpc ReadRows(ReadRowsRequest) returns (stream ReadRowsResponse) { + option (google.api.http) = { + get: "/v1/{read_stream=projects/*/locations/*/sessions/*/streams/*}" + }; + option (google.api.method_signature) = "read_stream,offset"; + } + + // Splits a given `ReadStream` into two `ReadStream` objects. These + // `ReadStream` objects are referred to as the primary and the residual + // streams of the split. The original `ReadStream` can still be read from in + // the same manner as before. Both of the returned `ReadStream` objects can + // also be read from, and the rows returned by both child streams will be + // the same as the rows read from the original stream. + // + // Moreover, the two child streams will be allocated back-to-back in the + // original `ReadStream`. Concretely, it is guaranteed that for streams + // original, primary, and residual, that original[0-j] = primary[0-j] and + // original[j-n] = residual[0-m] once the streams have been read to + // completion. + rpc SplitReadStream(SplitReadStreamRequest) + returns (SplitReadStreamResponse) { + option (google.api.http) = { + get: "/v1/{name=projects/*/locations/*/sessions/*/streams/*}" + }; + } +} + +// BigQuery Write API. +// +// The Write API can be used to write data to BigQuery. +// +// For supplementary information about the Write API, see: +// https://cloud.google.com/bigquery/docs/write-api +service BigQueryWrite { + option (google.api.default_host) = "bigquerystorage.googleapis.com"; + option (google.api.oauth_scopes) = + "https://www.googleapis.com/auth/bigquery," + "https://www.googleapis.com/auth/bigquery.insertdata," + "https://www.googleapis.com/auth/cloud-platform"; + + // Creates a write stream to the given table. + // Additionally, every table has a special stream named '_default' + // to which data can be written. This stream doesn't need to be created using + // CreateWriteStream. It is a stream that can be used simultaneously by any + // number of clients. Data written to this stream is considered committed as + // soon as an acknowledgement is received. + rpc CreateWriteStream(CreateWriteStreamRequest) returns (WriteStream) { + option (google.api.http) = { + post: "/v1/{parent=projects/*/datasets/*/tables/*}" + body: "write_stream" + }; + option (google.api.method_signature) = "parent,write_stream"; + } + + // Appends data to the given stream. + // + // If `offset` is specified, the `offset` is checked against the end of + // stream. The server returns `OUT_OF_RANGE` in `AppendRowsResponse` if an + // attempt is made to append to an offset beyond the current end of the stream + // or `ALREADY_EXISTS` if user provides an `offset` that has already been + // written to. User can retry with adjusted offset within the same RPC + // connection. If `offset` is not specified, append happens at the end of the + // stream. + // + // The response contains an optional offset at which the append + // happened. No offset information will be returned for appends to a + // default stream. + // + // Responses are received in the same order in which requests are sent. + // There will be one response for each successful inserted request. Responses + // may optionally embed error information if the originating AppendRequest was + // not successfully processed. + // + // The specifics of when successfully appended data is made visible to the + // table are governed by the type of stream: + // + // * For COMMITTED streams (which includes the default stream), data is + // visible immediately upon successful append. + // + // * For BUFFERED streams, data is made visible via a subsequent `FlushRows` + // rpc which advances a cursor to a newer offset in the stream. + // + // * For PENDING streams, data is not made visible until the stream itself is + // finalized (via the `FinalizeWriteStream` rpc), and the stream is explicitly + // committed via the `BatchCommitWriteStreams` rpc. + rpc AppendRows(stream AppendRowsRequest) returns (stream AppendRowsResponse) { + option (google.api.http) = { + post: "/v1/{write_stream=projects/*/datasets/*/tables/*/streams/*}" + body: "*" + }; + option (google.api.method_signature) = "write_stream"; + } + + // Gets information about a write stream. + rpc GetWriteStream(GetWriteStreamRequest) returns (WriteStream) { + option (google.api.http) = { + post: "/v1/{name=projects/*/datasets/*/tables/*/streams/*}" + body: "*" + }; + option (google.api.method_signature) = "name"; + } + + // Finalize a write stream so that no new data can be appended to the + // stream. Finalize is not supported on the '_default' stream. + rpc FinalizeWriteStream(FinalizeWriteStreamRequest) + returns (FinalizeWriteStreamResponse) { + option (google.api.http) = { + post: "/v1/{name=projects/*/datasets/*/tables/*/streams/*}" + body: "*" + }; + option (google.api.method_signature) = "name"; + } + + // Atomically commits a group of `PENDING` streams that belong to the same + // `parent` table. + // + // Streams must be finalized before commit and cannot be committed multiple + // times. Once a stream is committed, data in the stream becomes available + // for read operations. + rpc BatchCommitWriteStreams(BatchCommitWriteStreamsRequest) + returns (BatchCommitWriteStreamsResponse) { + option (google.api.http) = { + get: "/v1/{parent=projects/*/datasets/*/tables/*}" + }; + option (google.api.method_signature) = "parent"; + } + + // Flushes rows to a BUFFERED stream. + // + // If users are appending rows to BUFFERED stream, flush operation is + // required in order for the rows to become available for reading. A + // Flush operation flushes up to any previously flushed offset in a BUFFERED + // stream, to the offset specified in the request. + // + // Flush is not supported on the _default stream, since it is not BUFFERED. + rpc FlushRows(FlushRowsRequest) returns (FlushRowsResponse) { + option (google.api.http) = { + post: "/v1/{write_stream=projects/*/datasets/*/tables/*/streams/*}" + body: "*" + }; + option (google.api.method_signature) = "write_stream"; + } +} + +// Request message for `CreateReadSession`. +message CreateReadSessionRequest { + // Required. The request project that owns the session, in the form of + // `projects/{project_id}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "cloudresourcemanager.googleapis.com/Project" + } + ]; + + // Required. Session to be created. + ReadSession read_session = 2 [(google.api.field_behavior) = REQUIRED]; + + // Max initial number of streams. If unset or zero, the server will + // provide a value of streams so as to produce reasonable throughput. Must be + // non-negative. The number of streams may be lower than the requested number, + // depending on the amount parallelism that is reasonable for the table. + // There is a default system max limit of 1,000. + // + // This must be greater than or equal to preferred_min_stream_count. + // Typically, clients should either leave this unset to let the system to + // determine an upper bound OR set this a size for the maximum "units of work" + // it can gracefully handle. + int32 max_stream_count = 3; + + // The minimum preferred stream count. This parameter can be used to inform + // the service that there is a desired lower bound on the number of streams. + // This is typically a target parallelism of the client (e.g. a Spark + // cluster with N-workers would set this to a low multiple of N to ensure + // good cluster utilization). + // + // The system will make a best effort to provide at least this number of + // streams, but in some cases might provide less. + int32 preferred_min_stream_count = 4; +} + +// Request message for `ReadRows`. +message ReadRowsRequest { + // Required. Stream to read rows from. + string read_stream = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/ReadStream" + } + ]; + + // The offset requested must be less than the last row read from Read. + // Requesting a larger offset is undefined. If not specified, start reading + // from offset zero. + int64 offset = 2; +} + +// Information on if the current connection is being throttled. +message ThrottleState { + // How much this connection is being throttled. Zero means no throttling, + // 100 means fully throttled. + int32 throttle_percent = 1; +} + +// Estimated stream statistics for a given read Stream. +message StreamStats { + message Progress { + // The fraction of rows assigned to the stream that have been processed by + // the server so far, not including the rows in the current response + // message. + // + // This value, along with `at_response_end`, can be used to interpolate + // the progress made as the rows in the message are being processed using + // the following formula: `at_response_start + (at_response_end - + // at_response_start) * rows_processed_from_response / rows_in_response`. + // + // Note that if a filter is provided, the `at_response_end` value of the + // previous response may not necessarily be equal to the + // `at_response_start` value of the current response. + double at_response_start = 1; + + // Similar to `at_response_start`, except that this value includes the + // rows in the current response. + double at_response_end = 2; + } + + // Represents the progress of the current stream. + Progress progress = 2; +} + +// Response from calling `ReadRows` may include row data, progress and +// throttling information. +message ReadRowsResponse { + // Row data is returned in format specified during session creation. + oneof rows { + // Serialized row data in AVRO format. + AvroRows avro_rows = 3; + + // Serialized row data in Arrow RecordBatch format. + ArrowRecordBatch arrow_record_batch = 4; + } + + // Number of serialized rows in the rows block. + int64 row_count = 6; + + // Statistics for the stream. + StreamStats stats = 2; + + // Throttling state. If unset, the latest response still describes + // the current throttling status. + ThrottleState throttle_state = 5; + + // The schema for the read. If read_options.selected_fields is set, the + // schema may be different from the table schema as it will only contain + // the selected fields. This schema is equivalent to the one returned by + // CreateSession. This field is only populated in the first ReadRowsResponse + // RPC. + oneof schema { + // Output only. Avro schema. + AvroSchema avro_schema = 7 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Arrow schema. + ArrowSchema arrow_schema = 8 [(google.api.field_behavior) = OUTPUT_ONLY]; + } +} + +// Request message for `SplitReadStream`. +message SplitReadStreamRequest { + // Required. Name of the stream to split. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/ReadStream" + } + ]; + + // A value in the range (0.0, 1.0) that specifies the fractional point at + // which the original stream should be split. The actual split point is + // evaluated on pre-filtered rows, so if a filter is provided, then there is + // no guarantee that the division of the rows between the new child streams + // will be proportional to this fractional value. Additionally, because the + // server-side unit for assigning data is collections of rows, this fraction + // will always map to a data storage boundary on the server side. + double fraction = 2; +} + +// Response message for `SplitReadStream`. +message SplitReadStreamResponse { + // Primary stream, which contains the beginning portion of + // |original_stream|. An empty value indicates that the original stream can no + // longer be split. + ReadStream primary_stream = 1; + + // Remainder stream, which contains the tail of |original_stream|. An empty + // value indicates that the original stream can no longer be split. + ReadStream remainder_stream = 2; +} + +// Request message for `CreateWriteStream`. +message CreateWriteStreamRequest { + // Required. Reference to the table to which the stream belongs, in the format + // of `projects/{project}/datasets/{dataset}/tables/{table}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" } + ]; + + // Required. Stream to be created. + WriteStream write_stream = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Request message for `AppendRows`. +// +// Because AppendRows is a bidirectional streaming RPC, certain parts of the +// AppendRowsRequest need only be specified for the first request before +// switching table destinations. You can also switch table destinations within +// the same connection for the default stream. +// +// The size of a single AppendRowsRequest must be less than 10 MB in size. +// Requests larger than this return an error, typically `INVALID_ARGUMENT`. +message AppendRowsRequest { + // ProtoData contains the data rows and schema when constructing append + // requests. + message ProtoData { + // The protocol buffer schema used to serialize the data. Provide this value + // whenever: + // + // * You send the first request of an RPC connection. + // + // * You change the input schema. + // + // * You specify a new destination table. + ProtoSchema writer_schema = 1; + + // Serialized row data in protobuf message format. + // Currently, the backend expects the serialized rows to adhere to + // proto2 semantics when appending rows, particularly with respect to + // how default values are encoded. + ProtoRows rows = 2; + } + + // An enum to indicate how to interpret missing values of fields that are + // present in user schema but missing in rows. A missing value can represent a + // NULL or a column default value defined in BigQuery table schema. + enum MissingValueInterpretation { + // Invalid missing value interpretation. Requests with this value will be + // rejected. + MISSING_VALUE_INTERPRETATION_UNSPECIFIED = 0; + + // Missing value is interpreted as NULL. + NULL_VALUE = 1; + + // Missing value is interpreted as column default value if declared in the + // table schema, NULL otherwise. + DEFAULT_VALUE = 2; + } + + // Required. The write_stream identifies the append operation. It must be + // provided in the following scenarios: + // + // * In the first request to an AppendRows connection. + // + // * In all subsequent requests to an AppendRows connection, if you use the + // same connection to write to multiple tables or change the input schema for + // default streams. + // + // For explicitly created write streams, the format is: + // + // * `projects/{project}/datasets/{dataset}/tables/{table}/streams/{id}` + // + // For the special default stream, the format is: + // + // * `projects/{project}/datasets/{dataset}/tables/{table}/streams/_default`. + // + // An example of a possible sequence of requests with write_stream fields + // within a single connection: + // + // * r1: {write_stream: stream_name_1} + // + // * r2: {write_stream: /*omit*/} + // + // * r3: {write_stream: /*omit*/} + // + // * r4: {write_stream: stream_name_2} + // + // * r5: {write_stream: stream_name_2} + // + // The destination changed in request_4, so the write_stream field must be + // populated in all subsequent requests in this stream. + string write_stream = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/WriteStream" + } + ]; + + // If present, the write is only performed if the next append offset is same + // as the provided value. If not present, the write is performed at the + // current end of stream. Specifying a value for this field is not allowed + // when calling AppendRows for the '_default' stream. + google.protobuf.Int64Value offset = 2; + + // Input rows. The `writer_schema` field must be specified at the initial + // request and currently, it will be ignored if specified in following + // requests. Following requests must have data in the same format as the + // initial request. + oneof rows { + // Rows in proto format. + ProtoData proto_rows = 4; + } + + // Id set by client to annotate its identity. Only initial request setting is + // respected. + string trace_id = 6; + + // A map to indicate how to interpret missing value for some fields. Missing + // values are fields present in user schema but missing in rows. The key is + // the field name. The value is the interpretation of missing values for the + // field. + // + // For example, a map {'foo': NULL_VALUE, 'bar': DEFAULT_VALUE} means all + // missing values in field foo are interpreted as NULL, all missing values in + // field bar are interpreted as the default value of field bar in table + // schema. + // + // If a field is not in this map and has missing values, the missing values + // in this field are interpreted as NULL. + // + // This field only applies to the current request, it won't affect other + // requests on the connection. + // + // Currently, field name can only be top-level column name, can't be a struct + // field path like 'foo.bar'. + map missing_value_interpretations = 7; + + // Optional. Default missing value interpretation for all columns in the + // table. When a value is specified on an `AppendRowsRequest`, it is applied + // to all requests on the connection from that point forward, until a + // subsequent `AppendRowsRequest` sets it to a different value. + // `missing_value_interpretation` can override + // `default_missing_value_interpretation`. For example, if you want to write + // `NULL` instead of using default values for some columns, you can set + // `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same + // time, set `missing_value_interpretations` to `NULL_VALUE` on those columns. + MissingValueInterpretation default_missing_value_interpretation = 8 + [(google.api.field_behavior) = OPTIONAL]; +} + +// Response message for `AppendRows`. +message AppendRowsResponse { + // AppendResult is returned for successful append requests. + message AppendResult { + // The row offset at which the last append occurred. The offset will not be + // set if appending using default streams. + google.protobuf.Int64Value offset = 1; + } + + oneof response { + // Result if the append is successful. + AppendResult append_result = 1; + + // Error returned when problems were encountered. If present, + // it indicates rows were not accepted into the system. + // Users can retry or continue with other append requests within the + // same connection. + // + // Additional information about error signalling: + // + // ALREADY_EXISTS: Happens when an append specified an offset, and the + // backend already has received data at this offset. Typically encountered + // in retry scenarios, and can be ignored. + // + // OUT_OF_RANGE: Returned when the specified offset in the stream is beyond + // the current end of the stream. + // + // INVALID_ARGUMENT: Indicates a malformed request or data. + // + // ABORTED: Request processing is aborted because of prior failures. The + // request can be retried if previous failure is addressed. + // + // INTERNAL: Indicates server side error(s) that can be retried. + google.rpc.Status error = 2; + } + + // If backend detects a schema update, pass it to user so that user can + // use it to input new type of message. It will be empty when no schema + // updates have occurred. + TableSchema updated_schema = 3; + + // If a request failed due to corrupted rows, no rows in the batch will be + // appended. The API will return row level error info, so that the caller can + // remove the bad rows and retry the request. + repeated RowError row_errors = 4; + + // The target of the append operation. Matches the write_stream in the + // corresponding request. + string write_stream = 5; +} + +// Request message for `GetWriteStreamRequest`. +message GetWriteStreamRequest { + // Required. Name of the stream to get, in the form of + // `projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/WriteStream" + } + ]; + + // Indicates whether to get full or partial view of the WriteStream. If + // not set, view returned will be basic. + WriteStreamView view = 3; +} + +// Request message for `BatchCommitWriteStreams`. +message BatchCommitWriteStreamsRequest { + // Required. Parent table that all the streams should belong to, in the form + // of `projects/{project}/datasets/{dataset}/tables/{table}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" } + ]; + + // Required. The group of streams that will be committed atomically. + repeated string write_streams = 2 [(google.api.field_behavior) = REQUIRED]; +} + +// Response message for `BatchCommitWriteStreams`. +message BatchCommitWriteStreamsResponse { + // The time at which streams were committed in microseconds granularity. + // This field will only exist when there are no stream errors. + // **Note** if this field is not set, it means the commit was not successful. + google.protobuf.Timestamp commit_time = 1; + + // Stream level error if commit failed. Only streams with error will be in + // the list. + // If empty, there is no error and all streams are committed successfully. + // If non empty, certain streams have errors and ZERO stream is committed due + // to atomicity guarantee. + repeated StorageError stream_errors = 2; +} + +// Request message for invoking `FinalizeWriteStream`. +message FinalizeWriteStreamRequest { + // Required. Name of the stream to finalize, in the form of + // `projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}`. + string name = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/WriteStream" + } + ]; +} + +// Response message for `FinalizeWriteStream`. +message FinalizeWriteStreamResponse { + // Number of rows in the finalized stream. + int64 row_count = 1; +} + +// Request message for `FlushRows`. +message FlushRowsRequest { + // Required. The stream that is the target of the flush operation. + string write_stream = 1 [ + (google.api.field_behavior) = REQUIRED, + (google.api.resource_reference) = { + type: "bigquerystorage.googleapis.com/WriteStream" + } + ]; + + // Ending offset of the flush operation. Rows before this offset(including + // this offset) will be flushed. + google.protobuf.Int64Value offset = 2; +} + +// Respond message for `FlushRows`. +message FlushRowsResponse { + // The rows before this offset (including this offset) are flushed. + int64 offset = 1; +} + +// Structured custom BigQuery Storage error message. The error can be attached +// as error details in the returned rpc Status. In particular, the use of error +// codes allows more structured error handling, and reduces the need to evaluate +// unstructured error text strings. +message StorageError { + // Error code for `StorageError`. + enum StorageErrorCode { + // Default error. + STORAGE_ERROR_CODE_UNSPECIFIED = 0; + + // Table is not found in the system. + TABLE_NOT_FOUND = 1; + + // Stream is already committed. + STREAM_ALREADY_COMMITTED = 2; + + // Stream is not found. + STREAM_NOT_FOUND = 3; + + // Invalid Stream type. + // For example, you try to commit a stream that is not pending. + INVALID_STREAM_TYPE = 4; + + // Invalid Stream state. + // For example, you try to commit a stream that is not finalized or is + // garbaged. + INVALID_STREAM_STATE = 5; + + // Stream is finalized. + STREAM_FINALIZED = 6; + + // There is a schema mismatch and it is caused by user schema has extra + // field than bigquery schema. + SCHEMA_MISMATCH_EXTRA_FIELDS = 7; + + // Offset already exists. + OFFSET_ALREADY_EXISTS = 8; + + // Offset out of range. + OFFSET_OUT_OF_RANGE = 9; + + // Customer-managed encryption key (CMEK) not provided for CMEK-enabled + // data. + CMEK_NOT_PROVIDED = 10; + + // Customer-managed encryption key (CMEK) was incorrectly provided. + INVALID_CMEK_PROVIDED = 11; + + // There is an encryption error while using customer-managed encryption key. + CMEK_ENCRYPTION_ERROR = 12; + + // Key Management Service (KMS) service returned an error, which can be + // retried. + KMS_SERVICE_ERROR = 13; + + // Permission denied while using customer-managed encryption key. + KMS_PERMISSION_DENIED = 14; + } + + // BigQuery Storage specific error code. + StorageErrorCode code = 1; + + // Name of the failed entity. + string entity = 2; + + // Message that describes the error. + string error_message = 3; +} + +// The message that presents row level error info in a request. +message RowError { + // Error code for `RowError`. + enum RowErrorCode { + // Default error. + ROW_ERROR_CODE_UNSPECIFIED = 0; + + // One or more fields in the row has errors. + FIELDS_ERROR = 1; + } + + // Index of the malformed row in the request. + int64 index = 1; + + // Structured error reason for a row error. + RowErrorCode code = 2; + + // Description of the issue encountered when processing the row. + string message = 3; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/stream.proto b/proto/third-party/google/cloud/bigquery/storage/v1/stream.proto new file mode 100644 index 0000000000000..785c74f788df8 --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/stream.proto @@ -0,0 +1,315 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +import "google/api/field_behavior.proto"; +import "google/api/resource.proto"; +import "google/cloud/bigquery/storage/v1/arrow.proto"; +import "google/cloud/bigquery/storage/v1/avro.proto"; +import "google/cloud/bigquery/storage/v1/table.proto"; +import "google/protobuf/timestamp.proto"; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "StreamProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; + +// Data format for input or output data. +enum DataFormat { + // Data format is unspecified. + DATA_FORMAT_UNSPECIFIED = 0; + + // Avro is a standard open source row based file format. + // See https://avro.apache.org/ for more details. + AVRO = 1; + + // Arrow is a standard open source column-based message format. + // See https://arrow.apache.org/ for more details. + ARROW = 2; +} + +// Information about the ReadSession. +message ReadSession { + option (google.api.resource) = { + type: "bigquerystorage.googleapis.com/ReadSession" + pattern: "projects/{project}/locations/{location}/sessions/{session}" + }; + + // Additional attributes when reading a table. + message TableModifiers { + // The snapshot time of the table. If not set, interpreted as now. + google.protobuf.Timestamp snapshot_time = 1; + } + + // Options dictating how we read a table. + message TableReadOptions { + // Optional. The names of the fields in the table to be returned. If no + // field names are specified, then all fields in the table are returned. + // + // Nested fields -- the child elements of a STRUCT field -- can be selected + // individually using their fully-qualified names, and will be returned as + // record fields containing only the selected nested fields. If a STRUCT + // field is specified in the selected fields list, all of the child elements + // will be returned. + // + // As an example, consider a table with the following schema: + // + // { + // "name": "struct_field", + // "type": "RECORD", + // "mode": "NULLABLE", + // "fields": [ + // { + // "name": "string_field1", + // "type": "STRING", + // . "mode": "NULLABLE" + // }, + // { + // "name": "string_field2", + // "type": "STRING", + // "mode": "NULLABLE" + // } + // ] + // } + // + // Specifying "struct_field" in the selected fields list will result in a + // read session schema with the following logical structure: + // + // struct_field { + // string_field1 + // string_field2 + // } + // + // Specifying "struct_field.string_field1" in the selected fields list will + // result in a read session schema with the following logical structure: + // + // struct_field { + // string_field1 + // } + // + // The order of the fields in the read session schema is derived from the + // table schema and does not correspond to the order in which the fields are + // specified in this list. + repeated string selected_fields = 1; + + // SQL text filtering statement, similar to a WHERE clause in a query. + // Aggregates are not supported. + // + // Examples: "int_field > 5" + // "date_field = CAST('2014-9-27' as DATE)" + // "nullable_field is not NULL" + // "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))" + // "numeric_field BETWEEN 1.0 AND 5.0" + // + // Restricted to a maximum length for 1 MB. + string row_restriction = 2; + + oneof output_format_serialization_options { + // Optional. Options specific to the Apache Arrow output format. + ArrowSerializationOptions arrow_serialization_options = 3 + [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Options specific to the Apache Avro output format + AvroSerializationOptions avro_serialization_options = 4 + [(google.api.field_behavior) = OPTIONAL]; + } + + // Optional. Specifies a table sampling percentage. Specifically, the query + // planner will use TABLESAMPLE SYSTEM (sample_percentage PERCENT). The + // sampling percentage is applied at the data block granularity. It will + // randomly choose for each data block whether to read the rows in that data + // block. For more details, see + // https://cloud.google.com/bigquery/docs/table-sampling) + optional double sample_percentage = 5 + [(google.api.field_behavior) = OPTIONAL]; + } + + // Output only. Unique identifier for the session, in the form + // `projects/{project_id}/locations/{location}/sessions/{session_id}`. + string name = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Time at which the session becomes invalid. After this time, + // subsequent requests to read this Session will return errors. The + // expire_time is automatically assigned and currently cannot be specified or + // updated. + google.protobuf.Timestamp expire_time = 2 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Immutable. Data format of the output data. DATA_FORMAT_UNSPECIFIED not + // supported. + DataFormat data_format = 3 [(google.api.field_behavior) = IMMUTABLE]; + + // The schema for the read. If read_options.selected_fields is set, the + // schema may be different from the table schema as it will only contain + // the selected fields. + oneof schema { + // Output only. Avro schema. + AvroSchema avro_schema = 4 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Arrow schema. + ArrowSchema arrow_schema = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; + } + + // Immutable. Table that this ReadSession is reading from, in the form + // `projects/{project_id}/datasets/{dataset_id}/tables/{table_id}` + string table = 6 [ + (google.api.field_behavior) = IMMUTABLE, + (google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" } + ]; + + // Optional. Any modifiers which are applied when reading from the specified + // table. + TableModifiers table_modifiers = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Read options for this session (e.g. column selection, filters). + TableReadOptions read_options = 8 [(google.api.field_behavior) = OPTIONAL]; + + // Output only. A list of streams created with the session. + // + // At least one stream is created with the session. In the future, larger + // request_stream_count values *may* result in this list being unpopulated, + // in that case, the user will need to use a List method to get the streams + // instead, which is not yet available. + repeated ReadStream streams = 10 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. An estimate on the number of bytes this session will scan when + // all streams are completely consumed. This estimate is based on + // metadata from the table which might be incomplete or stale. + int64 estimated_total_bytes_scanned = 12 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. A pre-projected estimate of the total physical size of files + // (in bytes) that this session will scan when all streams are consumed. This + // estimate is independent of the selected columns and can be based on + // incomplete or stale metadata from the table. This field is only set for + // BigLake tables. + int64 estimated_total_physical_file_size = 15 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. An estimate on the number of rows present in this session's + // streams. This estimate is based on metadata from the table which might be + // incomplete or stale. + int64 estimated_row_count = 14 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Optional. ID set by client to annotate a session identity. This does not + // need to be strictly unique, but instead the same ID should be used to group + // logically connected sessions (e.g. All using the same ID for all sessions + // needed to complete a Spark SQL query is reasonable). + // + // Maximum length is 256 bytes. + string trace_id = 13 [(google.api.field_behavior) = OPTIONAL]; +} + +// Information about a single stream that gets data out of the storage system. +// Most of the information about `ReadStream` instances is aggregated, making +// `ReadStream` lightweight. +message ReadStream { + option (google.api.resource) = { + type: "bigquerystorage.googleapis.com/ReadStream" + pattern: "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}" + }; + + // Output only. Name of the stream, in the form + // `projects/{project_id}/locations/{location}/sessions/{session_id}/streams/{stream_id}`. + string name = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; +} + +// WriteStreamView is a view enum that controls what details about a write +// stream should be returned. +enum WriteStreamView { + // The default / unset value. + WRITE_STREAM_VIEW_UNSPECIFIED = 0; + + // The BASIC projection returns basic metadata about a write stream. The + // basic view does not include schema information. This is the default view + // returned by GetWriteStream. + BASIC = 1; + + // The FULL projection returns all available write stream metadata, including + // the schema. CreateWriteStream returns the full projection of write stream + // metadata. + FULL = 2; +} + +// Information about a single stream that gets data inside the storage system. +message WriteStream { + option (google.api.resource) = { + type: "bigquerystorage.googleapis.com/WriteStream" + pattern: "projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}" + }; + + // Type enum of the stream. + enum Type { + // Unknown type. + TYPE_UNSPECIFIED = 0; + + // Data will commit automatically and appear as soon as the write is + // acknowledged. + COMMITTED = 1; + + // Data is invisible until the stream is committed. + PENDING = 2; + + // Data is only visible up to the offset to which it was flushed. + BUFFERED = 3; + } + + // Mode enum of the stream. + enum WriteMode { + // Unknown type. + WRITE_MODE_UNSPECIFIED = 0; + + // Insert new records into the table. + // It is the default value if customers do not specify it. + INSERT = 1; + } + + // Output only. Name of the stream, in the form + // `projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}`. + string name = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Immutable. Type of the stream. + Type type = 2 [(google.api.field_behavior) = IMMUTABLE]; + + // Output only. Create time of the stream. For the _default stream, this is + // the creation_time of the table. + google.protobuf.Timestamp create_time = 3 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Commit time of the stream. + // If a stream is of `COMMITTED` type, then it will have a commit_time same as + // `create_time`. If the stream is of `PENDING` type, empty commit_time + // means it is not committed. + google.protobuf.Timestamp commit_time = 4 + [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. The schema of the destination table. It is only returned in + // `CreateWriteStream` response. Caller should generate data that's + // compatible with this schema to send in initial `AppendRowsRequest`. + // The table schema could go out of date during the life time of the stream. + TableSchema table_schema = 5 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Immutable. Mode of the stream. + WriteMode write_mode = 7 [(google.api.field_behavior) = IMMUTABLE]; + + // Immutable. The geographic location where the stream's dataset resides. See + // https://cloud.google.com/bigquery/docs/locations for supported + // locations. + string location = 8 [(google.api.field_behavior) = IMMUTABLE]; +} diff --git a/proto/third-party/google/cloud/bigquery/storage/v1/table.proto b/proto/third-party/google/cloud/bigquery/storage/v1/table.proto new file mode 100644 index 0000000000000..47629c510e672 --- /dev/null +++ b/proto/third-party/google/cloud/bigquery/storage/v1/table.proto @@ -0,0 +1,171 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.cloud.bigquery.storage.v1; + +import "google/api/field_behavior.proto"; + +option csharp_namespace = "Google.Cloud.BigQuery.Storage.V1"; +option go_package = "cloud.google.com/go/bigquery/storage/apiv1/storagepb;storagepb"; +option java_multiple_files = true; +option java_outer_classname = "TableProto"; +option java_package = "com.google.cloud.bigquery.storage.v1"; +option php_namespace = "Google\\Cloud\\BigQuery\\Storage\\V1"; + +// Schema of a table. This schema is a subset of +// google.cloud.bigquery.v2.TableSchema containing information necessary to +// generate valid message to write to BigQuery. +message TableSchema { + // Describes the fields in a table. + repeated TableFieldSchema fields = 1; +} + +// TableFieldSchema defines a single field/column within a table schema. +message TableFieldSchema { + enum Type { + // Illegal value + TYPE_UNSPECIFIED = 0; + + // 64K, UTF8 + STRING = 1; + + // 64-bit signed + INT64 = 2; + + // 64-bit IEEE floating point + DOUBLE = 3; + + // Aggregate type + STRUCT = 4; + + // 64K, Binary + BYTES = 5; + + // 2-valued + BOOL = 6; + + // 64-bit signed usec since UTC epoch + TIMESTAMP = 7; + + // Civil date - Year, Month, Day + DATE = 8; + + // Civil time - Hour, Minute, Second, Microseconds + TIME = 9; + + // Combination of civil date and civil time + DATETIME = 10; + + // Geography object + GEOGRAPHY = 11; + + // Numeric value + NUMERIC = 12; + + // BigNumeric value + BIGNUMERIC = 13; + + // Interval + INTERVAL = 14; + + // JSON, String + JSON = 15; + } + + enum Mode { + // Illegal value + MODE_UNSPECIFIED = 0; + + NULLABLE = 1; + + REQUIRED = 2; + + REPEATED = 3; + } + + // Required. The field name. The name must contain only letters (a-z, A-Z), + // numbers (0-9), or underscores (_), and must start with a letter or + // underscore. The maximum length is 128 characters. + string name = 1 [(google.api.field_behavior) = REQUIRED]; + + // Required. The field data type. + Type type = 2 [(google.api.field_behavior) = REQUIRED]; + + // Optional. The field mode. The default value is NULLABLE. + Mode mode = 3 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Describes the nested schema fields if the type property is set to + // STRUCT. + repeated TableFieldSchema fields = 4 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. The field description. The maximum length is 1,024 characters. + string description = 6 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Maximum length of values of this field for STRINGS or BYTES. + // + // If max_length is not specified, no maximum length constraint is imposed + // on this field. + // + // If type = "STRING", then max_length represents the maximum UTF-8 + // length of strings in this field. + // + // If type = "BYTES", then max_length represents the maximum number of + // bytes in this field. + // + // It is invalid to set this field if type is not "STRING" or "BYTES". + int64 max_length = 7 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. Precision (maximum number of total digits in base 10) and scale + // (maximum number of digits in the fractional part in base 10) constraints + // for values of this field for NUMERIC or BIGNUMERIC. + // + // It is invalid to set precision or scale if type is not "NUMERIC" or + // "BIGNUMERIC". + // + // If precision and scale are not specified, no value range constraint is + // imposed on this field insofar as values are permitted by the type. + // + // Values of this NUMERIC or BIGNUMERIC field must be in this range when: + // + // * Precision (P) and scale (S) are specified: + // [-10^(P-S) + 10^(-S), 10^(P-S) - 10^(-S)] + // * Precision (P) is specified but not scale (and thus scale is + // interpreted to be equal to zero): + // [-10^P + 1, 10^P - 1]. + // + // Acceptable values for precision and scale if both are specified: + // + // * If type = "NUMERIC": + // 1 <= precision - scale <= 29 and 0 <= scale <= 9. + // * If type = "BIGNUMERIC": + // 1 <= precision - scale <= 38 and 0 <= scale <= 38. + // + // Acceptable values for precision if only precision is specified but not + // scale (and thus scale is interpreted to be equal to zero): + // + // * If type = "NUMERIC": 1 <= precision <= 29. + // * If type = "BIGNUMERIC": 1 <= precision <= 38. + // + // If scale is specified but not precision, then it is invalid. + int64 precision = 8 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. See documentation for precision. + int64 scale = 9 [(google.api.field_behavior) = OPTIONAL]; + + // Optional. A SQL expression to specify the [default value] + // (https://cloud.google.com/bigquery/docs/default-values) for this field. + string default_value_expression = 10 [(google.api.field_behavior) = OPTIONAL]; +} diff --git a/proto/third-party/google/protobuf/wrappers.proto b/proto/third-party/google/protobuf/wrappers.proto new file mode 100644 index 0000000000000..1959fa55a4e7f --- /dev/null +++ b/proto/third-party/google/protobuf/wrappers.proto @@ -0,0 +1,123 @@ +// Protocol Buffers - Google's data interchange format +// Copyright 2008 Google Inc. All rights reserved. +// https://developers.google.com/protocol-buffers/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// Wrappers for primitive (non-message) types. These types are useful +// for embedding primitives in the `google.protobuf.Any` type and for places +// where we need to distinguish between the absence of a primitive +// typed field and its default value. +// +// These wrappers have no meaningful use within repeated fields as they lack +// the ability to detect presence on individual elements. +// These wrappers have no meaningful use within a map or a oneof since +// individual entries of a map or fields of a oneof can already detect presence. + +syntax = "proto3"; + +package google.protobuf; + +option cc_enable_arenas = true; +option go_package = "google.golang.org/protobuf/types/known/wrapperspb"; +option java_package = "com.google.protobuf"; +option java_outer_classname = "WrappersProto"; +option java_multiple_files = true; +option objc_class_prefix = "GPB"; +option csharp_namespace = "Google.Protobuf.WellKnownTypes"; + +// Wrapper message for `double`. +// +// The JSON representation for `DoubleValue` is JSON number. +message DoubleValue { + // The double value. + double value = 1; +} + +// Wrapper message for `float`. +// +// The JSON representation for `FloatValue` is JSON number. +message FloatValue { + // The float value. + float value = 1; +} + +// Wrapper message for `int64`. +// +// The JSON representation for `Int64Value` is JSON string. +message Int64Value { + // The int64 value. + int64 value = 1; +} + +// Wrapper message for `uint64`. +// +// The JSON representation for `UInt64Value` is JSON string. +message UInt64Value { + // The uint64 value. + uint64 value = 1; +} + +// Wrapper message for `int32`. +// +// The JSON representation for `Int32Value` is JSON number. +message Int32Value { + // The int32 value. + int32 value = 1; +} + +// Wrapper message for `uint32`. +// +// The JSON representation for `UInt32Value` is JSON number. +message UInt32Value { + // The uint32 value. + uint32 value = 1; +} + +// Wrapper message for `bool`. +// +// The JSON representation for `BoolValue` is JSON `true` and `false`. +message BoolValue { + // The bool value. + bool value = 1; +} + +// Wrapper message for `string`. +// +// The JSON representation for `StringValue` is JSON string. +message StringValue { + // The string value. + string value = 1; +} + +// Wrapper message for `bytes`. +// +// The JSON representation for `BytesValue` is JSON string. +message BytesValue { + // The bytes value. + bytes value = 1; +} diff --git a/proto/third-party/google/rpc/code.proto b/proto/third-party/google/rpc/code.proto new file mode 100644 index 0000000000000..7c810af40f08b --- /dev/null +++ b/proto/third-party/google/rpc/code.proto @@ -0,0 +1,186 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package google.rpc; + +option go_package = "google.golang.org/genproto/googleapis/rpc/code;code"; +option java_multiple_files = true; +option java_outer_classname = "CodeProto"; +option java_package = "com.google.rpc"; +option objc_class_prefix = "RPC"; + +// The canonical error codes for gRPC APIs. +// +// +// Sometimes multiple error codes may apply. Services should return +// the most specific error code that applies. For example, prefer +// `OUT_OF_RANGE` over `FAILED_PRECONDITION` if both codes apply. +// Similarly prefer `NOT_FOUND` or `ALREADY_EXISTS` over `FAILED_PRECONDITION`. +enum Code { + // Not an error; returned on success. + // + // HTTP Mapping: 200 OK + OK = 0; + + // The operation was cancelled, typically by the caller. + // + // HTTP Mapping: 499 Client Closed Request + CANCELLED = 1; + + // Unknown error. For example, this error may be returned when + // a `Status` value received from another address space belongs to + // an error space that is not known in this address space. Also + // errors raised by APIs that do not return enough error information + // may be converted to this error. + // + // HTTP Mapping: 500 Internal Server Error + UNKNOWN = 2; + + // The client specified an invalid argument. Note that this differs + // from `FAILED_PRECONDITION`. `INVALID_ARGUMENT` indicates arguments + // that are problematic regardless of the state of the system + // (e.g., a malformed file name). + // + // HTTP Mapping: 400 Bad Request + INVALID_ARGUMENT = 3; + + // The deadline expired before the operation could complete. For operations + // that change the state of the system, this error may be returned + // even if the operation has completed successfully. For example, a + // successful response from a server could have been delayed long + // enough for the deadline to expire. + // + // HTTP Mapping: 504 Gateway Timeout + DEADLINE_EXCEEDED = 4; + + // Some requested entity (e.g., file or directory) was not found. + // + // Note to server developers: if a request is denied for an entire class + // of users, such as gradual feature rollout or undocumented allowlist, + // `NOT_FOUND` may be used. If a request is denied for some users within + // a class of users, such as user-based access control, `PERMISSION_DENIED` + // must be used. + // + // HTTP Mapping: 404 Not Found + NOT_FOUND = 5; + + // The entity that a client attempted to create (e.g., file or directory) + // already exists. + // + // HTTP Mapping: 409 Conflict + ALREADY_EXISTS = 6; + + // The caller does not have permission to execute the specified + // operation. `PERMISSION_DENIED` must not be used for rejections + // caused by exhausting some resource (use `RESOURCE_EXHAUSTED` + // instead for those errors). `PERMISSION_DENIED` must not be + // used if the caller can not be identified (use `UNAUTHENTICATED` + // instead for those errors). This error code does not imply the + // request is valid or the requested entity exists or satisfies + // other pre-conditions. + // + // HTTP Mapping: 403 Forbidden + PERMISSION_DENIED = 7; + + // The request does not have valid authentication credentials for the + // operation. + // + // HTTP Mapping: 401 Unauthorized + UNAUTHENTICATED = 16; + + // Some resource has been exhausted, perhaps a per-user quota, or + // perhaps the entire file system is out of space. + // + // HTTP Mapping: 429 Too Many Requests + RESOURCE_EXHAUSTED = 8; + + // The operation was rejected because the system is not in a state + // required for the operation's execution. For example, the directory + // to be deleted is non-empty, an rmdir operation is applied to + // a non-directory, etc. + // + // Service implementors can use the following guidelines to decide + // between `FAILED_PRECONDITION`, `ABORTED`, and `UNAVAILABLE`: + // (a) Use `UNAVAILABLE` if the client can retry just the failing call. + // (b) Use `ABORTED` if the client should retry at a higher level. For + // example, when a client-specified test-and-set fails, indicating the + // client should restart a read-modify-write sequence. + // (c) Use `FAILED_PRECONDITION` if the client should not retry until + // the system state has been explicitly fixed. For example, if an "rmdir" + // fails because the directory is non-empty, `FAILED_PRECONDITION` + // should be returned since the client should not retry unless + // the files are deleted from the directory. + // + // HTTP Mapping: 400 Bad Request + FAILED_PRECONDITION = 9; + + // The operation was aborted, typically due to a concurrency issue such as + // a sequencer check failure or transaction abort. + // + // See the guidelines above for deciding between `FAILED_PRECONDITION`, + // `ABORTED`, and `UNAVAILABLE`. + // + // HTTP Mapping: 409 Conflict + ABORTED = 10; + + // The operation was attempted past the valid range. E.g., seeking or + // reading past end-of-file. + // + // Unlike `INVALID_ARGUMENT`, this error indicates a problem that may + // be fixed if the system state changes. For example, a 32-bit file + // system will generate `INVALID_ARGUMENT` if asked to read at an + // offset that is not in the range [0,2^32-1], but it will generate + // `OUT_OF_RANGE` if asked to read from an offset past the current + // file size. + // + // There is a fair bit of overlap between `FAILED_PRECONDITION` and + // `OUT_OF_RANGE`. We recommend using `OUT_OF_RANGE` (the more specific + // error) when it applies so that callers who are iterating through + // a space can easily look for an `OUT_OF_RANGE` error to detect when + // they are done. + // + // HTTP Mapping: 400 Bad Request + OUT_OF_RANGE = 11; + + // The operation is not implemented or is not supported/enabled in this + // service. + // + // HTTP Mapping: 501 Not Implemented + UNIMPLEMENTED = 12; + + // Internal errors. This means that some invariants expected by the + // underlying system have been broken. This error code is reserved + // for serious errors. + // + // HTTP Mapping: 500 Internal Server Error + INTERNAL = 13; + + // The service is currently unavailable. This is most likely a + // transient condition, which can be corrected by retrying with + // a backoff. Note that it is not always safe to retry + // non-idempotent operations. + // + // See the guidelines above for deciding between `FAILED_PRECONDITION`, + // `ABORTED`, and `UNAVAILABLE`. + // + // HTTP Mapping: 503 Service Unavailable + UNAVAILABLE = 14; + + // Unrecoverable data loss or corruption. + // + // HTTP Mapping: 500 Internal Server Error + DATA_LOSS = 15; +} diff --git a/src/gcp.rs b/src/gcp.rs index 148fa9dec501a..782408896cae7 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -30,6 +30,7 @@ const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200; const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2; +pub const BIGQUERY_STORAGE_URL: &str = "https://bigquerystorage.googleapis.com:443"; pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com"; pub static PUBSUB_ADDRESS: Lazy = Lazy::new(|| { diff --git a/src/sinks/gcp/bigquery/config.rs b/src/sinks/gcp/bigquery/config.rs new file mode 100644 index 0000000000000..0fda77182994b --- /dev/null +++ b/src/sinks/gcp/bigquery/config.rs @@ -0,0 +1,208 @@ +use codecs::encoding::ProtobufSerializerConfig; +use futures::FutureExt; +use http::Uri; +use indoc::indoc; +use tonic::transport::Channel; +use vector_config::configurable_component; + +use super::proto::google::cloud::bigquery::storage::v1 as proto; +use super::request_builder::{BigqueryRequestBuilder, MAX_BATCH_PAYLOAD_SIZE}; +use super::service::{AuthInterceptor, BigqueryService}; +use super::sink::BigquerySink; +use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}; +use crate::gcp::{GcpAuthConfig, GcpAuthenticator, Scope, BIGQUERY_STORAGE_URL}; +use crate::sinks::util::{BatchConfig, SinkBatchSettings, TowerRequestConfig}; +use crate::sinks::{Healthcheck, VectorSink}; + +fn default_endpoint() -> String { + BIGQUERY_STORAGE_URL.to_string() +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct BigqueryDefaultBatchSettings; + +impl SinkBatchSettings for BigqueryDefaultBatchSettings { + const MAX_EVENTS: Option = Some(50_000); // i made this number up, there's no hard limit in BigQuery + const MAX_BYTES: Option = Some(MAX_BATCH_PAYLOAD_SIZE); + const TIMEOUT_SECS: f64 = 1.0; +} + +/// Configuration for the `bigquery` sink. +#[configurable_component(sink("bigquery", "Store events in BigQuery."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct BigqueryConfig { + /// The project name to which to publish events. + #[configurable(metadata(docs::examples = "vector-123456"))] + pub project: String, + + /// The dataset within the project to which to publish events. + #[configurable(metadata(docs::examples = "this-is-a-dataset"))] + pub dataset: String, + + /// The dataset within the dataset to which to publish events. + #[configurable(metadata(docs::examples = "this-is-a-table"))] + pub table: String, + + /// The endpoint to which to publish events. + /// + /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined + /// by the [`GCP BigQuery`][bigquery_api] API are used. + /// + /// The trailing slash `/` must not be included. + /// + /// [bigquery_api]: https://cloud.google.com/bigquery/docs/reference/rest + #[serde(default = "default_endpoint")] + #[configurable(metadata(docs::examples = "https://bigquerystorage.googleapis.com:443"))] + pub endpoint: String, + + #[serde(default, flatten)] + pub auth: GcpAuthConfig, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + // we only support protobuf encoding because that's what the API uses (gRPC) + #[configurable(derived)] + encoding: ProtobufSerializerConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +impl BigqueryConfig { + fn get_write_stream(&self) -> String { + // TODO: support non-default streams + // https://cloud.google.com/bigquery/docs/write-api#application-created_streams + format!( + "projects/{}/datasets/{}/tables/{}/streams/_default", + self.project, self.dataset, self.table + ) + } +} + +impl GenerateConfig for BigqueryConfig { + fn generate_config() -> toml::Value { + toml::from_str(indoc! {r#" + project = "my-project" + dataset = "my-dataset" + table = "my-table" + encoding.protobuf.desc_file = "/etc/vector/proto.desc" + encoding.protobuf.message_type = "BigqueryMessage" + "#}) + .unwrap() + } +} + +/// Create a future that sends a single nothing-request to BigQuery +async fn healthcheck_future( + uri: Uri, + auth: GcpAuthenticator, + write_stream: String, +) -> crate::Result<()> { + let channel = Channel::builder(uri) + .tls_config(tonic::transport::channel::ClientTlsConfig::new()) + .unwrap() + .connect_timeout(std::time::Duration::from_secs(10)) + .connect() + .await?; + let mut client = proto::big_query_write_client::BigQueryWriteClient::with_interceptor( + channel, + AuthInterceptor { auth }, + ); + // specify the write_stream so that there's enough information to perform an IAM check + let stream = tokio_stream::once(proto::AppendRowsRequest { + write_stream, + ..proto::AppendRowsRequest::default() + }); + let mut response = client.append_rows(stream).await?; + // the result is expected to be `InvalidArgument` + // because we use a bunch of empty values in the request + // (and `InvalidArgument` specifically means we made it past the auth check) + if let Err(status) = response.get_mut().message().await { + if status.code() != tonic::Code::InvalidArgument { + return Err(status.into()); + } + } + Ok(()) +} + +#[async_trait::async_trait] +#[typetag::serde(name = "gcp_bigquery")] +impl SinkConfig for BigqueryConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + // `cx.proxy` doesn't apply well to tonic gRPC connections, + // so we don't use it when building the sink. + + // Configure auth and make sure it's constantly up-to-date + let auth = self.auth.build(Scope::BigQueryInsertdata).await?; + auth.spawn_regenerate_token(); + + // Kick off the healthcheck + let healthcheck: Healthcheck = if cx.healthcheck.enabled { + healthcheck_future( + self.endpoint.parse()?, + auth.clone(), + self.get_write_stream(), + ) + .boxed() + } else { + Box::pin(async move { Ok(()) }) + }; + + // Create the gRPC client + let channel = Channel::builder(self.endpoint.parse()?) + .connect_timeout(std::time::Duration::from_secs(10)) + .connect() + .await?; + let service = BigqueryService::with_auth(channel, auth).await?; + + let batcher_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? + .into_batcher_settings()?; + + let protobuf_serializer = self.encoding.build()?; + let write_stream = self.get_write_stream(); + let request_builder = BigqueryRequestBuilder { + protobuf_serializer, + write_stream, + }; + + let sink = BigquerySink { + service, + batcher_settings, + request_builder, + }; + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.input_type()) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +#[cfg(test)] +mod test { + use super::BigqueryConfig; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/gcp/bigquery/integration_tests.rs b/src/sinks/gcp/bigquery/integration_tests.rs new file mode 100644 index 0000000000000..7af3b08de0ad9 --- /dev/null +++ b/src/sinks/gcp/bigquery/integration_tests.rs @@ -0,0 +1,67 @@ +use chrono::{Local, Utc}; +use indoc::formatdoc; +use regex::Regex; +use std::collections::BTreeMap; +use vector_common::finalization::BatchStatus; +use vector_core::event::{BatchNotifier, Event, LogEvent, Value}; +use vector_core::sink::VectorSink; + +use crate::config::SinkConfig; +use crate::sinks::gcp::bigquery::BigqueryConfig; +use crate::test_util::components::{run_and_assert_sink_compliance, SINK_TAGS}; +use crate::test_util::{generate_events_with_stream, random_string, trace_init}; + +/// An event generator that can be used with `generate_events_with_stream` +fn event_generator(index: usize) -> Event { + let now = Local::now().with_timezone(&Utc); + let value = Value::Object(BTreeMap::from([ + ("time".into(), Value::Timestamp(now)), + ("count".into(), Value::Integer(index as i64)), + ("message".into(), Value::from(random_string(64))), + ("user".into(), Value::from("Bob".to_string())), + ])); + Event::Log(LogEvent::from_parts(value, Default::default())) +} + +/// Create a BigquerySink from the local environment +async fn create_sink() -> VectorSink { + let desc_file = std::path::PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("lib/codecs/tests/data/protobuf/integration.desc") + .to_string_lossy() + .into_owned(); + let message_type = "test.Integration"; + let write_stream = std::env::var("TEST_GCP_BIGQUERY_WRITE_STREAM") + .expect("couldn't find the BigQuery write stream in environment variables"); + let re = + Regex::new("^projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/streams/_default$").unwrap(); + let captures = re + .captures(&write_stream) + .expect("malformed BigQuery write stream in environment variables"); + let project = captures.get(1).unwrap().as_str(); + let dataset = captures.get(2).unwrap().as_str(); + let table = captures.get(3).unwrap().as_str(); + let config = formatdoc! {r#" + project = "{project}" + dataset = "{dataset}" + table = "{table}" + encoding.protobuf.desc_file = "{desc_file}" + encoding.protobuf.message_type = "{message_type}" + "#}; + let (bigquery_config, cx) = + crate::sinks::util::test::load_sink::(&config).unwrap(); + let (bigquery_sink, bigquery_healthcheck) = bigquery_config.build(cx).await.unwrap(); + bigquery_healthcheck + .await + .expect("BigQuery healthcheck failed"); + bigquery_sink +} + +#[tokio::test] +async fn gcp_bigquery_sink() { + trace_init(); + let bigquery_sink = create_sink().await; + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_, events) = generate_events_with_stream(event_generator, 10, Some(batch)); + run_and_assert_sink_compliance(bigquery_sink, events, &SINK_TAGS).await; + assert_eq!(Ok(BatchStatus::Delivered), receiver.try_recv()); +} diff --git a/src/sinks/gcp/bigquery/mod.rs b/src/sinks/gcp/bigquery/mod.rs new file mode 100644 index 0000000000000..6936522b00fa8 --- /dev/null +++ b/src/sinks/gcp/bigquery/mod.rs @@ -0,0 +1,35 @@ +//! The BigQuery [`vector_core::sink::VectorSink`] +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance responsible for taking +//! a stream of [`vector_core::event::Event`] and storing them in a BigQuery table. +//! This module uses the BigQuery Storage Write (gRPC) API. + +#[cfg(all(test, feature = "gcp-bigquery-integration-tests"))] +mod integration_tests; +#[cfg(test)] +mod tests; + +mod config; +mod request_builder; +mod service; +mod sink; + +#[allow(warnings, clippy::pedantic, clippy::nursery)] +pub(crate) mod proto { + pub(crate) mod google { + pub(crate) mod cloud { + pub(crate) mod bigquery { + pub(crate) mod storage { + pub(crate) mod v1 { + tonic::include_proto!("google.cloud.bigquery.storage.v1"); + } + } + } + } + pub(crate) mod rpc { + tonic::include_proto!("google.rpc"); + } + } +} + +pub use self::config::BigqueryConfig; diff --git a/src/sinks/gcp/bigquery/request_builder.rs b/src/sinks/gcp/bigquery/request_builder.rs new file mode 100644 index 0000000000000..6e0b3e985caa8 --- /dev/null +++ b/src/sinks/gcp/bigquery/request_builder.rs @@ -0,0 +1,190 @@ +use bytes::BytesMut; +use codecs::encoding::ProtobufSerializer; +use prost::Message; +use std::num::NonZeroUsize; +use tokio_util::codec::Encoder; +use vector_common::request_metadata::RequestMetadata; +use vector_core::event::Finalizable; + +use super::proto::google::cloud::bigquery::storage::v1 as proto; +use super::service::BigqueryRequest; +use crate::event::{Event, EventFinalizers}; +use crate::sinks::util::metadata::RequestMetadataBuilder; +use crate::sinks::util::IncrementalRequestBuilder; + +// 10MB maximum message size: +// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#appendrowsrequest +pub const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000; + +#[derive(Debug, snafu::Snafu)] +pub enum BigqueryRequestBuilderError { + #[snafu(display("Encoding protobuf failed: {}", message))] + ProtobufEncoding { message: String }, // `error` needs to be some concrete type +} + +impl From for BigqueryRequestBuilderError { + fn from(error: vector_common::Error) -> Self { + BigqueryRequestBuilderError::ProtobufEncoding { + message: format!("{:?}", error), + } + } +} + +#[derive(Default)] +pub struct BigqueryRequestMetadata { + request_metadata: RequestMetadata, + finalizers: EventFinalizers, +} + +pub struct BigqueryRequestBuilder { + pub protobuf_serializer: ProtobufSerializer, + pub write_stream: String, +} + +impl BigqueryRequestBuilder { + fn build_proto_data( + &self, + serialized_rows: Vec>, + ) -> (NonZeroUsize, proto::append_rows_request::ProtoData) { + let proto_data = proto::append_rows_request::ProtoData { + writer_schema: Some(proto::ProtoSchema { + proto_descriptor: Some(self.protobuf_serializer.descriptor_proto().clone()), + }), + rows: Some(proto::ProtoRows { serialized_rows }), + }; + let size = NonZeroUsize::new(proto_data.encoded_len()) + .expect("encoded payload can never be empty"); + (size, proto_data) + } +} + +impl IncrementalRequestBuilder> for BigqueryRequestBuilder { + type Metadata = BigqueryRequestMetadata; + type Payload = proto::append_rows_request::ProtoData; + type Request = BigqueryRequest; + type Error = BigqueryRequestBuilderError; + + fn encode_events_incremental( + &mut self, + input: Vec, + ) -> Vec> { + let base_proto_data_size: NonZeroUsize = self.build_proto_data(vec![]).0; + let max_serialized_rows_len: usize = MAX_BATCH_PAYLOAD_SIZE - base_proto_data_size.get(); + let metadata = RequestMetadataBuilder::from_events(&input); + let mut errors: Vec = vec![]; + let mut bodies: Vec<(EventFinalizers, (NonZeroUsize, Self::Payload))> = vec![]; + let mut event_finalizers = EventFinalizers::DEFAULT; + let mut serialized_rows: Vec> = vec![]; + let mut serialized_rows_len: usize = 0; + for mut event in input.into_iter() { + let current_event_finalizers = event.take_finalizers(); + let mut bytes = BytesMut::new(); + if let Err(e) = self.protobuf_serializer.encode(event, &mut bytes) { + errors.push(BigqueryRequestBuilderError::ProtobufEncoding { + message: format!("{:?}", e), + }); + } else { + if bytes.len() + serialized_rows_len > max_serialized_rows_len { + // there's going to be too many events to send in one body; + // flush the current events and start a new body + bodies.push((event_finalizers, self.build_proto_data(serialized_rows))); + event_finalizers = EventFinalizers::DEFAULT; + serialized_rows = vec![]; + serialized_rows_len = 0; + } + event_finalizers.merge(current_event_finalizers); + serialized_rows_len += bytes.len(); + serialized_rows.push(bytes.into()); + } + } + // flush the final body (if there are any events left) + if !serialized_rows.is_empty() { + bodies.push((event_finalizers, self.build_proto_data(serialized_rows))); + } + // throw everything together into the expected IncrementalRequestBuilder return type + bodies + .into_iter() + .map(|(event_finalizers, (size, proto_data))| { + Ok(( + BigqueryRequestMetadata { + finalizers: event_finalizers, + request_metadata: metadata.with_request_size(size), + }, + proto_data, + )) + }) + .chain(errors.into_iter().map(Err)) + .collect() + } + + fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request { + let request = proto::AppendRowsRequest { + write_stream: self.write_stream.clone(), + offset: None, // not supported by _default stream + trace_id: Default::default(), + missing_value_interpretations: Default::default(), + default_missing_value_interpretation: 0, + rows: Some(proto::append_rows_request::Rows::ProtoRows(payload)), + }; + let uncompressed_size = request.encoded_len(); + BigqueryRequest { + request, + metadata: metadata.request_metadata, + finalizers: metadata.finalizers, + uncompressed_size, + } + } +} + +#[cfg(test)] +mod test { + use bytes::{BufMut, Bytes, BytesMut}; + use codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions}; + use std::collections::BTreeMap; + use std::path::PathBuf; + use vector_core::event::{Event, EventMetadata, LogEvent, Value}; + + use super::BigqueryRequestBuilder; + use crate::sinks::util::IncrementalRequestBuilder; + + #[test] + fn encode_events_incremental() { + // build the request builder + let desc_file = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("lib/codecs/tests/data/protobuf/test.desc"); + let protobuf_serializer = ProtobufSerializerConfig { + protobuf: ProtobufSerializerOptions { + desc_file, + message_type: "test.Bytes".into(), + }, + } + .build() + .unwrap(); + let mut request_builder = BigqueryRequestBuilder { + protobuf_serializer, + write_stream: "/projects/123/datasets/456/tables/789/streams/_default".to_string(), + }; + // check that we break up large batches to avoid api limits + let mut events = vec![]; + let mut data = BytesMut::with_capacity(63336); + for i in 1..data.capacity() { + data.put_u64(i as u64); + } + for _ in 0..128 { + let event = Event::Log(LogEvent::from_parts( + Value::Object(BTreeMap::from([ + ("text".into(), Value::Bytes(Bytes::from("hello world"))), + ("binary".into(), Value::Bytes(data.clone().into())), + ])), + EventMetadata::default(), + )); + events.push(event); + } + let results = request_builder.encode_events_incremental(events); + assert!(results.iter().all(|r| r.is_ok())); + assert!(results.len() > 1); + // check that we don't generate bodies with no events in them + let results = request_builder.encode_events_incremental(vec![]); + assert!(results.is_empty()); + } +} diff --git a/src/sinks/gcp/bigquery/service.rs b/src/sinks/gcp/bigquery/service.rs new file mode 100644 index 0000000000000..3fec202992ed4 --- /dev/null +++ b/src/sinks/gcp/bigquery/service.rs @@ -0,0 +1,381 @@ +use futures::future::BoxFuture; +use snafu::Snafu; +use std::task::{Context, Poll}; +use tonic::metadata::MetadataValue; +use tonic::service::interceptor::InterceptedService; +use tonic::service::Interceptor; +use tonic::transport::Channel; +use tonic::{Request, Status}; +use tower::Service; +use vector_common::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_core::event::EventStatus; +use vector_core::stream::DriverResponse; + +use super::proto::google::cloud::bigquery::storage::v1 as proto; +use crate::event::{EventFinalizers, Finalizable}; +use crate::gcp::GcpAuthenticator; + +#[derive(Clone)] +pub struct AuthInterceptor { + pub auth: GcpAuthenticator, +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, mut request: Request<()>) -> Result, Status> { + if let Some(token) = self.auth.make_token() { + let value: MetadataValue<_> = token + .parse() + .map_err(|e| tonic::Status::unauthenticated(format!("{e:?}")))?; + request.metadata_mut().insert("authorization", value); + } + Ok(request) + } +} + +pub struct BigqueryRequest { + pub request: proto::AppendRowsRequest, + pub metadata: RequestMetadata, + pub finalizers: EventFinalizers, + pub uncompressed_size: usize, +} + +impl Finalizable for BigqueryRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl MetaDescriptive for BigqueryRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +#[derive(Debug)] +pub struct BigqueryResponse { + body: proto::AppendRowsResponse, + request_byte_size: GroupedCountByteSize, + request_uncompressed_size: usize, +} + +impl DriverResponse for BigqueryResponse { + fn event_status(&self) -> EventStatus { + if !self.body.row_errors.is_empty() { + // The AppendRowsResponse reports on specific rows that failed to append, + // meaning that in theory on failures we can retry the request without the bad events. + // Unfortunately there's no good mechanism for doing this in the Vector model; + // it's assumed either the whole thing is successful or it is not. + return EventStatus::Rejected; + } + match &self.body.response { + None => EventStatus::Dropped, + Some(proto::append_rows_response::Response::AppendResult(_)) => EventStatus::Delivered, + Some(proto::append_rows_response::Response::Error(status)) => { + match super::proto::google::rpc::Code::try_from(status.code) { + // we really shouldn't be able to get here, but just in case + Ok(super::proto::google::rpc::Code::Ok) => EventStatus::Delivered, + // these errors can't be retried because the event payload is almost definitely bad + Ok(super::proto::google::rpc::Code::InvalidArgument) + | Ok(super::proto::google::rpc::Code::NotFound) + | Ok(super::proto::google::rpc::Code::AlreadyExists) => EventStatus::Rejected, + // everything else can probably be retried + _ => EventStatus::Errored, + } + } + } + } + fn events_sent(&self) -> &GroupedCountByteSize { + &self.request_byte_size + } + fn bytes_sent(&self) -> Option { + Some(self.request_uncompressed_size) + } +} + +#[derive(Debug, Snafu)] +pub enum BigqueryServiceError { + #[snafu(display("Error communicating with BigQuery: {}", error))] + Transport { error: tonic::transport::Error }, + #[snafu(display("BigQuery request failure: {}", status))] + Request { status: tonic::Status }, + #[snafu(display("BigQuery row write failures: {:?}", row_errors))] + RowWrite { row_errors: Vec }, +} + +impl From for BigqueryServiceError { + fn from(error: tonic::transport::Error) -> Self { + Self::Transport { error } + } +} + +impl From for BigqueryServiceError { + fn from(status: tonic::Status) -> Self { + Self::Request { status } + } +} + +impl From> for BigqueryServiceError { + fn from(row_errors: Vec) -> Self { + Self::RowWrite { row_errors } + } +} + +type BigQueryWriteClient = proto::big_query_write_client::BigQueryWriteClient< + InterceptedService, +>; + +pub struct BigqueryService { + service: BigQueryWriteClient, +} + +impl BigqueryService { + pub async fn with_auth(channel: Channel, auth: GcpAuthenticator) -> crate::Result { + let service = proto::big_query_write_client::BigQueryWriteClient::with_interceptor( + channel, + AuthInterceptor { auth }, + ); + Ok(Self { service }) + } +} + +impl Service for BigqueryService { + type Response = BigqueryResponse; + type Error = BigqueryServiceError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: BigqueryRequest) -> Self::Future { + let metadata = std::mem::take(request.metadata_mut()); + let request_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let request_uncompressed_size = request.uncompressed_size; + + let mut client = self.service.clone(); + + Box::pin(async move { + // Ideally, we would maintain the gRPC stream, detect when auth expired and re-request with new auth. + // But issuing a new request every time leads to more comprehensible code with reasonable performance. + trace!( + message = "Sending request to BigQuery", + request = format!("{:?}", request.request), + ); + let stream = tokio_stream::once(request.request); + let response = client.append_rows(stream).await?; + match response.into_inner().message().await? { + Some(body) => { + trace!( + message = "Received response body from BigQuery", + body = format!("{:?}", body), + ); + if body.row_errors.is_empty() { + Ok(BigqueryResponse { + body, + request_byte_size, + request_uncompressed_size, + }) + } else { + Err(body.row_errors.into()) + } + } + None => Err(tonic::Status::unknown("response stream closed").into()), + } + }) + } +} + +#[cfg(test)] +mod test { + use futures::FutureExt; + use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + use tokio_stream::wrappers::UnboundedReceiverStream; + use tonic::{Request, Response, Status}; + use tower::Service; + + use super::{proto, BigqueryRequest, BigqueryService}; + + /// A dumb BigQueryWrite server that can be used to test the BigqueryService. + struct BigqueryServer { + error: Option, + append_rows_request_sender: UnboundedSender<( + proto::AppendRowsRequest, + tokio::sync::oneshot::Sender>, + )>, + } + + #[async_trait::async_trait] + impl proto::big_query_write_server::BigQueryWrite for BigqueryServer { + async fn create_write_stream( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } + + type AppendRowsStream = + UnboundedReceiverStream>; + + async fn append_rows( + &self, + request: Request>, + ) -> std::result::Result, Status> { + if let Some(error) = &self.error { + return Err(error.clone()); + } + let mut streaming = request.into_inner(); + let (sender, receiver) = + unbounded_channel::>(); + let message_sender = self.append_rows_request_sender.clone(); + tokio::spawn(async move { + loop { + match streaming.message().await.unwrap() { + Some(message) => { + let (stream_sender, stream_receiver) = tokio::sync::oneshot::channel(); + message_sender.send((message, stream_sender)).unwrap(); + let response = stream_receiver.await.unwrap(); + sender.send(response).unwrap(); + } + None => { + return; + } + } + } + }); + let receiver_stream = UnboundedReceiverStream::new(receiver); + Ok(Response::new(receiver_stream)) + } + + async fn get_write_stream( + &self, + _request: Request, + ) -> std::result::Result, Status> { + unimplemented!() + } + + async fn finalize_write_stream( + &self, + _request: Request, + ) -> std::result::Result, Status> { + unimplemented!() + } + + async fn batch_commit_write_streams( + &self, + _request: Request, + ) -> std::result::Result, Status> { + unimplemented!() + } + + async fn flush_rows( + &self, + _request: Request, + ) -> std::result::Result, Status> { + unimplemented!() + } + } + + /// Create a TcpListener on some arbitrary local address and an HTTP Channel that's connected to it + async fn create_tcp_listener() -> (tokio::net::TcpListener, tonic::transport::Channel) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let uri = tonic::transport::Uri::builder() + .scheme("http") + .authority(format!("{}:{}", addr.ip(), addr.port())) + .path_and_query("/") + .build() + .unwrap(); + let channel = tonic::transport::Channel::builder(uri) + .connect() + .await + .unwrap(); + (listener, channel) + } + + /// Run a fake BigqueryServer, providing a client, a request handler, and a handle to shut it down. + async fn run_server() -> ( + BigqueryService, + UnboundedReceiver<( + proto::AppendRowsRequest, + tokio::sync::oneshot::Sender>, + )>, + tokio::sync::oneshot::Sender<()>, + tokio::task::JoinHandle<()>, + ) { + let (sender, receiver) = unbounded_channel(); + let bigquery_server = BigqueryServer { + error: None, + append_rows_request_sender: sender, + }; + let (listener, channel) = create_tcp_listener().await; + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let router = tonic::transport::server::Server::builder().add_service( + proto::big_query_write_server::BigQueryWriteServer::new(bigquery_server), + ); + let join_handle = tokio::spawn(async move { + router + .serve_with_incoming_shutdown( + tokio_stream::wrappers::TcpListenerStream::new(listener), + shutdown_rx.map(|x| x.unwrap()), + ) + .await + .unwrap(); + }); + let service = BigqueryService::with_auth(channel, crate::gcp::GcpAuthenticator::None) + .await + .unwrap(); + (service, receiver, shutdown_tx, join_handle) + } + + #[tokio::test] + async fn bigquery_service_stream() { + let (mut service, mut receiver, shutdown, server_future) = run_server().await; + // send a request and process the response + let client_future = tokio::spawn(async move { + assert!(service + .poll_ready(&mut std::task::Context::from_waker( + futures::task::noop_waker_ref(), + )) + .is_ready()); + let response = service + .call(BigqueryRequest { + request: proto::AppendRowsRequest { + write_stream: "test".to_string(), + offset: None, + trace_id: "".to_string(), + missing_value_interpretations: Default::default(), + default_missing_value_interpretation: 0, + rows: None, + }, + metadata: Default::default(), + finalizers: Default::default(), + uncompressed_size: 1, + }) + .await + .unwrap(); + assert_eq!("ack", response.body.write_stream); + }); + // validate the request + let (request, responder) = receiver.recv().await.unwrap(); + assert_eq!("test", request.write_stream); + // respond to the request + responder + .send(Ok(proto::AppendRowsResponse { + response: Some(proto::append_rows_response::Response::AppendResult( + proto::append_rows_response::AppendResult { offset: None }, + )), + write_stream: "ack".into(), + updated_schema: None, + row_errors: Default::default(), + })) + .unwrap(); + // clean everything up + shutdown.send(()).unwrap(); + client_future.await.unwrap(); + server_future.await.unwrap(); + } +} diff --git a/src/sinks/gcp/bigquery/sink.rs b/src/sinks/gcp/bigquery/sink.rs new file mode 100644 index 0000000000000..a1ffebbce9159 --- /dev/null +++ b/src/sinks/gcp/bigquery/sink.rs @@ -0,0 +1,47 @@ +use futures_util::{ + stream::{self, BoxStream}, + StreamExt, +}; +use vector_core::event::Event; +use vector_core::sink::StreamSink; +use vector_core::stream::BatcherSettings; + +use super::request_builder::BigqueryRequestBuilder; +use super::service::BigqueryService; +use crate::sinks::prelude::SinkRequestBuildError; +use crate::sinks::util::builder::SinkBuilderExt; + +pub struct BigquerySink { + pub service: BigqueryService, + pub batcher_settings: BatcherSettings, + pub request_builder: BigqueryRequestBuilder, +} + +impl BigquerySink { + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batcher_settings.as_byte_size_config()) + .incremental_request_builder(self.request_builder) + .flat_map(stream::iter) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol("gRPC") + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for BigquerySink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/gcp/bigquery/tests.rs b/src/sinks/gcp/bigquery/tests.rs new file mode 100644 index 0000000000000..8b137891791fe --- /dev/null +++ b/src/sinks/gcp/bigquery/tests.rs @@ -0,0 +1 @@ + diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index a288a66024fed..26800790b79e2 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use vector_lib::configurable::configurable_component; +pub mod bigquery; +pub mod chronicle_unstructured; pub mod cloud_storage; pub mod pubsub; pub mod stackdriver; diff --git a/website/content/en/docs/reference/configuration/sinks/gcp_bigquery.md b/website/content/en/docs/reference/configuration/sinks/gcp_bigquery.md new file mode 100644 index 0000000000000..82c721f96016a --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/gcp_bigquery.md @@ -0,0 +1,14 @@ +--- +title: GCP BigQuery +description: Publish events to GCP's [BigQuery](https://cloud.google.com/bigquery) data store +kind: sink +layout: component +tags: ["gcp", "bigquery", "component", "sink"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sinks/base/bigquery.cue b/website/cue/reference/components/sinks/base/bigquery.cue new file mode 100644 index 0000000000000..b84edd0a7bb0d --- /dev/null +++ b/website/cue/reference/components/sinks/base/bigquery.cue @@ -0,0 +1,312 @@ +package metadata + +base: components: sinks: bigquery: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source connected to that sink, where the source supports + end-to-end acknowledgements as well, waits for events to be acknowledged by the sink + before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + api_key: { + description: """ + An [API key][gcp_api_key]. + + Either an API key or a path to a service account credentials JSON file can be specified. + + If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no + filename is named, an attempt is made to fetch an instance service account for the compute instance the program is + running on. If this is not on a GCE instance, then you must define it with an API key or service account + credentials JSON file. + + [gcp_api_key]: https://cloud.google.com/docs/authentication/api-keys + """ + required: false + type: string: {} + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized/compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: { + default: 50000 + unit: "events" + } + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + credentials_path: { + description: """ + Path to a [service account][gcp_service_account_credentials] credentials JSON file. + + Either an API key or a path to a service account credentials JSON file can be specified. + + If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no + filename is named, an attempt is made to fetch an instance service account for the compute instance the program is + running on. If this is not on a GCE instance, then you must define it with an API key or service account + credentials JSON file. + + [gcp_service_account_credentials]: https://cloud.google.com/docs/authentication/production#manually + """ + required: false + type: string: {} + } + dataset: { + description: "The dataset within the project to which to publish events." + required: true + type: string: examples: ["this-is-a-dataset"] + } + encoding: { + description: "Config used to build a `ProtobufSerializer`." + required: true + type: object: options: protobuf: { + description: "Options for the Protobuf serializer." + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } + } + endpoint: { + description: """ + The endpoint to which to publish events. + + The scheme (`http` or `https`) must be specified. No path should be included since the paths defined + by the [`GCP BigQuery`][bigquery_api] API are used. + + The trailing slash `/` must not be included. + + [bigquery_api]: https://cloud.google.com/bigquery/docs/reference/rest + """ + required: false + type: string: { + default: "https://bigquerystorage.googleapis.com:443" + examples: ["https://bigquerystorage.googleapis.com:443"] + } + } + project: { + description: "The project name to which to publish events." + required: true + type: string: examples: ["vector-123456"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, etc. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + Note that the new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit will be 1 (no concurrency). + + It is recommended to set this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: """ + The maximum number of retries to make for failed requests. + + The default, for all intents and purposes, represents an infinite number of retries. + """ + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 3600 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + table: { + description: "The dataset within the dataset to which to publish events." + required: true + type: string: examples: ["this-is-a-table"] + } +} diff --git a/website/cue/reference/services/gcp_bigquery.cue b/website/cue/reference/services/gcp_bigquery.cue new file mode 100644 index 0000000000000..cdfae1d776e71 --- /dev/null +++ b/website/cue/reference/services/gcp_bigquery.cue @@ -0,0 +1,10 @@ +package metadata + +services: gcp_bigquery: { + name: "GCP BigQuery" + thing: "a \(name) pipeline" + url: urls.gcp_bigquery + versions: null + + description: "[GCP BigQuery](\(urls.gcp_bigquery)) is a fully-managed data warehouse that allows you to store and query large amounts of structured data. This makes it a great sink for structured log data." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index cc32b4ce52c11..b43e3b99fedac 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -207,6 +207,7 @@ urls: { gcp_authentication_api_key: "\(gcp)/docs/authentication/api-keys" gcp_authentication_server_to_server: "\(gcp)/docs/authentication/production" gcp_authentication_service_account: "\(gcp)/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually" + gcp_bigquery: "\(gcp)/bigquery/" gcp_cloud_storage: "\(gcp)/storage" gcp_chronicle: "https://chronicle.security" gcp_folders: "\(gcp)/resource-manager/docs/creating-managing-folders"