Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka protobuf consumer support #4375

Merged
merged 32 commits into from
Sep 7, 2023
Merged

Conversation

devinrsmith
Copy link
Member

This adds support for kafka serdes-protobuf deserialization via KafkaTools#protobufSpec.

There is an extensive amount of unit testing, both in the form of "how does this message structure gets parsed into column types" and "how do schema changes get adapted into the same table".

While this feature is primarily targetted for use via KafkaTools, the underlying extensions-protobuf library is generalized to work in other cases as well. For example, it would work just as well at parsing protobuf messages via gRPC or websockets (at which point, it could easily be adapted into a blink table).

The implementation is underpinned by a set a functional interfaces for extracting primitives or Objects from a generic type; in the case of protobuf, the generic type is com.google.protobuf.Message. Coupled with functional composition, these interfaces provide useful primitives for fluently expressing and executing parsing expressions in a reusable way.

See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html

Fixes #3832

This adds support for kafka serdes-protobuf deserialization via KafkaTools#protobufSpec.

There is an extensive amount of unit testing, both in the form of "how does this message structure gets parsed into
column types" and "how do schema changes get adapted into the same table".

While this feature is primarily targetted for use via KafkaTools, the underlying extensions-protobuf library is
generalized to work in other cases as well. For example, it would work just as well at parsing protobuf messages via
gRPC or websockets (at which point, it could easily be adapted into a blink table).

The implementation is underpinned by a set a functional interfaces for extracting primitives or Objects from a
generic type; in the case of protobuf, the generic type is com.google.protobuf.Message. Coupled with functional
composition, these interfaces provide useful primitives for fluently expressing and executing parsing expressions in
a reusable way.

See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html

Fixes deephaven#3832
@devinrsmith
Copy link
Member Author

Note: a large part of the +20k diff is checked-in protobuf test code - see notes in TESTING.md for why I had to check it in.

}

private Optional<ProtobufFunctions> wellKnown() {
// todo: eventually have support for parsing specific fields in specific ways
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to track this with an issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about this. Are we recursively expanding nested fields?

py/server/deephaven/stream/kafka/consumer.py Show resolved Hide resolved
* </tr>
* </table>
*
* ^1 Unsigned 32-bit and 64-bit integers are represented using their signed counterpart, with the top bit being
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what we typically do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add documentation to java.lang.Integer#toUnsignedLong, which I feel is the appropriate way to handle this from the users' perspective if they want this behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, java.lang.Long#toUnsignedBigInteger exists, but it's private :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel very strongly, or as additional feature in the future, we could introduce this behavior as part of FieldOptions...

rcaudy
rcaudy previously approved these changes Sep 6, 2023
jmao-denver
jmao-denver previously approved these changes Sep 7, 2023
Copy link
Contributor

@jmao-denver jmao-denver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python changes LGTM

@devinrsmith devinrsmith enabled auto-merge (squash) September 7, 2023 15:43
@devinrsmith devinrsmith merged commit 35fd583 into deephaven:main Sep 7, 2023
10 checks passed
@devinrsmith devinrsmith deleted the kafka-protobuf branch September 7, 2023 16:30
@github-actions github-actions bot locked and limited conversation to collaborators Sep 7, 2023
@deephaven-internal
Copy link
Contributor

Labels indicate documentation is required. Issues for documentation have been opened:

How-to: https://github.com/deephaven/deephaven.io/issues/3150
Reference: https://github.com/deephaven/deephaven.io/issues/3149

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Protobuf support
6 participants