Skip to content

Commit

Permalink
Kafka protobuf consumer support (#4375)
Browse files Browse the repository at this point in the history
This adds support for kafka serdes-protobuf deserialization via KafkaTools#protobufSpec.

There is an extensive amount of unit testing, both in the form of "how does this message structure gets parsed into
column types" and "how do schema changes get adapted into the same table".

While this feature is primarily targetted for use via KafkaTools, the underlying extensions-protobuf library is
generalized to work in other cases as well. For example, it would work just as well at parsing protobuf messages via
gRPC or websockets (at which point, it could easily be adapted into a blink table).

The implementation is underpinned by a set a functional interfaces for extracting primitives or Objects from a
generic type; in the case of protobuf, the generic type is com.google.protobuf.Message. Coupled with functional
composition, these interfaces provide useful primitives for fluently expressing and executing parsing expressions in
a reusable way.

See https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-protobuf.html

Fixes #3832
  • Loading branch information
devinrsmith authored Sep 7, 2023
1 parent 4039d8d commit 35fd583
Show file tree
Hide file tree
Showing 98 changed files with 20,779 additions and 9 deletions.
20 changes: 20 additions & 0 deletions extensions/kafka/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# testing

## ProtobufImpl schema change testing

Most of the "simple" protobuf testing is done in the protobuf project, `project(':extensions-protobuf')`. One of
the important parts with respect to the kafka integration is how schema changes are handled, and so that is tested here,
[ProtobufImplSchemaChangeTest.java](src/test/java/io/deephaven/kafka/ProtobufImplSchemaChangeTest.java).

The test development setup for this is a little unconventional due to the fact that protoc won't let you generate
multiple versions of the same message type, at least not within the same protoc invocation. To work around this, there
is a little bit of manual test development workflow needed to add a new message / message version. It requires:
* Uncommenting the proto plugin logic in [build.gradle](build.gradle)
* Adding a single new .proto file per version of the message you want to create; it must have `option java_multiple_files = false;` (only one .proto file with a given message name can compile at any given time)
* Generating the new test code, `./gradlew :extensions-kafka:generateTestProto`
* Moving the output from [build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen](build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen) into [src/test/java/io/deephaven/kafka/protobuf/gen](src/test/java/io/deephaven/kafka/protobuf/gen)
* Renaming the .proto file to .proto.txt (to ensure it doesn't get re-generated later).
* Commenting out the proto plugin logic in [build.gradle](build.gradle)

While it's likely possible to automate the above, it would likely be a lot of bespoke work that would probably not see
much use elsewhere.
20 changes: 16 additions & 4 deletions extensions/kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
// See TESTING.md
// id 'com.google.protobuf' version '0.9.4'
}

description 'Kafka: Integrating Engine tables with Kafka'

dependencies {
api project(':engine-table')

api 'org.apache.avro:avro:1.11.1'
api 'org.apache.avro:avro:1.11.2'

// Using io.confluent dependencies requires code in the toplevel build.gradle to add their maven repository.
// Note: the -ccs flavor is provided by confluent as their community edition. It is equivalent to the maven central
// version, but has a different version to make it easier to keep confluent dependencies aligned.
api 'org.apache.kafka:kafka-clients:7.3.0-ccs'
api 'io.confluent:kafka-avro-serializer:7.3.0'
runtimeOnly 'io.confluent:kafka-protobuf-serializer:7.3.0'
api 'org.apache.kafka:kafka-clients:7.4.0-ccs'
api 'io.confluent:kafka-avro-serializer:7.4.0'

api 'io.confluent:kafka-protobuf-serializer:7.4.0'
api project(':extensions-protobuf')

implementation project(':Configuration')
implementation project(':log-factory')
Expand All @@ -37,6 +41,14 @@ spotless {
java {
targetExclude(
'**/**FieldCopier.java',
'src/test/java/io/deephaven/kafka/protobuf/gen/**',
)
}
}

// See TESTING.md
//protobuf {
// protoc {
// artifact = 'com.google.protobuf:protoc:3.24.1'
// }
//}
289 changes: 289 additions & 0 deletions extensions/kafka/src/main/java/io/deephaven/kafka/BoxTransform.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.kafka;

import io.deephaven.functions.ToBooleanFunction;
import io.deephaven.functions.ToByteFunction;
import io.deephaven.functions.ToCharFunction;
import io.deephaven.functions.ToDoubleFunction;
import io.deephaven.functions.ToFloatFunction;
import io.deephaven.functions.ToIntFunction;
import io.deephaven.functions.ToLongFunction;
import io.deephaven.functions.ToObjectFunction;
import io.deephaven.functions.ToPrimitiveFunction;
import io.deephaven.functions.ToShortFunction;
import io.deephaven.functions.TypedFunction;
import io.deephaven.qst.type.BoxedBooleanType;
import io.deephaven.qst.type.BoxedByteType;
import io.deephaven.qst.type.BoxedCharType;
import io.deephaven.qst.type.BoxedDoubleType;
import io.deephaven.qst.type.BoxedFloatType;
import io.deephaven.qst.type.BoxedIntType;
import io.deephaven.qst.type.BoxedLongType;
import io.deephaven.qst.type.BoxedShortType;
import io.deephaven.util.type.TypeUtils;

class BoxTransform {

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* For primitive functions {@code f}, see {@link #of(ToPrimitiveFunction)}.
*
* <p>
* For object functions {@code f}, {@code box} is the identity function (and {@code f} will simply be returned).
*
* @param f the function
* @return the object function
* @param <T> the input type
* @see #of(ToBooleanFunction)
* @see #of(ToCharFunction)
* @see #of(ToByteFunction)
* @see #of(ToShortFunction)
* @see #of(ToIntFunction)
* @see #of(ToLongFunction)
* @see #of(ToFloatFunction)
* @see #of(ToDoubleFunction)
*/
public static <T> ToObjectFunction<T, ?> of(TypedFunction<T> f) {
return BoxedVisitor.of(f);
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* The returned object function will have return type {@code f.returnType().boxedType()}.
*
* @param f the primitive function
* @return the object function
* @param <T> the input type
* @see #of(ToBooleanFunction)
* @see #of(ToCharFunction)
* @see #of(ToByteFunction)
* @see #of(ToShortFunction)
* @see #of(ToIntFunction)
* @see #of(ToLongFunction)
* @see #of(ToFloatFunction)
* @see #of(ToDoubleFunction)
*/
public static <T> ToObjectFunction<T, ?> of(ToPrimitiveFunction<T> f) {
return BoxedVisitor.of(f);
}

/**
* Creates the function composition {@code box ∘ f}, where {@code box} is simply a cast from {@code boolean} to
* {@code Boolean}.
*
* <p>
* Equivalent to {@code x -> (Boolean)f.test(x)}.
*
* @param f the boolean function
* @return the object function
* @param <T> the input type
*/
public static <T> ToObjectFunction<T, Boolean> of(ToBooleanFunction<T> f) {
return ToObjectFunction.of(f::test, BoxedBooleanType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsChar(x))}.
*
* @param f the char function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(char)
*/
public static <T> ToObjectFunction<T, Character> of(ToCharFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedCharType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsByte(x))}.
*
* @param f the byte function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(byte)
*/
public static <T> ToObjectFunction<T, Byte> of(ToByteFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedByteType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsShort(x))}.
*
* @param f the short function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(short)
*/
public static <T> ToObjectFunction<T, Short> of(ToShortFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedShortType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsInt(x))}.
*
* @param f the int function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(int)
*/
public static <T> ToObjectFunction<T, Integer> of(ToIntFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedIntType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsLong(x))}.
*
* @param f the long function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(long)
*/
public static <T> ToObjectFunction<T, Long> of(ToLongFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedLongType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsFloat(x))}.
*
* @param f the float function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(float)
*/
public static <T> ToObjectFunction<T, Float> of(ToFloatFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedFloatType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsDouble(x))}.
*
* @param f the double function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(double)
*/
public static <T> ToObjectFunction<T, Double> of(ToDoubleFunction<T> f) {
return ToObjectFunction.of(t -> box(f, t), BoxedDoubleType.of());
}

