diff --git a/extensions/kafka/TESTING.md b/extensions/kafka/TESTING.md
new file mode 100644
index 00000000000..d8a4c3c92a7
--- /dev/null
+++ b/extensions/kafka/TESTING.md
@@ -0,0 +1,20 @@
+# testing
+
+## ProtobufImpl schema change testing
+
+Most of the "simple" protobuf testing is done in the protobuf project, `project(':extensions-protobuf')`. One of
+the important parts with respect to the kafka integration is how schema changes are handled, and so that is tested here,
+[ProtobufImplSchemaChangeTest.java](src/test/java/io/deephaven/kafka/ProtobufImplSchemaChangeTest.java).
+
+The test development setup for this is a little unconventional due to the fact that protoc won't let you generate
+multiple versions of the same message type, at least not within the same protoc invocation. To work around this, there
+is a little bit of manual test development workflow needed to add a new message / message version. It requires:
+ * Uncommenting the proto plugin logic in [build.gradle](build.gradle)
+ * Adding a single new .proto file per version of the message you want to create; it must have `option java_multiple_files = false;` (only one .proto file with a given message name can compile at any given time)
+ * Generating the new test code, `./gradlew :extensions-kafka:generateTestProto`
+ * Moving the output from [build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen](build/generated/source/proto/test/java/io/deephaven/kafka/protobuf/gen) into [src/test/java/io/deephaven/kafka/protobuf/gen](src/test/java/io/deephaven/kafka/protobuf/gen)
+ * Renaming the .proto file to .proto.txt (to ensure it doesn't get re-generated later).
+ * Commenting out the proto plugin logic in [build.gradle](build.gradle)
+
+While it's likely possible to automate the above, it would likely be a lot of bespoke work that would probably not see
+much use elsewhere.
diff --git a/extensions/kafka/build.gradle b/extensions/kafka/build.gradle
index cfb627fa8e4..745a949797e 100644
--- a/extensions/kafka/build.gradle
+++ b/extensions/kafka/build.gradle
@@ -1,6 +1,8 @@
plugins {
id 'java-library'
id 'io.deephaven.project.register'
+ // See TESTING.md
+ // id 'com.google.protobuf' version '0.9.4'
}
description 'Kafka: Integrating Engine tables with Kafka'
@@ -8,14 +10,16 @@ description 'Kafka: Integrating Engine tables with Kafka'
dependencies {
api project(':engine-table')
- api 'org.apache.avro:avro:1.11.1'
+ api 'org.apache.avro:avro:1.11.2'
// Using io.confluent dependencies requires code in the toplevel build.gradle to add their maven repository.
// Note: the -ccs flavor is provided by confluent as their community edition. It is equivalent to the maven central
// version, but has a different version to make it easier to keep confluent dependencies aligned.
- api 'org.apache.kafka:kafka-clients:7.3.0-ccs'
- api 'io.confluent:kafka-avro-serializer:7.3.0'
- runtimeOnly 'io.confluent:kafka-protobuf-serializer:7.3.0'
+ api 'org.apache.kafka:kafka-clients:7.4.0-ccs'
+ api 'io.confluent:kafka-avro-serializer:7.4.0'
+
+ api 'io.confluent:kafka-protobuf-serializer:7.4.0'
+ api project(':extensions-protobuf')
implementation project(':Configuration')
implementation project(':log-factory')
@@ -37,6 +41,14 @@ spotless {
java {
targetExclude(
'**/**FieldCopier.java',
+ 'src/test/java/io/deephaven/kafka/protobuf/gen/**',
)
}
}
+
+// See TESTING.md
+//protobuf {
+// protoc {
+// artifact = 'com.google.protobuf:protoc:3.24.1'
+// }
+//}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/BoxTransform.java b/extensions/kafka/src/main/java/io/deephaven/kafka/BoxTransform.java
new file mode 100644
index 00000000000..70ed02548c0
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/BoxTransform.java
@@ -0,0 +1,289 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import io.deephaven.functions.ToBooleanFunction;
+import io.deephaven.functions.ToByteFunction;
+import io.deephaven.functions.ToCharFunction;
+import io.deephaven.functions.ToDoubleFunction;
+import io.deephaven.functions.ToFloatFunction;
+import io.deephaven.functions.ToIntFunction;
+import io.deephaven.functions.ToLongFunction;
+import io.deephaven.functions.ToObjectFunction;
+import io.deephaven.functions.ToPrimitiveFunction;
+import io.deephaven.functions.ToShortFunction;
+import io.deephaven.functions.TypedFunction;
+import io.deephaven.qst.type.BoxedBooleanType;
+import io.deephaven.qst.type.BoxedByteType;
+import io.deephaven.qst.type.BoxedCharType;
+import io.deephaven.qst.type.BoxedDoubleType;
+import io.deephaven.qst.type.BoxedFloatType;
+import io.deephaven.qst.type.BoxedIntType;
+import io.deephaven.qst.type.BoxedLongType;
+import io.deephaven.qst.type.BoxedShortType;
+import io.deephaven.util.type.TypeUtils;
+
+class BoxTransform {
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * For primitive functions {@code f}, see {@link #of(ToPrimitiveFunction)}.
+ *
+ *
+ * For object functions {@code f}, {@code box} is the identity function (and {@code f} will simply be returned).
+ *
+ * @param f the function
+ * @return the object function
+ * @param the input type
+ * @see #of(ToBooleanFunction)
+ * @see #of(ToCharFunction)
+ * @see #of(ToByteFunction)
+ * @see #of(ToShortFunction)
+ * @see #of(ToIntFunction)
+ * @see #of(ToLongFunction)
+ * @see #of(ToFloatFunction)
+ * @see #of(ToDoubleFunction)
+ */
+ public static ToObjectFunction of(TypedFunction f) {
+ return BoxedVisitor.of(f);
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * The returned object function will have return type {@code f.returnType().boxedType()}.
+ *
+ * @param f the primitive function
+ * @return the object function
+ * @param the input type
+ * @see #of(ToBooleanFunction)
+ * @see #of(ToCharFunction)
+ * @see #of(ToByteFunction)
+ * @see #of(ToShortFunction)
+ * @see #of(ToIntFunction)
+ * @see #of(ToLongFunction)
+ * @see #of(ToFloatFunction)
+ * @see #of(ToDoubleFunction)
+ */
+ public static ToObjectFunction of(ToPrimitiveFunction f) {
+ return BoxedVisitor.of(f);
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}, where {@code box} is simply a cast from {@code boolean} to
+ * {@code Boolean}.
+ *
+ *
+ * Equivalent to {@code x -> (Boolean)f.test(x)}.
+ *
+ * @param f the boolean function
+ * @return the object function
+ * @param the input type
+ */
+ public static ToObjectFunction of(ToBooleanFunction f) {
+ return ToObjectFunction.of(f::test, BoxedBooleanType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsChar(x))}.
+ *
+ * @param f the char function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(char)
+ */
+ public static ToObjectFunction of(ToCharFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedCharType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsByte(x))}.
+ *
+ * @param f the byte function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(byte)
+ */
+ public static ToObjectFunction of(ToByteFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedByteType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsShort(x))}.
+ *
+ * @param f the short function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(short)
+ */
+ public static ToObjectFunction of(ToShortFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedShortType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsInt(x))}.
+ *
+ * @param f the int function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(int)
+ */
+ public static ToObjectFunction of(ToIntFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedIntType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsLong(x))}.
+ *
+ * @param f the long function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(long)
+ */
+ public static ToObjectFunction of(ToLongFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedLongType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsFloat(x))}.
+ *
+ * @param f the float function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(float)
+ */
+ public static ToObjectFunction of(ToFloatFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedFloatType.of());
+ }
+
+ /**
+ * Creates the function composition {@code box ∘ f}.
+ *
+ *
+ * Equivalent to {@code x -> TypeUtils.box(f.applyAsDouble(x))}.
+ *
+ * @param f the double function
+ * @return the object function
+ * @param the input type
+ * @see TypeUtils#box(double)
+ */
+ public static ToObjectFunction of(ToDoubleFunction f) {
+ return ToObjectFunction.of(t -> box(f, t), BoxedDoubleType.of());
+ }
+
+ private enum BoxedVisitor implements TypedFunction.Visitor>,
+ ToPrimitiveFunction.Visitor> {
+ INSTANCE;
+
+ public static ToObjectFunction of(TypedFunction f) {
+ // noinspection unchecked
+ return f.walk((TypedFunction.Visitor>) (TypedFunction.Visitor, ?>) INSTANCE);
+ }
+
+ public static ToObjectFunction of(ToPrimitiveFunction f) {
+ // noinspection unchecked
+ return f.walk(
+ (ToPrimitiveFunction.Visitor>) (ToPrimitiveFunction.Visitor, ?>) INSTANCE);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToPrimitiveFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToObjectFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToObjectFunction visit(ToBooleanFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToCharFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToByteFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToShortFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToIntFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToLongFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToFloatFunction f) {
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToObjectFunction visit(ToDoubleFunction f) {
+ return BoxTransform.of(f);
+ }
+ }
+
+ private static Character box(ToCharFunction f, T x) {
+ return TypeUtils.box(f.applyAsChar(x));
+ }
+
+ private static Byte box(ToByteFunction f, T x) {
+ return TypeUtils.box(f.applyAsByte(x));
+ }
+
+ private static Short box(ToShortFunction f, T x) {
+ return TypeUtils.box(f.applyAsShort(x));
+ }
+
+ private static Integer box(ToIntFunction f, T x) {
+ return TypeUtils.box(f.applyAsInt(x));
+ }
+
+ private static Long box(ToLongFunction f, T x) {
+ return TypeUtils.box(f.applyAsLong(x));
+ }
+
+ private static Float box(ToFloatFunction f, T x) {
+ return TypeUtils.box(f.applyAsFloat(x));
+ }
+
+ private static Double box(ToDoubleFunction f, T x) {
+ return TypeUtils.box(f.applyAsDouble(x));
+ }
+}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/DhNullableTypeTransform.java b/extensions/kafka/src/main/java/io/deephaven/kafka/DhNullableTypeTransform.java
new file mode 100644
index 00000000000..9132913f185
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/DhNullableTypeTransform.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import io.deephaven.functions.ToBooleanFunction;
+import io.deephaven.functions.ToByteFunction;
+import io.deephaven.functions.ToCharFunction;
+import io.deephaven.functions.ToDoubleFunction;
+import io.deephaven.functions.ToFloatFunction;
+import io.deephaven.functions.ToIntFunction;
+import io.deephaven.functions.ToLongFunction;
+import io.deephaven.functions.ToObjectFunction;
+import io.deephaven.functions.ToPrimitiveFunction;
+import io.deephaven.functions.ToShortFunction;
+import io.deephaven.functions.TypedFunction;
+
+class DhNullableTypeTransform {
+
+ public static TypedFunction of(TypedFunction f) {
+ return NullableTypeVisitor.of(f);
+ }
+
+ public static TypedFunction of(ToPrimitiveFunction f) {
+ return NullableTypeVisitor.of(f);
+ }
+
+ private enum NullableTypeVisitor implements TypedFunction.Visitor>,
+ ToPrimitiveFunction.Visitor> {
+ INSTANCE;
+
+ public static TypedFunction of(TypedFunction f) {
+ // noinspection unchecked
+ return f.walk((TypedFunction.Visitor>) (TypedFunction.Visitor, ?>) INSTANCE);
+ }
+
+ public static TypedFunction of(ToPrimitiveFunction f) {
+ // noinspection unchecked
+ return f.walk(
+ (ToPrimitiveFunction.Visitor>) (ToPrimitiveFunction.Visitor, ?>) INSTANCE);
+ }
+
+ @Override
+ public TypedFunction visit(ToPrimitiveFunction f) {
+ return of(f);
+ }
+
+ @Override
+ public TypedFunction visit(ToObjectFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToObjectFunction visit(ToBooleanFunction f) {
+ // BooleanFunction is the only function / primitive type that doesn't natively have a "null" type.
+ return BoxTransform.of(f);
+ }
+
+ @Override
+ public ToCharFunction visit(ToCharFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToByteFunction visit(ToByteFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToShortFunction visit(ToShortFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToIntFunction visit(ToIntFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToLongFunction visit(ToLongFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToFloatFunction visit(ToFloatFunction f) {
+ return f;
+ }
+
+ @Override
+ public ToDoubleFunction visit(ToDoubleFunction f) {
+ return f;
+ }
+ }
+}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
index 1d337f8e697..acea6cff6a9 100644
--- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
@@ -4,6 +4,7 @@
package io.deephaven.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors.Descriptor;
import gnu.trove.map.hash.TIntLongHashMap;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@@ -53,6 +54,7 @@
import io.deephaven.kafka.KafkaTools.TableType.Blink;
import io.deephaven.kafka.KafkaTools.TableType.Ring;
import io.deephaven.kafka.KafkaTools.TableType.Visitor;
+import io.deephaven.kafka.ProtobufImpl.ProtobufConsumeImpl;
import io.deephaven.kafka.RawImpl.RawConsume;
import io.deephaven.kafka.RawImpl.RawProduce;
import io.deephaven.kafka.SimpleImpl.SimpleConsume;
@@ -62,9 +64,11 @@
import io.deephaven.kafka.ingest.KafkaRecordConsumer;
import io.deephaven.kafka.ingest.KafkaStreamPublisher;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
+import io.deephaven.kafka.protobuf.ProtobufConsumeOptions;
import io.deephaven.kafka.publish.KafkaPublisherException;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.kafka.publish.PublishToKafka;
+import io.deephaven.protobuf.ProtobufDescriptorParserOptions;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.stream.StreamChunkUtils;
import io.deephaven.stream.StreamConsumer;
@@ -513,6 +517,33 @@ public static KeyOrValueSpec avroSpec(final String schemaName) {
return new AvroConsume(schemaName, AVRO_LATEST_VERSION, DIRECT_MAPPING);
}
+ /**
+ * The kafka protobuf specs. This will fetch the {@link com.google.protobuf.Descriptors.Descriptor protobuf
+ * descriptor} for the {@link ProtobufConsumeOptions#schemaSubject() schema subject} from the schema registry
+ * using version {@link ProtobufConsumeOptions#schemaVersion() schema version} and create
+ * {@link com.google.protobuf.Message message} parsing functions according to
+ * {@link io.deephaven.protobuf.ProtobufDescriptorParser#parse(Descriptor, ProtobufDescriptorParserOptions)}.
+ * These functions will be adapted to handle schema changes.
+ *
+ *
+ * For purposes of reproducibility across restarts where schema changes may occur, it is advisable for callers
+ * to set a specific {@link ProtobufConsumeOptions#schemaVersion() schema version}. This will ensure the
+ * resulting {@link io.deephaven.engine.table.TableDefinition table definition} will not change across restarts.
+ * This gives the caller an explicit opportunity to update any downstream consumers when updating
+ * {@link ProtobufConsumeOptions#schemaVersion() schema version} if necessary.
+ *
+ * @param options the options
+ * @return the key or value spec
+ * @see io.deephaven.protobuf.ProtobufDescriptorParser#parse(Descriptor, ProtobufDescriptorParserOptions)
+ * parsing
+ * @see kafka
+ * protobuf serdes
+ */
+ public static KeyOrValueSpec protobufSpec(ProtobufConsumeOptions options) {
+ return new ProtobufConsumeImpl(options);
+ }
+
/**
* If {@code columnName} is set, that column name will be used. Otherwise, the names for the key or value
* columns can be provided in the properties as {@value KEY_COLUMN_NAME_PROPERTY} or
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueProcessorImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueProcessorImpl.java
new file mode 100644
index 00000000000..48607660a31
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueProcessorImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import io.deephaven.chunk.ObjectChunk;
+import io.deephaven.chunk.WritableChunk;
+import io.deephaven.chunk.attributes.Values;
+import io.deephaven.kafka.ingest.FieldCopier;
+import io.deephaven.kafka.ingest.KafkaIngesterException;
+import io.deephaven.kafka.ingest.KeyOrValueProcessor;
+
+import java.util.List;
+import java.util.Objects;
+
+final class KeyOrValueProcessorImpl implements KeyOrValueProcessor {
+
+ private final int[] chunkOffsets;
+ private final List copiers;
+ private final boolean allowNulls;
+
+ public KeyOrValueProcessorImpl(final int[] chunkOffsets, List copiers, boolean allowNulls) {
+ this.chunkOffsets = Objects.requireNonNull(chunkOffsets);
+ this.copiers = Objects.requireNonNull(copiers);
+ this.allowNulls = allowNulls;
+ }
+
+ @Override
+ public void handleChunk(ObjectChunk inputChunk, WritableChunk[] publisherChunks) {
+ if (!allowNulls) {
+ for (int ii = 0; ii < inputChunk.size(); ++ii) {
+ if (inputChunk.get(ii) == null) {
+ throw new KafkaIngesterException("Null records are not permitted");
+ }
+ }
+ }
+ for (int i = 0; i < chunkOffsets.length; ++i) {
+ final WritableChunk publisherChunk = publisherChunks[chunkOffsets[i]];
+ final int existingSize = publisherChunk.size();
+ publisherChunk.setSize(existingSize + inputChunk.size());
+ copiers.get(i).copyField(inputChunk, publisherChunk, 0, existingSize, inputChunk.size());
+ }
+ }
+}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/NullFunctions.java b/extensions/kafka/src/main/java/io/deephaven/kafka/NullFunctions.java
new file mode 100644
index 00000000000..7e71c9ddae8
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/NullFunctions.java
@@ -0,0 +1,131 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import io.deephaven.qst.type.BooleanType;
+import io.deephaven.qst.type.ByteType;
+import io.deephaven.qst.type.CharType;
+import io.deephaven.qst.type.DoubleType;
+import io.deephaven.qst.type.FloatType;
+import io.deephaven.qst.type.GenericType;
+import io.deephaven.qst.type.IntType;
+import io.deephaven.qst.type.LongType;
+import io.deephaven.qst.type.PrimitiveType;
+import io.deephaven.qst.type.ShortType;
+import io.deephaven.qst.type.Type;
+import io.deephaven.functions.ToBooleanFunction;
+import io.deephaven.functions.ToByteFunction;
+import io.deephaven.functions.ToCharFunction;
+import io.deephaven.functions.ToDoubleFunction;
+import io.deephaven.functions.ToFloatFunction;
+import io.deephaven.functions.ToIntFunction;
+import io.deephaven.functions.ToLongFunction;
+import io.deephaven.functions.ToObjectFunction;
+import io.deephaven.functions.ToPrimitiveFunction;
+import io.deephaven.functions.ToShortFunction;
+import io.deephaven.functions.TypedFunction;
+import io.deephaven.util.QueryConstants;
+
+import java.util.Optional;
+
+class NullFunctions {
+
+ public static Optional> of(Type> returnType) {
+ // noinspection unchecked
+ return Optional.ofNullable((TypedFunction) returnType.walk(NullFunctionVisitor.INSTANCE));
+ }
+
+ public static Optional> of(PrimitiveType> returnType) {
+ // noinspection unchecked
+ return Optional.ofNullable((ToPrimitiveFunction) returnType
+ .walk((PrimitiveType.Visitor>) NullFunctionVisitor.INSTANCE));
+ }
+
+ public static ToObjectFunction of(GenericType returnType) {
+ return ToObjectFunction.of(x -> null, returnType);
+ }
+
+ public static ToCharFunction nullCharFunction() {
+ return x -> QueryConstants.NULL_CHAR;
+ }
+
+ public static ToByteFunction nullByteFunction() {
+ return x -> QueryConstants.NULL_BYTE;
+ }
+
+ public static ToShortFunction nullShortFunction() {
+ return x -> QueryConstants.NULL_SHORT;
+ }
+
+ public static ToIntFunction nullIntFunction() {
+ return x -> QueryConstants.NULL_INT;
+ }
+
+ public static ToLongFunction nullLongFunction() {
+ return x -> QueryConstants.NULL_LONG;
+ }
+
+ public static ToFloatFunction nullFloatFunction() {
+ return x -> QueryConstants.NULL_FLOAT;
+ }
+
+ public static ToDoubleFunction nullDoubleFunction() {
+ return x -> QueryConstants.NULL_DOUBLE;
+ }
+
+ private enum NullFunctionVisitor
+ implements Type.Visitor>, PrimitiveType.Visitor> {
+ INSTANCE;
+
+ @Override
+ public ToPrimitiveFunction> visit(PrimitiveType> primitiveType) {
+ return of(primitiveType).orElse(null);
+ }
+
+ @Override
+ public TypedFunction> visit(GenericType> genericType) {
+ return of(genericType);
+ }
+
+ @Override
+ public ToBooleanFunction> visit(BooleanType booleanType) {
+ return null;
+ }
+
+ @Override
+ public ToByteFunction> visit(ByteType byteType) {
+ return nullByteFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(CharType charType) {
+ return nullCharFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(ShortType shortType) {
+ return nullShortFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(IntType intType) {
+ return nullIntFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(LongType longType) {
+ return nullLongFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(FloatType floatType) {
+ return nullFloatFunction();
+ }
+
+ @Override
+ public ToPrimitiveFunction> visit(DoubleType doubleType) {
+ return nullDoubleFunction();
+ }
+ }
+}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java
new file mode 100644
index 00000000000..93f60275d2c
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ProtobufImpl.java
@@ -0,0 +1,501 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Message;
+import io.confluent.kafka.schemaregistry.SchemaProvider;
+import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import io.deephaven.UncheckedDeephavenException;
+import io.deephaven.api.ColumnName;
+import io.deephaven.engine.table.ColumnDefinition;
+import io.deephaven.engine.table.TableDefinition;
+import io.deephaven.functions.ToBooleanFunction;
+import io.deephaven.functions.ToByteFunction;
+import io.deephaven.functions.ToCharFunction;
+import io.deephaven.functions.ToDoubleFunction;
+import io.deephaven.functions.ToFloatFunction;
+import io.deephaven.functions.ToIntFunction;
+import io.deephaven.functions.ToLongFunction;
+import io.deephaven.functions.ToObjectFunction;
+import io.deephaven.functions.ToPrimitiveFunction;
+import io.deephaven.functions.ToShortFunction;
+import io.deephaven.functions.TypedFunction;
+import io.deephaven.kafka.KafkaTools.Consume;
+import io.deephaven.kafka.KafkaTools.KeyOrValue;
+import io.deephaven.kafka.KafkaTools.KeyOrValueIngestData;
+import io.deephaven.kafka.ingest.FieldCopier;
+import io.deephaven.kafka.ingest.FieldCopierAdapter;
+import io.deephaven.kafka.ingest.KeyOrValueProcessor;
+import io.deephaven.kafka.ingest.MultiFieldChunkAdapter;
+import io.deephaven.kafka.protobuf.ProtobufConsumeOptions;
+import io.deephaven.kafka.protobuf.ProtobufConsumeOptions.FieldPathToColumnName;
+import io.deephaven.protobuf.FieldNumberPath;
+import io.deephaven.protobuf.FieldOptions;
+import io.deephaven.protobuf.FieldPath;
+import io.deephaven.protobuf.ProtobufDescriptorParser;
+import io.deephaven.protobuf.ProtobufDescriptorParserOptions;
+import io.deephaven.protobuf.ProtobufFunction;
+import io.deephaven.protobuf.ProtobufFunctions;
+import io.deephaven.protobuf.ProtobufFunctions.Builder;
+import io.deephaven.qst.type.ArrayType;
+import io.deephaven.qst.type.BooleanType;
+import io.deephaven.qst.type.BoxedType;
+import io.deephaven.qst.type.ByteType;
+import io.deephaven.qst.type.CharType;
+import io.deephaven.qst.type.CustomType;
+import io.deephaven.qst.type.DoubleType;
+import io.deephaven.qst.type.FloatType;
+import io.deephaven.qst.type.GenericType;
+import io.deephaven.qst.type.InstantType;
+import io.deephaven.qst.type.IntType;
+import io.deephaven.qst.type.LongType;
+import io.deephaven.qst.type.PrimitiveType;
+import io.deephaven.qst.type.ShortType;
+import io.deephaven.qst.type.StringType;
+import io.deephaven.qst.type.Type;
+import io.deephaven.qst.type.Type.Visitor;
+import io.deephaven.util.annotations.VisibleForTesting;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * This layer builds on top of {@link ProtobufDescriptorParser#parse(Descriptor, ProtobufDescriptorParserOptions)} by
+ * further transforming the functions according to {@link #withMostAppropriateType(TypedFunction)}, and then further
+ * adapting the functions to ensure they work for the expected chunk types {@link ToChunkTypeTransform}. This layer is
+ * also responsible for managing schema changes; in essence, ensuring that newly
+ * {@link ProtobufDescriptorParser#parse(Descriptor, ProtobufDescriptorParserOptions) parsed} {@link Descriptor
+ * descriptor} {@link TypedFunction functions} can be adapted into the original function type.
+ */
+class ProtobufImpl {
+
+ @VisibleForTesting
+ static ProtobufFunctions schemaChangeAwareFunctions(Descriptor descriptor,
+ ProtobufDescriptorParserOptions options) {
+ return new ParsedStates(descriptor, options).functionsForSchemaChanges();
+ }
+
+ static final class ProtobufConsumeImpl extends Consume.KeyOrValueSpec {
+
+ private static final ToObjectFunction PROTOBUF_MESSAGE_OBJ =
+ ToObjectFunction.identity(Type.ofCustom(Message.class));
+
+ private final ProtobufConsumeOptions specs;
+
+ ProtobufConsumeImpl(ProtobufConsumeOptions specs) {
+ this.specs = Objects.requireNonNull(specs);
+ }
+
+ @Override
+ public Optional getSchemaProvider() {
+ return Optional.of(new ProtobufSchemaProvider());
+ }
+
+ @Override
+ Deserializer> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
+ Map configs) {
+ return new KafkaProtobufDeserializer<>(Objects.requireNonNull(schemaRegistryClient));
+ }
+
+ @Override
+ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
+ Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) {
+ final Descriptor descriptor;
+ try {
+ descriptor = getDescriptor(schemaRegistryClient);
+ } catch (RestClientException | IOException e) {
+ throw new UncheckedDeephavenException(e);
+ }
+ final ProtobufFunctions functions = schemaChangeAwareFunctions(descriptor, specs.parserOptions());
+ final List fieldCopiers = new ArrayList<>(functions.functions().size());
+ final KeyOrValueIngestData data = new KeyOrValueIngestData();
+ data.fieldPathToColumnName = new LinkedHashMap<>();
+ final FieldPathToColumnName fieldPathToColumnName = specs.pathToColumnName();
+ final Map indices = new HashMap<>();
+ for (ProtobufFunction f : functions.functions()) {
+ final int ix = indices.compute(f.path(), (fieldPath, i) -> i == null ? 0 : i + 1);
+ final ColumnName columnName = fieldPathToColumnName.columnName(f.path(), ix);
+ add(columnName, f.function(), data, columnDefinitionsOut, fieldCopiers);
+ }
+ // we don't have enough info at this time to create KeyOrValueProcessorImpl
+ // data.extra = new KeyOrValueProcessorImpl(MultiFieldChunkAdapter.chunkOffsets(null, null), fieldCopiers,
+ // false);
+ data.extra = fieldCopiers;
+ return data;
+ }
+
+ private void add(
+ ColumnName columnName,
+ TypedFunction function,
+ KeyOrValueIngestData data,
+ List> columnDefinitionsOut,
+ List fieldCopiersOut) {
+ data.fieldPathToColumnName.put(columnName.name(), columnName.name());
+ columnDefinitionsOut.add(ColumnDefinition.of(columnName.name(), function.returnType()));
+ fieldCopiersOut.add(FieldCopierAdapter.of(PROTOBUF_MESSAGE_OBJ.map(ToChunkTypeTransform.of(function))));
+ }
+
+ @Override
+ KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
+ // noinspection unchecked
+ return new KeyOrValueProcessorImpl(
+ MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName),
+ (List) data.extra, false);
+ }
+
+ private Descriptor getDescriptor(SchemaRegistryClient schemaRegistryClient)
+ throws RestClientException, IOException {
+ final SchemaMetadata metadata = specs.schemaVersion().isPresent()
+ ? schemaRegistryClient.getSchemaMetadata(specs.schemaSubject(), specs.schemaVersion().getAsInt())
+ : schemaRegistryClient.getLatestSchemaMetadata(specs.schemaSubject());
+ if (!ProtobufSchema.TYPE.equals(metadata.getSchemaType())) {
+ throw new IllegalStateException(String.format("Expected schema type %s but was %s", ProtobufSchema.TYPE,
+ metadata.getSchemaType()));
+ }
+ final ProtobufSchema protobufSchema = (ProtobufSchema) schemaRegistryClient
+ .getSchemaBySubjectAndId(specs.schemaSubject(), metadata.getId());
+ // The potential need to set io.deephaven.kafka.protobuf.ProtobufConsumeOptions#schemaMessageName
+ // seems unfortunate; I'm surprised the information is not part of the kafka serdes protocol.
+ // Maybe it's so that a single schema can be used, and different topics with different root messages can
+ // all share that common schema?
+ return specs.schemaMessageName().isPresent()
+ ? protobufSchema.toDescriptor(specs.schemaMessageName().get())
+ : protobufSchema.toDescriptor();
+ }
+ }
+
+
+ private static ProtobufFunctions withMostAppropriateType(ProtobufFunctions functions) {
+ final Builder builder = ProtobufFunctions.builder();
+ for (ProtobufFunction f : functions.functions()) {
+ builder.addFunctions(ProtobufFunction.of(f.path(), withMostAppropriateType(f.function())));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Adapts {@code f} to the most appropriate Deephaven equivalent / nullable type.
+ *
+ *
+ * boolean -> Boolean
+ * Byte -> byte
+ * Character -> char
+ * Short -> short
+ * Integer -> int
+ * Long -> long
+ * Float -> float
+ * Double -> double
+ *
+ */
+ private static TypedFunction withMostAppropriateType(TypedFunction f) {
+ final TypedFunction f2 = DhNullableTypeTransform.of(f);
+ final ToPrimitiveFunction unboxed = UnboxTransform.of(f2).orElse(null);
+ return unboxed != null ? unboxed : f2;
+ }
+
+ private static class ParsedStates {
+ private final Descriptor originalDescriptor;
+ private final ProtobufDescriptorParserOptions options;
+ private final Map parsed;
+
+ private ParsedStates(Descriptor originalDescriptor, ProtobufDescriptorParserOptions options) {
+ this.originalDescriptor = Objects.requireNonNull(originalDescriptor);
+ this.options = Objects.requireNonNull(options);
+ this.parsed = new HashMap<>();
+ getOrCreate(originalDescriptor);
+ }
+
+ public ProtobufFunctions functionsForSchemaChanges() {
+ final Builder builder = ProtobufFunctions.builder();
+ for (ProtobufFunction f : getOrCreate(originalDescriptor).functions()) {
+ builder.addFunctions(ProtobufFunction.of(f.path(), new ForPath(f).adaptForSchemaChanges()));
+ }
+ return builder.build();
+ }
+
+ private ProtobufFunctions getOrCreate(Descriptor descriptor) {
+ return parsed.computeIfAbsent(descriptor, this::create);
+ }
+
+ private ProtobufFunctions create(Descriptor newDescriptor) {
+ if (!originalDescriptor.getFullName().equals(newDescriptor.getFullName())) {
+ throw new IllegalArgumentException(String.format(
+ "Expected descriptor names to match. expected='%s', actual='%s'. You may need to explicitly set schema_message_name.",
+ originalDescriptor.getFullName(), newDescriptor.getFullName()));
+ }
+ if (newDescriptor == originalDescriptor) {
+ return withMostAppropriateType(ProtobufDescriptorParser.parse(newDescriptor, options));
+ }
+ final Function adaptedOptions = fieldPath -> {
+ final FieldPath originalFieldPath = adaptFieldPath(fieldPath).orElse(null);
+ if (originalFieldPath == null) {
+ // This must be a new field, exclude it.
+ return FieldOptions.exclude();
+ }
+ return options.fieldOptions().apply(originalFieldPath);
+ };
+ final ProtobufDescriptorParserOptions a = ProtobufDescriptorParserOptions.builder()
+ .parsers(options.parsers())
+ .fieldOptions(adaptedOptions)
+ .build();
+ return withMostAppropriateType(ProtobufDescriptorParser.parse(newDescriptor, a));
+ }
+
+ private Optional adaptFieldPath(FieldPath path) {
+ if (path.path().isEmpty()) {
+ return Optional.of(path);
+ }
+ final List originalFds = new ArrayList<>(path.path().size());
+ Descriptor descriptor = originalDescriptor;
+ final Iterator it = path.path().iterator();
+ while (true) {
+ final FieldDescriptor currentFd = it.next();
+ final FieldDescriptor originalFd = descriptor.findFieldByNumber(currentFd.getNumber());
+ if (originalFd == null) {
+ // originalFd does not exist
+ return Optional.empty();
+ }
+ originalFds.add(originalFd);
+ if (!it.hasNext()) {
+ break;
+ }
+ descriptor = originalFd.getMessageType();
+ }
+ return Optional.of(FieldPath.of(originalFds));
+ }
+
+ private class ForPath {
+ private final ProtobufFunction originalFunction;
+ private final Map> functions;
+
+ public ForPath(ProtobufFunction originalFunction) {
+ this.originalFunction = Objects.requireNonNull(originalFunction);
+ this.functions = new HashMap<>();
+ }
+
+ public TypedFunction adaptForSchemaChanges() {
+ final Type> originalReturnType = originalReturnType();
+ final TypedFunction out = originalReturnType.walk(new AdaptForSchemaChanges());
+ if (!originalReturnType.equals(out.returnType())) {
+ throw new IllegalStateException(String.format(
+ "AdaptForSchemaChanges error, mismatched types for %s. expected=%s, actual=%s",
+ originalFunction.path().namePath(), originalReturnType, out.returnType()));
+ }
+ return out;
+ }
+
+ private Type> originalReturnType() {
+ return originalFunction.function().returnType();
+ }
+
+ private boolean test(Message message) {
+ return ((ToBooleanFunction) getOrCreateForType(message)).test(message);
+ }
+
+ private char applyAsChar(Message message) {
+ return ((ToCharFunction) getOrCreateForType(message)).applyAsChar(message);
+ }
+
+ private byte applyAsByte(Message message) {
+ return ((ToByteFunction) getOrCreateForType(message)).applyAsByte(message);
+ }
+
+ private short applyAsShort(Message message) {
+ return ((ToShortFunction) getOrCreateForType(message)).applyAsShort(message);
+ }
+
+ private int applyAsInt(Message message) {
+ return ((ToIntFunction) getOrCreateForType(message)).applyAsInt(message);
+ }
+
+ private long applyAsLong(Message message) {
+ return ((ToLongFunction) getOrCreateForType(message)).applyAsLong(message);
+ }
+
+ private float applyAsFloat(Message message) {
+ return ((ToFloatFunction) getOrCreateForType(message)).applyAsFloat(message);
+ }
+
+ private double applyAsDouble(Message message) {
+ return ((ToDoubleFunction) getOrCreateForType(message)).applyAsDouble(message);
+ }
+
+ private T applyAsObject(Message message) {
+ return ((ToObjectFunction) getOrCreateForType(message)).apply(message);
+ }
+
+ private TypedFunction getOrCreateForType(Message message) {
+ return getOrCreate(message.getDescriptorForType());
+ }
+
+ private TypedFunction getOrCreate(Descriptor descriptor) {
+ return functions.computeIfAbsent(descriptor, this::createFunctionFor);
+ }
+
+ private TypedFunction createFunctionFor(Descriptor descriptor) {
+ final Type> originalReturnType = originalReturnType();
+ final TypedFunction newFunction =
+ find(ParsedStates.this.getOrCreate(descriptor), originalFunction.path().numberPath())
+ .map(ProtobufFunction::function)
+ .orElse(null);
+ final TypedFunction adaptedFunction =
+ SchemaChangeAdaptFunction.of(newFunction, originalReturnType).orElse(null);
+ if (adaptedFunction == null) {
+ throw new UncheckedDeephavenException(
+ String.format("Incompatible schema change for %s, originalType=%s, newType=%s",
+ originalFunction.path().namePath(), originalReturnType,
+ newFunction == null ? null : newFunction.returnType()));
+ }
+ if (!originalReturnType.equals(adaptedFunction.returnType())) {
+ // If this happens, must be a logical error in SchemaChangeAdaptFunction
+ throw new IllegalStateException(String.format(
+ "Expected adapted return types to be equal for %s, originalType=%s, adaptedType=%s",
+ originalFunction.path().namePath(), originalReturnType, adaptedFunction.returnType()));
+ }
+ return adaptedFunction;
+ }
+
+ class AdaptForSchemaChanges
+ implements Visitor>, PrimitiveType.Visitor> {
+
+ @Override
+ public TypedFunction visit(PrimitiveType> primitiveType) {
+ return primitiveType.walk((PrimitiveType.Visitor>) this);
+ }
+
+ @Override
+ public ToObjectFunction visit(GenericType> genericType) {
+ // noinspection unchecked
+ return ToObjectFunction.of(ForPath.this::applyAsObject, (GenericType) genericType);
+ }
+
+ @Override
+ public ToBooleanFunction visit(BooleanType booleanType) {
+ return ForPath.this::test;
+ }
+
+ @Override
+ public ToByteFunction visit(ByteType byteType) {
+ return ForPath.this::applyAsByte;
+ }
+
+ @Override
+ public ToCharFunction visit(CharType charType) {
+ return ForPath.this::applyAsChar;
+ }
+
+ @Override
+ public ToShortFunction visit(ShortType shortType) {
+ return ForPath.this::applyAsShort;
+ }
+
+ @Override
+ public ToIntFunction visit(IntType intType) {
+ return ForPath.this::applyAsInt;
+ }
+
+ @Override
+ public ToLongFunction visit(LongType longType) {
+ return ForPath.this::applyAsLong;
+ }
+
+ @Override
+ public ToFloatFunction visit(FloatType floatType) {
+ return ForPath.this::applyAsFloat;
+ }
+
+ @Override
+ public ToDoubleFunction visit(DoubleType doubleType) {
+ return ForPath.this::applyAsDouble;
+ }
+ }
+ }
+ }
+
+ private static class SchemaChangeAdaptFunction implements TypedFunction.Visitor> {
+
+ public static Optional> of(TypedFunction f, Type> desiredReturnType) {
+ if (f == null) {
+ return NullFunctions.of(desiredReturnType);
+ }
+ if (desiredReturnType.equals(f.returnType())) {
+ return Optional.of(f);
+ }
+ return Optional.ofNullable(f.walk(new SchemaChangeAdaptFunction<>(desiredReturnType)));
+ }
+
+ private final Type> desiredReturnType;
+
+ public SchemaChangeAdaptFunction(Type> desiredReturnType) {
+ this.desiredReturnType = Objects.requireNonNull(desiredReturnType);
+ }
+
+ @Override
+ public TypedFunction visit(ToPrimitiveFunction f) {
+ if (desiredReturnType.equals(f.returnType().boxedType())) {
+ return BoxTransform.of(f);
+ }
+ return null;
+ }
+
+ @Override
+ public TypedFunction visit(ToObjectFunction f) {
+ return f.returnType().walk(new GenericType.Visitor<>() {
+ @Override
+ public TypedFunction visit(BoxedType> boxedType) {
+ if (desiredReturnType.equals(boxedType.primitiveType())) {
+ return UnboxTransform.of(f).orElse(null);
+ }
+ return null;
+ }
+
+ @Override
+ public TypedFunction visit(StringType stringType) {
+ return null;
+ }
+
+ @Override
+ public TypedFunction visit(InstantType instantType) {
+ return null;
+ }
+
+ @Override
+ public TypedFunction visit(ArrayType, ?> arrayType) {
+ return null;
+ }
+
+ @Override
+ public TypedFunction visit(CustomType> customType) {
+ return null;
+ }
+ });
+ }
+ }
+
+ private static Optional find(ProtobufFunctions f, FieldNumberPath numberPath) {
+ for (ProtobufFunction function : f.functions()) {
+ if (numberPath.equals(function.path().numberPath())) {
+ return Optional.of(function);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ToChunkTypeTransform.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ToChunkTypeTransform.java
new file mode 100644
index 00000000000..0e03f8dfd09
--- /dev/null
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ToChunkTypeTransform.java
@@ -0,0 +1,210 @@
+/**
+ * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
+ */
+package io.deephaven.kafka;
+
+import io.deephaven.functions.ToByteFunction;
+import io.deephaven.functions.ToLongFunction;
+import io.deephaven.functions.ToObjectFunction;
+import io.deephaven.functions.ToPrimitiveFunction;
+import io.deephaven.functions.TypedFunction;
+import io.deephaven.qst.type.ArrayType;
+import io.deephaven.qst.type.BoxedBooleanType;
+import io.deephaven.qst.type.BoxedByteType;
+import io.deephaven.qst.type.BoxedCharType;
+import io.deephaven.qst.type.BoxedDoubleType;
+import io.deephaven.qst.type.BoxedFloatType;
+import io.deephaven.qst.type.BoxedIntType;
+import io.deephaven.qst.type.BoxedLongType;
+import io.deephaven.qst.type.BoxedShortType;
+import io.deephaven.qst.type.BoxedType;
+import io.deephaven.qst.type.CustomType;
+import io.deephaven.qst.type.GenericType;
+import io.deephaven.qst.type.InstantType;
+import io.deephaven.qst.type.StringType;
+import io.deephaven.time.DateTimeUtils;
+import io.deephaven.util.BooleanUtils;
+
+import java.time.Instant;
+import java.util.Objects;
+
+import static io.deephaven.kafka.UnboxTransform.unboxByte;
+import static io.deephaven.kafka.UnboxTransform.unboxChar;
+import static io.deephaven.kafka.UnboxTransform.unboxDouble;
+import static io.deephaven.kafka.UnboxTransform.unboxFloat;
+import static io.deephaven.kafka.UnboxTransform.unboxInt;
+import static io.deephaven.kafka.UnboxTransform.unboxLong;
+import static io.deephaven.kafka.UnboxTransform.unboxShort;
+
+class ToChunkTypeTransform {
+
+ private static final ToByteFunction BOOLEAN_AS_BYTE = BooleanUtils::booleanAsByte;
+ private static final ToLongFunction EPOCH_NANOS = DateTimeUtils::epochNanos;
+
+ /**
+ * Transform the {@link TypedFunction function} {@code f} into its expected chunk type function.
+ *
+ * @param f the function
+ * @return the chunk type function
+ * @param the input type
+ * @see #of(ToPrimitiveFunction)
+ * @see #of(ToObjectFunction)
+ */
+ public static TypedFunction of(TypedFunction f) {
+ return FunctionVisitor.of(f);
+ }
+
+ /**
+ * Transform the {@link ToPrimitiveFunction function} {@code f} into its expected chunk type function.
+ *
+ * @param f the function
+ * @return the chunk type function
+ * @param the input type
+ */
+ public static TypedFunction of(ToPrimitiveFunction f) {
+ return f;
+ }
+
+ /**
+ * Transform the {@link ToPrimitiveFunction function} {@code f} into its expected chunk type function.
+ *
+ * @param f the Object function
+ * @return the chunk type function
+ * @param the input type
+ * @see #toEpochNanos(ToObjectFunction)
+ * @see #unboxBooleanAsByte(ToObjectFunction)
+ * @see UnboxTransform#unboxByte(ToObjectFunction)
+ * @see UnboxTransform#unboxChar(ToObjectFunction)
+ * @see UnboxTransform#unboxDouble(ToObjectFunction)
+ * @see UnboxTransform#unboxFloat(ToObjectFunction)
+ * @see UnboxTransform#unboxInt(ToObjectFunction)
+ * @see UnboxTransform#unboxLong(ToObjectFunction)
+ * @see UnboxTransform#unboxShort(ToObjectFunction)
+ */
+ public static TypedFunction of(ToObjectFunction f) {
+ return ObjectFunctionVisitor.of(f);
+ }
+
+ /**
+ * Equivalent to {@code f.mapByte(BooleanUtils::booleanAsByte)}.
+ *
+ * @param f the Boolean function
+ * @return the byte function
+ * @param the input type
+ * @see BooleanUtils#booleanAsByte(Boolean)
+ */
+ public static ToByteFunction unboxBooleanAsByte(ToObjectFunction f) {
+ return f.mapToByte(BOOLEAN_AS_BYTE);
+ }
+
+ /**
+ * Equivalent to {@code f.mapLong(DateTimeUtils::epochNanos)}.
+ *
+ * @param f the instant function
+ * @return the epoch nanos function
+ * @param the input type
+ * @see DateTimeUtils#epochNanos(Instant)
+ */
+ public static ToLongFunction toEpochNanos(ToObjectFunction f) {
+ return f.mapToLong(EPOCH_NANOS);
+ }
+
+ private static class FunctionVisitor implements TypedFunction.Visitor