Skip to content

Commit

Permalink
Added statistics tracking to DH parquet table writing. (#4339)
Browse files Browse the repository at this point in the history
* Added statistics tracking to DH parquet table writing.

* Added verification against pyarrow gen'd stats.

* Verify pyarrow statistics match DHC statistics.
  • Loading branch information
lbooker42 authored Aug 25, 2023
1 parent 92a7c9c commit 3c5370b
Show file tree
Hide file tree
Showing 15 changed files with 1,760 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.parquet.base;

import io.deephaven.util.QueryConstants;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -36,9 +37,11 @@ public int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues,
@NotNull final IntBuffer vectorSizes,
@NotNull final RunLengthBitPackingHybridEncoder rlEncoder,
@NotNull final RunLengthBitPackingHybridEncoder dlEncoder,
final int nonNullValueCount) throws IOException {
final IntBuffer nullsOffsets = writeBulkFilterNulls(bulkValues, nonNullValueCount).nullOffsets;
return applyDlAndRl(vectorSizes, rlEncoder, dlEncoder, nullsOffsets);
final int nonNullValueCount,
@NotNull Statistics<?> statistics) throws IOException {
final IntBuffer nullsOffsets =
writeBulkVectorFilterNulls(bulkValues, nonNullValueCount, statistics).nullOffsets;
return applyDlAndRl(vectorSizes, rlEncoder, dlEncoder, nullsOffsets, statistics);
}

/**
Expand Down Expand Up @@ -102,7 +105,8 @@ public int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues,
int applyDlAndRl(@NotNull final IntBuffer vectorSizes,
@NotNull final RunLengthBitPackingHybridEncoder rlEncoder,
@NotNull final RunLengthBitPackingHybridEncoder dlEncoder,
@NotNull final IntBuffer nullsOffsets) throws IOException {
@NotNull final IntBuffer nullsOffsets,
@NotNull Statistics<?> statistics) throws IOException {
int valueCount = 0;
int leafCount = 0;

Expand Down Expand Up @@ -139,6 +143,8 @@ int applyDlAndRl(@NotNull final IntBuffer vectorSizes,
valueCount++;
dlEncoder.writeInt(DL_VECTOR_NULL_VECTOR);
rlEncoder.writeInt(RL_FIRST_ELEM);
// This row is null so include it in the overall null counts for this column.
statistics.incrementNumNulls();
}
}
return valueCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.deephaven.parquet.base;

import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -36,7 +37,7 @@ public WriteResult(final int valueCount, @Nullable final IntBuffer nullOffsets)
* @param bulkValues the buffer of values
* @param rowCount the total number of rows to write.
*/
void writeBulk(@NotNull BUFFER_TYPE bulkValues, int rowCount);
void writeBulk(@NotNull BUFFER_TYPE bulkValues, int rowCount, @NotNull Statistics<?> statistics);

/**
* Write a buffer's worth of values to the underlying page. This method will find, without writing, {@code null}
Expand All @@ -46,13 +47,15 @@ public WriteResult(final int valueCount, @Nullable final IntBuffer nullOffsets)
* @param bulkValues the values to write
* @param dlEncoder the encoder for definition levels
* @param rowCount the number of rows being written
* @param statistics the {@link Statistics} object to modify.
* @return a {@link WriteResult} containing the statistics of the result.
* @throws IOException if there was an error during write.
*/
@NotNull
WriteResult writeBulkFilterNulls(@NotNull BUFFER_TYPE bulkValues,
@NotNull RunLengthBitPackingHybridEncoder dlEncoder,
int rowCount) throws IOException;
final int rowCount,
@NotNull Statistics<?> statistics) throws IOException;

/**
* Write a buffer's worth of packed vector values to the underlying page. This method will set the proper definition
Expand All @@ -70,7 +73,8 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues,
@NotNull final IntBuffer vectorSizes,
@NotNull final RunLengthBitPackingHybridEncoder rlEncoder,
@NotNull final RunLengthBitPackingHybridEncoder dlEncoder,
final int nonNullValueCount) throws IOException;
final int nonNullValueCount,
@NotNull Statistics<?> statistics) throws IOException;

/**
* Write a buffer's worth of packed vector values to the underlying page, skipping null values. This method will
Expand All @@ -81,7 +85,9 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues,
* @return a {@link WriteResult} containing the statistics of the result.
*/
@NotNull
WriteResult writeBulkFilterNulls(@NotNull BUFFER_TYPE bulkValues, int rowCount);
WriteResult writeBulkVectorFilterNulls(@NotNull BUFFER_TYPE bulkValues,
final int rowCount,
@NotNull Statistics<?> statistics);

