Skip to content

Commit

Permalink
Iterating using chunked iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 4, 2024
1 parent 3651e5f commit 51eefe1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
Expand Down Expand Up @@ -60,7 +60,6 @@
import java.util.*;
import java.util.stream.Collectors;

import static io.deephaven.engine.table.impl.partitioned.PartitionedTableCreatorImpl.CONSTITUENT;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable;
import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION;
Expand Down Expand Up @@ -546,8 +545,7 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
final Set<String> keyColumnNames = partitionedTable.keyColumnNames();
final Collection<ColumnDefinition<?>> columnDefinitions = new ArrayList<>(keyColumnNames.size() +
partitionedTable.constituentDefinition().numColumns());
partitionedTable.table().getDefinition().getColumns().stream()
.filter(columnDefinition -> keyColumnNames.contains(columnDefinition.getName()))
keyColumnNames.stream().map(keyColName -> partitionedTable.table().getDefinition().getColumn(keyColName))
.forEach(columnDefinitions::add);
partitionedTable.constituentDefinition().getColumns().stream()
.filter(columnDefinition -> !keyColumnNames.contains(columnDefinition.getName()))
Expand Down Expand Up @@ -576,55 +574,57 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
final String[] partitioningColumnNames = partitionedTable.keyColumnNames().toArray(String[]::new);
if (partitionedTable.table().numColumns() == partitioningColumnNames.length) {
throw new IllegalArgumentException(
"Cannot write a partitioned parquet table with no non-partitioning columns");
final TableDefinition partitionedTableDefinition = getNonKeyTableDefiniton(partitionedTable.keyColumnNames(),
definition);
if (partitionedTableDefinition.numColumns() == 0) {
throw new IllegalArgumentException("Cannot write a partitioned parquet table without any non-partitioning "
+ "columns");
}
// Note that there can be multiple constituents with the same key values, so cannot directly use the
// partitionedTable.constituentFor(keyValues) method, and we need to group them together
final Table withGroupConstituents = partitionedTable.table().groupBy(partitioningColumnNames);
final ColumnSource<ObjectVector<Table>> consituentVectorColumnSource =
withGroupConstituents.getColumnSource(partitionedTable.constituentColumnName());
if (consituentVectorColumnSource == null) {
throw new IllegalStateException("Grouped partitioned table must have a constituent column of type " +
"ObjectVector");
final List<StringBuilder> relativePathBuilders = new ArrayList<>();
final long numRows = withGroupConstituents.size();
for (long i = 0; i < numRows; i++) {
relativePathBuilders.add(new StringBuilder());
}
final Collection<Table> partitionedData = new ArrayList<>();
final Collection<File> destinations = new ArrayList<>();
withGroupConstituents.getRowSet().forAllRowKeys(key -> {
final StringBuilder relativePathBuilder = new StringBuilder();
for (final String partitioningColumnName : partitioningColumnNames) {
final ColumnSource<?> partitioningColSource =
withGroupConstituents.getColumnSource(partitioningColumnName);
final Object partitioningKey = partitioningColSource.get(key);
if (partitioningKey == null) {
throw new IllegalStateException("Partitioning column values must be non-null, found null " +
"value for column " + partitioningColumnName);
}
final String partitioningValue = PartitionFormatter.formatToString(partitioningKey);
relativePathBuilder.append(partitioningColumnName).append("=").append(partitioningValue)
.append(File.separator);
}
final String relativePath = relativePathBuilder.toString();
final ObjectVector<? extends Table> constituentVector = consituentVectorColumnSource.get(key);
if (constituentVector == null) {
throw new IllegalStateException("Grouped partitioned table must have a vector constituent column for" +
" key = " + key);
}
int count = 0;
for (final Table constituent : constituentVector) {
final File destination;
if (partitionedTable.uniqueKeys()) {
destination = new File(destinationDir, relativePath + baseName + ".parquet");
} else {
destination = new File(destinationDir, relativePath + baseName + "-part-" + count + ".parquet");
// For partitioning column for each row, accumulate the values in a key=value format
Arrays.stream(partitioningColumnNames).forEach(columnName -> {
try (final CloseableIterator<?> valueIterator = withGroupConstituents.columnIterator(columnName)) {
int row = 0;
while (valueIterator.hasNext()) {
final String partitioningValue = PartitionFormatter.formatToString(valueIterator.next());
relativePathBuilders.get(row).append(columnName).append("=").append(partitioningValue)
.append(File.separator);
row++;
}
destinations.add(destination);
partitionedData.add(constituent);
count++;
}
});

// For constituent column for each row, build final file paths
final Collection<Table> partitionedData = new ArrayList<>();
final Collection<File> destinations = new ArrayList<>();
try (final CloseableIterator<ObjectVector<? extends Table>> constituentIterator =
withGroupConstituents.objectColumnIterator(partitionedTable.constituentColumnName())) {
int row = 0;
while (constituentIterator.hasNext()) {
final ObjectVector<? extends Table> constituentVector = constituentIterator.next();
final String relativePath = relativePathBuilders.get(row).toString();
int count = 0;
for (final Table constituent : constituentVector) {
final File destination;
if (partitionedTable.uniqueKeys()) {
destination = new File(destinationDir, relativePath + baseName + ".parquet");
} else {
destination = new File(destinationDir, relativePath + baseName + "-part-" + count + ".parquet");
}
destinations.add(destination);
partitionedData.add(constituent);
count++;
}
row++;
}
}
// If needed, generate schema for _common_metadata file from key table
final ParquetInstructions updatedWriteInstructions;
if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) {
Expand All @@ -633,14 +633,12 @@ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable
} else {
updatedWriteInstructions = writeInstructions;
}
final TableDefinition partitionedTablesDefinition = getNonKeyTableDefiniton(partitionedTable.keyColumnNames(),
definition);
ParquetTools.writeParquetTables(
partitionedData.toArray(Table[]::new),
partitionedTablesDefinition,
partitionedTableDefinition,
updatedWriteInstructions,
destinations.toArray(File[]::new),
partitionedTablesDefinition.getGroupingColumnNamesArray());
partitionedTableDefinition.getGroupingColumnNamesArray());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import java.util.Map;

/**
* This class takes an object read from a parquet file and formats it to a string, only for the supported types.
* This class takes an object read from a parquet file and formats it to a string used for generate partitioning column
* values.
*/
enum PartitionFormatter {
ForString {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,27 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() {
assertTableEquals(inputData.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2"));
}

@Test
public void writeKeyValuePartitionedDataWithNoNonPartitioningColumnsTest() {
final TableDefinition definition = TableDefinition.of(
ColumnDefinition.ofInt("PC1").withPartitioning(),
ColumnDefinition.ofInt("PC2").withPartitioning());
final Table inputData = ((QueryTable) TableTools.emptyTable(20)
.updateView("PC1 = (int)(ii%3)",
"PC2 = (int)(ii%2)"))
.withDefinitionUnsafe(definition);

final File parentDir = new File(rootFile, "writeKeyValuePartitionedDataTest");
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setMetadataRootDir(parentDir.getAbsolutePath())
.build();
try {
writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions);
fail("Expected exception when writing the partitioned table with no non-partitioning columns");
} catch (final RuntimeException expected) {
}
}

@Test
public void writeKeyValuePartitionedDataWithNonUniqueKeys() {
final TableDefinition definition = TableDefinition.of(
Expand Down Expand Up @@ -783,7 +804,7 @@ public void testAllPartitioningColumnTypes() {
"PC8 = (float)(ii % 2)",
"PC9 = (double)(ii % 2)",
"PC10 = java.math.BigInteger.valueOf(ii)",
"PC11 = java.math.BigDecimal.valueOf(ii)",
"PC11 = java.math.BigDecimal.valueOf((double)ii)",
"PC12 = java.time.Instant.ofEpochSecond(ii)",
"PC13 = java.time.LocalDate.ofEpochDay(ii)",
"PC14 = java.time.LocalTime.of(i%24, i%60, (i+10)%60)",
Expand All @@ -795,7 +816,17 @@ public void testAllPartitioningColumnTypes() {
.setMetadataRootDir(parentDir.getAbsolutePath())
.build();
writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions);
readKeyValuePartitionedTable(parentDir, EMPTY).select();

// Verify that we can read the partition values, but types like LocalDate or LocalTime will be read as strings
// Therefore, we cannot compare the tables directly
final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY).select();

// Reading with metadata file should deduce the correct type, so we can compare the tables
final File commonMetadata = new File(parentDir, "_common_metadata");
final Table fromDiskWithMetadata = readTable(commonMetadata);
final String[] partitioningColumns = definition.getPartitioningColumns().stream()
.map(ColumnDefinition::getName).toArray(String[]::new);
assertTableEquals(inputData.sort(partitioningColumns), fromDiskWithMetadata.sort(partitioningColumns));
}

@Test
Expand Down

0 comments on commit 51eefe1

Please sign in to comment.