diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/AbstractBulkValuesWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/AbstractBulkValuesWriter.java index e506ce3e401..9277a23d77b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/AbstractBulkValuesWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/AbstractBulkValuesWriter.java @@ -4,6 +4,7 @@ package io.deephaven.parquet.base; import io.deephaven.util.QueryConstants; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; @@ -36,9 +37,11 @@ public int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues, @NotNull final IntBuffer vectorSizes, @NotNull final RunLengthBitPackingHybridEncoder rlEncoder, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int nonNullValueCount) throws IOException { - final IntBuffer nullsOffsets = writeBulkFilterNulls(bulkValues, nonNullValueCount).nullOffsets; - return applyDlAndRl(vectorSizes, rlEncoder, dlEncoder, nullsOffsets); + final int nonNullValueCount, + @NotNull Statistics statistics) throws IOException { + final IntBuffer nullsOffsets = + writeBulkVectorFilterNulls(bulkValues, nonNullValueCount, statistics).nullOffsets; + return applyDlAndRl(vectorSizes, rlEncoder, dlEncoder, nullsOffsets, statistics); } /** @@ -102,7 +105,8 @@ public int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues, int applyDlAndRl(@NotNull final IntBuffer vectorSizes, @NotNull final RunLengthBitPackingHybridEncoder rlEncoder, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - @NotNull final IntBuffer nullsOffsets) throws IOException { + @NotNull final IntBuffer nullsOffsets, + @NotNull Statistics statistics) throws IOException { int valueCount = 0; int leafCount = 0; @@ -139,6 +143,8 @@ int applyDlAndRl(@NotNull final IntBuffer vectorSizes, valueCount++; dlEncoder.writeInt(DL_VECTOR_NULL_VECTOR); rlEncoder.writeInt(RL_FIRST_ELEM); + // This row is null so include it in the overall null counts for this column. + statistics.incrementNumNulls(); } } return valueCount; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/BulkWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/BulkWriter.java index 4a52d382059..8b46160493c 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/BulkWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/BulkWriter.java @@ -3,6 +3,7 @@ */ package io.deephaven.parquet.base; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -36,7 +37,7 @@ public WriteResult(final int valueCount, @Nullable final IntBuffer nullOffsets) * @param bulkValues the buffer of values * @param rowCount the total number of rows to write. */ - void writeBulk(@NotNull BUFFER_TYPE bulkValues, int rowCount); + void writeBulk(@NotNull BUFFER_TYPE bulkValues, int rowCount, @NotNull Statistics statistics); /** * Write a buffer's worth of values to the underlying page. This method will find, without writing, {@code null} @@ -46,13 +47,15 @@ public WriteResult(final int valueCount, @Nullable final IntBuffer nullOffsets) * @param bulkValues the values to write * @param dlEncoder the encoder for definition levels * @param rowCount the number of rows being written + * @param statistics the {@link Statistics} object to modify. * @return a {@link WriteResult} containing the statistics of the result. * @throws IOException if there was an error during write. */ @NotNull WriteResult writeBulkFilterNulls(@NotNull BUFFER_TYPE bulkValues, @NotNull RunLengthBitPackingHybridEncoder dlEncoder, - int rowCount) throws IOException; + final int rowCount, + @NotNull Statistics statistics) throws IOException; /** * Write a buffer's worth of packed vector values to the underlying page. This method will set the proper definition @@ -70,7 +73,8 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues, @NotNull final IntBuffer vectorSizes, @NotNull final RunLengthBitPackingHybridEncoder rlEncoder, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int nonNullValueCount) throws IOException; + final int nonNullValueCount, + @NotNull Statistics statistics) throws IOException; /** * Write a buffer's worth of packed vector values to the underlying page, skipping null values. This method will @@ -81,7 +85,9 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues, * @return a {@link WriteResult} containing the statistics of the result. */ @NotNull - WriteResult writeBulkFilterNulls(@NotNull BUFFER_TYPE bulkValues, int rowCount); + WriteResult writeBulkVectorFilterNulls(@NotNull BUFFER_TYPE bulkValues, + final int rowCount, + @NotNull Statistics statistics); /** * Clear all internal state. @@ -93,7 +99,7 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues, * * @return a {@link ByteBuffer} containing the written data. * - * @throws IOException + * @throws IOException if there is an exception reading the data. */ ByteBuffer getByteBufferView() throws IOException; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriter.java index b9511dd1073..b4e62a95e74 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriter.java @@ -5,16 +5,45 @@ import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; +import io.deephaven.util.annotations.FinalDefault; +import org.apache.parquet.column.statistics.Statistics; import java.io.IOException; import java.nio.IntBuffer; public interface ColumnWriter extends SafeCloseable { - void addPageNoNulls(@NotNull Object pageData, int valuesCount) throws IOException; + /** + * Add a page with no nulls to the file. + */ + void addPageNoNulls(@NotNull Object pageData, int valuesCount, @NotNull Statistics statistics) + throws IOException; + /** + * Add a dictionary page to the file. + */ void addDictionaryPage(@NotNull Object dictionaryValues, int valuesCount) throws IOException; - void addPage(@NotNull Object pageData, int valuesCount) throws IOException; + /** + * Add a page (potentially containing nulls) to the file. + */ + void addPage(Object pageData, int valuesCount, Statistics statistics) throws IOException; - void addVectorPage(@NotNull Object pageData, @NotNull IntBuffer repeatCount, int valuesCount) throws IOException; + /** + * Add a vector page to the file. + */ + void addVectorPage(@NotNull Object pageData, + @NotNull IntBuffer repeatCount, + int valuesCount, + @NotNull Statistics statistics) + throws IOException; + + /** + * Reset the statistics for this column. This must be called between each row group. + */ + void resetStats(); + + /** + * Return the current statistics. + */ + Statistics getStats(); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index cc512e42e26..41f84f3cffc 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -46,6 +46,7 @@ public class ColumnWriterImpl implements ColumnWriter { private final CompressorAdapter compressorAdapter; private boolean hasDictionary; private int pageCount = 0; + private Statistics statistics; private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); @@ -85,13 +86,17 @@ public class ColumnWriterImpl implements ColumnWriter { getWidthFromMaxInt(column.getMaxRepetitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator); this.owner = owner; offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + statistics = Statistics.createStats(column.getPrimitiveType()); } @Override - public void addPageNoNulls(@NotNull final Object pageData, final int valuesCount) throws IOException { + public void addPageNoNulls(@NotNull final Object pageData, + final int valuesCount, + @NotNull final Statistics statistics) + throws IOException { initWriter(); // noinspection unchecked - bulkWriter.writeBulk(pageData, valuesCount); + bulkWriter.writeBulk(pageData, valuesCount, statistics); if (dlEncoder != null) { for (int i = 0; i < valuesCount; i++) { dlEncoder.writeInt(1); // TODO implement a bulk RLE writer @@ -126,7 +131,7 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int final BulkWriter dictionaryWriter = getWriter(column.getPrimitiveType()); // noinspection unchecked - dictionaryWriter.writeBulk(dictionaryValues, valuesCount); + dictionaryWriter.writeBulk(dictionaryValues, valuesCount, NullStatistics.INSTANCE); dictionaryOffset = writeChannel.position(); writeDictionaryPage(dictionaryWriter.getByteBufferView(), valuesCount); pageCount++; @@ -196,13 +201,16 @@ private BulkWriter getWriter(final PrimitiveType primitiveType) { } @Override - public void addPage(@NotNull final Object pageData, final int valuesCount) throws IOException { + public void addPage(@NotNull final Object pageData, + final int valuesCount, + @NotNull Statistics statistics) + throws IOException { if (dlEncoder == null) { throw new IllegalStateException("Null values not supported"); } initWriter(); // noinspection unchecked - bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount); + bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount, statistics); writePage(bulkWriter.getByteBufferView(), valuesCount); bulkWriter.reset(); } @@ -210,7 +218,8 @@ public void addPage(@NotNull final Object pageData, final int valuesCount) throw public void addVectorPage( @NotNull final Object pageData, @NotNull final IntBuffer repeatCount, - final int nonNullValueCount) throws IOException { + final int nonNullValueCount, + @NotNull final Statistics statistics) throws IOException { if (dlEncoder == null) { throw new IllegalStateException("Null values not supported"); } @@ -220,7 +229,7 @@ public void addVectorPage( initWriter(); // noinspection unchecked int valueCount = - bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount); + bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics); writePage(bulkWriter.getByteBufferView(), valueCount); bulkWriter.reset(); } @@ -416,7 +425,7 @@ public void close() { compressorAdapter.getCodecName(), encodingStatsBuilder.build(), encodings, - Statistics.createStats(column.getPrimitiveType()), + statistics, firstDataPageOffset, dictionaryOffset, totalValueCount, @@ -431,4 +440,14 @@ public ColumnDescriptor getColumn() { public OffsetIndex getOffsetIndex() { return offsetIndexBuilder.build(firstDataPageOffset); } + + @Override + public void resetStats() { + statistics = Statistics.createStats(column.getPrimitiveType()); + } + + @Override + public Statistics getStats() { + return statistics; + } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullStatistics.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullStatistics.java new file mode 100644 index 00000000000..694b028971b --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/NullStatistics.java @@ -0,0 +1,35 @@ +package io.deephaven.parquet.base; + +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.io.api.Binary; + +/** + * A lightweight statistics object that does nothing. This should be passed to BulkWriters when we don't want to track + * statistics. + */ +public class NullStatistics extends IntStatistics { + public static final NullStatistics INSTANCE = new NullStatistics(); + + public void updateStats(int value) {} + + public void updateStats(long value) {} + + public void updateStats(float value) {} + + public void updateStats(double value) {} + + public void updateStats(boolean value) {} + + public void updateStats(Binary value) {} + + @Override + public void incrementNumNulls() {} + + @Override + public void incrementNumNulls(long increment) {} + + @Override + public String toString() { + return "NullStatistic"; + } +} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBinaryChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBinaryChunkedWriter.java index 543ea98beb5..a11bab7ba5d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBinaryChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBinaryChunkedWriter.java @@ -8,6 +8,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.io.api.Binary; import org.jetbrains.annotations.NotNull; @@ -86,9 +87,13 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull Binary[] bulkValues, int rowCount) { + public void writeBulk(@NotNull Binary[] bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { for (int i = 0; i < rowCount; i++) { - writeBytes(bulkValues[i]); + final Binary v = bulkValues[i]; + writeBytes(v); + statistics.updateStats(v); } } @@ -96,12 +101,16 @@ public void writeBulk(@NotNull Binary[] bulkValues, int rowCount) { @Override public WriteResult writeBulkFilterNulls(@NotNull final Binary[] bulkValues, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int rowCount) throws IOException { + final int rowCount, + @NotNull final Statistics statistics) throws IOException { for (int i = 0; i < rowCount; i++) { if (bulkValues[i] != null) { - writeBytes(bulkValues[i]); + final Binary v = bulkValues[i]; + writeBytes(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -109,14 +118,19 @@ public WriteResult writeBulkFilterNulls(@NotNull final Binary[] bulkValues, } @Override - public @NotNull WriteResult writeBulkFilterNulls(@NotNull Binary[] bulkValues, int nonNullLeafCount) { + public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull Binary[] bulkValues, + final int nonNullLeafCount, + @NotNull final Statistics statistics) { IntBuffer nullOffsets = IntBuffer.allocate(4); for (int i = 0; i < nonNullLeafCount; i++) { if (bulkValues[i] != null) { - writeBytes(bulkValues[i]); + final Binary v = bulkValues[i]; + writeBytes(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } } return new WriteResult(nonNullLeafCount, nullOffsets); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBooleanChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBooleanChunkedWriter.java index bb40de9a6ee..4bdbd217063 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBooleanChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainBooleanChunkedWriter.java @@ -5,9 +5,9 @@ import io.deephaven.parquet.base.util.Helpers; import io.deephaven.util.QueryConstants; -import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; @@ -72,21 +72,31 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull ByteBuffer bulkValues, int rowCount) { + public void writeBulk(@NotNull ByteBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { while (bulkValues.hasRemaining()) { - writeBoolean(bulkValues.get() == 1); + final boolean v = bulkValues.get() == 1; + writeBoolean(v); + statistics.updateStats(v); } } @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull ByteBuffer bulkValues, @NotNull RunLengthBitPackingHybridEncoder dlEncoder, int rowCount) throws IOException { + public WriteResult writeBulkFilterNulls(@NotNull ByteBuffer bulkValues, + @NotNull RunLengthBitPackingHybridEncoder dlEncoder, + final int rowCount, + @NotNull final Statistics statistics) throws IOException { while (bulkValues.hasRemaining()) { final byte next = bulkValues.get(); if (next != QueryConstants.NULL_BYTE) { - writeBoolean(next == 1); + final boolean v = next == 1; + writeBoolean(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -94,16 +104,21 @@ public WriteResult writeBulkFilterNulls(@NotNull ByteBuffer bulkValues, @NotNull } @Override - public @NotNull WriteResult writeBulkFilterNulls(@NotNull ByteBuffer bulkValues, int rowCount) { + public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull ByteBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { IntBuffer nullOffsets = IntBuffer.allocate(4); int i = 0; while (bulkValues.hasRemaining()) { final byte next = bulkValues.get(); if (next != QueryConstants.NULL_BYTE) { - writeBoolean(next == 1); + final boolean v = next == 1; + writeBoolean(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainDoubleChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainDoubleChunkedWriter.java index 2af1ba097f6..583db477019 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainDoubleChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainDoubleChunkedWriter.java @@ -8,13 +8,12 @@ */ package io.deephaven.parquet.base; -import java.nio.IntBuffer; - import io.deephaven.parquet.base.util.Helpers; import io.deephaven.util.QueryConstants; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; @@ -22,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.DoubleBuffer; +import java.nio.IntBuffer; /** * A writer for encoding doubles in the PLAIN format @@ -89,8 +89,14 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull DoubleBuffer bulkValues, int rowCount) { + public void writeBulk(@NotNull DoubleBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); + // Generate statistics before we perform the bulk write. + for (int i = 0; i < rowCount; i++) { + statistics.updateStats(bulkValues.get(i)); + } targetBuffer.put(bulkValues); } @@ -98,14 +104,17 @@ public void writeBulk(@NotNull DoubleBuffer bulkValues, int rowCount) { @Override public WriteResult writeBulkFilterNulls(@NotNull final DoubleBuffer bulkValues, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int rowCount) throws IOException { + final int rowCount, + @NotNull final Statistics statistics) throws IOException { ensureCapacityFor(bulkValues); while (bulkValues.hasRemaining()) { - final double next = bulkValues.get(); - if (next != QueryConstants.NULL_DOUBLE) { - writeDouble(next); + final double v = bulkValues.get(); + if (v != QueryConstants.NULL_DOUBLE) { + writeDouble(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -114,18 +123,21 @@ public WriteResult writeBulkFilterNulls(@NotNull final DoubleBuffer bulkValues, @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull final DoubleBuffer bulkValues, - final int rowCount) { + public WriteResult writeBulkVectorFilterNulls(@NotNull final DoubleBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); int i = 0; IntBuffer nullOffsets = IntBuffer.allocate(4); while (bulkValues.hasRemaining()) { - final double next = bulkValues.get(); - if (next != QueryConstants.NULL_DOUBLE) { - writeDouble(next); + final double v = bulkValues.get(); + if (v != QueryConstants.NULL_DOUBLE) { + writeDouble(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainFloatChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainFloatChunkedWriter.java index a189dde052e..6bb4f2d1198 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainFloatChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainFloatChunkedWriter.java @@ -8,13 +8,12 @@ */ package io.deephaven.parquet.base; -import java.nio.IntBuffer; - import io.deephaven.parquet.base.util.Helpers; import io.deephaven.util.QueryConstants; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; @@ -22,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.FloatBuffer; +import java.nio.IntBuffer; /** * A writer for encoding floats in the PLAIN format @@ -89,8 +89,14 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull FloatBuffer bulkValues, int rowCount) { + public void writeBulk(@NotNull FloatBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); + // Generate statistics before we perform the bulk write. + for (int i = 0; i < rowCount; i++) { + statistics.updateStats(bulkValues.get(i)); + } targetBuffer.put(bulkValues); } @@ -98,14 +104,17 @@ public void writeBulk(@NotNull FloatBuffer bulkValues, int rowCount) { @Override public WriteResult writeBulkFilterNulls(@NotNull final FloatBuffer bulkValues, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int rowCount) throws IOException { + final int rowCount, + @NotNull final Statistics statistics) throws IOException { ensureCapacityFor(bulkValues); while (bulkValues.hasRemaining()) { - final float next = bulkValues.get(); - if (next != QueryConstants.NULL_FLOAT) { - writeFloat(next); + final float v = bulkValues.get(); + if (v != QueryConstants.NULL_FLOAT) { + writeFloat(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -114,18 +123,21 @@ public WriteResult writeBulkFilterNulls(@NotNull final FloatBuffer bulkValues, @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull final FloatBuffer bulkValues, - final int rowCount) { + public WriteResult writeBulkVectorFilterNulls(@NotNull final FloatBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); int i = 0; IntBuffer nullOffsets = IntBuffer.allocate(4); while (bulkValues.hasRemaining()) { - final float next = bulkValues.get(); - if (next != QueryConstants.NULL_FLOAT) { - writeFloat(next); + final float v = bulkValues.get(); + if (v != QueryConstants.NULL_FLOAT) { + writeFloat(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainIntChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainIntChunkedWriter.java index 58917d03741..6ec8f96690e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainIntChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainIntChunkedWriter.java @@ -8,6 +8,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; @@ -93,8 +94,14 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull IntBuffer bulkValues, int rowCount) { + public void writeBulk(@NotNull IntBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); + // Generate statistics before we perform the bulk write. + for (int i = 0; i < rowCount; i++) { + statistics.updateStats(bulkValues.get(i)); + } targetBuffer.put(bulkValues); } @@ -102,14 +109,17 @@ public void writeBulk(@NotNull IntBuffer bulkValues, int rowCount) { @Override public WriteResult writeBulkFilterNulls(@NotNull final IntBuffer bulkValues, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int rowCount) throws IOException { + final int rowCount, + @NotNull final Statistics statistics) throws IOException { ensureCapacityFor(bulkValues); while (bulkValues.hasRemaining()) { - final int next = bulkValues.get(); - if (next != nullValue) { - writeInteger(next); + final int v = bulkValues.get(); + if (v != nullValue) { + writeInteger(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -118,18 +128,21 @@ public WriteResult writeBulkFilterNulls(@NotNull final IntBuffer bulkValues, @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull final IntBuffer bulkValues, - final int rowCount) { + public WriteResult writeBulkVectorFilterNulls(@NotNull final IntBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); int i = 0; IntBuffer nullOffsets = IntBuffer.allocate(4); while (bulkValues.hasRemaining()) { - final int next = bulkValues.get(); - if (next != nullValue) { - writeInteger(next); + final int v = bulkValues.get(); + if (v != nullValue) { + writeInteger(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainLongChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainLongChunkedWriter.java index b505a8f65a7..9eead6b352d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainLongChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PlainLongChunkedWriter.java @@ -8,19 +8,19 @@ */ package io.deephaven.parquet.base; -import java.nio.IntBuffer; - import io.deephaven.parquet.base.util.Helpers; import io.deephaven.util.QueryConstants; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.IntBuffer; import java.nio.LongBuffer; /** @@ -89,8 +89,14 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull LongBuffer bulkValues, int rowCount) { + public void writeBulk(@NotNull LongBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); + // Generate statistics before we perform the bulk write. + for (int i = 0; i < rowCount; i++) { + statistics.updateStats(bulkValues.get(i)); + } targetBuffer.put(bulkValues); } @@ -98,14 +104,17 @@ public void writeBulk(@NotNull LongBuffer bulkValues, int rowCount) { @Override public WriteResult writeBulkFilterNulls(@NotNull final LongBuffer bulkValues, @NotNull final RunLengthBitPackingHybridEncoder dlEncoder, - final int rowCount) throws IOException { + final int rowCount, + @NotNull final Statistics statistics) throws IOException { ensureCapacityFor(bulkValues); while (bulkValues.hasRemaining()) { - final long next = bulkValues.get(); - if (next != QueryConstants.NULL_LONG) { - writeLong(next); + final long v = bulkValues.get(); + if (v != QueryConstants.NULL_LONG) { + writeLong(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -114,18 +123,21 @@ public WriteResult writeBulkFilterNulls(@NotNull final LongBuffer bulkValues, @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull final LongBuffer bulkValues, - final int rowCount) { + public WriteResult writeBulkVectorFilterNulls(@NotNull final LongBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { ensureCapacityFor(bulkValues); int i = 0; IntBuffer nullOffsets = IntBuffer.allocate(4); while (bulkValues.hasRemaining()) { - final long next = bulkValues.get(); - if (next != QueryConstants.NULL_LONG) { - writeLong(next); + final long v = bulkValues.get(); + if (v != QueryConstants.NULL_LONG) { + writeLong(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RleIntChunkedWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RleIntChunkedWriter.java index 87b55837ecf..a9ef7901718 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RleIntChunkedWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RleIntChunkedWriter.java @@ -8,8 +8,10 @@ import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +28,7 @@ public class RleIntChunkedWriter extends AbstractBulkValuesWriter { private static final Logger LOG = LoggerFactory.getLogger(org.apache.parquet.column.values.plain.PlainValuesWriter.class); private final RunLengthBitPackingHybridEncoder encoder; - private byte bitWidth; + private final byte bitWidth; RleIntChunkedWriter(int pageSize, ByteBufferAllocator allocator, byte bitWidth) { encoder = new RunLengthBitPackingHybridEncoder(bitWidth, pageSize, pageSize, allocator); @@ -95,22 +97,31 @@ public String memUsageString(String prefix) { } @Override - public void writeBulk(@NotNull IntBuffer bulkValues, int rowCount) { - + public void writeBulk(@NotNull IntBuffer bulkValues, + final int rowCount, + @Nullable final Statistics statistics) { + // Track statistics while we write the values. for (int i = 0; i < rowCount; i++) { - writeInteger(bulkValues.get()); + final int v = bulkValues.get(); + writeInteger(v); + statistics.updateStats(v); } } @NotNull @Override - public WriteResult writeBulkFilterNulls(@NotNull IntBuffer bulkValues, @NotNull RunLengthBitPackingHybridEncoder dlEncoder, int rowCount) throws IOException { + public WriteResult writeBulkFilterNulls(@NotNull IntBuffer bulkValues, + @NotNull RunLengthBitPackingHybridEncoder dlEncoder, + final int rowCount, + @NotNull final Statistics statistics) throws IOException { while (bulkValues.hasRemaining()) { - int next = bulkValues.get(); - if (next != QueryConstants.NULL_INT) { - writeInteger(next); + int v = bulkValues.get(); + if (v != QueryConstants.NULL_INT) { + writeInteger(v); + statistics.updateStats(v); dlEncoder.writeInt(DL_ITEM_PRESENT); } else { + statistics.incrementNumNulls(); dlEncoder.writeInt(DL_ITEM_NULL); } } @@ -118,16 +129,20 @@ public WriteResult writeBulkFilterNulls(@NotNull IntBuffer bulkValues, @NotNull } @Override - public @NotNull WriteResult writeBulkFilterNulls(@NotNull IntBuffer bulkValues, int rowCount) { + public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull IntBuffer bulkValues, + final int rowCount, + @NotNull final Statistics statistics) { IntBuffer nullOffsets = IntBuffer.allocate(4); int i = 0; while (bulkValues.hasRemaining()) { - int next = bulkValues.get(); - if (next != QueryConstants.NULL_INT) { - writeInteger(next); + int v = bulkValues.get(); + if (v != QueryConstants.NULL_INT) { + writeInteger(v); + statistics.updateStats(v); } else { nullOffsets = Helpers.ensureCapacity(nullOffsets); nullOffsets.put(i); + statistics.incrementNumNulls(); } i++; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 2c4c28c333b..fae0ab70426 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -44,6 +44,8 @@ import io.deephaven.vector.Vector; import org.apache.commons.lang3.tuple.Pair; import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.io.api.Binary; import org.jetbrains.annotations.NotNull; @@ -57,7 +59,6 @@ import java.nio.file.Paths; import java.time.Instant; import java.util.*; -import java.util.function.Function; import java.util.function.IntSupplier; import static io.deephaven.util.QueryConstants.NULL_INT; @@ -594,6 +595,7 @@ private static void encodePlain(@NotNull final ParquetInstructions w final VectorColumnWriterHelper vectorHelper = writingHelper.isVectorFormat() ? (VectorColumnWriterHelper) writingHelper : null; + final Statistics statistics = columnWriter.getStats(); // @formatter:off try (final RowSequence.Iterator lengthRowSetIterator = vectorHelper != null ? tableRowSet.getRowSequenceIterator() @@ -625,10 +627,10 @@ private static void encodePlain(@NotNull final ParquetInstructions w .asIntChunk(); lenChunk.copyToTypedBuffer(0, repeatCount, 0, lenChunk.size()); repeatCount.limit(lenChunk.size()); - columnWriter.addVectorPage(bufferToWrite, repeatCount, transferObject.rowCount()); + columnWriter.addVectorPage(bufferToWrite, repeatCount, transferObject.rowCount(), statistics); repeatCount.clear(); } else { - columnWriter.addPage(bufferToWrite, transferObject.rowCount()); + columnWriter.addPage(bufferToWrite, transferObject.rowCount(), statistics); } } } @@ -655,8 +657,10 @@ private static boolean tryEncodeDictionary( final VectorColumnWriterHelper vectorHelper = writingHelper.isVectorFormat() ? (VectorColumnWriterHelper) writingHelper : null; + final Statistics statistics = columnWriter.getStats(); try { final List pageBuffers = new ArrayList<>(); + final BitSet pageBufferHasNull = new BitSet(); Binary[] encodedKeys = new Binary[Math.min(INITIAL_DICTIONARY_SIZE, maxKeys)]; final TObjectIntHashMap keyToPos = @@ -671,6 +675,7 @@ private static boolean tryEncodeDictionary( ? vectorHelper.valueRowSet.getRowSequenceIterator() : tableRowSet.getRowSequenceIterator()) { for (int curPage = 0; curPage < pageCount; curPage++) { + boolean pageHasNulls = false; final RowSequence rs = it.getNextRowSequenceWithLength(valuePageSizeGetter.getAsInt()); final ObjectChunk chunk = valueSource.getChunk(context, rs).asObjectChunk(); @@ -679,18 +684,22 @@ private static boolean tryEncodeDictionary( final String key = chunk.get(vi); int dictionaryPos = keyToPos.get(key); if (dictionaryPos == keyToPos.getNoEntryValue()) { + // Track the min/max statistics while the dictionary is being built. if (key == null) { - hasNulls = true; + hasNulls = pageHasNulls = true; } else { if (keyCount == encodedKeys.length) { if (keyCount >= maxKeys) { + // Reset the stats because we will re-encode these in PLAIN encoding. + columnWriter.resetStats(); throw new DictionarySizeExceededException( "Dictionary maximum size exceeded for " + columnDefinition.getName()); } - encodedKeys = Arrays.copyOf(encodedKeys, (int) Math.min(keyCount * 2L, maxKeys)); } - encodedKeys[keyCount] = Binary.fromString(key); + final Binary encodedKey = Binary.fromString(key); + encodedKeys[keyCount] = encodedKey; + statistics.updateStats(encodedKey); dictionaryPos = keyCount; keyCount++; } @@ -699,10 +708,13 @@ private static boolean tryEncodeDictionary( posInDictionary.put(dictionaryPos); } pageBuffers.add(posInDictionary); + pageBufferHasNull.set(curPage, pageHasNulls); } } if (keyCount == 0 && hasNulls) { + // Reset the stats because we will re-encode these in PLAIN encoding. + columnWriter.resetStats(); return false; } @@ -727,18 +739,27 @@ private static boolean tryEncodeDictionary( columnWriter.addDictionaryPage(encodedKeys, keyCount); final Iterator arraySizeIt = arraySizeBuffers == null ? null : arraySizeBuffers.iterator(); - for (final IntBuffer pageBuffer : pageBuffers) { + // We've already determined min/max statistics while building the dictionary. Now use an integer statistics + // object to track the number of nulls that will be written. + Statistics tmpStats = new IntStatistics(); + for (int i = 0; i < pageBuffers.size(); ++i) { + final IntBuffer pageBuffer = pageBuffers.get(i); + final boolean pageHasNulls = pageBufferHasNull.get(i); pageBuffer.flip(); if (vectorHelper != null) { - columnWriter.addVectorPage(pageBuffer, arraySizeIt.next(), pageBuffer.remaining()); - } else if (hasNulls) { - columnWriter.addPage(pageBuffer, pageBuffer.remaining()); + columnWriter.addVectorPage(pageBuffer, arraySizeIt.next(), pageBuffer.remaining(), tmpStats); + } else if (pageHasNulls) { + columnWriter.addPage(pageBuffer, pageBuffer.remaining(), tmpStats); } else { - columnWriter.addPageNoNulls(pageBuffer, pageBuffer.remaining()); + columnWriter.addPageNoNulls(pageBuffer, pageBuffer.remaining(), tmpStats); } } + // Add the count of nulls to the overall stats. + statistics.incrementNumNulls(tmpStats.getNumNulls()); return true; } catch (final DictionarySizeExceededException ignored) { + // Reset the stats because we will re-encode these in PLAIN encoding. + columnWriter.resetStats(); return false; } } @@ -1015,7 +1036,6 @@ static class StringTransfer implements TransferObject { private final Binary[] buffer; private final ColumnSource columnSource; - StringTransfer(ColumnSource columnSource, int targetSize) { this.columnSource = columnSource; this.buffer = new Binary[targetSize]; @@ -1060,7 +1080,6 @@ static class CodecTransfer implements TransferObject { private final Binary[] buffer; private final ColumnSource columnSource; - CodecTransfer(ColumnSource columnSource, ObjectCodec codec, int targetSize) { this.columnSource = columnSource; this.buffer = new Binary[targetSize]; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 8eb1b292b67..744ba7cf52a 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -6,11 +6,19 @@ import io.deephaven.UncheckedDeephavenException; import io.deephaven.api.Selectable; import io.deephaven.base.FileUtils; +import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.primitive.function.ByteConsumer; +import io.deephaven.engine.primitive.function.CharConsumer; +import io.deephaven.engine.primitive.function.FloatConsumer; +import io.deephaven.engine.primitive.function.ShortConsumer; import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; import io.deephaven.engine.table.impl.util.ColumnHolder; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; +import io.deephaven.engine.table.iterators.*; import io.deephaven.engine.testutil.TstUtils; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.BigDecimalUtils; @@ -23,7 +31,16 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.test.types.OutOfBandTest; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.compare.DoubleComparisons; +import io.deephaven.util.compare.FloatComparisons; +import io.deephaven.vector.*; import junit.framework.TestCase; +import org.apache.commons.lang3.mutable.*; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -34,12 +51,18 @@ import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.DoubleConsumer; +import java.util.function.IntConsumer; +import java.util.function.LongConsumer; + import org.junit.experimental.categories.Category; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.util.QueryConstants.*; import static org.junit.Assert.*; @Category(OutOfBandTest.class) @@ -795,4 +818,1430 @@ public void readModifyWriteTestsImpl(TestParquetTableWriter writer) { // swapped out, then we would not be able to read from the file TstUtils.assertTableEquals(tableToSave, fromDisk); } + + @Test + public void readWriteStatisticsTest() { + // Test simple structured table. + final ColumnDefinition columnDefinition = + ColumnDefinition.fromGenericType("VariableWidthByteArrayColumn", byte[].class, byte.class); + final TableDefinition tableDefinition = TableDefinition.of(columnDefinition); + final byte[] byteArray = new byte[] {1, 2, 3, 4, NULL_BYTE, 6, 7, 8, 9, NULL_BYTE, 11, 12, 13}; + final Table simpleTable = TableTools.newTable(tableDefinition, + TableTools.col("VariableWidthByteArrayColumn", null, byteArray, byteArray, byteArray, byteArray, + byteArray)); + final File simpleTableDest = new File(rootFile, "ParquetTest_simple_statistics_test.parquet"); + ParquetTools.writeTable(simpleTable, simpleTableDest); + + final Table simpleFromDisk = ParquetTools.readTable(simpleTableDest); + TstUtils.assertTableEquals(simpleTable, simpleFromDisk); + + assertTableStatistics(simpleTable, simpleTableDest); + + // Test flat columns. + final Table flatTableToSave = getTableFlat(10_000, true, true); + final File flatTableDest = new File(rootFile, "ParquetTest_flat_statistics_test.parquet"); + ParquetTools.writeTable(flatTableToSave, flatTableDest); + + final Table flatFromDisk = ParquetTools.readTable(flatTableDest); + TstUtils.assertTableEquals(maybeFixBigDecimal(flatTableToSave), flatFromDisk); + + assertTableStatistics(flatTableToSave, flatTableDest); + + // Test nested columns. + final Table groupedTableToSave = getGroupedTable(10_000, true); + final File groupedTableDest = new File(rootFile, "ParquetTest_grouped_statistics_test.parquet"); + ParquetTools.writeTable(groupedTableToSave, groupedTableDest, groupedTableToSave.getDefinition()); + + final Table groupedFromDisk = ParquetTools.readTable(groupedTableDest); + TstUtils.assertTableEquals(groupedTableToSave, groupedFromDisk); + + assertTableStatistics(groupedTableToSave, groupedTableDest); + } + + /** + * Test our manual verification techniques against a file generated by pyarrow. Here is the code to produce the file + * when/if this file needs to be re-generated or changed. + * + *
+     * ###############################################################################
+     * import pyarrow.parquet
+     *
+     * pa_table = pyarrow.table({
+     *     'int': [0, None, 100, -100],
+     *     'float': [0.0, None, 100.0, -100.0],
+     *     'string': ["aaa", None, "111", "ZZZ"],
+     *     'intList': [
+     *         [0, None, 2],
+     *         None,
+     *         [3, 4, 6, 7, 8, 9, 10, 100, -100],
+     *         [5]
+     *     ],
+     *     'floatList': [
+     *         [0.0, None, 2.0],
+     *         None,
+     *         [3.0, 4.0, 6.0, 7.0, 8.0, 9.0, 10.0, 100.0, -100.0],
+     *         [5.0]
+     *     ],
+     *     'stringList': [
+     *         ["aaa", None, None],
+     *         None,
+     *         ["111", "zzz", "ZZZ", "AAA"],
+     *         ["ccc"]
+     *     ]})
+     * pyarrow.parquet.write_table(pa_table, './extensions/parquet/table/src/test/resources/e0/pyarrow_stats.parquet')
+     * ###############################################################################
+     * 
+ */ + @Test + public void verifyPyArrowStatistics() { + final String path = ParquetTableReadWriteTest.class.getResource("/e0/pyarrow_stats.parquet").getFile(); + final File pyarrowDest = new File(path); + final Table pyarrowFromDisk = ParquetTools.readTable(pyarrowDest); + + // Verify that our verification code works for a pyarrow generated table. + assertTableStatistics(pyarrowFromDisk, pyarrowDest); + + // Write the table to disk using our code. + final File dhDest = new File(rootFile, "ParquetTest_statistics_test.parquet"); + ParquetTools.writeTable(pyarrowFromDisk, dhDest); + + // Read the table back in using our code. + final Table dhFromDisk = ParquetTools.readTable(dhDest); + + // Verify the two tables loaded from disk are equal. + TstUtils.assertTableEquals(pyarrowFromDisk, dhFromDisk); + + // Run the verification code against DHC writer stats. + assertTableStatistics(pyarrowFromDisk, dhDest); + assertTableStatistics(dhFromDisk, dhDest); + } + + private void assertTableStatistics(Table inputTable, File dest) { + // Verify that the columns have the correct statistics. + final ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null).getMetadata(); + + final String[] colNames = + inputTable.getColumnSourceMap().keySet().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY); + for (int colIdx = 0; colIdx < inputTable.numColumns(); ++colIdx) { + final String colName = colNames[colIdx]; + + final ColumnSource columnSource = inputTable.getColumnSource(colName); + final ColumnChunkMetaData columnChunkMetaData = metadata.getBlocks().get(0).getColumns().get(colIdx); + final Statistics statistics = columnChunkMetaData.getStatistics(); + + final Class csType = columnSource.getType(); + + if (csType == boolean.class || csType == Boolean.class) { + assertBooleanColumnStatistics( + new SerialByteColumnIterator( + ReinterpretUtils.booleanToByteSource((ColumnSource) columnSource), + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == Boolean[].class) { + assertBooleanArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == byte.class || csType == Byte.class) { + assertByteColumnStatistics( + new SerialByteColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == byte[].class) { + assertByteArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == ByteVector.class) { + assertByteVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == char.class || csType == Character.class) { + assertCharColumnStatistics( + new SerialCharacterColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == char[].class) { + assertCharArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == CharVector.class) { + assertCharVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == short.class || csType == Short.class) { + assertShortColumnStatistics( + new SerialShortColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == short[].class) { + assertShortArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == ShortVector.class) { + assertShortVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == int.class || csType == Integer.class) { + assertIntColumnStatistics( + new SerialIntegerColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == int[].class) { + assertIntArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == IntVector.class) { + assertIntVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == long.class || csType == Long.class) { + assertLongColumnStatistics( + new SerialLongColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == long[].class) { + assertLongArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == LongVector.class) { + assertLongVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == float.class || csType == Float.class) { + assertFloatColumnStatistics( + new SerialFloatColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == float[].class) { + assertFloatArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == FloatVector.class) { + assertFloatVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == double.class || csType == Double.class) { + assertDoubleColumnStatistics( + new SerialDoubleColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == double[].class) { + assertDoubleArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == DoubleVector.class) { + assertDoubleVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == String.class) { + assertStringColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == String[].class) { + assertStringArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == ObjectVector.class && columnSource.getComponentType() == String.class) { + assertStringVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource>) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == BigInteger.class) { + assertBigIntegerColumnStatistics( + new SerialObjectColumnIterator( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == BigDecimal.class) { + assertBigDecimalColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == Instant.class) { + assertInstantColumnStatistic( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == Instant[].class) { + assertInstantArrayColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else if (csType == ObjectVector.class && columnSource.getComponentType() == Instant.class) { + assertInstantVectorColumnStatistics( + new SerialObjectColumnIterator<>( + (ColumnSource>) columnSource, + inputTable.getRowSet()), + (Statistics) statistics); + } else { + // We can't verify statistics for this column type, so just skip it. + System.out.println("Ignoring column " + colName + " of type " + csType.getName()); + } + } + } + + // region Column Statistics Assertions + private void assertBooleanColumnStatistics(SerialByteColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_BYTE); + MutableInt max = new MutableInt(NULL_BYTE); + + iterator.forEachRemaining((ByteConsumer) value -> { + itemCount.increment(); + if (value == NULL_BYTE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_BYTE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_BYTE || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue() == 1, statistics.genericGetMin()); + assertEquals(max.getValue() == 1, statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertBooleanArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_BYTE); + MutableInt max = new MutableInt(NULL_BYTE); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final Boolean value : values) { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_BYTE || (value ? 1 : 0) < min.getValue()) { + min.setValue(value ? 1 : 0); + } + if (max.getValue() == NULL_BYTE || (value ? 1 : 0) > max.getValue()) { + max.setValue(value ? 1 : 0); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue() == 1, statistics.genericGetMin()); + assertEquals(max.getValue() == 1, statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertByteColumnStatistics(SerialByteColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_BYTE); + MutableInt max = new MutableInt(NULL_BYTE); + + iterator.forEachRemaining((ByteConsumer) value -> { + itemCount.increment(); + if (value == NULL_BYTE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_BYTE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_BYTE || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertByteArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_BYTE); + MutableInt max = new MutableInt(NULL_BYTE); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final byte value : values) { + itemCount.increment(); + if (value == NULL_BYTE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_BYTE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_BYTE || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertByteVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_BYTE); + MutableInt max = new MutableInt(NULL_BYTE); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final byte value : values) { + itemCount.increment(); + if (value == NULL_BYTE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_BYTE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_BYTE || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertCharColumnStatistics(SerialCharacterColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_CHAR); + MutableInt max = new MutableInt(NULL_CHAR); + + iterator.forEachRemaining((CharConsumer) value -> { + itemCount.increment(); + if (value == NULL_CHAR) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_CHAR || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_CHAR || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertCharArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_CHAR); + MutableInt max = new MutableInt(NULL_CHAR); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final char value : values) { + itemCount.increment(); + if (value == NULL_CHAR) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_CHAR || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_CHAR || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertCharVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_CHAR); + MutableInt max = new MutableInt(NULL_CHAR); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final char value : values) { + itemCount.increment(); + if (value == NULL_CHAR) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_CHAR || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_CHAR || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertShortColumnStatistics(SerialShortColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_SHORT); + MutableInt max = new MutableInt(NULL_SHORT); + + iterator.forEachRemaining((ShortConsumer) value -> { + itemCount.increment(); + if (value == NULL_SHORT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_SHORT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_SHORT || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertShortArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_SHORT); + MutableInt max = new MutableInt(NULL_SHORT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final short value : values) { + itemCount.increment(); + if (value == NULL_SHORT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_SHORT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_SHORT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertShortVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_SHORT); + MutableInt max = new MutableInt(NULL_SHORT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final short value : values) { + itemCount.increment(); + if (value == NULL_SHORT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_SHORT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_SHORT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertIntColumnStatistics(SerialIntegerColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_INT); + MutableInt max = new MutableInt(NULL_INT); + + iterator.forEachRemaining((IntConsumer) value -> { + itemCount.increment(); + if (value == NULL_INT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_INT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_INT || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertIntArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_INT); + MutableInt max = new MutableInt(NULL_INT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final int value : values) { + itemCount.increment(); + if (value == NULL_INT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_INT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_INT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertIntVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableInt min = new MutableInt(NULL_INT); + MutableInt max = new MutableInt(NULL_INT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final int value : values) { + itemCount.increment(); + if (value == NULL_INT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_INT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_INT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertLongColumnStatistics(SerialLongColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining((LongConsumer) value -> { + itemCount.increment(); + if (value == NULL_LONG) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_LONG || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_LONG || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertLongArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final long value : values) { + itemCount.increment(); + if (value == NULL_LONG) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_LONG || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_LONG || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertLongVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final long value : values) { + itemCount.increment(); + if (value == NULL_LONG) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_LONG || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_LONG || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertFloatColumnStatistics(SerialFloatColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableFloat min = new MutableFloat(NULL_FLOAT); + MutableFloat max = new MutableFloat(NULL_FLOAT); + + iterator.forEachRemaining((FloatConsumer) value -> { + itemCount.increment(); + if (value == NULL_FLOAT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_FLOAT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_FLOAT || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use FloatComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(FloatComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(FloatComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertFloatArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableFloat min = new MutableFloat(NULL_FLOAT); + MutableFloat max = new MutableFloat(NULL_FLOAT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final float value : values) { + itemCount.increment(); + if (value == NULL_FLOAT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_FLOAT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_FLOAT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use FloatComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(FloatComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(FloatComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertFloatVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableFloat min = new MutableFloat(NULL_FLOAT); + MutableFloat max = new MutableFloat(NULL_FLOAT); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final float value : values) { + itemCount.increment(); + if (value == NULL_FLOAT) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_FLOAT || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_FLOAT || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use FloatComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(FloatComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(FloatComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertDoubleColumnStatistics(SerialDoubleColumnIterator iterator, Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableDouble min = new MutableDouble(NULL_DOUBLE); + MutableDouble max = new MutableDouble(NULL_DOUBLE); + + iterator.forEachRemaining((DoubleConsumer) value -> { + itemCount.increment(); + if (value == NULL_DOUBLE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_DOUBLE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_DOUBLE || value > max.getValue()) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use DoubleComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(DoubleComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(DoubleComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertDoubleArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableDouble min = new MutableDouble(NULL_DOUBLE); + MutableDouble max = new MutableDouble(NULL_DOUBLE); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final double value : values) { + itemCount.increment(); + if (value == NULL_DOUBLE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_DOUBLE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_DOUBLE || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use DoubleComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(DoubleComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(DoubleComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertDoubleVectorColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableDouble min = new MutableDouble(NULL_DOUBLE); + MutableDouble max = new MutableDouble(NULL_DOUBLE); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final double value : values) { + itemCount.increment(); + if (value == NULL_DOUBLE) { + nullCount.increment(); + } else { + if (min.getValue() == NULL_DOUBLE || value < min.getValue()) { + min.setValue(value); + } + if (max.getValue() == NULL_DOUBLE || value > max.getValue()) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + // Use DoubleComparisons.compare() to handle -0.0f == 0.0f properly + assertEquals(DoubleComparisons.compare(min.getValue(), statistics.genericGetMin()), 0); + assertEquals(DoubleComparisons.compare(max.getValue(), statistics.genericGetMax()), 0); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertStringColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableObject min = new MutableObject<>(null); + MutableObject max = new MutableObject<>(null); + + iterator.forEachRemaining((value) -> { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == null || value.compareTo(min.getValue()) < 0) { + min.setValue(value); + } + if (max.getValue() == null || value.compareTo(max.getValue()) > 0) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(Binary.fromString(min.getValue()), statistics.genericGetMin()); + assertEquals(Binary.fromString(max.getValue()), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertStringArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableObject min = new MutableObject<>(null); + MutableObject max = new MutableObject<>(null); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final String value : values) { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == null || value.compareTo(min.getValue()) < 0) { + min.setValue(value); + } + if (max.getValue() == null || value.compareTo(max.getValue()) > 0) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(Binary.fromString(min.getValue()), statistics.genericGetMin()); + assertEquals(Binary.fromString(max.getValue()), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertStringVectorColumnStatistics(SerialObjectColumnIterator> iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableObject min = new MutableObject<>(null); + MutableObject max = new MutableObject<>(null); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (String value : values) { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == null || value.compareTo(min.getValue()) < 0) { + min.setValue(value); + } + if (max.getValue() == null || value.compareTo(max.getValue()) > 0) { + max.setValue(value); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(Binary.fromString(min.getValue()), statistics.genericGetMin()); + assertEquals(Binary.fromString(max.getValue()), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertInstantColumnStatistic(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining((value) -> { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + // DateTimeUtils.epochNanos() is the correct conversion for Instant to long. + if (min.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) < min.getValue()) { + min.setValue(DateTimeUtils.epochNanos(value)); + } + if (max.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) > max.getValue()) { + max.setValue(DateTimeUtils.epochNanos(value)); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertInstantArrayColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (final Instant value : values) { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + // DateTimeUtils.epochNanos() is the correct conversion for Instant to long. + if (min.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) < min.getValue()) { + min.setValue(DateTimeUtils.epochNanos(value)); + } + if (max.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) > max.getValue()) { + max.setValue(DateTimeUtils.epochNanos(value)); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertInstantVectorColumnStatistics(SerialObjectColumnIterator> iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableLong min = new MutableLong(NULL_LONG); + MutableLong max = new MutableLong(NULL_LONG); + + iterator.forEachRemaining(values -> { + if (values == null) { + itemCount.increment(); + nullCount.increment(); + return; + } + for (Instant value : values) { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + // DateTimeUtils.epochNanos() is the correct conversion for Instant to long. + if (min.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) < min.getValue()) { + min.setValue(DateTimeUtils.epochNanos(value)); + } + if (max.getValue() == NULL_LONG || DateTimeUtils.epochNanos(value) > max.getValue()) { + max.setValue(DateTimeUtils.epochNanos(value)); + } + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed + // values. + assertEquals(min.getValue(), statistics.genericGetMin()); + assertEquals(max.getValue(), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertBigDecimalColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableObject min = new MutableObject<>(null); + MutableObject max = new MutableObject<>(null); + + iterator.forEachRemaining((value) -> { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == null || value.compareTo(min.getValue()) < 0) { + min.setValue(value); + } + if (max.getValue() == null || value.compareTo(max.getValue()) > 0) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(Binary.fromConstantByteArray(min.getValue().unscaledValue().toByteArray()), + statistics.genericGetMin()); + assertEquals(Binary.fromConstantByteArray(max.getValue().unscaledValue().toByteArray()), + statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + private void assertBigIntegerColumnStatistics(SerialObjectColumnIterator iterator, + Statistics statistics) { + MutableLong itemCount = new MutableLong(0); + MutableLong nullCount = new MutableLong(0); + MutableObject min = new MutableObject<>(null); + MutableObject max = new MutableObject<>(null); + + iterator.forEachRemaining((value) -> { + itemCount.increment(); + if (value == null) { + nullCount.increment(); + } else { + if (min.getValue() == null || value.compareTo(min.getValue()) < 0) { + min.setValue(value); + } + if (max.getValue() == null || value.compareTo(max.getValue()) > 0) { + max.setValue(value); + } + } + }); + + assertEquals(nullCount.intValue(), statistics.getNumNulls()); + if (!itemCount.getValue().equals(nullCount.getValue())) { + // There are some non-null values, so min and max should be non-null and equal to observed values. + assertEquals(Binary.fromConstantByteArray(min.getValue().toByteArray()), statistics.genericGetMin()); + assertEquals(Binary.fromConstantByteArray(max.getValue().toByteArray()), statistics.genericGetMax()); + } else { + // Everything is null, statistics should be empty. + assertFalse(statistics.hasNonNullValue()); + } + } + + // endregion Column Statistics Assertions } diff --git a/extensions/parquet/table/src/test/resources/e0/pyarrow_stats.parquet b/extensions/parquet/table/src/test/resources/e0/pyarrow_stats.parquet new file mode 100644 index 00000000000..503301ca035 Binary files /dev/null and b/extensions/parquet/table/src/test/resources/e0/pyarrow_stats.parquet differ