Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Read parquet metadata files with custom table definition #6196

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI,
* ex."s3://bucket/key".
* Top level accessor for a parquet file which can read from a CLI style file URI, ex."s3://bucket/key".
*/
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
Expand All @@ -39,25 +38,7 @@ public class ParquetFileReader {
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType type;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
Expand Down Expand Up @@ -91,7 +72,6 @@ private ParquetFileReader(
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
rootURI = parquetFileURI;
}
try (
Expand All @@ -102,7 +82,7 @@ private ParquetFileReader(
fileMetaData = Util.readFileMetaData(in);
}
}
type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
}

/**
Expand Down Expand Up @@ -148,87 +128,6 @@ public SeekableChannelsProvider getChannelsProvider() {
return channelsProvider;
}

private Set<String> columnsWithDictionaryUsedOnEveryDataPage = null;

/**
* Get the name of all columns that we can know for certain (a) have a dictionary, and (b) use the dictionary on all
* data pages.
*
* @return A set of parquet column names that satisfies the required condition.
*/
@SuppressWarnings("unused")
public Set<String> getColumnsWithDictionaryUsedOnEveryDataPage() {
if (columnsWithDictionaryUsedOnEveryDataPage == null) {
columnsWithDictionaryUsedOnEveryDataPage =
calculateColumnsWithDictionaryUsedOnEveryDataPage();
}
return columnsWithDictionaryUsedOnEveryDataPage;
}

/**
* True only if we are certain every data page in this column chunk uses dictionary encoding; note false also covers
* the "we can't tell" case.
*/
private static boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) {
final ColumnMetaData columnMeta = columnChunk.getMeta_data();
if (columnMeta.encoding_stats == null) {
return false; // this is false as "don't know".
}
for (PageEncodingStats encodingStat : columnMeta.encoding_stats) {
if (encodingStat.page_type != PageType.DATA_PAGE
&& encodingStat.page_type != PageType.DATA_PAGE_V2) {
// skip non-data pages.
continue;
}
// this is a data page.
if (encodingStat.encoding != Encoding.PLAIN_DICTIONARY
&& encodingStat.encoding != Encoding.RLE_DICTIONARY) {
return false;
}
}
return true;
}

private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {
final Set<String> result = new HashSet<>(fileMetaData.getSchemaSize());
final List<RowGroup> rowGroups = fileMetaData.getRow_groups();
final Iterator<RowGroup> riter = rowGroups.iterator();
if (!riter.hasNext()) {
// For an empty file we say all columns satisfy the property.
for (SchemaElement se : fileMetaData.getSchema()) {
if (!se.isSetNum_children()) { // We want only the leaves.
result.add(se.getName());
}
}
return result;
}
// On the first pass, for row group zero, we are going to add all columns to the set
// that satisfy the restriction.
// On later passes after zero, we will remove any column that does not satisfy
// the restriction.
final RowGroup rg0 = riter.next();
for (ColumnChunk columnChunk : rg0.columns) {
if (columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
result.add(parquetColumnName);
}
}

while (riter.hasNext()) {
final RowGroup rowGroup = riter.next();
for (ColumnChunk columnChunk : rowGroup.columns) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
if (!result.contains(parquetColumnName)) {
continue;
}
if (!columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
result.remove(parquetColumnName);
}
}
}
return result;
}

/**
* Create a {@link RowGroupReader} object for provided row group number
*
Expand All @@ -239,7 +138,7 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
type,
schema,
getSchema(),
version);
}
Expand Down Expand Up @@ -463,24 +362,7 @@ private static LogicalTypeAnnotation getLogicalTypeFromConvertedType(
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
return type;
}

public int rowGroupCount() {
return fileMetaData.getRow_groups().size();
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,6 @@ private static Table readPartitionedTableWithMetadata(
@NotNull final ParquetInstructions readInstructions,
@Nullable final SeekableChannelsProvider channelsProvider) {
verifyFileLayout(readInstructions, ParquetFileLayout.METADATA_PARTITIONED);
if (readInstructions.getTableDefinition().isPresent()) {
throw new UnsupportedOperationException("Detected table definition inside read instructions, reading " +
"metadata files with custom table definition is currently not supported");
}
final ParquetMetadataFileLayout layout =
ParquetMetadataFileLayout.create(sourceURI, readInstructions, channelsProvider);
return readTable(layout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,43 +120,53 @@ private ParquetMetadataFileLayout(
final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider);
final ParquetMetadataConverter converter = new ParquetMetadataConverter();
final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema(
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
if (inputInstructions.getTableDefinition().isEmpty()) {
// Infer the definition from the metadata file
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream().collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
// Infer the partitioning columns using the common metadata file
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream()
.collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(
String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
}
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
definition = inputInstructions.getTableDefinition().get();
instructions = inputInstructions;
}

final List<ColumnDefinition<?>> partitioningColumns = definition.getPartitioningColumns();
Expand Down Expand Up @@ -187,8 +197,10 @@ private ParquetMetadataFileLayout(
final int numPartitions = filePath.getNameCount() - 1;
if (numPartitions != partitioningColumns.size()) {
throw new TableDataException(String.format(
"Unexpected number of path elements in %s for partitions %s",
relativePathString, partitions.keySet()));
"Unexpected number of path elements in %s for partitions %s, found %d elements, expected " +
"%d based on definition %s",
relativePathString, partitions.keySet(), numPartitions, partitioningColumns.size(),
definition));
}
final boolean useHiveStyle = filePath.getName(0).toString().contains("=");
for (int pi = 0; pi < numPartitions; ++pi) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import java.nio.file.Path;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static org.junit.Assume.assumeTrue;

/**
* Assumes that there is already a checkout of <a href="https://github.com/deephaven/examples">deephaven-examples</a>.
* This is currently meant to be run locally.
*/
public class ParquetDeephavenExamplesTest {

private static final Path CHECKOUT_ROOT = null; // Path.of("/path/to/deephaven-examples");

@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void beforeClass() {
assumeTrue(CHECKOUT_ROOT != null);
}

@Test
public void pems() {
read("Pems/parquet/pems");
}

@Test
public void crypto() {
read("CryptoCurrencyHistory/Parquet/crypto.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept7.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept8.parquet");
read("CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet");
read("CryptoCurrencyHistory/Parquet/FakeCryptoTrades_20230209.parquet");
}

@Test
public void taxi() {
read("Taxi/parquet/taxi.parquet");
}

@Test
public void sensorData() {
read("SensorData/parquet/SensorData_gzip.parquet");
}

@Test
public void grades() {
assertTableEquals(
read("ParquetExamples/grades"),
read("ParquetExamples/grades_meta"));
assertTableEquals(
read("ParquetExamples/grades_flat").sort("Name", "Class"),
read("ParquetExamples/grades_flat_meta").sort("Name", "Class"));
assertTableEquals(
read("ParquetExamples/grades_kv").sort("Name", "Class"),
read("ParquetExamples/grades_kv_meta").sort("Name", "Class"));
}

private static Table read(String name) {
final String path = CHECKOUT_ROOT
.resolve(name)
.toUri()
.toString();
return ParquetTools.readTable(path).select();
}
}
Loading