Skip to content

Commit

Permalink
DH to work with byte arrays (#4507)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyaberdnikov committed Sep 19, 2023
1 parent d7d1977 commit bc4944f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;

public interface ChunkInputStreamGenerator extends SafeCloseable {
Expand Down Expand Up @@ -204,8 +205,18 @@ static WritableChunk<Values> extractChunkFromInputStream(
Double.BYTES, options,fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
case Object:
if (type.isArray()) {
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(
options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
if(componentType == byte.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> Arrays.copyOfRange(buf, off, off + len),
outChunk, outOffset, totalRows
);
} else {
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(
options, type, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}
}
if (Vector.class.isAssignableFrom(type)) {
//noinspection unchecked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ private NewTable newTable() {
ColumnHeader.ofFloat("Float"),
ColumnHeader.ofDouble("Double"),
ColumnHeader.ofString("String"),
ColumnHeader.ofInstant("Instant"))
ColumnHeader.ofInstant("Instant"),
ColumnHeader.of("ByteVector", byte[].class))
.start(3)
.row(true, (byte) 42, 'a', (short) 32_000, 1234567, 1234567890123L, 3.14f, 3.14d, "Hello, World",
Instant.now())
.row(null, null, null, null, null, null, null, null, null, (Instant) null)
Instant.now(), "abc".getBytes())
.row(null, null, null, null, null, null, null, null, null, (Instant) null, (byte[]) null)
.row(false, (byte) -42, 'b', (short) -32_000, -1234567, -1234567890123L, -3.14f, -3.14d, "Goodbye.",
Instant.ofEpochMilli(0))
Instant.ofEpochMilli(0), new byte[] {0x32, 0x02, 0x17, 0x42})
.newTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ class AddToInputTable extends FlightExampleBase {

@Override
protected void execute(FlightSession flight) throws Exception {
final ColumnHeader<Instant> header = ColumnHeader.ofInstant("Timestamp");

final var header = ColumnHeader.of(
ColumnHeader.ofBoolean("Boolean"),
ColumnHeader.ofByte("Byte"),
ColumnHeader.ofChar("Char"),
ColumnHeader.ofShort("Short"),
ColumnHeader.ofInt("Int"),
ColumnHeader.ofLong("Long"),
ColumnHeader.ofFloat("Float"),
ColumnHeader.ofDouble("Double"),
ColumnHeader.ofString("String"),
ColumnHeader.ofInstant("Instant"),
ColumnHeader.of("ByteVector", byte[].class));
final TableSpec timestamp = InMemoryAppendOnlyInputTable.of(TableHeader.of(header));
final TableSpec timestampLastBy =
timestamp.aggBy(Collections.singletonList(Aggregation.AggLast("Timestamp")));
timestamp.aggBy(Collections.singletonList(Aggregation.AggLast("Instant")));

final List<TableHandle> handles = flight.session().batch().execute(Arrays.asList(timestamp, timestampLastBy));
try (
Expand All @@ -44,7 +54,8 @@ protected void execute(FlightSession flight) throws Exception {

while (true) {
// Add a new row, at least once every second
final NewTable newRow = header.row(Instant.now()).newTable();
final NewTable newRow = header.row(true, (byte) 42, 'a', (short) 32_000, 1234567, 1234567890123L, 3.14f,
3.14d, "Hello, World", Instant.now(), "abc".getBytes()).newTable();
flight.addToInputTable(timestampHandle, newRow, bufferAllocator).get(5, TimeUnit.SECONDS);
Thread.sleep(ThreadLocalRandom.current().nextLong(1000));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public static Field stringField(String name) {
return field(name, MinorType.VARCHAR.getType(), "java.lang.String");
}

public static Field byteVectorField(String name) {
return field(name, MinorType.VARBINARY.getType(), "byte[]");
}

public static Field instantField(String name) {
return field(name, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), "java.time.Instant");
}
Expand Down Expand Up @@ -125,7 +129,11 @@ public Field visit(InstantType instantType) {

@Override
public Field visit(ArrayType<?, ?> arrayType) {
throw new UnsupportedOperationException();
if (arrayType.componentType().equals(Type.find(byte.class))) {
return byteVectorField(name);
} else {
throw new UnsupportedOperationException();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,14 @@
import io.deephaven.qst.array.PrimitiveArray;
import io.deephaven.qst.array.ShortArray;
import io.deephaven.qst.column.Column;
import io.deephaven.qst.type.ArrayType;
import io.deephaven.qst.type.BooleanType;
import io.deephaven.qst.type.BoxedBooleanType;
import io.deephaven.qst.type.BoxedByteType;
import io.deephaven.qst.type.BoxedCharType;
import io.deephaven.qst.type.BoxedDoubleType;
import io.deephaven.qst.type.BoxedFloatType;
import io.deephaven.qst.type.BoxedIntType;
import io.deephaven.qst.type.BoxedLongType;
import io.deephaven.qst.type.BoxedShortType;
import io.deephaven.qst.type.BoxedType;
import io.deephaven.qst.type.ByteType;
import io.deephaven.qst.type.CharType;
import io.deephaven.qst.type.CustomType;
import io.deephaven.qst.type.DoubleType;
import io.deephaven.qst.type.FloatType;
import io.deephaven.qst.type.*;
import io.deephaven.qst.type.GenericType.Visitor;
import io.deephaven.qst.type.InstantType;
import io.deephaven.qst.type.IntType;
import io.deephaven.qst.type.LongType;
import io.deephaven.qst.type.PrimitiveType;
import io.deephaven.qst.type.ShortType;
import io.deephaven.qst.type.StringType;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.Field;

import java.time.Instant;
import java.util.List;
import java.util.Objects;

/**
Expand Down Expand Up @@ -156,7 +126,11 @@ public FieldVector visit(InstantType instantType) {

@Override
public FieldVector visit(ArrayType<?, ?> arrayType) {
throw new UnsupportedOperationException();
if (arrayType.componentType().equals(Type.find(byte.class))) {
return visitByteVectorArray(generic.cast(arrayType));
} else {
throw new UnsupportedOperationException();
}
}

@Override
Expand Down Expand Up @@ -293,6 +267,13 @@ FieldVector visitStringArray(GenericArray<String> stringArray) {
return vector;
}

FieldVector visitByteVectorArray(GenericArray<?> byteVectorArray) {
Field field = FieldAdapter.byteVectorField(name);
VarBinaryVector vector = new VarBinaryVector(field, allocator);
VectorHelper.fill(vector, (List<byte[]>) byteVectorArray.values());
return vector;
}

FieldVector visitInstantArray(GenericArray<Instant> instantArray) {
Field field = FieldAdapter.instantField(name);
TimeStampNanoTZVector vector = new TimeStampNanoTZVector(field, allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VarBinaryVector;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -219,6 +220,20 @@ public static void fill(VarCharVector vector, Collection<String> array) {
vector.setValueCount(array.size());
}

public static void fill(VarBinaryVector vector, Collection<byte[]> array) {
vector.allocateNew(array.size());
int i = 0;
for (byte[] value : array) {
if (value == null) {
vector.setNull(i);
} else {
vector.setSafe(i, value);
}
++i;
}
vector.setValueCount(array.size());
}

public static void fill(TimeStampNanoTZVector vector, Collection<Instant> array) {
vector.allocateNew(array.size());
int i = 0;
Expand Down

0 comments on commit bc4944f

Please sign in to comment.