Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka protobuf consumer support #4375

Merged
merged 32 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
543da37
Kafka protobuf consumer support
devinrsmith Aug 24, 2023
e64b976
Fix protobuf generation for extensions-protobuf
devinrsmith Aug 24, 2023
d641a30
Add schema_message_name
devinrsmith Aug 25, 2023
911189a
Add FieldOptions to better support how the options may evolve in the …
devinrsmith Aug 25, 2023
6f0d04b
Fix and simplify python side
devinrsmith Sep 5, 2023
e4c9ab9
f
devinrsmith Sep 5, 2023
513e7cf
Rename to ToX function
devinrsmith Sep 5, 2023
1abe406
Rename mapToX
devinrsmith Sep 5, 2023
0a5687a
More explicit visitor return types where applicable
devinrsmith Sep 5, 2023
73c0624
Remove equals/hashcode
devinrsmith Sep 5, 2023
99ba1bf
Cast instead of primitive
devinrsmith Sep 5, 2023
0ddbda0
Remove extraneous ToDoubleFunction.map
devinrsmith Sep 5, 2023
4114db4
Make param arguments more generic
devinrsmith Sep 5, 2023
05dc323
Update javadocs
devinrsmith Sep 5, 2023
8c6f86c
Add ByteString support for BytesValue
devinrsmith Sep 5, 2023
011032d
remove unused
devinrsmith Sep 5, 2023
f538b08
javadoc
devinrsmith Sep 5, 2023
5e9d7b9
box
devinrsmith Sep 6, 2023
0a9182d
more fixes
devinrsmith Sep 6, 2023
dc72866
Add canonical parsers
devinrsmith Sep 6, 2023
f569254
Add multi message parser support
devinrsmith Sep 6, 2023
1c4b1cd
Use DH nulls when bypassing null inputs
devinrsmith Sep 6, 2023
0e9fce2
Comments
devinrsmith Sep 6, 2023
04f09b7
python comment changes
devinrsmith Sep 6, 2023
c45a57d
FieldPath match and testing
devinrsmith Sep 6, 2023
b7a1538
ToXFunction testing
devinrsmith Sep 6, 2023
4a975f2
Make MessageParserSingle extend MessageParser
devinrsmith Sep 6, 2023
56b713d
Add BytesValue javadoc about ByteString
devinrsmith Sep 6, 2023
3302e23
final review points
devinrsmith Sep 6, 2023
bd93a6c
Kafka consume test
devinrsmith Sep 7, 2023
66bdec8
Better docs for schemaMessageName / schema_message_name
devinrsmith Sep 7, 2023
7baad9f
formatting
devinrsmith Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.api.ColumnName;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.functions.ToBooleanFunction;
Expand All @@ -35,6 +36,7 @@
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.ingest.MultiFieldChunkAdapter;
import io.deephaven.kafka.protobuf.ProtobufConsumeOptions;
import io.deephaven.kafka.protobuf.ProtobufConsumeOptions.FieldPathToColumnName;
import io.deephaven.protobuf.FieldNumberPath;
import io.deephaven.protobuf.FieldOptions;
import io.deephaven.protobuf.FieldPath;
Expand Down Expand Up @@ -118,9 +120,12 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient s
final List<FieldCopier> fieldCopiers = new ArrayList<>(functions.functions().size());
final KeyOrValueIngestData data = new KeyOrValueIngestData();
data.fieldPathToColumnName = new LinkedHashMap<>();
final Function<FieldPath, String> pathToColumnName = specs.pathToColumnName();
final FieldPathToColumnName fieldPathToColumnName = specs.pathToColumnName();
final Map<FieldPath, Integer> counts = new HashMap<>();
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
for (ProtobufFunction f : functions.functions()) {
add(pathToColumnName.apply(f.path()), f.function(), data, columnDefinitionsOut, fieldCopiers);
final int ix = counts.compute(f.path(), (fieldPath, ixCount) -> ixCount == null ? 0 : ixCount + 1);
final ColumnName columnName = fieldPathToColumnName.columnName(f.path(), ix);
add(columnName, f.function(), data, columnDefinitionsOut, fieldCopiers);
}
// we don't have enough info at this time to create KeyOrValueProcessorImpl
// data.extra = new KeyOrValueProcessorImpl(MultiFieldChunkAdapter.chunkOffsets(null, null), fieldCopiers,
Expand All @@ -130,13 +135,13 @@ KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient s
}

