diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 55523475eb0..2ffdc15116d 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -16,6 +16,7 @@ import io.deephaven.annotations.SimpleStyle; import io.deephaven.annotations.SingletonStyle; import io.deephaven.chunk.ChunkType; +import io.deephaven.processor.ObjectProcessor; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; @@ -122,7 +123,11 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.*; +import java.util.function.Function; +import java.util.function.IntPredicate; +import java.util.function.IntToLongFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; import static io.deephaven.kafka.ingest.KafkaStreamPublisher.NULL_COLUMN_INDEX; @@ -569,6 +574,41 @@ public static KeyOrValueSpec simpleSpec(final String columnName) { public static KeyOrValueSpec rawSpec(ColumnHeader header, Class> deserializer) { return new RawConsume(ColumnDefinition.from(header), deserializer); } + + /** + * Creates a kafka key or value spec implementation from an {@link ObjectProcessor}. + * + *

+ * The respective column definition are derived from the combination of {@code columnNames} and + * {@link ObjectProcessor#outputTypes()}. + * + * @param deserializer the deserializer + * @param processor the object processor + * @param columnNames the column names + * @return the Kafka key or value spec + * @param the object type + */ + public static KeyOrValueSpec objectProcessorSpec( + Deserializer deserializer, + ObjectProcessor processor, + List columnNames) { + return new KeyOrValueSpecObjectProcessorImpl<>(deserializer, processor, columnNames); + } + + /** + * Creates a kafka key or value spec implementation from a byte-array {@link ObjectProcessor}. + * + *

+ * Equivalent to {@code objectProcessorSpec(new ByteArrayDeserializer(), processor, columnNames)}. + * + * @param processor the byte-array object processor + * @param columnNames the column names + * @return the Kafka key or value spec + */ + @SuppressWarnings("unused") + public static KeyOrValueSpec objectProcessorSpec(ObjectProcessor processor, List columnNames) { + return objectProcessorSpec(new ByteArrayDeserializer(), processor, columnNames); + } } public static class Produce { diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueSpecObjectProcessorImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueSpecObjectProcessorImpl.java new file mode 100644 index 00000000000..548026e7a14 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KeyOrValueSpecObjectProcessorImpl.java @@ -0,0 +1,121 @@ +package io.deephaven.kafka; + +import io.confluent.kafka.schemaregistry.SchemaProvider; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.TableDefinition; +import io.deephaven.kafka.KafkaTools.Consume.KeyOrValueSpec; +import io.deephaven.kafka.KafkaTools.KeyOrValue; +import io.deephaven.kafka.KafkaTools.KeyOrValueIngestData; +import io.deephaven.kafka.ingest.KafkaStreamPublisher; +import io.deephaven.kafka.ingest.KeyOrValueProcessor; +import io.deephaven.kafka.ingest.MultiFieldChunkAdapter; +import io.deephaven.processor.ObjectProcessor; +import io.deephaven.qst.type.Type; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * This implementation is useful for presenting an easier onboarding ramp and better (and public) interface + * {@link KafkaTools.Consume#objectProcessorSpec(Deserializer, ObjectProcessor, List)} for end-users. The + * {@link ObjectProcessor} is a user-visible replacement for {@link KeyOrValueProcessor}. In the meantime though, we are + * adapting into a {@link KeyOrValueProcessor} until such a time when {@link KafkaStreamPublisher} can be re-written to + * take advantage of these better interfaces. + */ +class KeyOrValueSpecObjectProcessorImpl extends KeyOrValueSpec implements KeyOrValueProcessor { + private final Deserializer deserializer; + private final ObjectProcessor processor; + private final List columnNames; + private Function[], List>> offsetsAdapter; + + KeyOrValueSpecObjectProcessorImpl( + Deserializer deserializer, ObjectProcessor processor, List columnNames) { + if (columnNames.size() != processor.outputTypes().size()) { + throw new IllegalArgumentException("Expected columnNames and processor.outputTypes() to be the same size"); + } + if (columnNames.stream().distinct().count() != columnNames.size()) { + throw new IllegalArgumentException("Expected columnNames to have distinct values"); + } + this.deserializer = Objects.requireNonNull(deserializer); + this.processor = Objects.requireNonNull(processor); + this.columnNames = List.copyOf(columnNames); + } + + @Override + public Optional getSchemaProvider() { + return Optional.empty(); + } + + @Override + protected Deserializer getDeserializer(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + Map configs) { + return deserializer; + } + + @Override + protected KeyOrValueIngestData getIngestData(KeyOrValue keyOrValue, SchemaRegistryClient schemaRegistryClient, + Map configs, MutableInt nextColumnIndexMut, List> columnDefinitionsOut) { + final KeyOrValueIngestData data = new KeyOrValueIngestData(); + data.fieldPathToColumnName = new LinkedHashMap<>(); + final int L = columnNames.size(); + for (int i = 0; i < L; ++i) { + final String columnName = columnNames.get(i); + final Type type = processor.outputTypes().get(i); + data.fieldPathToColumnName.put(columnName, columnName); + columnDefinitionsOut.add(ColumnDefinition.of(columnName, type)); + } + return data; + } + + @Override + protected KeyOrValueProcessor getProcessor(TableDefinition tableDef, KeyOrValueIngestData data) { + offsetsAdapter = offsetsFunction(MultiFieldChunkAdapter.chunkOffsets(tableDef, data.fieldPathToColumnName)); + return this; + } + + @Override + public void handleChunk(ObjectChunk inputChunk, WritableChunk[] publisherChunks) { + // noinspection unchecked + final ObjectChunk in = (ObjectChunk) inputChunk; + // we except isInOrder to be true, so apply should be an O(1) op no matter how many columns there are. + processor.processAll(in, offsetsAdapter.apply(publisherChunks)); + } + + private static Function> offsetsFunction(int[] offsets) { + return offsets.length == 0 + ? array -> Collections.emptyList() + : isInOrder(offsets) + ? array -> Arrays.asList(array).subList(offsets[0], offsets[0] + offsets.length) + : array -> reorder(array, offsets); + } + + private static boolean isInOrder(int[] offsets) { + for (int i = 1; i < offsets.length; ++i) { + if (offsets[i - 1] + 1 != offsets[i]) { + return false; + } + } + return true; + } + + private static List reorder(T[] array, int[] offsets) { + final List out = new ArrayList<>(offsets.length); + for (int offset : offsets) { + out.add(array[offset]); + } + return out; + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/ChunkUtils.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/ChunkUtils.java index 4b08821302f..f7bdb400b4d 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/ChunkUtils.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/ChunkUtils.java @@ -13,21 +13,22 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.WritableShortChunk; -import io.deephaven.functions.ToBooleanFunction; import io.deephaven.functions.ToByteFunction; import io.deephaven.functions.ToCharFunction; -import io.deephaven.functions.ToDoubleFunction; import io.deephaven.functions.ToFloatFunction; -import io.deephaven.functions.ToIntFunction; -import io.deephaven.functions.ToLongFunction; -import io.deephaven.functions.ToObjectFunction; import io.deephaven.functions.ToShortFunction; -class ChunkUtils { +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; + +public class ChunkUtils { public static void applyInto( - ToBooleanFunction booleanFunction, - ObjectChunk src, + Predicate booleanFunction, + ObjectChunk src, int srcOffset, WritableBooleanChunk dest, int destOffset, @@ -38,8 +39,8 @@ public static void applyInto( } public static void applyInto( - ToByteFunction byteFunction, - ObjectChunk src, + ToByteFunction byteFunction, + ObjectChunk src, int srcOffset, WritableByteChunk dest, int destOffset, @@ -50,8 +51,8 @@ public static void applyInto( } public static void applyInto( - ToCharFunction charFunction, - ObjectChunk src, + ToCharFunction charFunction, + ObjectChunk src, int srcOffset, WritableCharChunk dest, int destOffset, @@ -62,8 +63,8 @@ public static void applyInto( } public static void applyInto( - ToShortFunction shortFunction, - ObjectChunk src, + ToShortFunction shortFunction, + ObjectChunk src, int srcOffset, WritableShortChunk dest, int destOffset, @@ -74,8 +75,8 @@ public static void applyInto( } public static void applyInto( - ToIntFunction intFunction, - ObjectChunk src, + ToIntFunction intFunction, + ObjectChunk src, int srcOffset, WritableIntChunk dest, int destOffset, @@ -85,10 +86,9 @@ public static void applyInto( } } - public static void applyInto( - ToLongFunction longFunction, - ObjectChunk src, + ToLongFunction longFunction, + ObjectChunk src, int srcOffset, WritableLongChunk dest, int destOffset, @@ -99,8 +99,8 @@ public static void applyInto( } public static void applyInto( - ToFloatFunction floatFunction, - ObjectChunk src, + ToFloatFunction floatFunction, + ObjectChunk src, int srcOffset, WritableFloatChunk dest, int destOffset, @@ -111,8 +111,8 @@ public static void applyInto( } public static void applyInto( - ToDoubleFunction doubleFunction, - ObjectChunk src, + ToDoubleFunction doubleFunction, + ObjectChunk src, int srcOffset, WritableDoubleChunk dest, int destOffset, @@ -123,8 +123,8 @@ public static void applyInto( } public static void applyInto( - ToObjectFunction objFunction, - ObjectChunk src, + Function objFunction, + ObjectChunk src, int srcOffset, WritableObjectChunk dest, int destOffset, diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java new file mode 100644 index 00000000000..439590262e8 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessor.java @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.qst.type.BooleanType; +import io.deephaven.qst.type.BoxedType; +import io.deephaven.qst.type.ByteType; +import io.deephaven.qst.type.CharType; +import io.deephaven.qst.type.DoubleType; +import io.deephaven.qst.type.FloatType; +import io.deephaven.qst.type.GenericType; +import io.deephaven.qst.type.InstantType; +import io.deephaven.qst.type.IntType; +import io.deephaven.qst.type.LongType; +import io.deephaven.qst.type.ShortType; +import io.deephaven.qst.type.Type; + +import java.time.Instant; +import java.util.List; + +/** + * An interface for processing data from one or more input objects into output chunks on a 1-to-1 input to output basis. + * + * @param the object type + * @see ObjectProcessorRowLimited + */ +public interface ObjectProcessor { + + /** + * Creates or returns an implementation that adds strict safety checks around {@code delegate} + * {@link #processAll(ObjectChunk, List)}. The may be useful during for development or debugging purposes. + * + * @param delegate the delegate + * @return the strict implementation + * @param the object type + */ + static ObjectProcessor strict(ObjectProcessor delegate) { + return ObjectProcessorStrict.create(delegate); + } + + /** + * The relationship between {@link #outputTypes() output types} and the {@link #processAll(ObjectChunk, List) + * processAll out param} {@link WritableChunk#getChunkType()}. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
{@link Type}{@link ChunkType}
{@link ByteType}{@link ChunkType#Byte}
{@link ShortType}{@link ChunkType#Short}
{@link IntType}{@link ChunkType#Int}
{@link LongType}{@link ChunkType#Long}
{@link FloatType}{@link ChunkType#Float}
{@link DoubleType}{@link ChunkType#Double}
{@link CharType}{@link ChunkType#Char}
{@link BooleanType}{@link ChunkType#Byte} (not {@link ChunkType#Boolean})
{@link BoxedType}Same as {@link BoxedType#primitiveType()} would yield.
{@link InstantType}{@link ChunkType#Long} ({@link io.deephaven.time.DateTimeUtils#epochNanos(Instant)})
All other {@link GenericType}{@link ChunkType#Object}
+ */ + static ChunkType chunkType(Type type) { + return ObjectProcessorTypes.of(type); + } + + /** + * The logical output types {@code this} instance processes. The size and types correspond to the expected size and + * {@link io.deephaven.chunk.ChunkType chunk types} for {@link #processAll(ObjectChunk, List)} as specified by + * {@link #chunkType(Type)}. + * + * @return the output types + */ + List> outputTypes(); + + /** + * Processes {@code in} into {@code out} by appending {@code in.size()} values to each chunk. The size of each + * {@code out} chunk will be incremented by {@code in.size()}. Implementations are free to process the data in a + * row-oriented, column-oriented, or mix-oriented fashion. Implementations must not keep any references to the + * passed-in chunks. + * + *

+ * If an exception thrown the output chunks will be in an unspecified state. + * + * @param in the input objects + * @param out the output chunks as specified by {@link #outputTypes()}; each chunk must have remaining capacity of + * at least {@code in.size()} + */ + void processAll(ObjectChunk in, List> out); +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowBase.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowBase.java new file mode 100644 index 00000000000..8e906dac008 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowBase.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; + +import java.util.List; + +/** + * A specialization that {@link #processAll(ObjectChunk, List) processes all} one row at a time. + * + *

+ * In particular, this is a useful construct when {@code T} is a {@link java.nio.ByteBuffer} or {@code byte[]} type and + * it makes sense to parse straight from bytes into output chunks (as opposed to parsing into a more intermediate state + * and then processing into chunks in a column-oriented fashion). + * + * @param the object type + */ +public abstract class ObjectProcessorRowBase implements ObjectProcessorRowLimited { + + @Override + public final int rowLimit() { + return 1; + } + + /** + * Implementations are responsible for parsing {@code in} into {@code out} without increasing the output + * chunk size; as such implementations are responsible for setting the cell at position {@code chunk.size() + i} for + * each output {@code chunk}. Implementations must not keep any references to the passed-in chunks. + * + * @param in in the input object + * @param out the output chunks as specified by {@link #outputTypes()} + * @param i the cell offset relative to each output {@code chunk.size()} + */ + protected abstract void parse(T in, List> out, int i); + + @Override + public final void processAll(ObjectChunk in, List> out) { + final int inSize = in.size(); + for (int i = 0; i < inSize; ++i) { + parse(in.get(i), out, i); + } + for (WritableChunk outChunk : out) { + outChunk.setSize(outChunk.size() + inSize); + } + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimited.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimited.java new file mode 100644 index 00000000000..2a62cab8275 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimited.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ObjectChunk; + +import java.util.List; + +/** + * A specialization of {@link ObjectProcessor} that provides row-limited guarantees around the implementation of + * {@link #processAll(ObjectChunk, List)}. + * + *

+ * For use-cases that are naturally performed one row at a time, callers are encouraged to extend + * {@link ObjectProcessorRowBase}. For use-cases where the implementation can be columnar but callers would like to add + * a row limit, callers are encouraged to create a columnar implementation against {@link ObjectProcessor} and use + * {@link #of(ObjectProcessor, int)}. + * + * @param the object type + */ +public interface ObjectProcessorRowLimited extends ObjectProcessor { + + /** + * Creates or returns a row-limited implementation. If {@code delegate} is already limited more than + * {@code rowLimit}, {@code delegate} is returned. Otherwise, a row-limited implementation is created that wraps + * {@code delegate} and invokes {@link #processAll(ObjectChunk, List) delegate#processAll} with {@code rowLimit} + * sized out chunks, except for the last invocation which may have size less-than {@code rowLimit}. + * + *

+ * Adding a row-limit may be useful in cases where the input objects are "wide". By limiting the number of rows + * considered at any given time, there may be better opportunity for read caching. + * + * @param delegate the delegate + * @param rowLimit the max chunk size + * @return the row-limited processor + * @param the object type + */ + static ObjectProcessorRowLimited of(ObjectProcessor delegate, int rowLimit) { + if (delegate instanceof ObjectProcessorRowLimited) { + final ObjectProcessorRowLimited limited = (ObjectProcessorRowLimited) delegate; + if (limited.rowLimit() <= rowLimit) { + // already limited more than rowLimit + return limited; + } + if (limited instanceof ObjectProcessorRowLimitedImpl) { + // don't want to wrap multiple times, so operate on the inner delegate + return of(((ObjectProcessorRowLimitedImpl) limited).delegate(), rowLimit); + } + } + return new ObjectProcessorRowLimitedImpl<>(delegate, rowLimit); + } + + /** + * A guarantee that {@link #processAll(ObjectChunk, List)} operates on at most row-limit rows at a time. + * + * @return the row-limit + */ + int rowLimit(); +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimitedImpl.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimitedImpl.java new file mode 100644 index 00000000000..12bdbf57208 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorRowLimitedImpl.java @@ -0,0 +1,102 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.ResettableObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.engine.primitive.iterator.CloseableIterator; +import io.deephaven.qst.type.Type; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +final class ObjectProcessorRowLimitedImpl implements ObjectProcessorRowLimited { + private final ObjectProcessor delegate; + private final int rowLimit; + + ObjectProcessorRowLimitedImpl(ObjectProcessor delegate, int rowLimit) { + if (rowLimit <= 0) { + throw new IllegalArgumentException("rowLimit must be positive"); + } + if (rowLimit == Integer.MAX_VALUE) { + throw new IllegalArgumentException("rowLimit must be less than Integer.MAX_VALUE"); + } + this.delegate = Objects.requireNonNull(delegate); + this.rowLimit = rowLimit; + } + + ObjectProcessor delegate() { + return delegate; + } + + @Override + public List> outputTypes() { + return delegate.outputTypes(); + } + + @Override + public int rowLimit() { + return rowLimit; + } + + @Override + public void processAll(ObjectChunk in, List> out) { + for (ObjectChunk slice : iterable(in, rowLimit)) { + delegate.processAll(slice, out); + } + } + + // Ideally, these would be built into Chunk impls + + private static Iterable> iterable( + ObjectChunk source, int sliceSize) { + if (source.size() <= sliceSize) { + return List.of(source); + } + // Note: we need to create the "for pool" version to guarantee we are getting a plain version; we know we don't + // need to close them. + return () -> iterator(source, ResettableObjectChunk.makeResettableChunkForPool(), sliceSize); + } + + private static Iterator> iterator( + ObjectChunk source, ResettableObjectChunk slice, int sliceSize) { + return new ObjectChunkSliceIterator<>(source, slice, sliceSize); + } + + private static class ObjectChunkSliceIterator + implements CloseableIterator> { + private final ObjectChunk source; + private final ResettableObjectChunk slice; + private final int sliceSize; + private int ix = 0; + + private ObjectChunkSliceIterator( + ObjectChunk source, + ResettableObjectChunk slice, + int sliceSize) { + this.source = Objects.requireNonNull(source); + this.slice = slice; + this.sliceSize = sliceSize; + } + + @Override + public boolean hasNext() { + return ix < source.size() && ix >= 0; + } + + @Override + public ObjectChunk next() { + if (ix >= source.size() || ix < 0) { + throw new NoSuchElementException(); + } + slice.resetFromTypedChunk(source, ix, Math.min(sliceSize, source.size() - ix)); + ix += sliceSize; + return slice; + } + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java new file mode 100644 index 00000000000..51456d62a8a --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorStrict.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.qst.type.Type; + +import java.util.List; +import java.util.Objects; + +class ObjectProcessorStrict implements ObjectProcessor { + + static ObjectProcessor create(ObjectProcessor delegate) { + if (delegate instanceof ObjectProcessorStrict) { + return delegate; + } + return new ObjectProcessorStrict<>(delegate); + } + + private final ObjectProcessor delegate; + + ObjectProcessorStrict(ObjectProcessor delegate) { + this.delegate = Objects.requireNonNull(delegate); + } + + @Override + public List> outputTypes() { + return delegate.outputTypes(); + } + + @Override + public void processAll(ObjectChunk in, List> out) { + final int numColumns = delegate.outputTypes().size(); + if (numColumns != out.size()) { + throw new IllegalArgumentException(String.format( + "Expected delegate.outputTypes().size() == out.size(). delegate.outputTypes().size()=%d, out.size()=%d", + numColumns, out.size())); + } + final int[] originalSizes = new int[numColumns]; + for (int i = 0; i < numColumns; ++i) { + final WritableChunk chunk = out.get(i); + if (chunk.capacity() - chunk.size() < in.size()) { + throw new IllegalArgumentException(String.format( + "out chunk does not have enough remaining capacity. i=%d, in.size()=%d, chunk.size()=%d, chunk.capacity()=%d", + i, in.size(), chunk.size(), chunk.capacity())); + } + final Type type = delegate.outputTypes().get(i); + final ChunkType expectedChunkType = ObjectProcessor.chunkType(type); + final ChunkType actualChunkType = chunk.getChunkType(); + if (!expectedChunkType.equals(actualChunkType)) { + throw new IllegalArgumentException(String.format( + "Improper ChunkType. i=%d, outputType=%s, expectedChunkType=%s, actualChunkType=%s", i, type, + expectedChunkType, actualChunkType)); + } + originalSizes[i] = chunk.size(); + } + delegate.processAll(in, out); + for (int i = 0; i < numColumns; ++i) { + final WritableChunk chunk = out.get(i); + final int expectedSize = originalSizes[i] + in.size(); + if (chunk.size() != expectedSize) { + throw new UncheckedDeephavenException(String.format( + "Implementation did not increment chunk size correctly. i=%d, (before) chunk.size()=%d, (after) chunk.size()=%d, in.size()=%d", + i, originalSizes[i], chunk.size(), in.size())); + } + } + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorTypes.java b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorTypes.java new file mode 100644 index 00000000000..563914d0bc5 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/ObjectProcessorTypes.java @@ -0,0 +1,118 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.qst.type.ArrayType; +import io.deephaven.qst.type.BooleanType; +import io.deephaven.qst.type.BoxedType; +import io.deephaven.qst.type.ByteType; +import io.deephaven.qst.type.CharType; +import io.deephaven.qst.type.CustomType; +import io.deephaven.qst.type.DoubleType; +import io.deephaven.qst.type.FloatType; +import io.deephaven.qst.type.GenericType; +import io.deephaven.qst.type.InstantType; +import io.deephaven.qst.type.IntType; +import io.deephaven.qst.type.LongType; +import io.deephaven.qst.type.PrimitiveType; +import io.deephaven.qst.type.ShortType; +import io.deephaven.qst.type.StringType; +import io.deephaven.qst.type.Type; + +final class ObjectProcessorTypes { + + static ChunkType of(Type type) { + return type.walk(Impl.INSTANCE); + } + + static ChunkType of(PrimitiveType type) { + return type.walk((PrimitiveType.Visitor) Impl.INSTANCE); + } + + static ChunkType of(GenericType type) { + return type.walk((GenericType.Visitor) Impl.INSTANCE); + } + + private enum Impl + implements Type.Visitor, PrimitiveType.Visitor, GenericType.Visitor { + INSTANCE; + + @Override + public ChunkType visit(PrimitiveType primitiveType) { + return of(primitiveType); + } + + @Override + public ChunkType visit(GenericType genericType) { + return of(genericType); + } + + @Override + public ChunkType visit(BooleanType booleanType) { + return ChunkType.Byte; + } + + @Override + public ChunkType visit(ByteType byteType) { + return ChunkType.Byte; + } + + @Override + public ChunkType visit(CharType charType) { + return ChunkType.Char; + } + + @Override + public ChunkType visit(ShortType shortType) { + return ChunkType.Short; + } + + @Override + public ChunkType visit(IntType intType) { + return ChunkType.Int; + } + + @Override + public ChunkType visit(LongType longType) { + return ChunkType.Long; + } + + @Override + public ChunkType visit(FloatType floatType) { + return ChunkType.Float; + } + + @Override + public ChunkType visit(DoubleType doubleType) { + return ChunkType.Double; + } + + + @Override + public ChunkType visit(BoxedType boxedType) { + return of(boxedType.primitiveType()); + } + + @Override + public ChunkType visit(StringType stringType) { + return ChunkType.Object; + } + + @Override + public ChunkType visit(InstantType instantType) { + return ChunkType.Long; + } + + @Override + public ChunkType visit(ArrayType arrayType) { + return ChunkType.Object; + } + + @Override + public ChunkType visit(CustomType customType) { + return ChunkType.Object; + } + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctions.java b/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctions.java new file mode 100644 index 00000000000..aa1539c3912 --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctions.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor.functions; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.functions.TypedFunction; +import io.deephaven.processor.ObjectProcessor; + +import java.util.List; + +public final class ObjectProcessorFunctions { + + /** + * Creates a function-based processor whose {@link ObjectProcessor#outputTypes()} is the + * {@link TypedFunction#returnType()} from each function in {@code functions}. + * + *

+ * The implementation of {@link ObjectProcessor#processAll(ObjectChunk, List)} is column-oriented with a virtual + * call and cast per-column. + * + * @param functions the functions + * @return the function-based processor + * @param the object type + */ + public static ObjectProcessor of(List> functions) { + return ObjectProcessorFunctionsImpl.create(functions); + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctionsImpl.java b/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctionsImpl.java new file mode 100644 index 00000000000..43fd81c3ccc --- /dev/null +++ b/extensions/kafka/src/main/java/io/deephaven/processor/functions/ObjectProcessorFunctionsImpl.java @@ -0,0 +1,459 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.processor.functions; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableBooleanChunk; +import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableCharChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableFloatChunk; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.WritableShortChunk; +import io.deephaven.functions.ToBooleanFunction; +import io.deephaven.functions.ToByteFunction; +import io.deephaven.functions.ToCharFunction; +import io.deephaven.functions.ToDoubleFunction; +import io.deephaven.functions.ToFloatFunction; +import io.deephaven.functions.ToIntFunction; +import io.deephaven.functions.ToLongFunction; +import io.deephaven.functions.ToObjectFunction; +import io.deephaven.functions.ToPrimitiveFunction; +import io.deephaven.functions.ToShortFunction; +import io.deephaven.functions.TypedFunction; +import io.deephaven.kafka.ingest.ChunkUtils; +import io.deephaven.processor.ObjectProcessor; +import io.deephaven.qst.type.ArrayType; +import io.deephaven.qst.type.BoxedBooleanType; +import io.deephaven.qst.type.BoxedByteType; +import io.deephaven.qst.type.BoxedCharType; +import io.deephaven.qst.type.BoxedDoubleType; +import io.deephaven.qst.type.BoxedFloatType; +import io.deephaven.qst.type.BoxedIntType; +import io.deephaven.qst.type.BoxedLongType; +import io.deephaven.qst.type.BoxedShortType; +import io.deephaven.qst.type.BoxedType; +import io.deephaven.qst.type.CustomType; +import io.deephaven.qst.type.GenericType; +import io.deephaven.qst.type.InstantType; +import io.deephaven.qst.type.StringType; +import io.deephaven.qst.type.Type; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.BooleanUtils; +import io.deephaven.util.type.TypeUtils; + +import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +final class ObjectProcessorFunctionsImpl implements ObjectProcessor { + + interface Appender { + + void append(WritableChunk dest, ObjectChunk src); + } + + static ObjectProcessorFunctionsImpl create(List> functions) { + final List> logicalTypes = functions.stream() + .map(TypedFunction::returnType) + .collect(Collectors.toList()); + final List> appenders = functions.stream() + .map(AppenderVisitor::of) + .collect(Collectors.toList()); + return new ObjectProcessorFunctionsImpl<>(logicalTypes, appenders); + } + + private final List> logicalTypes; + private final List> appenders; + + private ObjectProcessorFunctionsImpl(List> logicalTypes, List> appenders) { + this.logicalTypes = List.copyOf(Objects.requireNonNull(logicalTypes)); + this.appenders = Objects.requireNonNull(appenders); + } + + @Override + public List> outputTypes() { + return logicalTypes; + } + + @Override + public void processAll(ObjectChunk in, List> out) { + checkChunks(out); + final int L = appenders.size(); + for (int i = 0; i < L; ++i) { + appenders.get(i).append(out.get(i), in); + } + } + + private void checkChunks(List> out) { + final int numColumns = appenders.size(); + if (numColumns != out.size()) { + throw new IllegalArgumentException(String.format( + "Expected appenders.size() == out.size(). appenders.size()=%d, out.size()=%d", + numColumns, out.size())); + } + // we'll catch mismatched chunk types later when we try to cast them + } + + private static class AppenderVisitor implements + TypedFunction.Visitor>, + ToPrimitiveFunction.Visitor> { + + static Appender of(TypedFunction f) { + return f.walk(new AppenderVisitor<>()); + } + + @Override + public Appender visit(ToPrimitiveFunction f) { + return f.walk((ToPrimitiveFunction.Visitor>) this); + } + + @Override + public ByteAppender visit(ToBooleanFunction f) { + return ByteAppender.from(f); + } + + @Override + public CharAppender visit(ToCharFunction f) { + return new CharAppender<>(f); + } + + @Override + public ByteAppender visit(ToByteFunction f) { + return new ByteAppender<>(f); + } + + @Override + public ShortAppender visit(ToShortFunction f) { + return new ShortAppender<>(f); + } + + @Override + public IntAppender visit(ToIntFunction f) { + return new IntAppender<>(f); + } + + @Override + public LongAppender visit(ToLongFunction f) { + return new LongAppender<>(f); + } + + @Override + public FloatAppender visit(ToFloatFunction f) { + return new FloatAppender<>(f); + } + + @Override + public DoubleAppender visit(ToDoubleFunction f) { + return new DoubleAppender<>(f); + } + + @Override + public Appender visit(ToObjectFunction f) { + return f.returnType().walk(new ObjectFunctionAppenderVisitor<>(this, f)); + } + } + + private static class ObjectFunctionAppenderVisitor + implements GenericType.Visitor>, BoxedType.Visitor> { + + private final AppenderVisitor v; + private final ToObjectFunction f; + + private ObjectFunctionAppenderVisitor(AppenderVisitor v, ToObjectFunction f) { + this.v = Objects.requireNonNull(v); + this.f = Objects.requireNonNull(f); + } + + @Override + public Appender visit(BoxedType boxedType) { + return boxedType.walk((BoxedType.Visitor>) this); + } + + @Override + public ByteAppender visit(BoxedBooleanType booleanType) { + return ByteAppender.from(f.cast(booleanType)); + } + + @Override + public ByteAppender visit(BoxedByteType byteType) { + return v.visit(f.cast(byteType).mapToByte(TypeUtils::unbox)); + } + + @Override + public CharAppender visit(BoxedCharType charType) { + return v.visit(f.cast(charType).mapToChar(TypeUtils::unbox)); + } + + @Override + public ShortAppender visit(BoxedShortType shortType) { + return v.visit(f.cast(shortType).mapToShort(TypeUtils::unbox)); + } + + @Override + public IntAppender visit(BoxedIntType intType) { + return v.visit(f.cast(intType).mapToInt(TypeUtils::unbox)); + } + + @Override + public LongAppender visit(BoxedLongType longType) { + return v.visit(f.cast(longType).mapToLong(TypeUtils::unbox)); + } + + @Override + public FloatAppender visit(BoxedFloatType floatType) { + return v.visit(f.cast(floatType).mapToFloat(TypeUtils::unbox)); + } + + @Override + public DoubleAppender visit(BoxedDoubleType doubleType) { + return v.visit(f.cast(doubleType).mapToDouble(TypeUtils::unbox)); + } + + @Override + public ObjectAppender visit(StringType stringType) { + return new ObjectAppender<>(f); + } + + @Override + public LongAppender visit(InstantType instantType) { + // to long function + return v.visit(f.cast(instantType).mapToLong(DateTimeUtils::epochNanos)); + } + + @Override + public ObjectAppender visit(ArrayType arrayType) { + return new ObjectAppender<>(f); + } + + @Override + public ObjectAppender visit(CustomType customType) { + return new ObjectAppender<>(f); + } + } + + private static class ObjectAppender implements Appender { + private final ToObjectFunction f; + + ObjectAppender(ToObjectFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableObjectChunk(), f, src); + } + } + + // private static class BooleanAppender implements Appender { + // private final Predicate f; + // + // BooleanAppender(Predicate f) { + // this.f = Objects.requireNonNull(f); + // } + // + // @Override + // public Type returnType() { + // return Type.booleanType(); + // } + // + // @Override + // public void add(WritableChunk dest, T src) { + // dest.asWritableBooleanChunk().add(f.test(src)); + // } + // + // @Override + // public void append(WritableChunk dest, ObjectChunk src) { + // ObjectProcessorFunctionsImpl.append(dest.asWritableBooleanChunk(), f, src); + // } + // } + + private static class CharAppender implements Appender { + private final ToCharFunction f; + + CharAppender(ToCharFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableCharChunk(), f, src); + } + } + + private static class ByteAppender implements Appender { + + static ByteAppender from(ToBooleanFunction f) { + return new ByteAppender<>(x -> BooleanUtils.booleanAsByte(f.test(x))); + } + + static ByteAppender from(ToObjectFunction f) { + return new ByteAppender<>(f.mapToByte(BooleanUtils::booleanAsByte)); + } + + private final ToByteFunction f; + + ByteAppender(ToByteFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableByteChunk(), f, src); + } + } + + private static class ShortAppender implements Appender { + private final ToShortFunction f; + + ShortAppender(ToShortFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableShortChunk(), f, src); + } + } + + private static class IntAppender implements Appender { + private final java.util.function.ToIntFunction f; + + IntAppender(java.util.function.ToIntFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableIntChunk(), f, src); + } + } + + private static class LongAppender implements Appender { + private final java.util.function.ToLongFunction f; + + LongAppender(java.util.function.ToLongFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableLongChunk(), f, src); + } + } + + private static class FloatAppender implements Appender { + private final ToFloatFunction f; + + FloatAppender(ToFloatFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableFloatChunk(), f, src); + } + } + + private static class DoubleAppender implements Appender { + private final java.util.function.ToDoubleFunction f; + + DoubleAppender(java.util.function.ToDoubleFunction f) { + this.f = Objects.requireNonNull(f); + } + + @Override + public void append(WritableChunk dest, ObjectChunk src) { + ObjectProcessorFunctionsImpl.append(dest.asWritableDoubleChunk(), f, src); + } + } + + // Ideally, these would be built into WritableChunk impls + + private static void append( + WritableBooleanChunk dest, + Predicate booleanFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(booleanFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableByteChunk dest, + ToByteFunction byteFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(byteFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableCharChunk dest, + ToCharFunction charFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(charFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableShortChunk dest, + ToShortFunction shortFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(shortFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableIntChunk dest, + java.util.function.ToIntFunction intFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(intFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableLongChunk dest, + java.util.function.ToLongFunction longFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(longFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableFloatChunk dest, + ToFloatFunction floatFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(floatFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableDoubleChunk dest, + java.util.function.ToDoubleFunction doubleFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(doubleFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } + + private static void append( + WritableObjectChunk dest, + Function objFunction, + ObjectChunk src) { + final int destSize = dest.size(); + ChunkUtils.applyInto(objFunction, src, 0, dest, destSize, src.size()); + dest.setSize(destSize + src.size()); + } +}