Skip to content

Commit

Permalink
Added support to read parquet metadata files with custom table defini…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
malhotrashivam committed Oct 11, 2024
1 parent 4f8aa29 commit 261738c
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a CLI style file URI, ex."s3://bucket/key".
* Top level accessor for a parquet file which can read from a CLI style file URI, ex."s3://bucket/key".
*/
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,6 @@ private static Table readPartitionedTableWithMetadata(
@NotNull final ParquetInstructions readInstructions,
@Nullable final SeekableChannelsProvider channelsProvider) {
verifyFileLayout(readInstructions, ParquetFileLayout.METADATA_PARTITIONED);
if (readInstructions.getTableDefinition().isPresent()) {
throw new UnsupportedOperationException("Detected table definition inside read instructions, reading " +
"metadata files with custom table definition is currently not supported");
}
final ParquetMetadataFileLayout layout =
ParquetMetadataFileLayout.create(sourceURI, readInstructions, channelsProvider);
return readTable(layout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,43 +120,53 @@ private ParquetMetadataFileLayout(
final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider);
final ParquetMetadataConverter converter = new ParquetMetadataConverter();
final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema(
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
if (inputInstructions.getTableDefinition().isEmpty()) {
// Infer the definition from the metadata file
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> leafSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream().collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
metadataFileReader.getSchema(),
metadataFileMetadata.getFileMetaData().getKeyValueMetaData(),
inputInstructions);

if (channelsProvider.exists(commonMetadataFileURI)) {
// Infer the partitioning columns using the common metadata file
final ParquetFileReader commonMetadataFileReader =
ParquetFileReader.create(commonMetadataFileURI, channelsProvider);
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> fullSchemaInfo =
ParquetSchemaReader.convertSchema(
commonMetadataFileReader.getSchema(),
convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter)
.getFileMetaData()
.getKeyValueMetaData(),
leafSchemaInfo.getSecond());
final Collection<ColumnDefinition<?>> adjustedColumnDefinitions = new ArrayList<>();
final Map<String, ColumnDefinition<?>> leafDefinitionsMap =
leafSchemaInfo.getFirst().stream()
.collect(toMap(ColumnDefinition::getName, Function.identity()));
for (final ColumnDefinition<?> fullDefinition : fullSchemaInfo.getFirst()) {
final ColumnDefinition<?> leafDefinition = leafDefinitionsMap.get(fullDefinition.getName());
if (leafDefinition == null) {
adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition));
} else if (fullDefinition.equals(leafDefinition)) {
adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case
} else {
final List<String> differences = new ArrayList<>();
fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema",
"", false);
throw new TableDataException(
String.format("Schema mismatch between %s and %s for column %s: %s",
metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences));
}
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
}
definition = TableDefinition.of(adjustedColumnDefinitions);
instructions = fullSchemaInfo.getSecond();
} else {
definition = TableDefinition.of(leafSchemaInfo.getFirst());
instructions = leafSchemaInfo.getSecond();
definition = inputInstructions.getTableDefinition().get();
instructions = inputInstructions;
}

final List<ColumnDefinition<?>> partitioningColumns = definition.getPartitioningColumns();
Expand Down Expand Up @@ -187,8 +197,10 @@ private ParquetMetadataFileLayout(
final int numPartitions = filePath.getNameCount() - 1;
if (numPartitions != partitioningColumns.size()) {
throw new TableDataException(String.format(
"Unexpected number of path elements in %s for partitions %s",
relativePathString, partitions.keySet()));
"Unexpected number of path elements in %s for partitions %s, found %d elements, expected " +
"%d based on definition %s",
relativePathString, partitions.keySet(), numPartitions, partitioningColumns.size(),
definition));
}
final boolean useHiveStyle = filePath.getName(0).toString().contains("=");
for (int pi = 0; pi < numPartitions; ++pi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,138 @@ public void decimalLogicalTypeTest() {
}
}

