Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka protobuf consumer support #4375

Merged
merged 32 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
543da37
Kafka protobuf consumer support
devinrsmith Aug 24, 2023
e64b976
Fix protobuf generation for extensions-protobuf
devinrsmith Aug 24, 2023
d641a30
Add schema_message_name
devinrsmith Aug 25, 2023
911189a
Add FieldOptions to better support how the options may evolve in the …
devinrsmith Aug 25, 2023
6f0d04b
Fix and simplify python side
devinrsmith Sep 5, 2023
e4c9ab9
f
devinrsmith Sep 5, 2023
513e7cf
Rename to ToX function
devinrsmith Sep 5, 2023
1abe406
Rename mapToX
devinrsmith Sep 5, 2023
0a5687a
More explicit visitor return types where applicable
devinrsmith Sep 5, 2023
73c0624
Remove equals/hashcode
devinrsmith Sep 5, 2023
99ba1bf
Cast instead of primitive
devinrsmith Sep 5, 2023
0ddbda0
Remove extraneous ToDoubleFunction.map
devinrsmith Sep 5, 2023
4114db4
Make param arguments more generic
devinrsmith Sep 5, 2023
05dc323
Update javadocs
devinrsmith Sep 5, 2023
8c6f86c
Add ByteString support for BytesValue
devinrsmith Sep 5, 2023
011032d
remove unused
devinrsmith Sep 5, 2023
f538b08
javadoc
devinrsmith Sep 5, 2023
5e9d7b9
box
devinrsmith Sep 6, 2023
0a9182d
more fixes
devinrsmith Sep 6, 2023
dc72866
Add canonical parsers
devinrsmith Sep 6, 2023
f569254
Add multi message parser support
devinrsmith Sep 6, 2023
1c4b1cd
Use DH nulls when bypassing null inputs
devinrsmith Sep 6, 2023
0e9fce2
Comments
devinrsmith Sep 6, 2023
04f09b7
python comment changes
devinrsmith Sep 6, 2023
c45a57d
FieldPath match and testing
devinrsmith Sep 6, 2023
b7a1538
ToXFunction testing
devinrsmith Sep 6, 2023
4a975f2
Make MessageParserSingle extend MessageParser
devinrsmith Sep 6, 2023
56b713d
Add BytesValue javadoc about ByteString
devinrsmith Sep 6, 2023
3302e23
final review points
devinrsmith Sep 6, 2023
bd93a6c
Kafka consume test
devinrsmith Sep 7, 2023
66bdec8
Better docs for schemaMessageName / schema_message_name
devinrsmith Sep 7, 2023
7baad9f
formatting
devinrsmith Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static <T> ToByteFunction<T> cast() {
*
* @param f the inner function
* @param g the outer function
* @return the boolean function
* @return the byte function
* @param <T> the input type
* @param <R> the intermediate type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The {@link #path()} to a {@link com.google.protobuf.Descriptors.Descriptor Descriptor's} field.
Expand All @@ -26,14 +25,33 @@
@SimpleStyle
public abstract class FieldPath {

private static final FieldPath EMPTY = of(List.of());

/**
* Creates an empty field path. Equivalent to {@code of(List.of())}.
*
* @return the empty field path
*/
public static FieldPath empty() {
return of(List.of());
return EMPTY;
}

/**
* Creates a field path with {@code descriptors}.
*
* @param descriptors the descriptors
* @return the field path
*/
public static FieldPath of(FieldDescriptor... descriptors) {
return of(Arrays.asList(descriptors));
}

/**
* Creates a field path with {@code descriptors}.
*
* @param descriptors the descriptors
* @return the field path
*/
public static FieldPath of(List<FieldDescriptor> descriptors) {
return ImmutableFieldPath.of(descriptors);
}
Expand Down Expand Up @@ -87,27 +105,36 @@ public static List<String> toNamePath(String simplePath) {
return Arrays.asList(simplePath.split("/"));
}

/**
* The ordered field descriptors which make up the field path.
*
* @return the path
*/
@Parameter
public abstract List<FieldDescriptor> path();

/**
* The number path for this field path. Equivalent to
* {@code FieldNumberPath.of(path().stream().mapToInt(FieldDescriptor::getNumber).toArray())}.
*
* @return the number path
*/
@Lazy
public FieldNumberPath numberPath() {
return FieldNumberPath.of(path().stream().mapToInt(FieldDescriptor::getNumber).toArray());
}

/**
* The name path for this field path. Equivalent to
* {@code path().stream().map(FieldDescriptor::getName).collect(Collectors.toList())}.
*
* @return the name path
**/
@Lazy
public List<String> namePath() {
return path().stream().map(FieldDescriptor::getName).collect(Collectors.toList());
}

public final FieldPath prefixWith(FieldDescriptor prefix) {
return FieldPath.of(Stream.concat(Stream.of(prefix), path().stream()).collect(Collectors.toList()));
}

public final FieldPath append(FieldDescriptor fieldDescriptor) {
return FieldPath.of(Stream.concat(path().stream(), Stream.of(fieldDescriptor)).collect(Collectors.toList()));
}

public final boolean startsWith(List<String> prefix) {
return startsWith(namePath(), prefix);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class ProtobufDescriptorParserImpl {

Expand Down Expand Up @@ -125,7 +126,7 @@ private class FieldContext {
public FieldContext(DescriptorContext parent, FieldDescriptor fd) {
this.parent = Objects.requireNonNull(parent);
this.fd = Objects.requireNonNull(fd);
this.fieldPath = parent.fieldPath.append(fd);
this.fieldPath = append(parent.fieldPath, fd);
}

private ProtobufFunctions functions() {
Expand Down Expand Up @@ -240,7 +241,7 @@ private ProtobufFunctions functions() {
? e.function()
: BypassOnNull.of(e.function());
builder.addFunctions(
ProtobufFunction.of(e.path().prefixWith(fd), value.mapInput(fieldAsMessage)));
ProtobufFunction.of(prepend(e.path(), fd), value.mapInput(fieldAsMessage)));
}
return builder.build();
}
Expand Down Expand Up @@ -275,7 +276,7 @@ private ProtobufFunctions functions() {
if (valueFd == null) {
throw new IllegalStateException("Expected map to have field descriptor number 2 (value)");
}
final DescriptorContext dc = new DescriptorContext(parent.fieldPath.append(fd), fd.getMessageType());
final DescriptorContext dc = new DescriptorContext(append(parent.fieldPath, fd), fd.getMessageType());

// Note: maps are a "special" case, where even though we don't include the key / value FDs as a return
// io.deephaven.protobuf.ProtobufFunction#path, it's important that we force their inclusion if we've
Expand Down Expand Up @@ -356,7 +357,7 @@ private ProtobufFunctions functions() {
final Builder builder = ProtobufFunctions.builder();
for (ProtobufFunction f : functions.functions()) {
final ToObjectFunction<Message, ?> repeatedTf = f.function().walk(new ToRepeatedType());
builder.addFunctions(ProtobufFunction.of(f.path().prefixWith(fd), repeatedTf));
builder.addFunctions(ProtobufFunction.of(prepend(f.path(), fd), repeatedTf));
}
return builder.build();
}
Expand Down Expand Up @@ -543,4 +544,12 @@ private static <T> T[] toArray(Message message, FieldDescriptor fd, ToObjectFunc
}
return array;
}

private static FieldPath prepend(FieldPath f, FieldDescriptor fd) {
return FieldPath.of(Stream.concat(Stream.of(fd), f.path().stream()).collect(Collectors.toList()));
}

private static FieldPath append(FieldPath f, FieldDescriptor fd) {
return FieldPath.of(Stream.concat(f.path().stream(), Stream.of(fd)).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@
@SimpleStyle
public abstract class ProtobufFunction {

public static ProtobufFunction of(TypedFunction<Message> f) {
/**
* Creates the unnamed protobuf function. Equivalent to {@code of(FieldPath.empty(), f)}.
*
* @param f the function
* @return the unnamed protobuf function
*/
public static ProtobufFunction unnammed(TypedFunction<Message> f) {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
return of(FieldPath.empty(), f);
}

/**
* Creates the protobuf function.
*
* @param path the field path
* @param f the function
* @return the protobuf function
*/
public static ProtobufFunction of(FieldPath path, TypedFunction<Message> f) {
return ImmutableProtobufFunction.of(path, f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,39 @@
@BuildableStyle
public abstract class ProtobufFunctions {

private static final ProtobufFunctions EMPTY = builder().build();

public static Builder builder() {
return ImmutableProtobufFunctions.builder();
}

/**
* Creates an empty protobuf functions. Equivalent to {@code builder().build()}.
*
* @return the empty protobuf functions
*/
public static ProtobufFunctions empty() {
return builder().build();
return EMPTY;
}

public static ProtobufFunctions unnamed(TypedFunction<Message> tf) {
return of(ProtobufFunction.of(tf));
/**
* Creates a protobuf functions with a single, unnamed {@code function}. Equivalent to
* {@code builder().addFunctions(ProtobufFunction.unnammed(function)).build()}.
*
* @param function the function
* @return the protobuf functions
*/
public static ProtobufFunctions unnamed(TypedFunction<Message> function) {
return builder().addFunctions(ProtobufFunction.unnammed(function)).build();
}

/**
* Creates a protobuf fuctions with {@code functions}. Equivalent to
* {@code builder().addFunctions(functions).build()}.
*
* @param functions the functions
* @return the protobuf functions
*/
public static ProtobufFunctions of(ProtobufFunction... functions) {
return builder().addFunctions(functions).build();
}
Expand Down