diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index b8d59f2417a..87ce0690771 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -25,7 +25,7 @@ import static io.deephaven.base.FileUtils.convertToURI; /** - * Top level accessor for a parquet file which can read both from a CLI style file URI, ex."s3://bucket/key". + * Top level accessor for a parquet file which can read from a CLI style file URI, ex."s3://bucket/key". */ public class ParquetFileReader { private static final int FOOTER_LENGTH_SIZE = 4; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index ab035bdd0c8..2460670ee3a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -970,10 +970,6 @@ private static Table readPartitionedTableWithMetadata( @NotNull final ParquetInstructions readInstructions, @Nullable final SeekableChannelsProvider channelsProvider) { verifyFileLayout(readInstructions, ParquetFileLayout.METADATA_PARTITIONED); - if (readInstructions.getTableDefinition().isPresent()) { - throw new UnsupportedOperationException("Detected table definition inside read instructions, reading " + - "metadata files with custom table definition is currently not supported"); - } final ParquetMetadataFileLayout layout = ParquetMetadataFileLayout.create(sourceURI, readInstructions, channelsProvider); return readTable(layout, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 4ec93d85a1b..84546de519e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -120,43 +120,53 @@ private ParquetMetadataFileLayout( final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider); final ParquetMetadataConverter converter = new ParquetMetadataConverter(); final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter); - final Pair>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema( - metadataFileReader.getSchema(), - metadataFileMetadata.getFileMetaData().getKeyValueMetaData(), - inputInstructions); - - if (channelsProvider.exists(commonMetadataFileURI)) { - final ParquetFileReader commonMetadataFileReader = - ParquetFileReader.create(commonMetadataFileURI, channelsProvider); - final Pair>, ParquetInstructions> fullSchemaInfo = + if (inputInstructions.getTableDefinition().isEmpty()) { + // Infer the definition from the metadata file + final Pair>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema( - commonMetadataFileReader.getSchema(), - convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter) - .getFileMetaData() - .getKeyValueMetaData(), - leafSchemaInfo.getSecond()); - final Collection> adjustedColumnDefinitions = new ArrayList<>(); - final Map> leafDefinitionsMap = - leafSchemaInfo.getFirst().stream().collect(toMap(ColumnDefinition::getName, Function.identity())); - for (final ColumnDefinition fullDefinition : fullSchemaInfo.getFirst()) { - final ColumnDefinition leafDefinition = leafDefinitionsMap.get(fullDefinition.getName()); - if (leafDefinition == null) { - adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition)); - } else if (fullDefinition.equals(leafDefinition)) { - adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case - } else { - final List differences = new ArrayList<>(); - fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema", - "", false); - throw new TableDataException(String.format("Schema mismatch between %s and %s for column %s: %s", - metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences)); + metadataFileReader.getSchema(), + metadataFileMetadata.getFileMetaData().getKeyValueMetaData(), + inputInstructions); + + if (channelsProvider.exists(commonMetadataFileURI)) { + // Infer the partitioning columns using the common metadata file + final ParquetFileReader commonMetadataFileReader = + ParquetFileReader.create(commonMetadataFileURI, channelsProvider); + final Pair>, ParquetInstructions> fullSchemaInfo = + ParquetSchemaReader.convertSchema( + commonMetadataFileReader.getSchema(), + convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter) + .getFileMetaData() + .getKeyValueMetaData(), + leafSchemaInfo.getSecond()); + final Collection> adjustedColumnDefinitions = new ArrayList<>(); + final Map> leafDefinitionsMap = + leafSchemaInfo.getFirst().stream() + .collect(toMap(ColumnDefinition::getName, Function.identity())); + for (final ColumnDefinition fullDefinition : fullSchemaInfo.getFirst()) { + final ColumnDefinition leafDefinition = leafDefinitionsMap.get(fullDefinition.getName()); + if (leafDefinition == null) { + adjustedColumnDefinitions.add(adjustPartitionDefinition(fullDefinition)); + } else if (fullDefinition.equals(leafDefinition)) { + adjustedColumnDefinitions.add(fullDefinition); // No adjustments to apply in this case + } else { + final List differences = new ArrayList<>(); + fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema", + "", false); + throw new TableDataException( + String.format("Schema mismatch between %s and %s for column %s: %s", + metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences)); + } } + definition = TableDefinition.of(adjustedColumnDefinitions); + instructions = fullSchemaInfo.getSecond(); + } else { + definition = TableDefinition.of(leafSchemaInfo.getFirst()); + instructions = leafSchemaInfo.getSecond(); } - definition = TableDefinition.of(adjustedColumnDefinitions); - instructions = fullSchemaInfo.getSecond(); } else { - definition = TableDefinition.of(leafSchemaInfo.getFirst()); - instructions = leafSchemaInfo.getSecond(); + definition = inputInstructions.getTableDefinition().get(); + instructions = inputInstructions; } final List> partitioningColumns = definition.getPartitioningColumns(); @@ -187,8 +197,10 @@ private ParquetMetadataFileLayout( final int numPartitions = filePath.getNameCount() - 1; if (numPartitions != partitioningColumns.size()) { throw new TableDataException(String.format( - "Unexpected number of path elements in %s for partitions %s", - relativePathString, partitions.keySet())); + "Unexpected number of path elements in %s for partitions %s, found %d elements, expected " + + "%d based on definition %s", + relativePathString, partitions.keySet(), numPartitions, partitioningColumns.size(), + definition)); } final boolean useHiveStyle = filePath.getName(0).toString().contains("="); for (int pi = 0; pi < numPartitions; ++pi) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index a6d683afcc0..beb23ce5a1b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -1782,6 +1782,138 @@ public void decimalLogicalTypeTest() { } } + @Test + public void metadataPartitionedDataWithCustomTableDefinition() { + final Table inputData = TableTools.emptyTable(100).update("PC1 = (ii%2 == 0) ? `Apple` : `Ball`", + "PC2 = (ii%3 == 0) ? `Pen` : `Pencil`", + "numbers = ii", + "characters = (char) (65 + (ii % 23))"); + final TableDefinition tableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofString("PC2").withPartitioning(), + ColumnDefinition.ofLong("numbers"), + ColumnDefinition.ofChar("characters")); + final File parentDir = new File(rootFile, "metadataPartitionedDataWithCustomTableDefinition"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setGenerateMetadataFiles(true) + .setTableDefinition(tableDefinition) + .build(); + writeKeyValuePartitionedTable(inputData, parentDir.getAbsolutePath(), writeInstructions); + final Table fromDisk = readTable(parentDir.getPath(), + EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + assertEquals(fromDisk.getDefinition(), tableDefinition); + assertTableEquals(inputData.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); + + // Now set a different table definitions and read the data + { + // Remove a column + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofString("PC2").withPartitioning(), + ColumnDefinition.ofLong("numbers")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions); + assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition()); + assertTableEquals(inputData.select("PC1", "PC2", "numbers").sort("PC1", "PC2"), + fromDiskWithNewDefinition.sort("PC1", "PC2")); + } + + { + // Remove another column + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofString("PC2").withPartitioning(), + ColumnDefinition.ofChar("characters")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions); + assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition()); + assertTableEquals(inputData.select("PC1", "PC2", "characters").sort("PC1", "PC2"), + fromDiskWithNewDefinition.sort("PC1", "PC2")); + } + + { + // Remove a partitioning column + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofChar("characters"), + ColumnDefinition.ofLong("numbers")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + try { + readTable(parentDir.getPath(), readInstructions); + fail("Exception expected because of missing partitioning column in table definition"); + } catch (final RuntimeException expected) { + } + } + + { + // Add an extra column + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofString("PC2").withPartitioning(), + ColumnDefinition.ofLong("numbers"), + ColumnDefinition.ofChar("characters"), + ColumnDefinition.ofInt("extraColumn")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions); + assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition()); + assertTableEquals(inputData.update("extraColumn = (int) null").sort("PC1", "PC2"), + fromDiskWithNewDefinition.sort("PC1", "PC2")); + } + + { + // Reorder partitioning and non-partitioning columns + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PC2").withPartitioning(), + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofChar("characters"), + ColumnDefinition.ofLong("numbers")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions); + assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition()); + assertTableEquals(inputData.select("PC2", "PC1", "characters", "numbers").sort("PC2", "PC1"), + fromDiskWithNewDefinition.sort("PC2", "PC1")); + } + + { + // Rename partitioning and non partitioning columns + final TableDefinition newTableDefinition = TableDefinition.of( + ColumnDefinition.ofString("PartCol1").withPartitioning(), + ColumnDefinition.ofString("PartCol2").withPartitioning(), + ColumnDefinition.ofLong("nums"), + ColumnDefinition.ofChar("chars")); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setTableDefinition(newTableDefinition) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .addColumnNameMapping("PC1", "PartCol1") + .addColumnNameMapping("PC2", "PartCol2") + .addColumnNameMapping("numbers", "nums") + .addColumnNameMapping("characters", "chars") + .build(); + final Table fromDiskWithNewDefinition = readTable(parentDir.getPath(), readInstructions); + assertEquals(newTableDefinition, fromDiskWithNewDefinition.getDefinition()); + assertTableEquals( + inputData.sort("PC1", "PC2"), + fromDiskWithNewDefinition + .renameColumns("PC1 = PartCol1", "PC2 = PartCol2", "numbers = nums", "characters = chars") + .sort("PC1", "PC2")); + } + } + @Test public void testVectorColumns() { final Table table = getTableFlat(20000, true, false);