From bc4944feacf6854eeefbc0e2846c5e556813cac5 Mon Sep 17 00:00:00 2001 From: ilyaberdnikov <95850631+ilyaberdnikov@users.noreply.github.com> Date: Tue, 19 Sep 2023 03:27:46 +0100 Subject: [PATCH] DH to work with byte arrays (#4507) --- .../chunk/ChunkInputStreamGenerator.java | 15 +++++- .../client/DeephavenFlightSessionTest.java | 9 ++-- .../client/examples/AddToInputTable.java | 19 +++++-- .../deephaven/client/impl/FieldAdapter.java | 10 +++- .../client/impl/FieldVectorAdapter.java | 49 ++++++------------- .../deephaven/client/impl/VectorHelper.java | 15 ++++++ 6 files changed, 72 insertions(+), 45 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index 85aefa37ebb..01c1c72b90d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -30,6 +30,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Iterator; public interface ChunkInputStreamGenerator extends SafeCloseable { @@ -204,8 +205,18 @@ static WritableChunk extractChunkFromInputStream( Double.BYTES, options,fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); case Object: if (type.isArray()) { - return VarListChunkInputStreamGenerator.extractChunkFromInputStream( - options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + if(componentType == byte.class) { + return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream( + is, + fieldNodeIter, + bufferInfoIter, + (buf, off, len) -> Arrays.copyOfRange(buf, off, off + len), + outChunk, outOffset, totalRows + ); + } else { + return VarListChunkInputStreamGenerator.extractChunkFromInputStream( + options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + } } if (Vector.class.isAssignableFrom(type)) { //noinspection unchecked diff --git a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java index 181289aaa94..53035744866 100644 --- a/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java +++ b/java-client/flight-dagger/src/test/java/io/deephaven/client/DeephavenFlightSessionTest.java @@ -127,13 +127,14 @@ private NewTable newTable() { ColumnHeader.ofFloat("Float"), ColumnHeader.ofDouble("Double"), ColumnHeader.ofString("String"), - ColumnHeader.ofInstant("Instant")) + ColumnHeader.ofInstant("Instant"), + ColumnHeader.of("ByteVector", byte[].class)) .start(3) .row(true, (byte) 42, 'a', (short) 32_000, 1234567, 1234567890123L, 3.14f, 3.14d, "Hello, World", - Instant.now()) - .row(null, null, null, null, null, null, null, null, null, (Instant) null) + Instant.now(), "abc".getBytes()) + .row(null, null, null, null, null, null, null, null, null, (Instant) null, (byte[]) null) .row(false, (byte) -42, 'b', (short) -32_000, -1234567, -1234567890123L, -3.14f, -3.14d, "Goodbye.", - Instant.ofEpochMilli(0)) + Instant.ofEpochMilli(0), new byte[] {0x32, 0x02, 0x17, 0x42}) .newTable(); } } diff --git a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/AddToInputTable.java b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/AddToInputTable.java index 7b954876c04..b4934fb11e5 100644 --- a/java-client/flight-examples/src/main/java/io/deephaven/client/examples/AddToInputTable.java +++ b/java-client/flight-examples/src/main/java/io/deephaven/client/examples/AddToInputTable.java @@ -27,11 +27,21 @@ class AddToInputTable extends FlightExampleBase { @Override protected void execute(FlightSession flight) throws Exception { - final ColumnHeader header = ColumnHeader.ofInstant("Timestamp"); - + final var header = ColumnHeader.of( + ColumnHeader.ofBoolean("Boolean"), + ColumnHeader.ofByte("Byte"), + ColumnHeader.ofChar("Char"), + ColumnHeader.ofShort("Short"), + ColumnHeader.ofInt("Int"), + ColumnHeader.ofLong("Long"), + ColumnHeader.ofFloat("Float"), + ColumnHeader.ofDouble("Double"), + ColumnHeader.ofString("String"), + ColumnHeader.ofInstant("Instant"), + ColumnHeader.of("ByteVector", byte[].class)); final TableSpec timestamp = InMemoryAppendOnlyInputTable.of(TableHeader.of(header)); final TableSpec timestampLastBy = - timestamp.aggBy(Collections.singletonList(Aggregation.AggLast("Timestamp"))); + timestamp.aggBy(Collections.singletonList(Aggregation.AggLast("Instant"))); final List handles = flight.session().batch().execute(Arrays.asList(timestamp, timestampLastBy)); try ( @@ -44,7 +54,8 @@ protected void execute(FlightSession flight) throws Exception { while (true) { // Add a new row, at least once every second - final NewTable newRow = header.row(Instant.now()).newTable(); + final NewTable newRow = header.row(true, (byte) 42, 'a', (short) 32_000, 1234567, 1234567890123L, 3.14f, + 3.14d, "Hello, World", Instant.now(), "abc".getBytes()).newTable(); flight.addToInputTable(timestampHandle, newRow, bufferAllocator).get(5, TimeUnit.SECONDS); Thread.sleep(ThreadLocalRandom.current().nextLong(1000)); } diff --git a/java-client/flight/src/main/java/io/deephaven/client/impl/FieldAdapter.java b/java-client/flight/src/main/java/io/deephaven/client/impl/FieldAdapter.java index 4cb4a6a1aca..71864d6fadb 100644 --- a/java-client/flight/src/main/java/io/deephaven/client/impl/FieldAdapter.java +++ b/java-client/flight/src/main/java/io/deephaven/client/impl/FieldAdapter.java @@ -81,6 +81,10 @@ public static Field stringField(String name) { return field(name, MinorType.VARCHAR.getType(), "java.lang.String"); } + public static Field byteVectorField(String name) { + return field(name, MinorType.VARBINARY.getType(), "byte[]"); + } + public static Field instantField(String name) { return field(name, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), "java.time.Instant"); } @@ -125,7 +129,11 @@ public Field visit(InstantType instantType) { @Override public Field visit(ArrayType arrayType) { - throw new UnsupportedOperationException(); + if (arrayType.componentType().equals(Type.find(byte.class))) { + return byteVectorField(name); + } else { + throw new UnsupportedOperationException(); + } } @Override diff --git a/java-client/flight/src/main/java/io/deephaven/client/impl/FieldVectorAdapter.java b/java-client/flight/src/main/java/io/deephaven/client/impl/FieldVectorAdapter.java index 7bed3fc4558..1f41cdf2c8b 100644 --- a/java-client/flight/src/main/java/io/deephaven/client/impl/FieldVectorAdapter.java +++ b/java-client/flight/src/main/java/io/deephaven/client/impl/FieldVectorAdapter.java @@ -15,44 +15,14 @@ import io.deephaven.qst.array.PrimitiveArray; import io.deephaven.qst.array.ShortArray; import io.deephaven.qst.column.Column; -import io.deephaven.qst.type.ArrayType; -import io.deephaven.qst.type.BooleanType; -import io.deephaven.qst.type.BoxedBooleanType; -import io.deephaven.qst.type.BoxedByteType; -import io.deephaven.qst.type.BoxedCharType; -import io.deephaven.qst.type.BoxedDoubleType; -import io.deephaven.qst.type.BoxedFloatType; -import io.deephaven.qst.type.BoxedIntType; -import io.deephaven.qst.type.BoxedLongType; -import io.deephaven.qst.type.BoxedShortType; -import io.deephaven.qst.type.BoxedType; -import io.deephaven.qst.type.ByteType; -import io.deephaven.qst.type.CharType; -import io.deephaven.qst.type.CustomType; -import io.deephaven.qst.type.DoubleType; -import io.deephaven.qst.type.FloatType; +import io.deephaven.qst.type.*; import io.deephaven.qst.type.GenericType.Visitor; -import io.deephaven.qst.type.InstantType; -import io.deephaven.qst.type.IntType; -import io.deephaven.qst.type.LongType; -import io.deephaven.qst.type.PrimitiveType; -import io.deephaven.qst.type.ShortType; -import io.deephaven.qst.type.StringType; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.BigIntVector; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.Float4Vector; -import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampNanoTZVector; -import org.apache.arrow.vector.TinyIntVector; -import org.apache.arrow.vector.UInt2Vector; -import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.*; import org.apache.arrow.vector.types.pojo.Field; import java.time.Instant; +import java.util.List; import java.util.Objects; /** @@ -156,7 +126,11 @@ public FieldVector visit(InstantType instantType) { @Override public FieldVector visit(ArrayType arrayType) { - throw new UnsupportedOperationException(); + if (arrayType.componentType().equals(Type.find(byte.class))) { + return visitByteVectorArray(generic.cast(arrayType)); + } else { + throw new UnsupportedOperationException(); + } } @Override @@ -293,6 +267,13 @@ FieldVector visitStringArray(GenericArray stringArray) { return vector; } + FieldVector visitByteVectorArray(GenericArray byteVectorArray) { + Field field = FieldAdapter.byteVectorField(name); + VarBinaryVector vector = new VarBinaryVector(field, allocator); + VectorHelper.fill(vector, (List) byteVectorArray.values()); + return vector; + } + FieldVector visitInstantArray(GenericArray instantArray) { Field field = FieldAdapter.instantField(name); TimeStampNanoTZVector vector = new TimeStampNanoTZVector(field, allocator); diff --git a/java-client/flight/src/main/java/io/deephaven/client/impl/VectorHelper.java b/java-client/flight/src/main/java/io/deephaven/client/impl/VectorHelper.java index 4f80c780a23..0509a03bfe7 100644 --- a/java-client/flight/src/main/java/io/deephaven/client/impl/VectorHelper.java +++ b/java-client/flight/src/main/java/io/deephaven/client/impl/VectorHelper.java @@ -14,6 +14,7 @@ import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.UInt2Vector; import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VarBinaryVector; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -219,6 +220,20 @@ public static void fill(VarCharVector vector, Collection array) { vector.setValueCount(array.size()); } + public static void fill(VarBinaryVector vector, Collection array) { + vector.allocateNew(array.size()); + int i = 0; + for (byte[] value : array) { + if (value == null) { + vector.setNull(i); + } else { + vector.setSafe(i, value); + } + ++i; + } + vector.setValueCount(array.size()); + } + public static void fill(TimeStampNanoTZVector vector, Collection array) { vector.allocateNew(array.size()); int i = 0;