@Test
public void metadataPartitionedDataWithCustomTableDefinition() {
final Table inputData = TableTools.emptyTable(100).update("PC1 = (ii%2 == 0) ? `Apple` : `Ball`",
"PC2 = (ii%3 == 0) ? `Pen` : `Pencil`",
"numbers = ii",
"characters = (char) (65 + (ii % 23))");
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofString("PC2").withPartitioning(),
ColumnDefinition.ofLong("numbers"),
ColumnDefinition.ofChar("characters"));
final File parentDir = new File(rootFile, "metadataPartitionedDataWithCustomTableDefinition");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setGenerateMetadataFiles(true)
.setTableDefinition(tableDefinition)
.build();
writeKeyValuePartitionedTable(inputData, parentDir.getAbsolutePath(), writeInstructions);
final Table fromDisk = readTable(parentDir.getPath(),
EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED));
assertEquals(fromDisk.getDefinition(), tableDefinition);
assertTableEquals(inputData.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2"));

// Now set a different table definitions and read the data
{
// Remove a column
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofString("PC2").withPartitioning(),
ColumnDefinition.ofLong("numbers"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.build();
final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions);
assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition());
assertTableEquals(inputData.select("PC1", "PC2", "numbers").sort("PC1", "PC2"),
fromDiskWithNewDefinition.sort("PC1", "PC2"));
}

{
// Remove another column
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofString("PC2").withPartitioning(),
ColumnDefinition.ofChar("characters"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.build();
final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions);
assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition());
assertTableEquals(inputData.select("PC1", "PC2", "characters").sort("PC1", "PC2"),
fromDiskWithNewDefinition.sort("PC1", "PC2"));
}

{
// Remove a partitioning column
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofChar("characters"),
ColumnDefinition.ofLong("numbers"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.build();
try {
readTable(parentDir.getPath(), readInstructions);
fail("Exception expected because of missing partitioning column in table definition");
} catch (final RuntimeException expected) {
}
}

{
// Add an extra column
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofString("PC2").withPartitioning(),
ColumnDefinition.ofLong("numbers"),
ColumnDefinition.ofChar("characters"),
ColumnDefinition.ofInt("extraColumn"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.build();
final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions);
assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition());
assertTableEquals(inputData.update("extraColumn = (int) null").sort("PC1", "PC2"),
fromDiskWithNewDefinition.sort("PC1", "PC2"));
}

{
// Reorder partitioning and non-partitioning columns
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PC2").withPartitioning(),
ColumnDefinition.ofString("PC1").withPartitioning(),
ColumnDefinition.ofChar("characters"),
ColumnDefinition.ofLong("numbers"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.build();
final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions);
assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition());
assertTableEquals(inputData.select("PC2", "PC1", "characters", "numbers").sort("PC2", "PC1"),
fromDiskWithNewDefinition.sort("PC2", "PC1"));
}

{
// Rename partitioning and non partitioning columns
final TableDefinition newTableDefinition = TableDefinition.of(
ColumnDefinition.ofString("PartCol1").withPartitioning(),
ColumnDefinition.ofString("PartCol2").withPartitioning(),
ColumnDefinition.ofLong("nums"),
ColumnDefinition.ofChar("chars"));
final ParquetInstructions readInstructions = ParquetInstructions.builder()
.setTableDefinition(newTableDefinition)
.setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)
.addColumnNameMapping("PC1", "PartCol1")
.addColumnNameMapping("PC2", "PartCol2")
.addColumnNameMapping("numbers", "nums")
.addColumnNameMapping("characters", "chars")
.build();
final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions);
assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition());
assertTableEquals(
inputData.sort("PC1", "PC2"),
fromDiskWithNewDefinition
.renameColumns("PC1 = PartCol1", "PC2 = PartCol2", "numbers = nums", "characters = chars")
.sort("PC1", "PC2"));
}
}

@Test
public void testVectorColumns() {
final Table table = getTableFlat(20000, true, false);
Expand Down

0 comments on commit 261738c

Please sign in to comment.