Skip to content

Commit

Permalink
Translate between different versions of prost-types
Browse files Browse the repository at this point in the history
This is absolutely awful, but do you know what was even more awful?
Trying to upgrade every prost packages and all the packages that use prost under the hood.
  • Loading branch information
goakley committed Oct 1, 2024
1 parent e967d63 commit 6968364
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 7 deletions.
1 change: 0 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,3 @@ fn main() {
// Emit the aforementioned stanzas.
tracker.emit_rerun_stanzas();
}

2 changes: 1 addition & 1 deletion src/sinks/gcp/bigquery/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use futures::FutureExt;
use http::Uri;
use indoc::indoc;
use tonic::transport::Channel;
use vector_lib::codecs::encoding::ProtobufSerializerConfig;
use vector_lib::configurable::configurable_component;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down
162 changes: 159 additions & 3 deletions src/sinks/gcp/bigquery/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use bytes::BytesMut;
use vector_lib::codecs::encoding::ProtobufSerializer;
use prost::Message;
use std::num::NonZeroUsize;
use tokio_util::codec::Encoder;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::codecs::encoding::ProtobufSerializer;
use vector_lib::event::Finalizable;
use vector_lib::request_metadata::RequestMetadata;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
use super::service::BigqueryRequest;
Expand Down Expand Up @@ -48,7 +48,9 @@ impl BigqueryRequestBuilder {
) -> (NonZeroUsize, proto::append_rows_request::ProtoData) {
let proto_data = proto::append_rows_request::ProtoData {
writer_schema: Some(proto::ProtoSchema {
proto_descriptor: Some(self.protobuf_serializer.descriptor_proto().clone()),
proto_descriptor: Some(translate_descriptor_proto(
self.protobuf_serializer.descriptor_proto().clone(),
)),
}),
rows: Some(proto::ProtoRows { serialized_rows }),
};
Expand Down Expand Up @@ -136,6 +138,160 @@ impl IncrementalRequestBuilder<Vec<Event>> for BigqueryRequestBuilder {
}
}

/// Convert from `prost_reflect::prost_types::DescriptorProto` to `prost_types::DescriptorProto`
///
/// Someone upgraded `prost_reflect` without upgrading the other prost crates,
/// so the `prost_types` version used by `prost_reflect` is newer than the version used by vector.
///
/// This function discards any `UninterpretedOption`s.
///
/// "Why don't you just upgrade `prost_types` to match the version used by `prost_reflect`?
/// Ha. Hahaha. Hahahahahahaha. My branches are littered with the corpses of such attempts.
fn translate_descriptor_proto(
old_descriptor: prost_reflect::prost_types::DescriptorProto,
) -> prost_types::DescriptorProto {
prost_types::DescriptorProto {
name: old_descriptor.name,
field: old_descriptor
.field
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
extension: old_descriptor
.extension
.into_iter()
.map(|field| prost_types::FieldDescriptorProto {
name: field.name,
number: field.number,
label: field.label,
r#type: field.r#type,
type_name: field.type_name,
extendee: field.extendee,
default_value: field.default_value,
oneof_index: field.oneof_index,
json_name: field.json_name,
options: field.options.map(|options| prost_types::FieldOptions {
ctype: options.ctype,
packed: options.packed,
jstype: options.jstype,
lazy: options.lazy,
deprecated: options.deprecated,
weak: options.weak,
uninterpreted_option: Default::default(),
}),
proto3_optional: field.proto3_optional,
})
.collect(),
nested_type: old_descriptor
.nested_type
.into_iter()
.map(translate_descriptor_proto)
.collect(),
enum_type: old_descriptor
.enum_type
.into_iter()
.map(|enum_descriptor| prost_types::EnumDescriptorProto {
name: enum_descriptor.name,
value: enum_descriptor
.value
.into_iter()
.map(|value| prost_types::EnumValueDescriptorProto {
name: value.name,
number: value.number,
options: value.options.map(|options| prost_types::EnumValueOptions {
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: enum_descriptor
.options
.map(|options| prost_types::EnumOptions {
allow_alias: options.allow_alias,
deprecated: options.deprecated,
uninterpreted_option: Default::default(),
}),
reserved_range: enum_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::enum_descriptor_proto::EnumReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: enum_descriptor.reserved_name,
})
.collect(),
extension_range: old_descriptor
.extension_range
.into_iter()
.map(
|extension_range| prost_types::descriptor_proto::ExtensionRange {
start: extension_range.start,
end: extension_range.end,
options: extension_range
.options
.map(|_| prost_types::ExtensionRangeOptions {
uninterpreted_option: Default::default(),
}),
},
)
.collect(),
oneof_decl: old_descriptor
.oneof_decl
.into_iter()
.map(|oneof| prost_types::OneofDescriptorProto {
name: oneof.name,
options: oneof.options.map(|_| prost_types::OneofOptions {
uninterpreted_option: Default::default(),
}),
})
.collect(),
options: old_descriptor
.options
.map(|options| prost_types::MessageOptions {
message_set_wire_format: options.message_set_wire_format,
no_standard_descriptor_accessor: options.no_standard_descriptor_accessor,
deprecated: options.deprecated,
map_entry: options.map_entry,
uninterpreted_option: Default::default(),
}),
reserved_range: old_descriptor
.reserved_range
.into_iter()
.map(
|reserved_range| prost_types::descriptor_proto::ReservedRange {
start: reserved_range.start,
end: reserved_range.end,
},
)
.collect(),
reserved_name: old_descriptor.reserved_name,
}
}

#[cfg(test)]
mod test {
use bytes::{BufMut, Bytes, BytesMut};
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/gcp/bigquery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::{Request, Status};
use tower::Service;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::event::EventStatus;
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::stream::DriverResponse;

use super::proto::third_party::google::cloud::bigquery::storage::v1 as proto;
Expand Down Expand Up @@ -81,7 +81,9 @@ impl DriverResponse for BigqueryResponse {
// these errors can't be retried because the event payload is almost definitely bad
Ok(super::proto::third_party::google::rpc::Code::InvalidArgument)
| Ok(super::proto::third_party::google::rpc::Code::NotFound)
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => EventStatus::Rejected,
| Ok(super::proto::third_party::google::rpc::Code::AlreadyExists) => {
EventStatus::Rejected
}
// everything else can probably be retried
_ => EventStatus::Errored,
}
Expand Down

0 comments on commit 6968364

Please sign in to comment.