Skip to content

Commit

Permalink
Add KafkaTools.Consume#objectProcessorSpec for custom KeyOrValueSpecs
Browse files Browse the repository at this point in the history
This introduces a new public interface, ObjectProcessor, that provides a means for end-users to create custom KeyOrValueSpec for use with Kafka ingestion. The interface isn't specific to kafka though, and should be extensible to other domains in the future.

The existing KeyOrValue specs are left unchanged; but arguably, there may be value in porting our existing implementations over to it in the future.

Fixes deephaven#4346
  • Loading branch information
devinrsmith committed Oct 16, 2023
1 parent b14b349 commit b8bb1f3
Show file tree
Hide file tree
Showing 11 changed files with 1,203 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.annotations.SimpleStyle;
import io.deephaven.annotations.SingletonStyle;
import io.deephaven.chunk.ChunkType;
import io.deephaven.processor.ObjectProcessor;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessManager;
import io.deephaven.engine.liveness.LivenessScope;
Expand Down Expand Up @@ -122,7 +123,11 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.*;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.IntToLongFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static io.deephaven.kafka.ingest.KafkaStreamPublisher.NULL_COLUMN_INDEX;

Expand Down Expand Up @@ -569,6 +574,41 @@ public static KeyOrValueSpec simpleSpec(final String columnName) {
public static KeyOrValueSpec rawSpec(ColumnHeader<?> header, Class<? extends Deserializer<?>> deserializer) {
return new RawConsume(ColumnDefinition.from(header), deserializer);
}

/**
* Creates a kafka key or value spec implementation from an {@link ObjectProcessor}.
*
* <p>
* The respective column definition are derived from the combination of {@code columnNames} and
* {@link ObjectProcessor#outputTypes()}.
*
* @param deserializer the deserializer
* @param processor the object processor
* @param columnNames the column names
* @return the Kafka key or value spec
* @param <T> the object type
*/
public static <T> KeyOrValueSpec objectProcessorSpec(
Deserializer<T> deserializer,
ObjectProcessor<T> processor,
List<String> columnNames) {
return new KeyOrValueSpecObjectProcessorImpl<>(deserializer, processor, columnNames);
}

/**
* Creates a kafka key or value spec implementation from a byte-array {@link ObjectProcessor}.
*
* <p>
* Equivalent to {@code objectProcessorSpec(new ByteArrayDeserializer(), processor, columnNames)}.
*
* @param processor the byte-array object processor
* @param columnNames the column names
* @return the Kafka key or value spec
*/
@SuppressWarnings("unused")
public static KeyOrValueSpec objectProcessorSpec(ObjectProcessor<byte[]> processor, List<String> columnNames) {
return objectProcessorSpec(new ByteArrayDeserializer(), processor, columnNames);
}
}

public static class Produce {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.deephaven.kafka;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec;
import io.deephaven.kafka.KafkaTools.KeyOrValue;
import io.deephaven.kafka.KafkaTools.KeyOrValueIngestData;
import io.deephaven.kafka.ingest.KafkaStreamPublisher;
import io.deephaven.kafka.ingest.KeyOrValueProcessor;
import io.deephaven.kafka.ingest.MultiFieldChunkAdapter;
import io.deephaven.processor.ObjectProcessor;
import io.deephaven.qst.type.Type;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

/**
* This implementation is useful for presenting an easier onboarding ramp and better (and public) interface
* {@link KafkaTools.Consume#objectProcessorSpec(Deserializer, ObjectProcessor, List)} for end-users. The
* {@link ObjectProcessor} is a user-visible replacement for {@link KeyOrValueProcessor}. In the meantime though, we are
* adapting into a {@link KeyOrValueProcessor} until such a time when {@link KafkaStreamPublisher} can be re-written to
* take advantage of these better interfaces.
*/
class KeyOrValueSpecObjectProcessorImpl<T> extends KeyOrValueSpec implements KeyOrValueProcessor {
private final Deserializer<T> deserializer;
private final ObjectProcessor<T> processor;
private final List<String> columnNames;
private Function<WritableChunk<?>[], List<WritableChunk<?>>> offsetsAdapter;

KeyOrValueSpecObjectProcessorImpl(
Deserializer<T> deserializer, ObjectProcessor<T> processor, List<String> columnNames) {
if (columnNames.size() != processor.outputTypes().size()) {
throw new IllegalArgumentException("Expected columnNames and processor.outputTypes() to be the same size");
}
if (columnNames.stream().distinct().count() != columnNames.size()) {
throw new IllegalArgumentException("Expected columnNames to have distinct values");
}
this.deserializer = Objects.requireNonNull(deserializer);
this.processor = Objects.requireNonNull(processor);
this.columnNames = List.copyOf(columnNames);
}

@Override
public Optional<SchemaProvider> getSchemaProvider() {
return Optional.empty();
}

@Override
protected Deserializer<T> getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs) {
return deserializer;
}

@Override
protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient,
Map<String, ?> configs, MutableInt nextColumnIndexMut, List<ColumnDefinition<?>> columnDefinitionsOut) {
final KeyOrValueIngestData data = new KeyOrValueIngestData();
data.fieldPathToColumnName = new LinkedHashMap<>();
final int L = columnNames.size();
for (int i = 0; i < L; ++i) {
final String columnName = columnNames.get(i);
final Type<?> type = processor.outputTypes().get(i);
data.fieldPathToColumnName.put(columnName, columnName);
columnDefinitionsOut.add(ColumnDefinition.of(columnName, type));
}
return data;
}

