Skip to content

Commit

Permalink
Refactored parquet writing code (#4541)
Browse files Browse the repository at this point in the history
Read vector/array columns without ungrouping
  • Loading branch information
malhotrashivam committed Oct 12, 2023
1 parent 9ab5065 commit 0dcbd6c
Show file tree
Hide file tree
Showing 65 changed files with 2,410 additions and 836 deletions.
57 changes: 57 additions & 0 deletions engine/vector/src/main/java/io/deephaven/vector/VectorFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public enum VectorFactory {
// @formatter:off

Boolean() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
throw new UnsupportedOperationException("Vector is not implemented for primitive booleans");
}

@Override
@NotNull
public final Vector<?> vectorWrap(@NotNull final Object array) {
Expand All @@ -28,6 +34,12 @@ public Vector<?> vectorWrap(@NotNull final Object array, int offset, int capacit
},

Char() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return CharVector.class;
}

@Override
@NotNull
public final CharVector vectorWrap(@NotNull final Object array) {
Expand All @@ -42,6 +54,12 @@ public CharVector vectorWrap(@NotNull final Object array, int offset, int capaci
},

Byte() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return ByteVector.class;
}

@Override
@NotNull
public final ByteVector vectorWrap(@NotNull final Object array) {
Expand All @@ -56,6 +74,12 @@ public ByteVector vectorWrap(@NotNull final Object array, int offset, int capaci
},

Short() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return ShortVector.class;
}

@Override
@NotNull
public final ShortVector vectorWrap(@NotNull final Object array) {
Expand All @@ -70,6 +94,12 @@ public ShortVector vectorWrap(@NotNull final Object array, int offset, int capac
},

Int() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return IntVector.class;
}

@Override
@NotNull
public final IntVector vectorWrap(@NotNull final Object array) {
Expand All @@ -84,6 +114,12 @@ public IntVector vectorWrap(@NotNull final Object array, int offset, int capacit
},

Long() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return LongVector.class;
}

@Override
@NotNull
public final LongVector vectorWrap(@NotNull final Object array) {
Expand All @@ -98,6 +134,12 @@ public LongVector vectorWrap(@NotNull final Object array, int offset, int capaci
},

Float() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return FloatVector.class;
}

@Override
@NotNull
public final FloatVector vectorWrap(@NotNull final Object array) {
Expand All @@ -112,6 +154,12 @@ public FloatVector vectorWrap(@NotNull final Object array, int offset, int capac
},

Double() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
return DoubleVector.class;
}

@Override
@NotNull
public final DoubleVector vectorWrap(@NotNull final Object array) {
Expand All @@ -126,6 +174,13 @@ public DoubleVector vectorWrap(@NotNull final Object array, int offset, int capa
},

Object() {
@Override
@NotNull
public Class<? extends Vector<?>> vectorType() {
//noinspection unchecked
return (Class<? extends Vector<?>>) (Object) ObjectVector.class;
}

@Override
@NotNull
public final ObjectVector<?> vectorWrap(@NotNull final Object array) {
Expand All @@ -149,6 +204,8 @@ public static VectorFactory forElementType(@NotNull final Class<?> clazz) {
return BY_ELEMENT_TYPE.get(clazz);
}

public abstract @NotNull Class<? extends Vector<?>> vectorType();

@NotNull
public abstract Vector<?> vectorWrap(@NotNull Object array);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public class PlainBinaryChunkedWriter extends AbstractBulkValuesWriter<Binary[]>

private final ByteBufferAllocator allocator;

ByteBuffer innerBuffer;
private ByteBuffer innerBuffer;
private IntBuffer nullOffsets;

public PlainBinaryChunkedWriter(final int pageSize, @NotNull final ByteBufferAllocator allocator) {
PlainBinaryChunkedWriter(final int pageSize, @NotNull final ByteBufferAllocator allocator) {
innerBuffer = allocator.allocate(pageSize);
innerBuffer.order(ByteOrder.LITTLE_ENDIAN);
this.allocator = allocator;
innerBuffer.mark();
nullOffsets = IntBuffer.allocate(4);
}

@Override
Expand Down Expand Up @@ -121,7 +123,7 @@ public WriteResult writeBulkFilterNulls(@NotNull final Binary[] bulkValues,
public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull Binary[] bulkValues,
final int nonNullLeafCount,
@NotNull final Statistics<?> statistics) {
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
for (int i = 0; i < nonNullLeafCount; i++) {
if (bulkValues[i] != null) {
final Binary v = bulkValues[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
*/
public class PlainBooleanChunkedWriter extends AbstractBulkValuesWriter<ByteBuffer> {
private final BooleanPlainValuesWriter writer;
private IntBuffer nullOffsets;

public PlainBooleanChunkedWriter() {
PlainBooleanChunkedWriter() {
writer = new BooleanPlainValuesWriter();
nullOffsets = IntBuffer.allocate(4);
}

@Override
Expand Down Expand Up @@ -107,7 +109,7 @@ public WriteResult writeBulkFilterNulls(@NotNull ByteBuffer bulkValues,
public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull ByteBuffer bulkValues,
final int rowCount,
@NotNull final Statistics<?> statistics) {
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
int i = 0;
while (bulkValues.hasRemaining()) {
final byte next = bulkValues.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ public class PlainDoubleChunkedWriter extends AbstractBulkValuesWriter<DoubleBuf

private DoubleBuffer targetBuffer;
private ByteBuffer innerBuffer;
private IntBuffer nullOffsets;


PlainDoubleChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
this.allocator = allocator;
realloc(targetPageSize);
nullOffsets = IntBuffer.allocate(4);
}

@Override
Expand Down Expand Up @@ -128,7 +130,7 @@ public WriteResult writeBulkVectorFilterNulls(@NotNull final DoubleBuffer bulkVa
@NotNull final Statistics<?> statistics) {
ensureCapacityFor(bulkValues);
int i = 0;
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
while (bulkValues.hasRemaining()) {
final double v = bulkValues.get();
if (v != QueryConstants.NULL_DOUBLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class PlainFloatChunkedWriter extends AbstractBulkValuesWriter<FloatBuffe

private FloatBuffer targetBuffer;
private ByteBuffer innerBuffer;

private IntBuffer nullOffsets;

PlainFloatChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
this.allocator = allocator;
realloc(targetPageSize);
nullOffsets = IntBuffer.allocate(4);
}

@Override
Expand Down Expand Up @@ -128,7 +129,7 @@ public WriteResult writeBulkVectorFilterNulls(@NotNull final FloatBuffer bulkVal
@NotNull final Statistics<?> statistics) {
ensureCapacityFor(bulkValues);
int i = 0;
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
while (bulkValues.hasRemaining()) {
final float v = bulkValues.get();
if (v != QueryConstants.NULL_FLOAT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class PlainIntChunkedWriter extends AbstractBulkValuesWriter<IntBuffer> {

private IntBuffer targetBuffer;
private ByteBuffer innerBuffer;
private IntBuffer nullOffsets;

PlainIntChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator, final int nullValue) {
this.allocator = allocator;
this.nullValue = nullValue;
realloc(targetPageSize);
nullOffsets = IntBuffer.allocate(4);
}

PlainIntChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
Expand Down Expand Up @@ -133,7 +135,7 @@ public WriteResult writeBulkVectorFilterNulls(@NotNull final IntBuffer bulkValue
@NotNull final Statistics<?> statistics) {
ensureCapacityFor(bulkValues);
int i = 0;
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
while (bulkValues.hasRemaining()) {
final int v = bulkValues.get();
if (v != nullValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class PlainLongChunkedWriter extends AbstractBulkValuesWriter<LongBuffer>

private LongBuffer targetBuffer;
private ByteBuffer innerBuffer;

private IntBuffer nullOffsets;

PlainLongChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
this.allocator = allocator;
realloc(targetPageSize);
nullOffsets = IntBuffer.allocate(4);
}

@Override
Expand Down Expand Up @@ -128,7 +129,7 @@ public WriteResult writeBulkVectorFilterNulls(@NotNull final LongBuffer bulkValu
@NotNull final Statistics<?> statistics) {
ensureCapacityFor(bulkValues);
int i = 0;
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
while (bulkValues.hasRemaining()) {
final long v = bulkValues.get();
if (v != QueryConstants.NULL_LONG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ public class RleIntChunkedWriter extends AbstractBulkValuesWriter<IntBuffer> {

private final RunLengthBitPackingHybridEncoder encoder;
private final byte bitWidth;
private IntBuffer nullOffsets;

RleIntChunkedWriter(int pageSize, ByteBufferAllocator allocator, byte bitWidth) {
encoder = new RunLengthBitPackingHybridEncoder(bitWidth, pageSize, pageSize, allocator);
this.bitWidth = bitWidth;
nullOffsets = IntBuffer.allocate(4);
}


@Override
public final void writeInteger(int v) {
try {
Expand Down Expand Up @@ -132,7 +133,7 @@ public WriteResult writeBulkFilterNulls(@NotNull IntBuffer bulkValues,
public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull IntBuffer bulkValues,
final int rowCount,
@NotNull final Statistics<?> statistics) {
IntBuffer nullOffsets = IntBuffer.allocate(4);
nullOffsets.clear();
int i = 0;
while (bulkValues.hasRemaining()) {
int v = bulkValues.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

final class DictionarySizeExceededException extends UncheckedDeephavenException {
public final class DictionarySizeExceededException extends UncheckedDeephavenException {
public DictionarySizeExceededException(@NotNull final String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public static int getDefaltMaximumDictionarySize() {
return defaultMaximumDictionarySize;
}

private static final int MIN_TARGET_PAGE_SIZE =
public static final int MIN_TARGET_PAGE_SIZE =
Configuration.getInstance().getIntegerWithDefault("Parquet.minTargetPageSize", 2 << 10);
private static final int DEFAULT_TARGET_PAGE_SIZE =
Configuration.getInstance().getIntegerWithDefault("Parquet.defaultTargetPageSize", 8 << 10);
Configuration.getInstance().getIntegerWithDefault("Parquet.defaultTargetPageSize", 1 << 20);
private static volatile int defaultTargetPageSize = DEFAULT_TARGET_PAGE_SIZE;

private static final boolean DEFAULT_IS_REFRESHING = false;
Expand Down
Loading

0 comments on commit 0dcbd6c

Please sign in to comment.