private enum BoxedVisitor implements TypedFunction.Visitor<Object, ToObjectFunction<Object, ?>>,
ToPrimitiveFunction.Visitor<Object, ToObjectFunction<Object, ?>> {
INSTANCE;

public static <T> ToObjectFunction<T, ?> of(TypedFunction<T> f) {
// noinspection unchecked
return f.walk((TypedFunction.Visitor<T, ToObjectFunction<T, ?>>) (TypedFunction.Visitor<?, ?>) INSTANCE);
}

public static <T> ToObjectFunction<T, ?> of(ToPrimitiveFunction<T> f) {
// noinspection unchecked
return f.walk(
(ToPrimitiveFunction.Visitor<T, ToObjectFunction<T, ?>>) (ToPrimitiveFunction.Visitor<?, ?>) INSTANCE);
}

@Override
public ToObjectFunction<Object, ?> visit(ToPrimitiveFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, ?> visit(ToObjectFunction<Object, ?> f) {
return f;
}

@Override
public ToObjectFunction<Object, Boolean> visit(ToBooleanFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Character> visit(ToCharFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Byte> visit(ToByteFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Short> visit(ToShortFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Integer> visit(ToIntFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Long> visit(ToLongFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Float> visit(ToFloatFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ToObjectFunction<Object, Double> visit(ToDoubleFunction<Object> f) {
return BoxTransform.of(f);
}
}

private static <T> Character box(ToCharFunction<T> f, T x) {
return TypeUtils.box(f.applyAsChar(x));
}

private static <T> Byte box(ToByteFunction<T> f, T x) {
return TypeUtils.box(f.applyAsByte(x));
}

private static <T> Short box(ToShortFunction<T> f, T x) {
return TypeUtils.box(f.applyAsShort(x));
}

private static <T> Integer box(ToIntFunction<T> f, T x) {
return TypeUtils.box(f.applyAsInt(x));
}

private static <T> Long box(ToLongFunction<T> f, T x) {
return TypeUtils.box(f.applyAsLong(x));
}

private static <T> Float box(ToFloatFunction<T> f, T x) {
return TypeUtils.box(f.applyAsFloat(x));
}

private static <T> Double box(ToDoubleFunction<T> f, T x) {
return TypeUtils.box(f.applyAsDouble(x));
}
}
Loading

0 comments on commit 35fd583

Please sign in to comment.