@Override
protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) {
offsetsAdapter = offsetsFunction(MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName));
return this;
}

@Override
public void handleChunk(ObjectChunk<Object, Values> inputChunk, WritableChunk<Values>[] publisherChunks) {
// noinspection unchecked
final ObjectChunk<T, ?> in = (ObjectChunk<T, ?>) inputChunk;
// we except isInOrder to be true, so apply should be an O(1) op no matter how many columns there are.
processor.processAll(in, offsetsAdapter.apply(publisherChunks));
}

private static <T> Function<T[], List<T>> offsetsFunction(int[] offsets) {
return offsets.length == 0
? array -> Collections.emptyList()
: isInOrder(offsets)
? array -> Arrays.asList(array).subList(offsets[0], offsets[0] + offsets.length)
: array -> reorder(array, offsets);
}

private static boolean isInOrder(int[] offsets) {
for (int i = 1; i < offsets.length; ++i) {
if (offsets[i - 1] + 1 != offsets[i]) {
return false;
}
}
return true;
}

private static <T> List<T> reorder(T[] array, int[] offsets) {
final List<T> out = new ArrayList<>(offsets.length);
for (int offset : offsets) {
out.add(array[offset]);
}
return out;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.WritableShortChunk;
import io.deephaven.functions.ToBooleanFunction;
import io.deephaven.functions.ToByteFunction;
import io.deephaven.functions.ToCharFunction;
import io.deephaven.functions.ToDoubleFunction;
import io.deephaven.functions.ToFloatFunction;
import io.deephaven.functions.ToIntFunction;
import io.deephaven.functions.ToLongFunction;
import io.deephaven.functions.ToObjectFunction;
import io.deephaven.functions.ToShortFunction;

class ChunkUtils {
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;

public class ChunkUtils {

public static <T> void applyInto(
ToBooleanFunction<T> booleanFunction,
ObjectChunk<T, ?> src,
Predicate<? super T> booleanFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableBooleanChunk<?> dest,
int destOffset,
Expand All @@ -38,8 +39,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToByteFunction<T> byteFunction,
ObjectChunk<T, ?> src,
ToByteFunction<? super T> byteFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableByteChunk<?> dest,
int destOffset,
Expand All @@ -50,8 +51,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToCharFunction<T> charFunction,
ObjectChunk<T, ?> src,
ToCharFunction<? super T> charFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableCharChunk<?> dest,
int destOffset,
Expand All @@ -62,8 +63,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToShortFunction<T> shortFunction,
ObjectChunk<T, ?> src,
ToShortFunction<? super T> shortFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableShortChunk<?> dest,
int destOffset,
Expand All @@ -74,8 +75,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToIntFunction<T> intFunction,
ObjectChunk<T, ?> src,
ToIntFunction<? super T> intFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableIntChunk<?> dest,
int destOffset,
Expand All @@ -85,10 +86,9 @@ public static <T> void applyInto(
}
}


public static <T> void applyInto(
ToLongFunction<T> longFunction,
ObjectChunk<T, ?> src,
ToLongFunction<? super T> longFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableLongChunk<?> dest,
int destOffset,
Expand All @@ -99,8 +99,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToFloatFunction<T> floatFunction,
ObjectChunk<T, ?> src,
ToFloatFunction<? super T> floatFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableFloatChunk<?> dest,
int destOffset,
Expand All @@ -111,8 +111,8 @@ public static <T> void applyInto(
}

public static <T> void applyInto(
ToDoubleFunction<T> doubleFunction,
ObjectChunk<T, ?> src,
ToDoubleFunction<? super T> doubleFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableDoubleChunk<?> dest,
int destOffset,
Expand All @@ -123,8 +123,8 @@ public static <T> void applyInto(
}

public static <T, R> void applyInto(
ToObjectFunction<T, R> objFunction,
ObjectChunk<T, ?> src,
Function<? super T, ? extends R> objFunction,
ObjectChunk<? extends T, ?> src,
int srcOffset,
WritableObjectChunk<R, ?> dest,
int destOffset,
Expand Down
Loading

0 comments on commit b8bb1f3

Please sign in to comment.