Skip to content

Commit

Permalink
Optimizations in parquet page materialization (deephaven#5582)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jun 17, 2024
1 parent f2eed4b commit 643cc9a
Show file tree
Hide file tree
Showing 56 changed files with 1,793 additions and 953 deletions.
2 changes: 2 additions & 0 deletions extensions/parquet/base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies {
implementation project(':extensions-parquet-compression')
implementation project(':Base')
implementation project(':Util')
implementation project(':engine-time')
implementation project(':Configuration')
implementation depCommonsIo

compileOnly depAnnotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory nullMaterializerFactory;
private final PageMaterializerFactory pageMaterializerFactory;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
Expand Down Expand Up @@ -81,7 +81,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
}
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
this.pageMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType());
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
Expand Down Expand Up @@ -289,7 +289,7 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " +
"column: " + columnName + ", uri: " + getURI(), e);
Expand Down Expand Up @@ -364,7 +364,7 @@ public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelCo
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
pageMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Require;
import io.deephaven.parquet.base.materializers.IntMaterializer;
import io.deephaven.parquet.compress.CompressorAdapter;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
Expand Down Expand Up @@ -344,8 +345,7 @@ private IntBuffer readKeysFromPageCommon(
final RunLengthBitPackingHybridBufferDecoder rlDecoder,
final RunLengthBitPackingHybridBufferDecoder dlDecoder,
final ValuesReader dataReader) throws IOException {
final Object result = materialize(PageMaterializer.IntFactory, dlDecoder, rlDecoder, dataReader,
nullPlaceholder);
final Object result = materialize(IntMaterializer.Factory, dlDecoder, rlDecoder, dataReader, nullPlaceholder);
if (result instanceof DataWithOffsets) {
keyDest.put((int[]) ((DataWithOffsets) result).materializeResult);
return ((DataWithOffsets) result).offsets;
Expand Down
Loading

0 comments on commit 643cc9a

Please sign in to comment.