private void add(
String columnName,
ColumnName columnName,
TypedFunction<Message> function,
KeyOrValueIngestData data,
List<ColumnDefinition<?>> columnDefinitionsOut,
List<FieldCopier> fieldCopiersOut) {
data.fieldPathToColumnName.put(columnName, columnName);
columnDefinitionsOut.add(ColumnDefinition.of(columnName, function.returnType()));
data.fieldPathToColumnName.put(columnName.name(), columnName.name());
columnDefinitionsOut.add(ColumnDefinition.of(columnName.name(), function.returnType()));
fieldCopiersOut.add(FieldCopierAdapter.of(PROTOBUF_MESSAGE_OBJ.map(ToChunkTypeTransform.of(function))));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.protobuf.Descriptors.Descriptor;
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.api.ColumnName;
import io.deephaven.kafka.KafkaTools.Consume;
import io.deephaven.protobuf.FieldPath;
import io.deephaven.protobuf.ProtobufDescriptorParserOptions;
Expand All @@ -31,6 +32,20 @@
@BuildableStyle
public abstract class ProtobufConsumeOptions {

@FunctionalInterface
public interface FieldPathToColumnName {
/**
* Creates a unique column name from {@code fieldPath} and {@code indexOccurrence}. Implementations will need to
* take notice when {@code indexOccurrence > 0}, as that means a column name for {@code fieldPath} has already
* been generated {@code indexOccurrence + 1} times.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
*
* @param fieldPath the field path
* @param indexOccurrence the number of times a column name for fieldPath has already been generated
* @return the column name
*/
ColumnName columnName(FieldPath fieldPath, int indexOccurrence);
}

/**
* The builder.
*
Expand All @@ -41,13 +56,15 @@ public static Builder builder() {
}

/**
* Joins the name paths with underscores. Equivalent to {@code String.join("_", path.namePath())}.
* Joins the name paths with underscores, appending {@code indexOccurrence + 1} if {@code indexOccurrence != 0}.
*
* @param path the path
* @param indexOccurrence the number of times this field path has been used
* @return the underscore joined path names
*/
public static String joinNamePathWithUnderscore(FieldPath path) {
return String.join("_", path.namePath());
public static ColumnName joinNamePathWithUnderscore(FieldPath path, int indexOccurrence) {
final String simple = String.join("_", path.namePath());
return ColumnName.of(indexOccurrence == 0 ? simple : simple + "_" + (indexOccurrence + 1));
}

/**
Expand Down Expand Up @@ -94,12 +111,12 @@ public ProtobufDescriptorParserOptions parserOptions() {

/**
* The function to turn field paths into column names. By default, is the function
* {@link #joinNamePathWithUnderscore(FieldPath)}.
* {@link #joinNamePathWithUnderscore(FieldPath, int)}}.
*
* @return the function to create column names
*/
@Default
public Function<FieldPath, String> pathToColumnName() {
public FieldPathToColumnName pathToColumnName() {
return ProtobufConsumeOptions::joinNamePathWithUnderscore;
}

Expand All @@ -113,7 +130,7 @@ public interface Builder {

Builder parserOptions(ProtobufDescriptorParserOptions options);

Builder pathToColumnName(Function<FieldPath, String> pathToColumnName);
Builder pathToColumnName(FieldPathToColumnName pathToColumnName);

ProtobufConsumeOptions build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.deephaven.functions.ToIntFunction;
import io.deephaven.functions.ToLongFunction;
import io.deephaven.functions.ToObjectFunction;
import io.deephaven.functions.TypedFunction;
import io.deephaven.protobuf.FieldOptions.BytesBehavior;
import io.deephaven.qst.type.CustomType;
import io.deephaven.qst.type.GenericType;
Expand All @@ -44,7 +43,7 @@ class Builtin {
// Due to https://github.com/confluentinc/schema-registry/issues/2708, the parsers need to be built in such a way
// that they work with DynamicMessages.

static List<SingleValuedMessageParser> parsers() {
static List<MessageParserSingle> parsers() {
// Update javadoc in io.deephaven.protobuf.SingleValuedMessageParser.builtin when editing
return List.of(
TimestampParser.of(),
Expand All @@ -62,7 +61,7 @@ static List<SingleValuedMessageParser> parsers() {
customParser(FieldMask.class));
}

static <T extends Message> SingleValuedMessageParser customParser(Class<T> clazz) {
static <T extends Message> MessageParserSingle customParser(Class<T> clazz) {
try {
final Method method = clazz.getDeclaredMethod("getDescriptor");
final Descriptor descriptor = (Descriptor) method.invoke(null);
Expand All @@ -72,14 +71,14 @@ static <T extends Message> SingleValuedMessageParser customParser(Class<T> clazz
}
}

private enum TimestampParser implements SingleValuedMessageParser {
private enum TimestampParser implements MessageParserSingle {
INSTANCE;

private static final ToObjectFunction<Message, Instant> CANONICAL_FUNCTION = ToObjectFunction
.<Message, Timestamp>identity(Type.ofCustom(Timestamp.class))
.mapToObj(TimestampParser::parseCanonical, Type.instantType());

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -104,15 +103,15 @@ public ToObjectFunction<Message, Instant> messageParser(Descriptor descriptor,
}
}

private enum DurationParser implements SingleValuedMessageParser {
private enum DurationParser implements MessageParserSingle {
INSTANCE;

private static final ToObjectFunction<Message, Duration> CANONICAL_FUNCTION =
ToObjectFunction.<Message, com.google.protobuf.Duration>identity(
Type.ofCustom(com.google.protobuf.Duration.class))
.mapToObj(DurationParser::parseCanonical, Type.ofCustom(Duration.class));

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -137,14 +136,14 @@ public ToObjectFunction<Message, Duration> messageParser(Descriptor descriptor,
}
}

private enum BoolValueParser implements SingleValuedMessageParser {
private enum BoolValueParser implements MessageParserSingle {
INSTANCE;

private static final ToBooleanFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, BoolValue>identity(Type.ofCustom(BoolValue.class))
.mapToBoolean(BoolValue::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -164,14 +163,14 @@ public ToBooleanFunction<Message> messageParser(Descriptor descriptor, ProtobufD
}
}

private enum Int32ValueParser implements SingleValuedMessageParser {
private enum Int32ValueParser implements MessageParserSingle {
INSTANCE;

private static final ToIntFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, Int32Value>identity(Type.ofCustom(Int32Value.class))
.mapToInt(Int32Value::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -191,14 +190,14 @@ public ToIntFunction<Message> messageParser(Descriptor descriptor, ProtobufDescr
}
}

private enum UInt32ValueParser implements SingleValuedMessageParser {
private enum UInt32ValueParser implements MessageParserSingle {
INSTANCE;

private static final ToIntFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, UInt32Value>identity(Type.ofCustom(UInt32Value.class))
.mapToInt(UInt32Value::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -218,14 +217,14 @@ public ToIntFunction<Message> messageParser(Descriptor descriptor, ProtobufDescr
}
}

private enum Int64ValueParser implements SingleValuedMessageParser {
private enum Int64ValueParser implements MessageParserSingle {
INSTANCE;

private static final ToLongFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, Int64Value>identity(Type.ofCustom(Int64Value.class))
.mapToLong(Int64Value::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -245,14 +244,14 @@ public ToLongFunction<Message> messageParser(Descriptor descriptor, ProtobufDesc
}
}

private enum UInt64ValueParser implements SingleValuedMessageParser {
private enum UInt64ValueParser implements MessageParserSingle {
INSTANCE;

private static final ToLongFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, UInt64Value>identity(Type.ofCustom(UInt64Value.class))
.mapToLong(UInt64Value::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -272,14 +271,14 @@ public ToLongFunction<Message> messageParser(Descriptor descriptor, ProtobufDesc
}
}

private enum FloatValueParser implements SingleValuedMessageParser {
private enum FloatValueParser implements MessageParserSingle {
INSTANCE;

private static final ToFloatFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, FloatValue>identity(Type.ofCustom(FloatValue.class))
.mapToFloat(FloatValue::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -299,14 +298,14 @@ public ToFloatFunction<Message> messageParser(Descriptor descriptor, ProtobufDes
}
}

private enum DoubleValueParser implements SingleValuedMessageParser {
private enum DoubleValueParser implements MessageParserSingle {
INSTANCE;

private static final ToDoubleFunction<Message> CANONICAL_FUNCTION = ToObjectFunction
.<Message, DoubleValue>identity(Type.ofCustom(DoubleValue.class))
.mapToDouble(DoubleValue::getValue);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -326,14 +325,14 @@ public ToDoubleFunction<Message> messageParser(Descriptor descriptor, ProtobufDe
}
}

private enum StringValueParser implements SingleValuedMessageParser {
private enum StringValueParser implements MessageParserSingle {
INSTANCE;

private static final ToObjectFunction<Message, String> CANONICAL_FUNCTION = ToObjectFunction
.<Message, StringValue>identity(Type.ofCustom(StringValue.class))
.mapToObj(StringValue::getValue, Type.stringType());

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand All @@ -354,7 +353,7 @@ public ToObjectFunction<Message, String> messageParser(Descriptor descriptor,
}
}

private enum BytesValueParser implements SingleValuedMessageParser {
private enum BytesValueParser implements MessageParserSingle {
INSTANCE;

private static final ToObjectFunction<Message, ByteString> CANONICAL_FUNCTION_BYTESTRING = ToObjectFunction
Expand All @@ -365,7 +364,7 @@ private enum BytesValueParser implements SingleValuedMessageParser {
private static final ToObjectFunction<Message, byte[]> CANONICAL_FUNCTION_BYTEARRAY =
CANONICAL_FUNCTION_BYTESTRING.mapToObj(BYTESTRING_TO_BYTES);

public static SingleValuedMessageParser of() {
public static MessageParserSingle of() {
return INSTANCE;
}

Expand Down Expand Up @@ -393,7 +392,7 @@ public Descriptor canonicalDescriptor() {
}
}

private static class GenericSVMP<T extends Message> implements SingleValuedMessageParser {
private static class GenericSVMP<T extends Message> implements MessageParserSingle {
private final GenericType<T> type;
private final Descriptor descriptor;

Expand Down
Loading