Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka protobuf consumer support #4375

Merged
merged 32 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
543da37
Kafka protobuf consumer support
devinrsmith Aug 24, 2023
e64b976
Fix protobuf generation for extensions-protobuf
devinrsmith Aug 24, 2023
d641a30
Add schema_message_name
devinrsmith Aug 25, 2023
911189a
Add FieldOptions to better support how the options may evolve in the …
devinrsmith Aug 25, 2023
6f0d04b
Fix and simplify python side
devinrsmith Sep 5, 2023
e4c9ab9
f
devinrsmith Sep 5, 2023
513e7cf
Rename to ToX function
devinrsmith Sep 5, 2023
1abe406
Rename mapToX
devinrsmith Sep 5, 2023
0a5687a
More explicit visitor return types where applicable
devinrsmith Sep 5, 2023
73c0624
Remove equals/hashcode
devinrsmith Sep 5, 2023
99ba1bf
Cast instead of primitive
devinrsmith Sep 5, 2023
0ddbda0
Remove extraneous ToDoubleFunction.map
devinrsmith Sep 5, 2023
4114db4
Make param arguments more generic
devinrsmith Sep 5, 2023
05dc323
Update javadocs
devinrsmith Sep 5, 2023
8c6f86c
Add ByteString support for BytesValue
devinrsmith Sep 5, 2023
011032d
remove unused
devinrsmith Sep 5, 2023
f538b08
javadoc
devinrsmith Sep 5, 2023
5e9d7b9
box
devinrsmith Sep 6, 2023
0a9182d
more fixes
devinrsmith Sep 6, 2023
dc72866
Add canonical parsers
devinrsmith Sep 6, 2023
f569254
Add multi message parser support
devinrsmith Sep 6, 2023
1c4b1cd
Use DH nulls when bypassing null inputs
devinrsmith Sep 6, 2023
0e9fce2
Comments
devinrsmith Sep 6, 2023
04f09b7
python comment changes
devinrsmith Sep 6, 2023
c45a57d
FieldPath match and testing
devinrsmith Sep 6, 2023
b7a1538
ToXFunction testing
devinrsmith Sep 6, 2023
4a975f2
Make MessageParserSingle extend MessageParser
devinrsmith Sep 6, 2023
56b713d
Add BytesValue javadoc about ByteString
devinrsmith Sep 6, 2023
3302e23
final review points
devinrsmith Sep 6, 2023
bd93a6c
Kafka consume test
devinrsmith Sep 7, 2023
66bdec8
Better docs for schemaMessageName / schema_message_name
devinrsmith Sep 7, 2023
7baad9f
formatting
devinrsmith Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions extensions/kafka/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# testing

## ProtobufImpl schema change testing

Most of the "simple" protobuf testing is done in the protobuf project, `project(':extensions-protobuf')`. One of
the important parts with respect to the kafka integration is how schema changes are handled, and so that is tested here,
[ProtobufImplSchemaChangeTest.java](src/test/java/io/deephaven/kafka/ProtobufImplSchemaChangeTest.java).

The test development setup for this is a little unconventional due to the fact that protoc won't let you generate
multiple versions of the same message type, at least not within the same protoc invocation. To work around this, there
is a little bit of manual test development workflow needed to add a new message / message version. It requires:
* Uncommenting the proto plugin logic in [build.gradle](build.gradle)
* Adding a single new .proto file per version of the message you want to create; it must have `option java_multiple_files = false;` (only one .proto file with a given message name can compile at any given time)
* Generating the new test code, `./gradlew :extensions-kafka:generateTestProto`
* Moving the output from [build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen](build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen) into [src/test/java/io/deephaven/kafka/protobuf/gen](src/test/java/io/deephaven/kafka/protobuf/gen)
* Renaming the .proto file to .proto.txt (to ensure it doesn't get re-generated later).
* Commenting out the proto plugin logic in [build.gradle](build.gradle)

While it's likely possible to automate the above, it would likely be a lot of bespoke work that would probably not see
much use elsewhere.
20 changes: 16 additions & 4 deletions extensions/kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
// See TESTING.md
// id 'com.google.protobuf' version '0.9.4'
}

