Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
this.columnChunkURI = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;
import java.io.IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.parquet.base;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

Expand Down Expand Up @@ -33,9 +34,10 @@ public static boolean fileNameMatches(final Path path) {
}

/**
* @return the key value derived from the file name, used for storing each file's metadata in the metadata files.
* @return the key value derived from the file path, used for storing each file's metadata in the combined
* {@value #METADATA_FILE_NAME} and {@value #COMMON_METADATA_FILE_NAME} files.
*/
public static String getKeyForFile(final String fileName) {
return "deephaven_per_file_" + fileName;
public static String getPerFileMetadataKey(final String filePath) {
return "deephaven_per_file_" + filePath.replace(File.separatorChar, '_');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public static int getDefaultTargetPageSize() {

private static final boolean DEFAULT_GENERATE_METADATA_FILES = false;

static final String UUID_TOKEN = "{uuid}";
static final String PARTITIONS_TOKEN = "{partitions}";
static final String FILE_INDEX_TOKEN = "{i}";
private static final String DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA = UUID_TOKEN;

public ParquetInstructions() {}

public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) {
Expand Down Expand Up @@ -173,6 +178,14 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean generateMetadataFiles();


/**
* @return the base name for partitioned parquet data. Check
* {@link Builder#setBaseNameForPartitionedParquetData(String) setBaseNameForPartitionedParquetData} for
* more details about different tokens that can be used in the base name.
*/
public abstract String baseNameForPartitionedParquetData();

@VisibleForTesting
public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) {
if (i1 == EMPTY) {
Expand Down Expand Up @@ -252,6 +265,11 @@ public boolean isRefreshing() {
public boolean generateMetadataFiles() {
return DEFAULT_GENERATE_METADATA_FILES;
}

@Override
public String baseNameForPartitionedParquetData() {
return DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA;
}
};

private static class ColumnInstructions {
Expand Down Expand Up @@ -321,6 +339,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final boolean isRefreshing;
private final Object specialInstructions;
private final boolean generateMetadataFiles;
private final String baseNameForPartitionedParquetData;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -332,7 +351,8 @@ private ReadOnly(
final int targetPageSize,
final boolean isRefreshing,
final Object specialInstructions,
final boolean generateMetadataFiles) {
final boolean generateMetadataFiles,
final String baseNameForPartitionedParquetData) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -343,6 +363,7 @@ private ReadOnly(
this.isRefreshing = isRefreshing;
this.specialInstructions = specialInstructions;
this.generateMetadataFiles = generateMetadataFiles;
this.baseNameForPartitionedParquetData = baseNameForPartitionedParquetData;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -441,6 +462,11 @@ public boolean generateMetadataFiles() {
return generateMetadataFiles;
}

@Override
public String baseNameForPartitionedParquetData() {
return baseNameForPartitionedParquetData;
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
return (columnNameToInstructions == null)
Expand Down Expand Up @@ -493,6 +519,7 @@ public static class Builder {
private boolean isRefreshing = DEFAULT_IS_REFRESHING;
private Object specialInstructions;
private boolean generateMetadataFiles = DEFAULT_GENERATE_METADATA_FILES;
private String baseNameForPartitionedParquetData = DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA;

public Builder() {}

Expand Down Expand Up @@ -687,6 +714,30 @@ public Builder setGenerateMetadataFiles(final boolean generateMetadataFiles) {
return this;
}

/**
* Set the base name for partitioned parquet data. This is used to generate the file name for partitioned
* parquet files, and therefore, this parameter is only used when writing partitioned parquet data. Users can
* provide the following tokens to be replaced in the base name:
* <ul>
* <li>The token {@value #FILE_INDEX_TOKEN} will be replaced with an automatically incremented integer for files
* in a directory. For example, a base name of "table-{i}" will result in files named like
* "PC=partition1/table-0.parquet", "PC=partition1/table-1.parquet", etc., where PC is a partitioning
* column.</li>
* <li>The token {@value #UUID_TOKEN} will be replaced with a random UUID. For example, a base name of
* "table-{uuid}" will result in files named like
* "table-8e8ab6b2-62f2-40d1-8191-1c5b70c5f330.parquet.parquet".</li>
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* <li>The token {@value #PARTITIONS_TOKEN} will be replaced with an underscore-delimited, concatenated string
* of partition values. For example, a base name of "{partitions}-table" will result in files like
* "PC1=partition1/PC2=partitionA/PC1=partition1_PC2=partitionA-table.parquet", where "PC1" and "PC2" are
* partitioning columns.</li>
* </ul>
* The default value of this parameter is {@value #DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA}.
*/
public Builder setBaseNameForPartitionedParquetData(final String baseNameForPartitionedParquetData) {
this.baseNameForPartitionedParquetData = baseNameForPartitionedParquetData;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -695,7 +746,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, generateMetadataFiles);
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetUtils.MAGIC;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY;
import static io.deephaven.parquet.base.ParquetUtils.getKeyForFile;
import static io.deephaven.parquet.base.ParquetUtils.getPerFileMetadataKey;

/**
* Used to generate a combined {@value ParquetUtils#METADATA_FILE_NAME} and
* {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file for provided Parquet files.
* {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file for provided Parquet files. This class is stateful and therefore
* should not be used by multiple threads concurrently.
*/
final class ParquetMetadataFileWriterImpl implements ParquetMetadataFileWriter {

Expand Down Expand Up @@ -82,24 +83,6 @@ private static class ParquetFileMetadata {
}
this.metadataRootDirAbsPath = metadataRootDir.getAbsoluteFile().toPath();
final String metadataRootDirAbsPathString = metadataRootDirAbsPath.toString();
final File firstDestination = destinations[0];
for (int i = 0; i < destinations.length; i++) {
final File destination = destinations[i];
if (!destination.getAbsolutePath().startsWith(metadataRootDirAbsPathString)) {
throw new UncheckedDeephavenException("All destinations must be nested under the provided metadata root"
+ " directory, provided destination " + destination.getAbsolutePath() + " is not under " +
metadataRootDirAbsPathString);
}
// TODO How should I change the basename in the API for writeKeyValuePartitioned data for this check?
if (i > 0) {
// We use filename to generate the key for each file's metadata in the common metadata file, therefore
// all files must have unique names.
if (destination.getName().equals(firstDestination.getName())) {
throw new UncheckedDeephavenException("When generating common metadata for multiple parquet files, "
+ "all files must have unique names, but " + destination.getName() + " is repeated.");
}
}
}
for (final File destination : destinations) {
if (!destination.getAbsolutePath().startsWith(metadataRootDirAbsPathString)) {
throw new UncheckedDeephavenException("All destinations must be nested under the provided metadata root"
Expand All @@ -121,10 +104,7 @@ private static class ParquetFileMetadata {
}

/**
* Add parquet metadata for the provided parquet file the combined metadata file. We store deephaven-specific
* metadata for each file individually inside the key-value metadata of the combined metadata file, with keys being
* derived from the file names and values being the metadata for the file. Therefore, the provided parquet files
* must have unique names.
* Add parquet metadata for the provided parquet file the combined metadata file.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*
* @param parquetFilePath The parquet file destination path
* @param metadata The parquet metadata
Expand Down Expand Up @@ -170,8 +150,9 @@ private void mergeMetadata() throws IOException {
for (final ParquetFileMetadata parquetFileMetadata : parquetFileMetadataList) {
final FileMetaData fileMetaData = parquetFileMetadata.metadata.getFileMetaData();
mergedSchema = mergeSchemaInto(fileMetaData.getSchema(), mergedSchema);
mergeKeyValueMetaData(parquetFileMetadata);
mergeBlocksInto(parquetFileMetadata, metadataRootDirAbsPath, mergedBlocks);
final String relativePath = getRelativePath(parquetFileMetadata.filePath, metadataRootDirAbsPath);
mergeKeyValueMetaData(parquetFileMetadata, relativePath);
mergeBlocksInto(parquetFileMetadata, relativePath, mergedBlocks);
mergedCreatedBy.add(fileMetaData.getCreatedBy());
}
if (mergedKeyValueMetaData.size() != parquetFileMetadataList.size()) {
Expand Down Expand Up @@ -212,7 +193,8 @@ private static MessageType mergeSchemaInto(final MessageType schema, final Messa
* well as accumulate the required fields to generate a common table info later once all files are processed.</li>
* </ul>
*/
private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFileMetadata) throws IOException {
private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFileMetadata,
@NotNull final String relativePath) throws IOException {
final Map<String, String> keyValueMetaData =
parquetFileMetadata.metadata.getFileMetaData().getKeyValueMetaData();
for (final Map.Entry<String, String> entry : keyValueMetaData.entrySet()) {
Expand All @@ -232,12 +214,11 @@ private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFil
});
} else {
// Add a separate entry for each file
final String fileKey = getKeyForFile(new File(parquetFileMetadata.filePath).getName());
final String fileKey = getPerFileMetadataKey(relativePath);
// Assuming the keys are unique for each file because file names are unique, verified in the constructor
if (mergedKeyValueMetaData.containsKey(fileKey)) {
throw new UncheckedDeephavenException("Could not merge metadata for for file " +
parquetFileMetadata.filePath + " because has conflicting file name with another file. For "
+ " generating metadata files, file names should be unique");
throw new IllegalStateException("Could not merge metadata for file " +
parquetFileMetadata.filePath + " because it has conflicting file key: " + fileKey);
}
mergedKeyValueMetaData.put(fileKey, entry.getValue());

Expand Down Expand Up @@ -265,21 +246,24 @@ private void mergeKeyValueMetaData(@NotNull final ParquetFileMetadata parquetFil
}

private static void mergeBlocksInto(final ParquetFileMetadata parquetFileMetadata,
final Path metadataRootDirAbsPath, final Collection<BlockMetaData> mergedBlocks) {
final Path parquetFileAbsPath = new File(parquetFileMetadata.filePath).getAbsoluteFile().toPath();
String fileRelativePathString = metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString();
// Remove leading slashes from the relative path
int pos = 0;
while (pos < fileRelativePathString.length() && fileRelativePathString.charAt(pos) == '/') {
pos++;
}
fileRelativePathString = fileRelativePathString.substring(pos);
final String fileRelativePathString, final Collection<BlockMetaData> mergedBlocks) {
for (final BlockMetaData block : parquetFileMetadata.metadata.getBlocks()) {
block.setPath(fileRelativePathString);
mergedBlocks.add(block);
}
}

private static String getRelativePath(final String parquetFilePath, final Path metadataRootDirAbsPath) {
final Path parquetFileAbsPath = new File(parquetFilePath).getAbsoluteFile().toPath();
final String relativePath = metadataRootDirAbsPath.relativize(parquetFileAbsPath).toString();
// Remove leading slashes from the relative path
int pos = 0;
while (pos < relativePath.length() && relativePath.charAt(pos) == '/') {
pos++;
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return relativePath.substring(pos);
}

private void writeMetadataFile(final ParquetMetadata metadataFooter, final String outputPath) throws IOException {
final PositionedBufferedOutputStream metadataOutputStream =
new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(outputPath, false),
Expand Down
Loading
Loading