Skip to content

Commit

Permalink
Commit #3, create vector/array chunk readers to do type lookups once
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 25, 2024
1 parent a2eb630 commit 3b0789b
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,17 @@ public Field componentArrowField() {
* @param factor
* @param typeInfo
* @return
* @throws IOException
*/
ChunkReader extractChunkFromInputStream(
final StreamReaderOptions options,
final int factor,
final ChunkTypeInfo typeInfo) throws IOException;
ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final int factor,
final ChunkTypeInfo typeInfo);

/**
*
* @param options
* @param typeInfo
* @return
* @throws IOException
*/
default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo)
throws IOException {
default ChunkReader extractChunkFromInputStream(final StreamReaderOptions options, final ChunkTypeInfo typeInfo) {
return extractChunkFromInputStream(options, 1, typeInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,20 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.ColumnConversionMode;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.vector.Vector;

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY;

Expand All @@ -37,7 +31,7 @@ public final class DefaultChunkReadingFactory implements ChunkReadingFactory {

@Override
public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int factor,
ChunkTypeInfo typeInfo) throws IOException {
ChunkTypeInfo typeInfo) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (typeInfo.chunkType()) {
case Boolean:
Expand Down Expand Up @@ -93,17 +87,11 @@ public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int
(buf, off, len) -> Arrays.copyOfRange(buf, off, off + len),
outChunk, outOffset, totalRows);
} else {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarListChunkInputStreamGenerator.extractChunkFromInputStream(options,
typeInfo,
fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows, this);
return new VarListChunkReader<>(options, typeInfo, this);
}
}
if (Vector.class.isAssignableFrom(typeInfo.type())) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VectorChunkInputStreamGenerator.extractChunkFromInputStream(options,
typeInfo, fieldNodeIter, bufferInfoIter,
is, outChunk, outOffset, totalRows, this);
return new VectorChunkReader(options, typeInfo, this);
}
if (typeInfo.type() == BigInteger.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,95 +233,5 @@ public int drainTo(final OutputStream outputStream) throws IOException {
}
}

static <T> WritableObjectChunk<T, Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final ChunkReadingFactory.ChunkTypeInfo typeInfo,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows,
ChunkReadingFactory chunkReadingFactory) throws IOException {

final Class<?> componentType = typeInfo.type().getComponentType();
final Class<?> innerComponentType = componentType != null ? componentType.getComponentType() : null;

final ChunkType chunkType;
if (componentType == boolean.class || componentType == Boolean.class) {
// Note: Internally booleans are passed around as bytes, but the wire format is packed bits.
chunkType = ChunkType.Byte;
} else {
chunkType = ChunkType.fromElementType(componentType);
}

ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType,
typeInfo.componentArrowField()));

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();

if (nodeInfo.numElements == 0) {
try (final WritableChunk<Values> ignored =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements);
}
}

final WritableObjectChunk<T, Values> chunk;
final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs);
final WritableIntChunk<ChunkPositions> offsets =
WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) {
// Read validity buffer:
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

// Read offsets:
final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES;
if (offsetsBuffer < offBufRead) {
throw new IllegalStateException("offset buffer is too short for the expected number of elements");
}
for (int i = 0; i < nodeInfo.numElements + 1; ++i) {
offsets.set(i, is.readInt());
}
if (offBufRead < offsetsBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead));
}

final ArrayExpansionKernel kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType);
try (final WritableChunk<Values> inner =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows);

long nextValid = 0;
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
if ((ii % 64) == 0) {
nextValid = isValid.get(ii / 64);
}
if ((nextValid & 0x1) == 0x0) {
chunk.set(outOffset + ii, null);
}
nextValid >>= 1;
}
}
}

return chunk;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.ChunkPositions;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.chunk.array.ArrayExpansionKernel;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