description 'Kafka: Integrating Engine tables with Kafka'

dependencies {
api project(':engine-table')

api 'org.apache.avro:avro:1.11.1'
api 'org.apache.avro:avro:1.11.2'

// Using io.confluent dependencies requires code in the toplevel build.gradle to add their maven repository.
// Note: the -ccs flavor is provided by confluent as their community edition. It is equivalent to the maven central
// version, but has a different version to make it easier to keep confluent dependencies aligned.
api 'org.apache.kafka:kafka-clients:7.3.0-ccs'
api 'io.confluent:kafka-avro-serializer:7.3.0'
runtimeOnly 'io.confluent:kafka-protobuf-serializer:7.3.0'
api 'org.apache.kafka:kafka-clients:7.4.0-ccs'
api 'io.confluent:kafka-avro-serializer:7.4.0'

api 'io.confluent:kafka-protobuf-serializer:7.4.0'
api project(':extensions-protobuf')

implementation project(':Configuration')
implementation project(':log-factory')
Expand All @@ -37,6 +41,14 @@ spotless {
java {
targetExclude(
'**/**FieldCopier.java',
'src/test/java/io/deephaven/kafka/protobuf/gen/**',
)
}
}

// See TESTING.md
//protobuf {
// protoc {
// artifact = 'com.google.protobuf:protoc:3.24.1'
// }
//}
289 changes: 289 additions & 0 deletions extensions/kafka/src/main/java/io/deephaven/kafka/BoxTransform.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.kafka;

import io.deephaven.functions.BooleanFunction;
import io.deephaven.functions.ByteFunction;
import io.deephaven.functions.CharFunction;
import io.deephaven.functions.DoubleFunction;
import io.deephaven.functions.FloatFunction;
import io.deephaven.functions.IntFunction;
import io.deephaven.functions.LongFunction;
import io.deephaven.functions.ObjectFunction;
import io.deephaven.functions.PrimitiveFunction;
import io.deephaven.functions.ShortFunction;
import io.deephaven.functions.TypedFunction;
import io.deephaven.qst.type.BoxedBooleanType;
import io.deephaven.qst.type.BoxedByteType;
import io.deephaven.qst.type.BoxedCharType;
import io.deephaven.qst.type.BoxedDoubleType;
import io.deephaven.qst.type.BoxedFloatType;
import io.deephaven.qst.type.BoxedIntType;
import io.deephaven.qst.type.BoxedLongType;
import io.deephaven.qst.type.BoxedShortType;
import io.deephaven.util.type.TypeUtils;

class BoxTransform {

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* For primitive functions {@code f}, see {@link #of(PrimitiveFunction)}.
*
* <p>
* For object functions {@code f}, {@code box} is the identity function (and {@code f} will simply be returned).
*
* @param f the function
* @return the object function
* @param <T> the input type
* @see #of(BooleanFunction)
* @see #of(CharFunction)
* @see #of(ByteFunction)
* @see #of(ShortFunction)
* @see #of(IntFunction)
* @see #of(LongFunction)
* @see #of(FloatFunction)
* @see #of(DoubleFunction)
*/
public static <T> ObjectFunction<T, ?> of(TypedFunction<T> f) {
return BoxedVisitor.of(f);
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* The returned object function will have return type {@code f.returnType().boxedType()}.
*
* @param f the primitive function
* @return the object function
* @param <T> the input type
* @see #of(BooleanFunction)
* @see #of(CharFunction)
* @see #of(ByteFunction)
* @see #of(ShortFunction)
* @see #of(IntFunction)
* @see #of(LongFunction)
* @see #of(FloatFunction)
* @see #of(DoubleFunction)
*/
public static <T> ObjectFunction<T, ?> of(PrimitiveFunction<T> f) {
return BoxedVisitor.of(f);
}

/**
* Creates the function composition {@code box ∘ f}, where {@code box} is simply a cast from {@code boolean} to
* {@code Boolean}.
*
* <p>
* Equivalent to {@code x -> (Boolean)f.test(x)}.
*
* @param f the boolean function
* @return the object function
* @param <T> the input type
*/
public static <T> ObjectFunction<T, Boolean> of(BooleanFunction<T> f) {
return ObjectFunction.of(f::test, BoxedBooleanType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsChar(x))}.
*
* @param f the char function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(char)
*/
public static <T> ObjectFunction<T, Character> of(CharFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedCharType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsByte(x))}.
*
* @param f the byte function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(byte)
*/
public static <T> ObjectFunction<T, Byte> of(ByteFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedByteType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsShort(x))}.
*
* @param f the short function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(short)
*/
public static <T> ObjectFunction<T, Short> of(ShortFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedShortType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsInt(x))}.
*
* @param f the int function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(int)
*/
public static <T> ObjectFunction<T, Integer> of(IntFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedIntType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsLong(x))}.
*
* @param f the long function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(long)
*/
public static <T> ObjectFunction<T, Long> of(LongFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedLongType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsFloat(x))}.
*
* @param f the float function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(float)
*/
public static <T> ObjectFunction<T, Float> of(FloatFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedFloatType.of());
}