/**
* Clear all internal state.
Expand All @@ -93,7 +99,7 @@ int writeBulkVector(@NotNull final BUFFER_TYPE bulkValues,
*
* @return a {@link ByteBuffer} containing the written data.
*
* @throws IOException
* @throws IOException if there is an exception reading the data.
*/
ByteBuffer getByteBufferView() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,45 @@

import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import io.deephaven.util.annotations.FinalDefault;
import org.apache.parquet.column.statistics.Statistics;

import java.io.IOException;
import java.nio.IntBuffer;

public interface ColumnWriter extends SafeCloseable {
void addPageNoNulls(@NotNull Object pageData, int valuesCount) throws IOException;
/**
* Add a page with no nulls to the file.
*/
void addPageNoNulls(@NotNull Object pageData, int valuesCount, @NotNull Statistics<?> statistics)
throws IOException;

/**
* Add a dictionary page to the file.
*/
void addDictionaryPage(@NotNull Object dictionaryValues, int valuesCount) throws IOException;

void addPage(@NotNull Object pageData, int valuesCount) throws IOException;
/**
* Add a page (potentially containing nulls) to the file.
*/
void addPage(Object pageData, int valuesCount, Statistics<?> statistics) throws IOException;

void addVectorPage(@NotNull Object pageData, @NotNull IntBuffer repeatCount, int valuesCount) throws IOException;
/**
* Add a vector page to the file.
*/
void addVectorPage(@NotNull Object pageData,
@NotNull IntBuffer repeatCount,
int valuesCount,
@NotNull Statistics<?> statistics)
throws IOException;

/**
* Reset the statistics for this column. This must be called between each row group.
*/
void resetStats();

/**
* Return the current statistics.
*/
Statistics<?> getStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ColumnWriterImpl implements ColumnWriter {
private final CompressorAdapter compressorAdapter;
private boolean hasDictionary;
private int pageCount = 0;
private Statistics<?> statistics;
private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();


Expand Down Expand Up @@ -85,13 +86,17 @@ public class ColumnWriterImpl implements ColumnWriter {
getWidthFromMaxInt(column.getMaxRepetitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator);
this.owner = owner;
offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
statistics = Statistics.createStats(column.getPrimitiveType());
}

@Override
public void addPageNoNulls(@NotNull final Object pageData, final int valuesCount) throws IOException {
public void addPageNoNulls(@NotNull final Object pageData,
final int valuesCount,
@NotNull final Statistics<?> statistics)
throws IOException {
initWriter();
// noinspection unchecked
bulkWriter.writeBulk(pageData, valuesCount);
bulkWriter.writeBulk(pageData, valuesCount, statistics);
if (dlEncoder != null) {
for (int i = 0; i < valuesCount; i++) {
dlEncoder.writeInt(1); // TODO implement a bulk RLE writer
Expand Down Expand Up @@ -126,7 +131,7 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int
final BulkWriter dictionaryWriter = getWriter(column.getPrimitiveType());

// noinspection unchecked
dictionaryWriter.writeBulk(dictionaryValues, valuesCount);
dictionaryWriter.writeBulk(dictionaryValues, valuesCount, NullStatistics.INSTANCE);
dictionaryOffset = writeChannel.position();
writeDictionaryPage(dictionaryWriter.getByteBufferView(), valuesCount);
pageCount++;
Expand Down Expand Up @@ -196,21 +201,25 @@ private BulkWriter getWriter(final PrimitiveType primitiveType) {
}

@Override
public void addPage(@NotNull final Object pageData, final int valuesCount) throws IOException {
public void addPage(@NotNull final Object pageData,
final int valuesCount,
@NotNull Statistics<?> statistics)
throws IOException {
if (dlEncoder == null) {
throw new IllegalStateException("Null values not supported");
}
initWriter();
// noinspection unchecked
bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount);
bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount, statistics);
writePage(bulkWriter.getByteBufferView(), valuesCount);
bulkWriter.reset();
}

public void addVectorPage(
@NotNull final Object pageData,
@NotNull final IntBuffer repeatCount,
final int nonNullValueCount) throws IOException {
final int nonNullValueCount,
@NotNull final Statistics<?> statistics) throws IOException {
if (dlEncoder == null) {
throw new IllegalStateException("Null values not supported");
}
Expand All @@ -220,7 +229,7 @@ public void addVectorPage(
initWriter();
// noinspection unchecked
int valueCount =
bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount);
bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics);
writePage(bulkWriter.getByteBufferView(), valueCount);
bulkWriter.reset();
}
Expand Down Expand Up @@ -416,7 +425,7 @@ public void close() {
compressorAdapter.getCodecName(),
encodingStatsBuilder.build(),
encodings,
Statistics.createStats(column.getPrimitiveType()),
statistics,
firstDataPageOffset,
dictionaryOffset,
totalValueCount,
Expand All @@ -431,4 +440,14 @@ public ColumnDescriptor getColumn() {
public OffsetIndex getOffsetIndex() {
return offsetIndexBuilder.build(firstDataPageOffset);
}

@Override
public void resetStats() {
statistics = Statistics.createStats(column.getPrimitiveType());
}

@Override
public Statistics<?> getStats() {
return statistics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.deephaven.parquet.base;

import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.io.api.Binary;

/**
* A lightweight statistics object that does nothing. This should be passed to BulkWriters when we don't want to track
* statistics.
*/
public class NullStatistics extends IntStatistics {
public static final NullStatistics INSTANCE = new NullStatistics();

public void updateStats(int value) {}

public void updateStats(long value) {}

public void updateStats(float value) {}

public void updateStats(double value) {}

public void updateStats(boolean value) {}

public void updateStats(Binary value) {}

@Override
public void incrementNumNulls() {}

@Override
public void incrementNumNulls(long increment) {}

@Override
public String toString() {
return "NullStatistic";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.io.api.Binary;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -86,37 +87,50 @@ public String memUsageString(String prefix) {
}

@Override
public void writeBulk(@NotNull Binary[] bulkValues, int rowCount) {
public void writeBulk(@NotNull Binary[] bulkValues,
final int rowCount,
@NotNull final Statistics<?> statistics) {
for (int i = 0; i < rowCount; i++) {
writeBytes(bulkValues[i]);
final Binary v = bulkValues[i];
writeBytes(v);
statistics.updateStats(v);
}
}

@NotNull
@Override
public WriteResult writeBulkFilterNulls(@NotNull final Binary[] bulkValues,
@NotNull final RunLengthBitPackingHybridEncoder dlEncoder,
final int rowCount) throws IOException {
final int rowCount,
@NotNull final Statistics<?> statistics) throws IOException {
for (int i = 0; i < rowCount; i++) {
if (bulkValues[i] != null) {
writeBytes(bulkValues[i]);
final Binary v = bulkValues[i];
writeBytes(v);
statistics.updateStats(v);
dlEncoder.writeInt(DL_ITEM_PRESENT);
} else {
statistics.incrementNumNulls();
dlEncoder.writeInt(DL_ITEM_NULL);
}
}
return new WriteResult(rowCount);
}

@Override
public @NotNull WriteResult writeBulkFilterNulls(@NotNull Binary[] bulkValues, int nonNullLeafCount) {
public @NotNull WriteResult writeBulkVectorFilterNulls(@NotNull Binary[] bulkValues,
final int nonNullLeafCount,
@NotNull final Statistics<?> statistics) {
IntBuffer nullOffsets = IntBuffer.allocate(4);
for (int i = 0; i < nonNullLeafCount; i++) {
if (bulkValues[i] != null) {
writeBytes(bulkValues[i]);
final Binary v = bulkValues[i];
writeBytes(v);
statistics.updateStats(v);
} else {
nullOffsets = Helpers.ensureCapacity(nullOffsets);
nullOffsets.put(i);
statistics.incrementNumNulls();
}
}
return new WriteResult(nonNullLeafCount, nullOffsets);
Expand Down
Loading

0 comments on commit 3c5370b

Please sign in to comment.