Skip to content

Commit

Permalink
feat: Read parquet metadata files with custom table definition (#6196)
Browse files Browse the repository at this point in the history
Closes #6174 

Also, added tests for reading parquet files from 
deephaven-examples repo.
  • Loading branch information
malhotrashivam authored Oct 16, 2024
1 parent 3692f7a commit 98851b4
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI,
* ex."s3://bucket/key".
* Top level accessor for a parquet file which can read from a CLI style file URI, ex."s3://bucket/key".
*/
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
Expand All @@ -39,25 +38,7 @@ public class ParquetFileReader {
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType type;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
Expand Down Expand Up @@ -91,7 +72,6 @@ private ParquetFileReader(
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
rootURI = parquetFileURI;
}
try (
Expand All @@ -102,7 +82,7 @@ private ParquetFileReader(
fileMetaData = Util.readFileMetaData(in);
}
}
type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
}

/**
Expand Down Expand Up @@ -148,87 +128,6 @@ public SeekableChannelsProvider getChannelsProvider() {
return channelsProvider;
}

private Set<String> columnsWithDictionaryUsedOnEveryDataPage = null;

/**
* Get the name of all columns that we can know for certain (a) have a dictionary, and (b) use the dictionary on all
* data pages.
*
* @return A set of parquet column names that satisfies the required condition.
*/
@SuppressWarnings("unused")
public Set<String> getColumnsWithDictionaryUsedOnEveryDataPage() {
if (columnsWithDictionaryUsedOnEveryDataPage == null) {
columnsWithDictionaryUsedOnEveryDataPage =
calculateColumnsWithDictionaryUsedOnEveryDataPage();
}
return columnsWithDictionaryUsedOnEveryDataPage;
}

/**
* True only if we are certain every data page in this column chunk uses dictionary encoding; note false also covers
* the "we can't tell" case.
*/
private static boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) {
final ColumnMetaData columnMeta = columnChunk.getMeta_data();
if (columnMeta.encoding_stats == null) {
return false; // this is false as "don't know".
}
for (PageEncodingStats encodingStat : columnMeta.encoding_stats) {
if (encodingStat.page_type != PageType.DATA_PAGE
&& encodingStat.page_type != PageType.DATA_PAGE_V2) {
// skip non-data pages.
continue;
}
// this is a data page.
if (encodingStat.encoding != Encoding.PLAIN_DICTIONARY
&& encodingStat.encoding != Encoding.RLE_DICTIONARY) {
return false;
}
}
return true;
}

private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {
final Set<String> result = new HashSet<>(fileMetaData.getSchemaSize());
final List<RowGroup> rowGroups = fileMetaData.getRow_groups();
final Iterator<RowGroup> riter = rowGroups.iterator();
if (!riter.hasNext()) {
// For an empty file we say all columns satisfy the property.
for (SchemaElement se : fileMetaData.getSchema()) {
if (!se.isSetNum_children()) { // We want only the leaves.
result.add(se.getName());
}
}
return result;
}
// On the first pass, for row group zero, we are going to add all columns to the set
// that satisfy the restriction.
// On later passes after zero, we will remove any column that does not satisfy
// the restriction.
final RowGroup rg0 = riter.next();
for (ColumnChunk columnChunk : rg0.columns) {
if (columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
result.add(parquetColumnName);
}
}

while (riter.hasNext()) {
final RowGroup rowGroup = riter.next();
for (ColumnChunk columnChunk : rowGroup.columns) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
if (!result.contains(parquetColumnName)) {
continue;
}
if (!columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
result.remove(parquetColumnName);
}
}
}
return result;
}

/**
* Create a {@link RowGroupReader} object for provided row group number
*
Expand All @@ -239,7 +138,7 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
type,
schema,
getSchema(),
version);
}
Expand Down Expand Up @@ -463,24 +362,7 @@ private static LogicalTypeAnnotation getLogicalTypeFromConvertedType(
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
return type;
}

public int rowGroupCount() {
return fileMetaData.getRow_groups().size();
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,6 @@ private static Table readPartitionedTableWithMetadata(
@NotNull final ParquetInstructions readInstructions,
@Nullable final SeekableChannelsProvider channelsProvider) {
verifyFileLayout(readInstructions, ParquetFileLayout.METADATA_PARTITIONED);
if (readInstructions.getTableDefinition().isPresent()) {
throw new UnsupportedOperationException("Detected table definition inside read instructions, reading " +
"metadata files with custom table definition is currently not supported");
}
final ParquetMetadataFileLayout layout =
ParquetMetadataFileLayout.create(sourceURI, readInstructions, channelsProvider);
return readTable(layout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,43 +120,53 @@ private ParquetMetadataFileLayout(
final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider);
final ParquetMetadataConverter converter = new ParquetMetadataConverter();
final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema(
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
if (inputInstructions.getTableDefinition().isEmpty()) {
// Infer the definition from the metadata file
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream().collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
// Infer the partitioning columns using the common metadata file
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream()
.collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(
String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
}
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
definition = inputInstructions.getTableDefinition().get();
instructions = inputInstructions;
}

final List<ColumnDefinition<?>> partitioningColumns = definition.getPartitioningColumns();
Expand Down Expand Up @@ -187,8 +197,10 @@ private ParquetMetadataFileLayout(
final int numPartitions = filePath.getNameCount() - 1;
if (numPartitions != partitioningColumns.size()) {
throw new TableDataException(String.format(
"Unexpected number of path elements in %s for partitions %s",
relativePathString, partitions.keySet()));
"Unexpected number of path elements in %s for partitions %s, found %d elements, expected " +
"%d based on definition %s",
relativePathString, partitions.keySet(), numPartitions, partitioningColumns.size(),
definition));
}
final boolean useHiveStyle = filePath.getName(0).toString().contains("=");
for (int pi = 0; pi < numPartitions; ++pi) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import java.nio.file.Path;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static org.junit.Assume.assumeTrue;

/**
* Assumes that there is already a checkout of <a href="https://github.com/deephaven/examples">deephaven-examples</a>.
* This is currently meant to be run locally.
*/
public class ParquetDeephavenExamplesTest {

private static final Path CHECKOUT_ROOT = null; // Path.of("/path/to/deephaven-examples");

@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void beforeClass() {
assumeTrue(CHECKOUT_ROOT != null);
}

@Test
public void pems() {
read("Pems/parquet/pems");
}

@Test
public void crypto() {
read("CryptoCurrencyHistory/Parquet/crypto.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept7.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept8.parquet");
read("CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet");
read("CryptoCurrencyHistory/Parquet/FakeCryptoTrades_20230209.parquet");
}

@Test
public void taxi() {
read("Taxi/parquet/taxi.parquet");
}

@Test
public void sensorData() {
read("SensorData/parquet/SensorData_gzip.parquet");
}

@Test
public void grades() {
assertTableEquals(
read("ParquetExamples/grades"),
read("ParquetExamples/grades_meta"));
assertTableEquals(
read("ParquetExamples/grades_flat").sort("Name", "Class"),
read("ParquetExamples/grades_flat_meta").sort("Name", "Class"));
assertTableEquals(
read("ParquetExamples/grades_kv").sort("Name", "Class"),
read("ParquetExamples/grades_kv_meta").sort("Name", "Class"));
}

private static Table read(String name) {
final String path = CHECKOUT_ROOT
.resolve(name)
.toUri()
.toString();
return ParquetTools.readTable(path).select();
}
}
Loading

0 comments on commit 98851b4

Please sign in to comment.