diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index b65c7502471..b9290e96407 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -4,18 +4,15 @@ package io.deephaven.parquet.base; import org.apache.parquet.column.Dictionary; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.schema.PrimitiveType; +import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.util.Iterator; import java.util.function.Supplier; public interface ColumnChunkReader { - /** - * @return -1 if the current column doesn't guarantee fixed page size, otherwise the fixed page size - */ - int getPageFixedSize(); - /** * @return The number of rows in this ColumnChunk, or -1 if it's unknown. */ @@ -32,15 +29,28 @@ public interface ColumnChunkReader { */ int getMaxRl(); - interface ColumnPageReaderIterator extends Iterator, AutoCloseable { - @Override - void close() throws IOException; - } + /** + * @return The offset index for this column chunk, or null if it not found in the metadata. + */ + @Nullable + OffsetIndex getOffsetIndex(); /** * @return An iterator over individual parquet pages */ - ColumnPageReaderIterator getPageIterator() throws IOException; + Iterator getPageIterator() throws IOException; + + interface ColumnPageDirectAccessor { + /** + * Directly access a page reader for a given page number. + */ + ColumnPageReader getPageReader(final int pageNum); + } + + /** + * @return An accessor for individual parquet pages + */ + ColumnPageDirectAccessor getPageAccessor(); /** * @return Whether this column chunk uses a dictionary-based encoding on every page @@ -69,4 +79,10 @@ public int getMaxId() { } PrimitiveType getType(); + + /** + * @return The "version" string from deephaven specific parquet metadata, or null if it's not present. + */ + @Nullable + String getVersion(); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index f56216ebe86..8de3d8b9281 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -15,6 +15,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.format.*; import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -27,7 +28,9 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.function.Supplier; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; @@ -46,10 +49,18 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { private final PageMaterializer.Factory nullMaterializerFactory; private Path filePath; - - ColumnChunkReaderImpl( - ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, - Path rootPath, MessageType type, OffsetIndex offsetIndex, List fieldTypes) { + /** + * Number of rows in the row group of this column chunk. + */ + private final long numRows; + /** + * Version string from deephaven specific parquet metadata, or null if it's not present. + */ + private final String version; + + ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, Path rootPath, + MessageType type, OffsetIndex offsetIndex, List fieldTypes, final long numRows, + final String version) { this.channelsProvider = channelsProvider; this.columnChunk = columnChunk; this.rootPath = rootPath; @@ -65,16 +76,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader { this.fieldTypes = fieldTypes; this.dictionarySupplier = new LazyCachingSupplier<>(this::getDictionary); this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName()); - } - - @Override - public int getPageFixedSize() { - return -1; + this.numRows = numRows; + this.version = version; } @Override public long numRows() { - return numValues(); + return numRows; } @Override @@ -87,15 +95,26 @@ public int getMaxRl() { return path.getMaxRepetitionLevel(); } + public final OffsetIndex getOffsetIndex() { + return offsetIndex; + } + @Override - public ColumnPageReaderIterator getPageIterator() { + public Iterator getPageIterator() { final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); if (offsetIndex == null) { - return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values(), - path, channelsProvider); + return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); } else { - return new ColumnPageReaderIteratorIndexImpl(path, channelsProvider); + return new ColumnPageReaderIteratorIndexImpl(); + } + } + + @Override + public final ColumnPageDirectAccessor getPageAccessor() { + if (offsetIndex == null) { + throw new UnsupportedOperationException("Cannot use direct accessor without offset index"); } + return new ColumnPageDirectAccessorImpl(); } private Path getFilePath() { @@ -166,6 +185,11 @@ public PrimitiveType getType() { return path.getPrimitiveType(); } + @Override + public String getVersion() { + return version; + } + @NotNull private Dictionary readDictionary(ReadableByteChannel file) throws IOException { // explicitly not closing this, caller is responsible @@ -192,21 +216,13 @@ private Dictionary readDictionary(ReadableByteChannel file) throws IOException { return dictionaryPage.getEncoding().initDictionary(path, dictionaryPage); } - class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator { - private final SeekableChannelsProvider channelsProvider; + private final class ColumnPageReaderIteratorImpl implements Iterator { private long currentOffset; - private final ColumnDescriptor path; - private long remainingValues; - ColumnPageReaderIteratorImpl(final long startOffset, - final long numValues, - @NotNull final ColumnDescriptor path, - @NotNull final SeekableChannelsProvider channelsProvider) { + ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) { this.remainingValues = numValues; this.currentOffset = startOffset; - this.path = path; - this.channelsProvider = channelsProvider; } @Override @@ -217,7 +233,7 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } // NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(getFilePath())) { @@ -254,28 +270,19 @@ public ColumnPageReader next() { (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) ? dictionarySupplier : () -> NULL_DICTIONARY; - return new ColumnPageReaderImpl( - channelsProvider, decompressor, pageDictionarySupplier, + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, readChannel.position(), pageHeader, - -1); + ColumnPageReaderImpl.NULL_NUM_VALUES); } catch (IOException e) { - throw new RuntimeException("Error reading page header", e); + throw new UncheckedDeephavenException("Error reading page header", e); } } - - @Override - public void close() {} } - class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { - private final SeekableChannelsProvider channelsProvider; + private final class ColumnPageReaderIteratorIndexImpl implements Iterator { private int pos; - private final ColumnDescriptor path; - ColumnPageReaderIteratorIndexImpl(ColumnDescriptor path, - SeekableChannelsProvider channelsProvider) { - this.path = path; - this.channelsProvider = channelsProvider; + ColumnPageReaderIteratorIndexImpl() { pos = 0; } @@ -287,20 +294,37 @@ public boolean hasNext() { @Override public ColumnPageReader next() { if (!hasNext()) { - throw new RuntimeException("No next element"); + throw new NoSuchElementException("No next element"); } - int rowCount = - (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) - - offsetIndex.getFirstRowIndex(pos) + 1); + // Following logic assumes that offsetIndex will store the number of values for a page instead of number + // of rows (which can be different for array and vector columns). This behavior is because of a bug on + // parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading + // parquet files written before deephaven-core/pull/4844. + final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) + - offsetIndex.getFirstRowIndex(pos) + 1); ColumnPageReaderImpl columnPageReader = new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, path, getFilePath(), fieldTypes, offsetIndex.getOffset(pos), null, - rowCount); + numValues); pos++; return columnPageReader; } + } + + private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { + + ColumnPageDirectAccessorImpl() {} @Override - public void close() throws IOException {} + public ColumnPageReader getPageReader(final int pageNum) { + if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) { + throw new IndexOutOfBoundsException( + "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); + } + // Page header and number of values will be populated later when we read the page header from the file + return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, + path, getFilePath(), fieldTypes, offsetIndex.getOffset(pageNum), null, + ColumnPageReaderImpl.NULL_NUM_VALUES); + } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 142499f1fac..1e367b42758 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -45,6 +45,7 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private static final int MAX_HEADER = 8192; private static final int START_HEADER = 128; public static final int NULL_OFFSET = -1; + static final int NULL_NUM_VALUES = -1; private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; @@ -54,11 +55,32 @@ public class ColumnPageReaderImpl implements ColumnPageReader { private final Path filePath; private final List fieldTypes; + /** + * Stores the offset from where the next byte should be read. Can be the offset of page header if + * {@link #pageHeader} is {@code null}, else will be the offset of data. + */ private long offset; private PageHeader pageHeader; private int numValues; private int rowCount = -1; + /** + * Returns a {@link ColumnPageReader} object for reading the column page data from the file. + * + * @param channelsProvider The provider for {@link SeekableByteChannel} for reading the file. + * @param compressorAdapter The adapter for decompressing the data. + * @param dictionarySupplier The supplier for dictionary data, set as {@link ColumnChunkReader#NULL_DICTIONARY} if + * page isn't dictionary encoded + * @param materializerFactory The factory for creating {@link PageMaterializer}. + * @param path The path of the column. + * @param filePath The path of the file. + * @param fieldTypes The types of the fields in the column. + * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data + * following the header in the page. + * @param pageHeader The page header if it is already read from the file. Else, {@code null}. + * @param numValues The number of values in the page if it is already read from the file. Else, + * {@value #NULL_NUM_VALUES} + */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, Supplier dictionarySupplier, @@ -84,7 +106,6 @@ public class ColumnPageReaderImpl implements ColumnPageReader { @Override public Object materialize(Object nullValue) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readDataPage(nullValue, readChannel); } @@ -92,7 +113,6 @@ public Object materialize(Object nullValue) throws IOException { public int readRowCount() throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readRowCountFromDataPage(readChannel); } @@ -102,38 +122,66 @@ public int readRowCount() throws IOException { @Override public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder) throws IOException { try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { - readChannel.position(offset); ensurePageHeader(readChannel); return readKeyFromDataPage(keyDest, nullPlaceholder, readChannel); } } - private synchronized void ensurePageHeader(SeekableByteChannel file) throws IOException { - if (pageHeader == null) { - offset = file.position(); - int maxHeader = START_HEADER; - boolean success; - do { - ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); - file.read(headerBuffer); - headerBuffer.flip(); - ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); - try { - pageHeader = Util.readPageHeader(bufferedIS); - offset += bufferedIS.position(); - success = true; - } catch (IOException e) { - success = false; - if (maxHeader > MAX_HEADER) { - throw e; + /** + * If {@link #pageHeader} is {@code null}, read it from the file, and increment the {@link #offset} by the length of + * page header. Channel position would be set to the end of page header or beginning of data before returning. + */ + private void ensurePageHeader(final SeekableByteChannel file) throws IOException { + // Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be + // the offset of page header, else it would be the offset of data. + file.position(offset); + synchronized (this) { + if (pageHeader == null) { + int maxHeader = START_HEADER; + boolean success; + do { + final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); + file.read(headerBuffer); + headerBuffer.flip(); + final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); + try { + pageHeader = Util.readPageHeader(bufferedIS); + offset += bufferedIS.position(); + success = true; + } catch (IOException e) { + success = false; + if (maxHeader > MAX_HEADER) { + throw e; + } + maxHeader <<= 1; + file.position(offset); + } + } while (!success); + file.position(offset); + if (numValues >= 0) { + final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader); + if (numValues != numValuesFromHeader) { + throw new IllegalStateException( + "numValues = " + numValues + " different from number of values " + + "read from the page header = " + numValuesFromHeader + " for column " + path); } - maxHeader *= 2; - file.position(offset); } - } while (!success); - file.position(offset); + } + if (numValues == NULL_NUM_VALUES) { + numValues = readNumValuesFromPageHeader(pageHeader); + } + } + } + + private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { + switch (header.type) { + case DATA_PAGE: + return header.getData_page_header().getNum_values(); + case DATA_PAGE_V2: + return header.getData_page_header_v2().getNum_values(); + default: + throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); } - ensureNumValues(); } private int readRowCountFromDataPage(ReadableByteChannel file) throws IOException { @@ -550,9 +598,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val if (dataEncoding.usesDictionary()) { final Dictionary dictionary = dictionarySupplier.get(); if (dictionary == ColumnChunkReader.NULL_DICTIONARY) { - throw new ParquetDecodingException( - "Could not read page in col " + path - + " as the dictionary was missing for encoding " + dataEncoding); + throw new ParquetDecodingException("Could not read page in col " + path + " as the dictionary was " + + "missing for encoding " + dataEncoding); } dataReader = new DictionaryValuesReader(dictionary); } else { @@ -569,25 +616,14 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val @Override public int numValues() throws IOException { - ensureNumValues(); - return numValues; - } - - private void ensureNumValues() throws IOException { if (numValues >= 0) { - return; + return numValues; } - Assert.neqNull(pageHeader, "pageHeader"); - switch (pageHeader.type) { - case DATA_PAGE: - numValues = pageHeader.getData_page_header().getNum_values(); - break; - case DATA_PAGE_V2: - numValues = pageHeader.getData_page_header_v2().getNum_values(); - break; - default: - throw new IOException( - String.format("Unexpected page of type {%s}", pageHeader.getType())); + try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(filePath)) { + ensurePageHeader(readChannel); + // Above will block till it populates numValues + Assert.geqZero(numValues, "numValues"); + return numValues; } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index e17107a648b..1aea3081a3b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -102,7 +102,7 @@ public void addPageNoNulls(@NotNull final Object pageData, dlEncoder.writeInt(1); // TODO implement a bulk RLE writer } } - writePage(bulkWriter.getByteBufferView(), valuesCount); + writePage(bulkWriter.getByteBufferView(), valuesCount, valuesCount); bulkWriter.reset(); } @@ -210,7 +210,7 @@ public void addPage(@NotNull final Object pageData, initWriter(); // noinspection unchecked bulkWriter.writeBulkFilterNulls(pageData, dlEncoder, valuesCount, statistics); - writePage(bulkWriter.getByteBufferView(), valuesCount); + writePage(bulkWriter.getByteBufferView(), valuesCount, valuesCount); bulkWriter.reset(); } @@ -229,7 +229,7 @@ public void addVectorPage( // noinspection unchecked final int valueCount = bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics); - writePage(bulkWriter.getByteBufferView(), valueCount); + writePage(bulkWriter.getByteBufferView(), valueCount, repeatCount.limit()); bulkWriter.reset(); } @@ -313,8 +313,8 @@ public void writePageV2( compressedData.writeAllTo(bufferedOutput); } - private void writePage(final BytesInput bytes, final int valueCount, final Encoding valuesEncoding) - throws IOException { + private void writePage(final BytesInput bytes, final int valueCount, final long rowCount, + final Encoding valuesEncoding) throws IOException { final long initialOffset = bufferedOutput.position(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; @@ -354,7 +354,7 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod this.pageCount += 1; compressedBytes.writeAllTo(bufferedOutput); - offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), valueCount); + offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), rowCount); encodings.add(valuesEncoding); encodingStatsBuilder.addDataEncoding(valuesEncoding); } @@ -389,9 +389,10 @@ private static PageHeader newDataPageHeader( /** * writes the current data to a new page in the page store * - * @param valueCount how many rows have been written so far + * @param valueCount how many values have been written so far + * @param rowCount how many rows have been written so far, can be different from valueCount for vector/arrays */ - private void writePage(final ByteBuffer encodedData, final long valueCount) { + private void writePage(final ByteBuffer encodedData, final long valueCount, final long rowCount) { try { BytesInput bytes = BytesInput.from(encodedData); if (dlEncoder != null) { @@ -402,9 +403,7 @@ private void writePage(final ByteBuffer encodedData, final long valueCount) { final BytesInput rlBytesInput = rlEncoder.toBytes(); bytes = BytesInput.concat(BytesInput.fromInt((int) rlBytesInput.size()), rlBytesInput, bytes); } - writePage( - bytes, - (int) valueCount, hasDictionary ? Encoding.RLE_DICTIONARY : Encoding.PLAIN); + writePage(bytes, (int) valueCount, rowCount, hasDictionary ? Encoding.RLE_DICTIONARY : Encoding.PLAIN); } catch (IOException e) { throw new ParquetEncodingException("could not write page for " + column.getPath()[0], e); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 1db38879652..4545c52b855 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -173,13 +173,19 @@ private int readIntLittleEndian(SeekableByteChannel f) throws IOException { return tempBuf.getInt(); } - public RowGroupReader getRowGroup(int groupNumber) { + /** + * Create a {@link RowGroupReader} object for provided row group number + * + * @param version The "version" string from deephaven specific parquet metadata, or null if it's not present. + */ + public RowGroupReader getRowGroup(final int groupNumber, final String version) { return new RowGroupReaderImpl( fileMetaData.getRow_groups().get(groupNumber), channelsProvider, rootPath, type, - getSchema()); + getSchema(), + version); } private static MessageType fromParquetSchema(List schema, List columnOrders) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java index 33007f26c7d..90d4e20add3 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupReaderImpl.java @@ -12,6 +12,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.BufferedInputStream; import java.io.IOException; @@ -34,13 +35,15 @@ public class RowGroupReaderImpl implements RowGroupReader { private final Map chunkMap = new HashMap<>(); private final Path rootPath; + private final String version; RowGroupReaderImpl( @NotNull final RowGroup rowGroup, @NotNull final SeekableChannelsProvider channelsProvider, @NotNull final Path rootPath, @NotNull final MessageType type, - @NotNull final MessageType schema) { + @NotNull final MessageType schema, + @Nullable final String version) { this.channelsProvider = channelsProvider; this.rowGroup = rowGroup; this.rootPath = rootPath; @@ -59,6 +62,7 @@ public class RowGroupReaderImpl implements RowGroupReader { } schemaMap.put(key, nonRequiredFields); } + this.version = version; } @Override @@ -80,8 +84,8 @@ public ColumnChunkReaderImpl getColumnChunk(@NotNull final List path) { throw new UncheckedIOException(e); } } - return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootPath, - type, offsetIndex, fieldTypes); + return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootPath, type, offsetIndex, fieldTypes, + numRows(), version); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 40a4aaf6fdd..6c61c9276e5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -197,12 +197,14 @@ public METADATA_TYPE getMetadata(@NotNull final ColumnDefinition } } } - final Map columnTypes = ParquetSchemaReader.parseMetadata( + final Optional tableInfo = ParquetSchemaReader.parseMetadata( new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData) - .getFileMetaData().getKeyValueMetaData()) - .map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + .getFileMetaData().getKeyValueMetaData()); + final Map columnTypes = + tableInfo.map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + final String version = tableInfo.map(TableInfo::version).orElse(null); - final RowGroupReader rowGroupReader = parquetFileReader.getRowGroup(0); + final RowGroupReader rowGroupReader = parquetFileReader.getRowGroup(0, version); final ColumnChunkReader groupingKeyReader = rowGroupReader.getColumnChunk(Collections.singletonList(GROUPING_KEY)); final ColumnChunkReader beginPosReader = @@ -225,17 +227,20 @@ public METADATA_TYPE getMetadata(@NotNull final ColumnDefinition localPageCache.castAttr(), groupingKeyReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(GROUPING_KEY), ParquetInstructions.EMPTY, - GROUPING_KEY, groupingKeyReader, columnDefinition)).pageStore, + GROUPING_KEY, groupingKeyReader, columnDefinition), + columnDefinition).pageStore, ColumnChunkPageStore.create( localPageCache.castAttr(), beginPosReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(BEGIN_POS), ParquetInstructions.EMPTY, BEGIN_POS, - beginPosReader, FIRST_KEY_COL_DEF)).pageStore, + beginPosReader, FIRST_KEY_COL_DEF), + columnDefinition).pageStore, ColumnChunkPageStore.create( localPageCache.castAttr(), endPosReader, ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, makeToPage(columnTypes.get(END_POS), ParquetInstructions.EMPTY, END_POS, - endPosReader, LAST_KEY_COL_DEF)).pageStore) + endPosReader, LAST_KEY_COL_DEF), + columnDefinition).pageStore) .get(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -431,7 +436,8 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { tl().getRegionParameters().regionMask, makeToPage(tl().getColumnTypes().get(parquetColumnName), tl().getReadInstructions(), parquetColumnName, columnChunkReader, - columnDefinition)); + columnDefinition), + columnDefinition); pageStores[psi] = creatorResult.pageStore; dictionaryChunkSuppliers[psi] = creatorResult.dictionaryChunkSupplier; dictionaryKeysPageStores[psi] = creatorResult.dictionaryKeysPageStore; @@ -469,7 +475,7 @@ public Object get() { if (metaData != null) { return metaData; } - final int numRows = (int) keyColumn.size(); + final int numRows = (int) keyColumn.numRows(); try ( final ChunkBoxer.BoxerKernel boxerKernel = diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 78b8796e81c..88d2ae87acc 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -42,6 +42,7 @@ public class ParquetTableLocation extends AbstractTableLocation { private final Map parquetColumnNameToPath; private final Map groupingColumns; private final Map columnTypes; + private final String version; private volatile RowGroupReader[] rowGroupReaders; @@ -84,6 +85,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, ParquetSchemaReader.parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()); groupingColumns = tableInfo.map(TableInfo::groupingColumnMap).orElse(Collections.emptyMap()); columnTypes = tableInfo.map(TableInfo::columnTypeMap).orElse(Collections.emptyMap()); + version = tableInfo.map(TableInfo::version).orElse(null); handleUpdate(computeIndex(), tableLocationKey.getFile().lastModified()); } @@ -130,7 +132,7 @@ private RowGroupReader[] getRowGroupReaders() { return local; } return rowGroupReaders = IntStream.of(rowGroupIndices) - .mapToObj(parquetFileReader::getRowGroup) + .mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) .toArray(RowGroupReader[]::new); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java index 125e112500d..fb6b8da2002 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/metadata/TableInfo.java @@ -61,11 +61,16 @@ public final Map columnTypeMap() { } /** - * @return The Deephaven release version when this metadata format was defined + * @return The Deephaven release version used to write the parquet file */ @Value.Default public String version() { - return "0.4.0"; + final String version = TableInfo.class.getPackage().getImplementationVersion(); + if (version == null) { + // When the code is run from class files as opposed to jars, like in unit tests + return "unknown"; + } + return version; } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java index d98dd9c638e..dcd7677ccf5 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/ColumnChunkPageStore.java @@ -4,6 +4,7 @@ package io.deephaven.parquet.table.pagestore; import io.deephaven.base.verify.Require; +import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.parquet.table.pagestore.topage.ToPage; import io.deephaven.engine.table.Releasable; import io.deephaven.chunk.attributes.Any; @@ -16,22 +17,24 @@ import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ColumnPageReader; import io.deephaven.util.SafeCloseable; +import io.deephaven.vector.Vector; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public abstract class ColumnChunkPageStore implements PageStore>, Page, SafeCloseable, Releasable { - protected final PageCache pageCache; + final PageCache pageCache; private final ColumnChunkReader columnChunkReader; private final long mask; private final ToPage toPage; - private final long size; - final ColumnChunkReader.ColumnPageReaderIterator columnPageReaderIterator; + private final long numRows; public static class CreatorResult { @@ -48,25 +51,63 @@ private CreatorResult(@NotNull final ColumnChunkPageStore pageStore, } } + private static boolean canUseOffsetIndexBasedPageStore(@NotNull final ColumnChunkReader columnChunkReader, + @NotNull final ColumnDefinition columnDefinition) { + if (columnChunkReader.getOffsetIndex() == null) { + return false; + } + final String version = columnChunkReader.getVersion(); + if (version == null) { + // Parquet file not written by deephaven, can use offset index + return true; + } + // For vector and array column types, versions before 0.31.0 had a bug in offset index calculation, fixed as + // part of deephaven-core#4844 + final Class columnType = columnDefinition.getDataType(); + if (columnType.isArray() || Vector.class.isAssignableFrom(columnType)) { + return hasCorrectVectorOffsetIndexes(version); + } + return true; + } + + private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)"); + + /** + * Check if the version is greater than or equal to 0.31.0, or it doesn't follow the versioning schema X.Y.Z + */ + @VisibleForTesting + public static boolean hasCorrectVectorOffsetIndexes(@NotNull final String version) { + final Matcher matcher = VERSION_PATTERN.matcher(version); + if (!matcher.matches()) { + // Could be unit tests or some other versioning scheme + return true; + } + final int major = Integer.parseInt(matcher.group(1)); + final int minor = Integer.parseInt(matcher.group(2)); + return major > 0 || major == 0 && minor >= 31; + } + public static CreatorResult create( @NotNull final PageCache pageCache, @NotNull final ColumnChunkReader columnChunkReader, final long mask, - @NotNull final ToPage toPage) throws IOException { - final boolean fixedSizePages = columnChunkReader.getPageFixedSize() >= 1; - final ColumnChunkPageStore columnChunkPageStore = fixedSizePages - ? new FixedPageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage) + @NotNull final ToPage toPage, + @NotNull final ColumnDefinition columnDefinition) throws IOException { + final boolean canUseOffsetIndex = canUseOffsetIndexBasedPageStore(columnChunkReader, columnDefinition); + // TODO(deephaven-core#4879): Rather than this fall back logic for supporting incorrect offset index, we should + // instead log an error and explain to user how to fix the parquet file + final ColumnChunkPageStore columnChunkPageStore = canUseOffsetIndex + ? new OffsetIndexBasedColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage) : new VariablePageSizeColumnChunkPageStore<>(pageCache, columnChunkReader, mask, toPage); final ToPage dictionaryKeysToPage = toPage.getDictionaryKeysToPage(); final ColumnChunkPageStore dictionaryKeysColumnChunkPageStore = dictionaryKeysToPage == null ? null - : fixedSizePages - ? new FixedPageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, mask, - dictionaryKeysToPage) + : canUseOffsetIndex + ? new OffsetIndexBasedColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, + mask, dictionaryKeysToPage) : new VariablePageSizeColumnChunkPageStore<>(pageCache.castAttr(), columnChunkReader, - mask, - dictionaryKeysToPage); + mask, dictionaryKeysToPage); return new CreatorResult<>(columnChunkPageStore, toPage::getDictionaryChunk, dictionaryKeysColumnChunkPageStore); } @@ -82,8 +123,7 @@ public static CreatorResult create( this.mask = mask; this.toPage = toPage; - this.size = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); - this.columnPageReaderIterator = columnChunkReader.getPageIterator(); + this.numRows = Require.inRange(columnChunkReader.numRows(), "numRows", mask, "mask"); } ChunkPage toPage(final long offset, @NotNull final ColumnPageReader columnPageReader) @@ -101,8 +141,11 @@ public long firstRowOffset() { return 0; } - public long size() { - return size; + /** + * @return The number of rows in this ColumnChunk + */ + public long numRows() { + return numRows; } @Override @@ -127,11 +170,5 @@ public boolean usesDictionaryOnEveryPage() { } @Override - public void close() { - try { - columnPageReaderIterator.close(); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - } + public void close() {} } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java deleted file mode 100644 index 3a66f93b3a3..00000000000 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/FixedPageSizeColumnChunkPageStore.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.parquet.table.pagestore; - -import io.deephaven.base.verify.Assert; -import io.deephaven.base.verify.Require; -import io.deephaven.chunk.attributes.Any; -import io.deephaven.engine.page.ChunkPage; -import io.deephaven.parquet.table.pagestore.topage.ToPage; -import io.deephaven.parquet.base.ColumnChunkReader; -import io.deephaven.parquet.base.ColumnPageReader; -import org.jetbrains.annotations.NotNull; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.lang.ref.WeakReference; -import java.util.Arrays; - -class FixedPageSizeColumnChunkPageStore extends ColumnChunkPageStore { - - private final int pageFixedSize; - private volatile int numPages = 0; - private final ColumnPageReader[] columnPageReaders; - private final WeakReference>[] pages; - - FixedPageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, - @NotNull final ColumnChunkReader columnChunkReader, - final long mask, - @NotNull final ToPage toPage) throws IOException { - super(pageCache, columnChunkReader, mask, toPage); - - this.pageFixedSize = columnChunkReader.getPageFixedSize(); - - Require.gtZero(pageFixedSize, "pageFixedSize"); - - final int numPages = Math.toIntExact((size() - 1) / pageFixedSize + 1); - this.columnPageReaders = new ColumnPageReader[numPages]; - - // noinspection unchecked - this.pages = (WeakReference>[]) new WeakReference[numPages]; - Arrays.fill(pages, PageCache.getNullPage()); - } - - private void fillToPage(final int pageNum) { - - while (numPages <= pageNum) { - synchronized (this) { - if (numPages <= pageNum) { - Assert.assertion(columnPageReaderIterator.hasNext(), - "columnPageReaderIterator.hasNext()", - "Parquet fixed page size and page iterator don't match, not enough pages."); - columnPageReaders[numPages++] = columnPageReaderIterator.next(); - } - } - } - } - - private ChunkPage getPage(final int pageNum) { - PageCache.IntrusivePage page = pages[pageNum].get(); - - if (page == null) { - synchronized (columnPageReaders[pageNum]) { - page = pages[pageNum].get(); - - if (page == null) { - try { - page = new PageCache.IntrusivePage<>( - toPage((long) pageNum * pageFixedSize, columnPageReaders[pageNum])); - } catch (IOException except) { - throw new UncheckedIOException(except); - } - - pages[pageNum] = new WeakReference<>(page); - } - } - } - - pageCache.touch(page); - return page.getPage(); - } - - @Override - public @NotNull ChunkPage getPageContaining(FillContext fillContext, - final long elementIndex) { - final long row = elementIndex & mask(); - Require.inRange(row, "row", size(), "numRows"); - - // This is safe because of our check in the constructor, and we know the row is in range. - final int pageNum = (int) (row / pageFixedSize); - - fillToPage(pageNum); - return getPage(pageNum); - } -} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java new file mode 100644 index 00000000000..62f0f22cfa0 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -0,0 +1,151 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.table.pagestore; + +import io.deephaven.base.verify.Assert; +import io.deephaven.base.verify.Require; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.engine.page.ChunkPage; +import io.deephaven.parquet.table.pagestore.topage.ToPage; +import io.deephaven.parquet.base.ColumnChunkReader; +import io.deephaven.parquet.base.ColumnPageReader; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.ref.WeakReference; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * A {@link ColumnChunkPageStore} that uses {@link OffsetIndex} to find the page containing a row. + */ +final class OffsetIndexBasedColumnChunkPageStore extends ColumnChunkPageStore { + private static final long PAGE_SIZE_NOT_FIXED = -1; + + private static final class PageState { + private volatile WeakReference> pageRef; + + PageState() { + pageRef = null; // Initialized when used for the first time + } + } + + private final OffsetIndex offsetIndex; + private final int numPages; + /** + * Fixed number of rows per page. Set as positive value if first ({@link #numPages}-1) pages have equal number of + * rows, else equal to {@value #PAGE_SIZE_NOT_FIXED}. We cannot find the number of rows in the last page size from + * offset index, because it only has the first row index of each page. And we don't want to materialize any pages. + * So as a workaround, in case first ({@link #numPages}-1) pages have equal size, we can assume all pages to be of + * the same size and calculate the page number as {@code row_index / fixed_page_size -> page_number}. If it is + * greater than {@link #numPages}, we will infer that the row is coming from last page. + */ + private final long fixedPageSize; + private final AtomicReferenceArray> pageStates; + private final ColumnChunkReader.ColumnPageDirectAccessor columnPageDirectAccessor; + + OffsetIndexBasedColumnChunkPageStore(@NotNull final PageCache pageCache, + @NotNull final ColumnChunkReader columnChunkReader, + final long mask, + @NotNull final ToPage toPage) throws IOException { + super(pageCache, columnChunkReader, mask, toPage); + offsetIndex = columnChunkReader.getOffsetIndex(); + Assert.neqNull(offsetIndex, "offsetIndex"); + numPages = offsetIndex.getPageCount(); + Assert.gtZero(numPages, "numPages"); + pageStates = new AtomicReferenceArray<>(numPages); + columnPageDirectAccessor = columnChunkReader.getPageAccessor(); + + if (numPages == 1) { + fixedPageSize = numRows(); + return; + } + boolean isPageSizeFixed = true; + final long firstPageSize = offsetIndex.getFirstRowIndex(1) - offsetIndex.getFirstRowIndex(0); + for (int i = 2; i < numPages; ++i) { + if (offsetIndex.getFirstRowIndex(i) - offsetIndex.getFirstRowIndex(i - 1) != firstPageSize) { + isPageSizeFixed = false; + break; + } + } + fixedPageSize = isPageSizeFixed ? firstPageSize : PAGE_SIZE_NOT_FIXED; + } + + /** + * Binary search in offset index to find the page number that contains the row. Logic duplicated from + * {@link Arrays#binarySearch(long[], long)} to use the offset index. + */ + private static int findPageNumUsingOffsetIndex(final OffsetIndex offsetIndex, final long row) { + int low = 0; + int high = offsetIndex.getPageCount() - 1; + + while (low <= high) { + final int mid = (low + high) >>> 1; + final long midVal = offsetIndex.getFirstRowIndex(mid); + if (midVal < row) { + low = mid + 1; + } else if (midVal > row) { + high = mid - 1; + } else { + return mid; // 'row' is the first row of page + } + } + return (low - 1); // 'row' is somewhere in the middle of page + } + + private ChunkPage getPage(final int pageNum) { + if (pageNum < 0 || pageNum >= numPages) { + throw new IllegalArgumentException("pageNum " + pageNum + " is out of range [0, " + numPages + ")"); + } + PageState pageState; + while ((pageState = pageStates.get(pageNum)) == null) { + pageState = new PageState<>(); + if (pageStates.weakCompareAndSetVolatile(pageNum, null, pageState)) { + break; + } + } + PageCache.IntrusivePage page; + WeakReference> localRef; + if ((localRef = pageState.pageRef) == null || (page = localRef.get()) == null) { + synchronized (pageState) { + // Make sure no one materialized this page as we waited for the lock + if ((localRef = pageState.pageRef) == null || (page = localRef.get()) == null) { + final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); + try { + page = new PageCache.IntrusivePage<>(toPage(offsetIndex.getFirstRowIndex(pageNum), reader)); + } catch (final IOException except) { + throw new UncheckedIOException(except); + } + pageState.pageRef = new WeakReference<>(page); + } + } + } + pageCache.touch(page); + return page.getPage(); + } + + @NotNull + @Override + public ChunkPage getPageContaining(@NotNull final FillContext fillContext, long row) { + row &= mask(); + Require.inRange(row, "row", numRows(), "numRows"); + + int pageNum; + if (fixedPageSize == PAGE_SIZE_NOT_FIXED) { + pageNum = findPageNumUsingOffsetIndex(offsetIndex, row); + } else { + pageNum = (int) (row / fixedPageSize); + if (pageNum >= numPages) { + // This can happen if the last page is larger than rest of the pages, which are all the same size. + // We have already checked that row is less than numRows. + Assert.geq(row, "row", offsetIndex.getFirstRowIndex(numPages - 1), + "offsetIndex.getFirstRowIndex(numPages - 1)"); + pageNum = (numPages - 1); + } + } + return getPage(pageNum); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java index 2356393ea6d..9975ebdbb5d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/VariablePageSizeColumnChunkPageStore.java @@ -16,19 +16,19 @@ import java.io.UncheckedIOException; import java.lang.ref.WeakReference; import java.util.Arrays; +import java.util.Iterator; -class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { +final class VariablePageSizeColumnChunkPageStore extends ColumnChunkPageStore { // We will set numPages after changing all of these arrays in place and/or setting additional - // elements to the - // end of the array. Thus, for i < numPages, array[i] will always have the same value, and be - // valid to use, as - // long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used + // elements to the end of the array. Thus, for i < numPages, array[i] will always have the same value, and be + // valid to use, as long as we fetch numPages before accessing the arrays. This is the thread-safe pattern used // throughout. private volatile int numPages = 0; private volatile long[] pageRowOffsets; private volatile ColumnPageReader[] columnPageReaders; + private final Iterator columnPageReaderIterator; private volatile WeakReference>[] pages; VariablePageSizeColumnChunkPageStore(@NotNull final PageCache pageCache, @@ -41,6 +41,7 @@ class VariablePageSizeColumnChunkPageStore extends ColumnChunk pageRowOffsets = new long[INIT_ARRAY_SIZE + 1]; pageRowOffsets[0] = 0; columnPageReaders = new ColumnPageReader[INIT_ARRAY_SIZE]; + columnPageReaderIterator = columnChunkReader.getPageIterator(); // noinspection unchecked pages = (WeakReference>[]) new WeakReference[INIT_ARRAY_SIZE]; @@ -139,7 +140,7 @@ private ChunkPage getPage(final int pageNum) { @Override public ChunkPage getPageContaining(@NotNull final FillContext fillContext, long row) { row &= mask(); - Require.inRange(row - pageRowOffsets[0], "row", size(), "numRows"); + Require.inRange(row - pageRowOffsets[0], "row", numRows(), "numRows"); int localNumPages = numPages; int pageNum = Arrays.binarySearch(pageRowOffsets, 1, localNumPages + 1, row); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 4727ba36205..1ea62f13652 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -29,6 +29,7 @@ import io.deephaven.parquet.base.NullStatistics; import io.deephaven.parquet.base.InvalidParquetFileException; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.parquet.table.pagestore.ColumnChunkPageStore; import io.deephaven.parquet.table.transfer.StringDictionary; import io.deephaven.stringset.ArrayStringSet; import io.deephaven.engine.table.Table; @@ -627,6 +628,117 @@ private Table maybeFixBigDecimal(Table toFix) { .dropColumns("bdColE"); } + private static Table readParquetFileFromGitLFS(final File dest) { + try { + return readSingleFileTable(dest, EMPTY); + } catch (final RuntimeException e) { + if (e.getCause() instanceof InvalidParquetFileException) { + final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + + "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + + "the repo to pull the files from LFS. Check cause of exception for more details."; + throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); + } + throw e; + } + } + + /** + * Test if the current code can read the parquet data written by the old code. There is logic in + * {@link ColumnChunkPageStore#create} that decides page store based on the version of the parquet file. The old + * data is generated using following logic: + * + *
+     *  // Enforce a smaller page size to write multiple pages
+     *  final ParquetInstructions writeInstructions = new ParquetInstructions.Builder()
+     *        .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE)
+     *        .build();
+     *
+     *  final Table table = getTableFlat(5000, true, false);
+     *  ParquetTools.writeTable(table, new File("ReferenceParquetData.parquet"), writeInstructions);
+     *
+     *  Table vectorTable = table.groupBy().select();
+     *  vectorTable = vectorTable.join(TableTools.emptyTable(100)).select();
+     *  ParquetTools.writeTable(vectorTable, new File("ReferenceParquetVectorData.parquet"), writeInstructions);
+     *
+     *  final Table arrayTable = vectorTable.updateView(vectorTable.getColumnSourceMap().keySet().stream()
+     *         .map(name -> name + " = " + name + ".toArray()")
+     *         .toArray(String[]::new));
+     *  ParquetTools.writeTable(arrayTable, new File("ReferenceParquetArrayData.parquet"), writeInstructions);
+     * 
+ */ + @Test + public void testReadOldParquetData() { + String path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetData.parquet").getFile(); + readParquetFileFromGitLFS(new File(path)).select(); + final ParquetMetadata metadata = new ParquetTableLocationKey(new File(path), 0, null).getMetadata(); + assertTrue(metadata.getFileMetaData().getKeyValueMetaData().get("deephaven").contains("\"version\":\"0.4.0\"")); + + path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetVectorData.parquet").getFile(); + readParquetFileFromGitLFS(new File(path)).select(); + + path = ParquetTableReadWriteTest.class.getResource("/ReferenceParquetArrayData.parquet").getFile(); + readParquetFileFromGitLFS(new File(path)).select(); + } + + @Test + public void testVersionChecks() { + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.0.0")); + assertFalse(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.4.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.3")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.1")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.32.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("1.3.0")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("unknown")); + assertTrue(ColumnChunkPageStore.hasCorrectVectorOffsetIndexes("0.31.0-SNAPSHOT")); + } + + + /** + * Test if the parquet reading code can read pre-generated parquet files which have different number of rows in each + * page. Following is how these files are generated. + * + *
+     * Table arrayTable = TableTools.emptyTable(100).update(
+     *         "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse();
+     * File dest = new File(rootFile, "ReferenceParquetFileWithDifferentPageSizes1.parquet");
+     * final ParquetInstructions writeInstructions = new ParquetInstructions.Builder()
+     *         .setTargetPageSize(ParquetInstructions.MIN_TARGET_PAGE_SIZE)
+     *         .build();
+     * ParquetTools.writeTable(arrayTable, dest, writeInstructions);
+     *
+     * arrayTable = TableTools.emptyTable(1000).update(
+     *         "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " +
+     *                 "java.util.stream.IntStream.range(i, i+2).toArray()");
+     * dest = new File(rootFile, "ReferenceParquetFileWithDifferentPageSizes2.parquet");
+     * ParquetTools.writeTable(arrayTable, dest, writeInstructions);
+     * 
+ */ + @Test + public void testReadingParquetFilesWithDifferentPageSizes() { + Table expected = TableTools.emptyTable(100).update( + "intArrays = java.util.stream.IntStream.range(0, i).toArray()").reverse(); + String path = ParquetTableReadWriteTest.class + .getResource("/ReferenceParquetFileWithDifferentPageSizes1.parquet").getFile(); + Table fromDisk = readParquetFileFromGitLFS(new File(path)); + assertTableEquals(expected, fromDisk); + + path = ParquetTableReadWriteTest.class + .getResource("/ReferenceParquetFileWithDifferentPageSizes2.parquet").getFile(); + fromDisk = readParquetFileFromGitLFS(new File(path)); + + // Access something on the last page to make sure we can read it + final int[] data = (int[]) fromDisk.getColumnSource("intArrays").get(998); + assertNotNull(data); + assertEquals(2, data.length); + assertEquals(998, data[0]); + assertEquals(999, data[1]); + + expected = TableTools.emptyTable(1000).update( + "intArrays = (i <= 900) ? java.util.stream.IntStream.range(i, i+50).toArray() : " + + "java.util.stream.IntStream.range(i, i+2).toArray()"); + assertTableEquals(expected, fromDisk); + } // Following is used for testing both writing APIs for parquet tables private interface TestParquetTableWriter { @@ -842,18 +954,7 @@ public void legacyGroupingFileReadTest() { final File destFile = new File(path); // Read the legacy file and verify that grouping column is read correctly - final Table fromDisk; - try { - fromDisk = readSingleFileTable(destFile, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table fromDisk = readParquetFileFromGitLFS(destFile); final String groupingColName = "gcol"; assertTrue(fromDisk.getDefinition().getColumn(groupingColName).isGrouping()); @@ -1346,18 +1447,8 @@ public void readWriteDateTimeTest() { public void verifyPyArrowStatistics() { final String path = ParquetTableReadWriteTest.class.getResource("/e0/pyarrow_stats.parquet").getFile(); final File pyarrowDest = new File(path); - final Table pyarrowFromDisk; - try { - pyarrowFromDisk = readSingleFileTable(pyarrowDest, EMPTY); - } catch (RuntimeException e) { - if (e.getCause() instanceof InvalidParquetFileException) { - final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " + - "file is fetched properly from Git LFS. Run commands 'git lfs install; git lfs pull' inside " + - "the repo to pull the files from LFS. Check cause of exception for more details."; - throw new UncheckedDeephavenException(InvalidParquetFileErrorMsgString, e.getCause()); - } - throw e; - } + final Table pyarrowFromDisk = readParquetFileFromGitLFS(pyarrowDest); + // Verify that our verification code works for a pyarrow generated table. assertTableStatistics(pyarrowFromDisk, pyarrowDest); diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet new file mode 100644 index 00000000000..16851b6b630 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetArrayData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a21ee158eb36ed817b85d28202215fe70e90aaad81bebfdfb44e9da049e17a18 +size 22075750 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet new file mode 100644 index 00000000000..6010fc66edf --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dbbb899c95833c163717f3be3984f1ea3efbc858f536b794d70f2f5dabcd6cd7 +size 320991 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet new file mode 100644 index 00000000000..c864f329d45 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes1.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e180f3133320f8fc5174d2cfbab89b30ae735c0a6485b6d13a98a5aace5f8740 +size 5259 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet new file mode 100644 index 00000000000..4bad668401a --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetFileWithDifferentPageSizes2.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e5e70548a903371acfc038629c4e5155a3ed5ed79cd4d59ee622750e7285b4cf +size 35751 diff --git a/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet b/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet new file mode 100644 index 00000000000..58ee60ef1af --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ReferenceParquetVectorData.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8f1b80574b46b9508c2a0b35aa93b45062e52ff3c133eb0734fb3d493086b661 +size 22098162