Skip to content

Commit

Permalink
[WIP] Improved handling of writing NULLs in parquet (#4191)
Browse files Browse the repository at this point in the history
* Properly record nulls in the presence bitmap per field
* Address vector writing bugs for arrays/vectors of Instants, Strings, and other well-supported types
* Clean up Parquet writing code somewhat

---------

Co-authored-by: Ryan Caudy <rcaudy@gmail.com>
  • Loading branch information
malhotrashivam and rcaudy authored Jul 26, 2023
1 parent 52f87ce commit acc5474
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
"Array type " + dataType + " does not match component type " + componentType);
}
// Arrays of primitives or basic types do not require codecs
return !(componentType.isPrimitive() || noCodecRequired(dataType));
return !(componentType.isPrimitive() || noCodecRequired(componentType));
}
if (Vector.class.isAssignableFrom(dataType)) {
if (componentType == null) {
throw new IllegalArgumentException("Vector type " + dataType + " requires a component type");
}
if (ObjectVector.class.isAssignableFrom(dataType)) {
// Vectors of basic types do not require codecs
return !noCodecRequired(dataType);
return !noCodecRequired(componentType);
}
// VectorBases of primitive types do not require codecs
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter;
import io.deephaven.parquet.compress.CompressorAdapter;
import io.deephaven.util.QueryConstants;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
Expand All @@ -18,6 +19,7 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -164,6 +166,17 @@ private BulkWriter getWriter(final PrimitiveType primitiveType) {
case FIXED_LEN_BYTE_ARRAY:
throw new UnsupportedOperationException("No support for writing FIXED_LENGTH or INT96 types");
case INT32:
LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation();
if (annotation != null) {
// Appropriately set the null value for different type of integers
if (LogicalTypeAnnotation.intType(8, true).equals(annotation)) {
return new PlainIntChunkedWriter(targetPageSize, allocator, QueryConstants.NULL_BYTE);
} else if (LogicalTypeAnnotation.intType(16, true).equals(annotation)) {
return new PlainIntChunkedWriter(targetPageSize, allocator, QueryConstants.NULL_SHORT);
} else if (LogicalTypeAnnotation.intType(16, false).equals(annotation)) {
return new PlainIntChunkedWriter(targetPageSize, allocator, QueryConstants.NULL_CHAR);
}
}
return new PlainIntChunkedWriter(targetPageSize, allocator);
case INT64:
return new PlainLongChunkedWriter(targetPageSize, allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,28 @@
*/
public class PlainIntChunkedWriter extends AbstractBulkValuesWriter<IntBuffer> {
private static final int MAXIMUM_TOTAL_CAPACITY = Integer.MAX_VALUE / Integer.BYTES;

/**
* This variable stores a type-specific {@code null} representation for writing. This is useful for Byte, Char, and
* Short data types which are written as primitive ints but have a different definition of {@code null}.
*/
private final int nullValue;

private final ByteBufferAllocator allocator;

private IntBuffer targetBuffer;
private ByteBuffer innerBuffer;


PlainIntChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
PlainIntChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator, final int nullValue) {
this.allocator = allocator;
this.nullValue = nullValue;
realloc(targetPageSize);
}

PlainIntChunkedWriter(final int targetPageSize, @NotNull final ByteBufferAllocator allocator) {
this(targetPageSize, allocator, QueryConstants.NULL_INT);
}

@Override
public final void writeInteger(int v) {
targetBuffer.put(v);
Expand Down Expand Up @@ -95,7 +106,7 @@ public WriteResult writeBulkFilterNulls(@NotNull final IntBuffer bulkValues,
ensureCapacityFor(bulkValues);
while (bulkValues.hasRemaining()) {
final int next = bulkValues.get();
if (next != QueryConstants.NULL_INT) {
if (next != nullValue) {
writeInteger(next);
dlEncoder.writeInt(DL_ITEM_PRESENT);
} else {
Expand All @@ -114,7 +125,7 @@ public WriteResult writeBulkFilterNulls(@NotNull final IntBuffer bulkValues,
IntBuffer nullOffsets = IntBuffer.allocate(4);
while (bulkValues.hasRemaining()) {
final int next = bulkValues.get();
if (next != QueryConstants.NULL_INT) {
if (next != nullValue) {
writeInteger(next);
} else {
nullOffsets = Helpers.ensureCapacity(nullOffsets);
Expand Down
Loading

0 comments on commit acc5474

Please sign in to comment.