Skip to content

Commit

Permalink
Added fixes and tests for windows
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Apr 1, 2024
1 parent fa2b0c5 commit 7909ea0
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
import io.deephaven.util.datastructures.LazyCachingFunction;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -75,7 +76,8 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ public static void writeTable(
writeTables(new Table[] {sourceTable}, definition, new File[] {destFile}, writeInstructions);
}

private static File getShadowFile(File destFile) {
private static File getShadowFile(final File destFile) {
return new File(destFile.getParent(), ".NEW_" + destFile.getName());
}

@VisibleForTesting
static File getBackupFile(File destFile) {
static File getBackupFile(final File destFile) {
return new File(destFile.getParent(), ".OLD_" + destFile.getName());
}

Expand All @@ -339,15 +339,16 @@ private static String minusParquetSuffix(@NotNull final String s) {
* Generates the index file path relative to the table destination file path.
*
* @param tableDest Destination path for the main table containing these indexing columns
* @param columnNames Array of names of the indexing columns
* @param columnNames Array of indexing column names
*
* @return The relative index file path. For example, for table with destination {@code "table.parquet"} and
* indexing column {@code "IndexingColName"}, the method will return
* {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"}
* {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"} on unix systems.
*/
public static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String[] columnNames) {
final String columns = String.join(",", columnNames);
return String.format(".dh_metadata/indexes/%s/index_%s_%s", columns, columns, tableDest.getName());
return String.format(".dh_metadata%sindexes%s%s%sindex_%s_%s", File.separator, File.separator, columns,
File.separator, columns, tableDest.getName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.base.ParquetFileReader;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import io.deephaven.util.type.TypeUtils;
import org.apache.commons.lang3.mutable.MutableInt;
Expand Down Expand Up @@ -143,34 +144,33 @@ public ParquetMetadataFileLayout(
final Map<String, PartitionParser> partitionKeyToParser = partitioningColumns.stream().collect(toMap(
ColumnDefinition::getName,
cd -> PartitionParser.lookupSupported(cd.getDataType(), cd.getComponentType())));
final Map<String, TIntList> fileNameToRowGroupIndices = new LinkedHashMap<>();
final Map<String, TIntList> filePathToRowGroupIndices = new LinkedHashMap<>();
final List<RowGroup> rowGroups = metadataFileReader.fileMetaData.getRow_groups();
final int numRowGroups = rowGroups.size();
for (int rgi = 0; rgi < numRowGroups; ++rgi) {
fileNameToRowGroupIndices
.computeIfAbsent(rowGroups.get(rgi).getColumns().get(0).getFile_path(), fn -> new TIntArrayList())
.add(rgi);
final String relativePath =
FilenameUtils.separatorsToSystem(rowGroups.get(rgi).getColumns().get(0).getFile_path());
filePathToRowGroupIndices.computeIfAbsent(relativePath, fn -> new TIntArrayList()).add(rgi);
}
final File directory = metadataFile.getParentFile();
final MutableInt partitionOrder = new MutableInt(0);
keys = fileNameToRowGroupIndices.entrySet().stream().map(entry -> {
final String filePathString = entry.getKey();
keys = filePathToRowGroupIndices.entrySet().stream().map(entry -> {
final String relativePathString = entry.getKey();
final int[] rowGroupIndices = entry.getValue().toArray();

if (filePathString == null || filePathString.isEmpty()) {
if (relativePathString == null || relativePathString.isEmpty()) {
throw new TableDataException(String.format(
"Missing parquet file name for row groups %s in %s",
Arrays.toString(rowGroupIndices), metadataFile));
}
final LinkedHashMap<String, Comparable<?>> partitions =
partitioningColumns.isEmpty() ? null : new LinkedHashMap<>();
if (partitions != null) {
final Path filePath = Paths.get(filePathString);
final Path filePath = Paths.get(relativePathString);
final int numPartitions = filePath.getNameCount() - 1;
if (numPartitions != partitioningColumns.size()) {
throw new TableDataException(String.format(
"Unexpected number of path elements in %s for partitions %s",
filePathString, partitions.keySet()));
relativePathString, partitions.keySet()));
}
final boolean useHiveStyle = filePath.getName(0).toString().contains("=");
for (int pi = 0; pi < numPartitions; ++pi) {
Expand All @@ -182,7 +182,7 @@ public ParquetMetadataFileLayout(
if (pathComponents.length != 2) {
throw new TableDataException(String.format(
"Unexpected path format found for hive-style partitioning from %s for %s",
filePathString, metadataFile));
relativePathString, metadataFile));
}
partitionKey = instructions.getColumnNameFromParquetColumnNameOrDefault(pathComponents[0]);
partitionValueRaw = pathComponents[1];
Expand All @@ -195,16 +195,16 @@ public ParquetMetadataFileLayout(
if (partitions.containsKey(partitionKey)) {
throw new TableDataException(String.format(
"Unexpected duplicate partition key %s when parsing %s for %s",
partitionKey, filePathString, metadataFile));
partitionKey, relativePathString, metadataFile));
}
partitions.put(partitionKey, partitionValue);
}
}
final File partitionFile = new File(directory, filePathString);
final File partitionFile = new File(directory, relativePathString);
final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFile,
partitionOrder.getAndIncrement(), partitions, inputInstructions);
tlk.setFileReader(metadataFileReader);
tlk.setMetadata(getParquetMetadataForFile(filePathString, metadataFileMetadata));
tlk.setMetadata(getParquetMetadataForFile(relativePathString, metadataFileMetadata));
tlk.setRowGroupIndices(rowGroupIndices);
return tlk;
}).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.parquet.base.ParquetFileReader;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.format.RowGroup;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
Expand Down Expand Up @@ -190,7 +191,8 @@ public synchronized int[] getRowGroupIndices() {
// While it seems that row group *could* have column chunks splayed out into multiple files,
// we're not expecting that in this code path. To support it, discovery tools should figure out
// the row groups for a partition themselves and call setRowGroupReaders.
final String filePath = rowGroups.get(rgi).getColumns().get(0).getFile_path();
final String filePath =
FilenameUtils.separatorsToSystem(rowGroups.get(rgi).getColumns().get(0).getFile_path());
return filePath == null || convertToURI(filePath, false).equals(uri);
}).toArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ private Table maybeFixBigDecimal(Table toFix) {

private static Table readParquetFileFromGitLFS(final File dest) {
try {
return readSingleFileTable(dest, EMPTY);
return readTable(dest, EMPTY);
} catch (final RuntimeException e) {
if (e.getCause() instanceof InvalidParquetFileException) {
final String InvalidParquetFileErrorMsgString = "Invalid parquet file detected, please ensure the " +
Expand Down Expand Up @@ -1623,6 +1623,18 @@ public void basicWriteTests() {
basicWriteTestsImpl(MULTI_WRITER);
}

@Test
public void readPartitionedDataGeneratedOnWindows() {
final String path = ParquetTableReadWriteTest.class
.getResource("/referencePartitionedDataFromWindows").getFile();
final Table partitionedDataFromWindows = readParquetFileFromGitLFS(new File(path)).select();
final Table expected = TableTools.newTable(
longCol("year", 2019, 2020, 2021, 2021, 2022, 2022),
longCol("n_legs", 5, 2, 4, 100, 2, 4),
stringCol("animal", "Brittle stars", "Flamingo", "Dog", "Centipede", "Parrot", "Horse"));
assertTableEquals(expected, partitionedDataFromWindows.sort("year"));
}

private static void basicWriteTestsImpl(TestParquetTableWriter writer) {
// Create an empty parent directory
final File parentDir = new File(rootFile, "tempDir");
Expand Down
Binary file not shown.
Binary file not shown.
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown

0 comments on commit 7909ea0

Please sign in to comment.