public class VarListChunkReader<T> implements ChunkReader {
private static final String DEBUG_NAME = "VarListChunkReader";

private final ArrayExpansionKernel kernel;
private final ChunkReader componentReader;

public VarListChunkReader(final StreamReaderOptions options, final ChunkReadingFactory.ChunkTypeInfo typeInfo,
ChunkReadingFactory chunkReadingFactory) {
final Class<?> componentType = typeInfo.type().getComponentType();
final Class<?> innerComponentType = componentType != null ? componentType.getComponentType() : null;

final ChunkType chunkType;
if (componentType == boolean.class || componentType == Boolean.class) {
// Note: Internally booleans are passed around as bytes, but the wire format is packed bits.
chunkType = ChunkType.Byte;
} else {
chunkType = ChunkType.fromElementType(componentType);
}
kernel = ArrayExpansionKernel.makeExpansionKernel(chunkType, componentType);

componentReader = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, innerComponentType,
typeInfo.componentArrowField()));
}

@Override
public WritableObjectChunk<T, Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();

if (nodeInfo.numElements == 0) {
try (final WritableChunk<Values> ignored =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
return WritableObjectChunk.makeWritableChunk(nodeInfo.numElements);
}
}

final WritableObjectChunk<T, Values> chunk;
final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs);
final WritableIntChunk<ChunkPositions> offsets =
WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) {
// Read validity buffer:
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

// Read offsets:
final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES;
if (offsetsBuffer < offBufRead) {
throw new IllegalStateException("offset buffer is too short for the expected number of elements");
}
for (int i = 0; i < nodeInfo.numElements + 1; ++i) {
offsets.set(i, is.readInt());
}
if (offBufRead < offsetsBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead));
}

try (final WritableChunk<Values> inner =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows);

long nextValid = 0;
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
if ((ii % 64) == 0) {
nextValid = isValid.get(ii / 64);
}
if ((nextValid & 0x1) == 0x0) {
chunk.set(outOffset + ii, null);
}
nextValid >>= 1;
}
}
}

return chunk;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,91 +232,4 @@ public int drainTo(final OutputStream outputStream) throws IOException {
return LongSizedDataStructure.intSize(DEBUG_NAME, bytesWritten);
}
}

static WritableObjectChunk<Vector<?>, Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final ChunkReadingFactory.ChunkTypeInfo typeInfo,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows,
ChunkReadingFactory chunkReadingFactory) throws IOException {

final Class<?> componentType =
VectorExpansionKernel.getComponentType(typeInfo.type(), typeInfo.componentType());
final ChunkType chunkType = ChunkType.fromElementType(componentType);
ChunkReader componentReader = chunkReadingFactory.extractChunkFromInputStream(
options,
new ChunkReadingFactory.ChunkTypeInfo(chunkType, componentType, componentType.getComponentType(),
typeInfo.componentArrowField()));

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long offsetsBuffer = bufferInfoIter.nextLong();


if (nodeInfo.numElements == 0) {
try (final WritableChunk<Values> ignored =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
if (outChunk != null) {
return outChunk.asWritableObjectChunk();
}
return WritableObjectChunk.makeWritableChunk(totalRows);
}
}

final WritableObjectChunk<Vector<?>, Values> chunk;
final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs);
final WritableIntChunk<ChunkPositions> offsets =
WritableIntChunk.makeWritableChunk(nodeInfo.numElements + 1)) {
// Read validity buffer:
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

// Read offsets:
final long offBufRead = (nodeInfo.numElements + 1L) * Integer.BYTES;
if (offsetsBuffer < offBufRead) {
throw new IllegalStateException("offset buffer is too short for the expected number of elements");
}
for (int i = 0; i < nodeInfo.numElements + 1; ++i) {
offsets.set(i, is.readInt());
}
if (offBufRead < offsetsBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, offsetsBuffer - offBufRead));
}

final VectorExpansionKernel kernel = VectorExpansionKernel.makeExpansionKernel(chunkType, componentType);
try (final WritableChunk<Values> inner =
componentReader.read(fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {
chunk = kernel.contract(inner, offsets, outChunk, outOffset, totalRows);

long nextValid = 0;
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
if ((ii % 64) == 0) {
nextValid = isValid.get(ii / 64);
}
if ((nextValid & 0x1) == 0x0) {
chunk.set(outOffset + ii, null);
}
nextValid >>= 1;
}
}
}

return chunk;
}
}
Loading

0 comments on commit 3b0789b

Please sign in to comment.