/**
* Creates the function composition {@code box ∘ f}.
*
* <p>
* Equivalent to {@code x -> TypeUtils.box(f.applyAsDouble(x))}.
*
* @param f the double function
* @return the object function
* @param <T> the input type
* @see TypeUtils#box(double)
*/
public static <T> ObjectFunction<T, Double> of(DoubleFunction<T> f) {
return ObjectFunction.of(t -> box(f, t), BoxedDoubleType.of());
}

private enum BoxedVisitor implements TypedFunction.Visitor<Object, ObjectFunction<Object, ?>>,
PrimitiveFunction.Visitor<Object, ObjectFunction<Object, ?>> {
INSTANCE;

public static <T> ObjectFunction<T, ?> of(TypedFunction<T> f) {
// noinspection unchecked
return f.walk((TypedFunction.Visitor<T, ObjectFunction<T, ?>>) (TypedFunction.Visitor<?, ?>) INSTANCE);
}

public static <T> ObjectFunction<T, ?> of(PrimitiveFunction<T> f) {
// noinspection unchecked
return f.walk(
(PrimitiveFunction.Visitor<T, ObjectFunction<T, ?>>) (PrimitiveFunction.Visitor<?, ?>) INSTANCE);
}

@Override
public ObjectFunction<Object, ?> visit(PrimitiveFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(ObjectFunction<Object, ?> f) {
return f;
}

@Override
public ObjectFunction<Object, ?> visit(BooleanFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(CharFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(ByteFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(ShortFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(IntFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(LongFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(FloatFunction<Object> f) {
return BoxTransform.of(f);
}

@Override
public ObjectFunction<Object, ?> visit(DoubleFunction<Object> f) {
return BoxTransform.of(f);
}
}

private static <T> Character box(CharFunction<T> f, T x) {
return TypeUtils.box(f.applyAsChar(x));
}

private static <T> Byte box(ByteFunction<T> f, T x) {
return TypeUtils.box(f.applyAsByte(x));
}

private static <T> Short box(ShortFunction<T> f, T x) {
return TypeUtils.box(f.applyAsShort(x));
}

private static <T> Integer box(IntFunction<T> f, T x) {
return TypeUtils.box(f.applyAsInt(x));
}

private static <T> Long box(LongFunction<T> f, T x) {
return TypeUtils.box(f.applyAsLong(x));
}

private static <T> Float box(FloatFunction<T> f, T x) {
return TypeUtils.box(f.applyAsFloat(x));
}

private static <T> Double box(DoubleFunction<T> f, T x) {
return TypeUtils.box(f.applyAsDouble(x));
}
}
Loading
Loading