Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@
* API for writing DH tables in parquet format
*/
public class ParquetTableWriter {
public static final String BEGIN_POS = "dh_begin_pos";
public static final String END_POS = "dh_end_pos";
public static final String GROUPING_KEY = "dh_key";
public static final String GROUPING_KEY_COLUMN_NAME = "dh_key";
public static final String GROUPING_BEGIN_POS_COLUMN_NAME = "dh_begin_pos";
public static final String GROUPING_END_POS_COLUMN_NAME = "dh_end_pos";

public static final String INDEX_ROW_SET_COLUMN_NAME = "dh_row_set";


/**
* Helper struct used to pass information about where to write the index files
Expand All @@ -71,7 +74,7 @@ static class IndexWritingInfo {
*/
final String[] parquetColumnNames;
/**
* File path to be added in the index metadata of main parquet file
* File path to be added in the index metadata of the main parquet file
*/
final File destFileForMetadata;
/**
Expand All @@ -81,8 +84,13 @@ static class IndexWritingInfo {
*/
final File destFile;

GroupingColumnWritingInfo(final String parquetColumnName, final File destFileForMetadata, final File destFile) {
this.parquetColumnName = parquetColumnName;
IndexWritingInfo(
final String[] indexColumnNames,
final String[] parquetColumnNames,
final File destFileForMetadata,
final File destFile) {
this.indexColumnNames = indexColumnNames;
this.parquetColumnNames = parquetColumnNames;
this.destFileForMetadata = destFileForMetadata;
this.destFile = destFile;
}
Expand All @@ -99,8 +107,8 @@ static class IndexWritingInfo {
* {@code destFilePath} if we are writing the parquet file to a shadow location first since the metadata
* should always hold the accurate path.
* @param incomingMeta A map of metadata values to be stores in the file footer
* @param groupingColumnsWritingInfoMap A map of grouping column names to their respective info used for writing
* grouping files
* @param indexInfoList Arrays containing the column names for indexes to persist as sidecar tables. Indexes that
* are specified but missing will be computed on demand.
* @param metadataFileWriter The writer for the {@value ParquetUtils#METADATA_FILE_NAME} and
* {@value ParquetUtils#COMMON_METADATA_FILE_NAME} files
* @param computedCache When we need to perform some computation depending on column data to make a decision
Expand All @@ -117,31 +125,53 @@ static void write(
@NotNull final String destFilePath,
@NotNull final String destFilePathForMetadata,
@NotNull final Map<String, String> incomingMeta,
@Nullable final Map<String, GroupingColumnWritingInfo> groupingColumnsWritingInfoMap,
@Nullable final List<ParquetTableWriter.IndexWritingInfo> indexInfoList,
@NotNull final ParquetMetadataFileWriter metadataFileWriter,
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache)
throws IOException {
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache) throws IOException {
if (t.isRefreshing()) {
/*
* We mustn't write inconsistent tables or data indexes. This check is "basic". Snapshotting logic here
* would probably be inappropriate, as we might be writing very large tables. Hopefully users aren't naively
* writing Parquet tables from within listeners or transforms without ensuring proper dependency
* satisfaction for the table and any indexes it has.
*/
t.getUpdateGraph().checkInitiateSerialTableOperation();
}

final TableInfo.Builder tableInfoBuilder = TableInfo.builder();
List<File> cleanupFiles = null;
try {
if (groupingColumnsWritingInfoMap != null) {
cleanupFiles = new ArrayList<>(groupingColumnsWritingInfoMap.size());
if (indexInfoList != null) {
cleanupFiles = new ArrayList<>(indexInfoList.size());
final Path destDirPath = Paths.get(destFilePath).getParent();
for (final Map.Entry<String, GroupingColumnWritingInfo> entry : groupingColumnsWritingInfoMap
.entrySet()) {
final String groupingColumnName = entry.getKey();
final Table auxiliaryTable = groupingAsTable(t, groupingColumnName);
final String parquetColumnName = entry.getValue().parquetColumnName;
final File groupingFileDestForMetadata = entry.getValue().destFileForMetadata;
final File groupingFileDest = entry.getValue().destFile;
cleanupFiles.add(groupingFileDest);
tableInfoBuilder.addGroupingColumns(GroupingColumnInfo.of(parquetColumnName,
destDirPath.relativize(groupingFileDestForMetadata.toPath()).toString()));
// We don't accumulate metadata from grouping files into the main metadata file
write(auxiliaryTable, auxiliaryTable.getDefinition(), writeInstructions,
groupingFileDest.getAbsolutePath(),
groupingFileDestForMetadata.getAbsolutePath(), Collections.emptyMap(), TableInfo.builder(),
NullParquetMetadataFileWriter.INSTANCE, computedCache);
for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) {
try (final SafeCloseable ignored = t.isRefreshing() ? LivenessScopeStack.open() : null) {
// This will retrieve an existing index if one exists, or create a new one if not
final BasicDataIndex dataIndex = Optional
.ofNullable(DataIndexer.getDataIndex(t, info.indexColumnNames))
.or(() -> Optional.of(DataIndexer.getOrCreateDataIndex(t, info.indexColumnNames)))
.get()
.transform(DataIndexTransformer.builder().invertRowSet(t.getRowSet()).build());
final Table indexTable = dataIndex.table();

cleanupFiles.add(info.destFile);
tableInfoBuilder.addDataIndexes(DataIndexInfo.of(
destDirPath.relativize(info.destFileForMetadata.toPath()).toString(),
info.parquetColumnNames));
final ParquetInstructions writeInstructionsToUse;
if (INDEX_ROW_SET_COLUMN_NAME.equals(dataIndex.rowSetColumnName())) {
writeInstructionsToUse = writeInstructions;
} else {
writeInstructionsToUse = new ParquetInstructions.Builder(writeInstructions)
.addColumnNameMapping(INDEX_ROW_SET_COLUMN_NAME, dataIndex.rowSetColumnName())
.build();
}
// We don't accumulate metadata from grouping files into the main metadata file
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
write(indexTable, indexTable.getDefinition(), writeInstructionsToUse,
info.destFile.getAbsolutePath(),
info.destFileForMetadata.getAbsolutePath(), Collections.emptyMap(), TableInfo.builder(),
NullParquetMetadataFileWriter.INSTANCE, computedCache);
}
}
}
write(t, definition, writeInstructions, destFilePath, destFilePathForMetadata, incomingMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.deephaven.util.channel.SeekableChannelsProviderPlugin;
import io.deephaven.vector.*;
import io.deephaven.stringset.StringSet;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.PartitionAwareSourceTable;
import io.deephaven.engine.table.impl.SimpleSourceTable;
import io.deephaven.engine.table.impl.indexer.DataIndexer;
Expand All @@ -36,9 +37,7 @@
import io.deephaven.engine.table.impl.locations.impl.PollingTableLocationProvider;
import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService;
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.parquet.base.ParquetFileReader;
Expand All @@ -49,14 +48,9 @@
import io.deephaven.parquet.table.location.ParquetTableLocationFactory;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.parquet.table.metadata.ColumnTypeInfo;
import io.deephaven.stringset.StringSet;
import io.deephaven.util.SimpleTypeMap;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import io.deephaven.util.channel.SeekableChannelsProviderPlugin;
import io.deephaven.vector.*;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
Expand All @@ -81,10 +75,10 @@
import static io.deephaven.parquet.table.ParquetInstructions.FILE_INDEX_TOKEN;
import static io.deephaven.parquet.table.ParquetInstructions.PARTITIONS_TOKEN;
import static io.deephaven.parquet.table.ParquetInstructions.UUID_TOKEN;
import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable;
import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION;
import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_NAME;
import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_NAME;
import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;

/**
Expand Down Expand Up @@ -697,9 +691,11 @@ private static void writeKeyValuePartitionedTableImpl(@NotNull final Partitioned
}
final Map<String, Map<ParquetCacheTags, Object>> computedCache =
buildComputedCache(() -> sourceTable.orElseGet(partitionedTable::merge), leafDefinition);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
writeParquetTablesImpl(partitionedData.toArray(Table[]::new), leafDefinition, writeInstructions,
destinations.toArray(File[]::new), leafDefinition.getGroupingColumnNamesArray(),
partitioningColumnsSchema, new File(destinationRoot), computedCache);
final Table[] partitionedDataArray = partitionedData.toArray(Table[]::new);
// TODO Verify correctness with Ryan/Larry
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
writeParquetTablesImpl(partitionedDataArray, leafDefinition, writeInstructions,
destinations.toArray(File[]::new), indexedColumnNames(partitionedDataArray), partitioningColumnsSchema,
new File(destinationRoot), computedCache);
}

/**
Expand Down Expand Up @@ -756,25 +752,25 @@ private static Map<String, Map<ParquetCacheTags, Object>> buildComputedCache(
}

/**
* Writes tables to disk in parquet format to a supplied set of destinations. If you specify grouping columns, there
* must already be grouping information for those columns in the sources. This can be accomplished with
* {@code .groupBy(<grouping columns>).ungroup()} or {@code .sort(<grouping column>)}.
* Writes tables to disk in parquet format to a supplied set of destinations.
*
* @param sources The tables to write
* @param definition The common definition for all the tables to write
* @param writeInstructions Write instructions for customizations while writing
* @param destinations The destination paths. Any non-existing directories in the paths provided are created. If
* there is an error, any intermediate directories previously created are removed; note this makes this
* method unsafe for concurrent use
* @param groupingColumns List of columns the tables are grouped by (the write operation will store the grouping
* info)
* method unsafe for concurrent use.
* @param indexColumnArr Arrays containing the column names for indexes to persist. The write operation will store
* the index info as sidecar tables. This argument is used to narrow the set of indexes to write, or to be
* explicit about the expected set of indexes present on all sources. Indexes that are specified but missing
* will be computed on demand.
*/
public static void writeParquetTables(
@NotNull final Table[] sources,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions writeInstructions,
@NotNull final File[] destinations,
@Nullable final String[] groupingColumns) {
@Nullable final String[][] indexColumnArr) {
final File metadataRootDir;
if (writeInstructions.generateMetadataFiles()) {
// We insist on writing the metadata file in the same directory as the destination files, thus all
Expand All @@ -795,19 +791,19 @@ public static void writeParquetTables(
buildComputedCache(() -> PartitionedTableFactory.ofTables(definition, sources).merge(), definition);
// We do not have any additional schema for partitioning columns in this case. Schema for all columns will be
// generated at the time of writing the parquet files and merged to generate the metadata files.
writeParquetTablesImpl(sources, definition, writeInstructions, destinations, groupingColumns,
writeParquetTablesImpl(sources, definition, writeInstructions, destinations, indexColumnArr,
null, metadataRootDir, computedCache);
}

/**
* Refer to {@link #writeParquetTables(Table[], TableDefinition, ParquetInstructions, File[], String[])} for more
* Refer to {@link #writeParquetTables(Table[], TableDefinition, ParquetInstructions, File[], String[][])} for more
* details.
*/
private static void writeParquetTablesImpl(@NotNull final Table[] sources,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions writeInstructions,
@NotNull final File[] destinations,
@Nullable final String[] groupingColumns,
@Nullable final String[][] indexColumnArr,
@Nullable final MessageType partitioningColumnsSchema,
@Nullable final File metadataRootDir,
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache) {
Expand Down Expand Up @@ -840,16 +836,16 @@ private static void writeParquetTablesImpl(@NotNull final Table[] sources,
// List of all destination files (including index files), to roll back in case of exceptions
final List<File> destFiles = new ArrayList<>();
try {
final List<Map<String, ParquetTableWriter.GroupingColumnWritingInfo>> groupingColumnWritingInfoMaps;
if (groupingColumns == null || groupingColumns.length == 0) {
// Write the tables without any grouping info
groupingColumnWritingInfoMaps = null;
final List<List<ParquetTableWriter.IndexWritingInfo>> indexInfoLists;
if (indexColumnArr == null || indexColumnArr.length == 0) {
// Write the tables without any index info
indexInfoLists = null;
for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) {
shadowFiles.add(shadowDestFiles[tableIdx]);
final Table source = sources[tableIdx];
ParquetTableWriter.write(source, definition, writeInstructions, shadowDestFiles[tableIdx].getPath(),
destinations[tableIdx].getPath(), Collections.emptyMap(),
(Map<String, ParquetTableWriter.GroupingColumnWritingInfo>) null, metadataFileWriter,
(List<ParquetTableWriter.IndexWritingInfo>) null, metadataFileWriter,
computedCache);
}
} else {
Expand All @@ -875,7 +871,7 @@ private static void writeParquetTablesImpl(@NotNull final Table[] sources,
final Table sourceTable = sources[tableIdx];
ParquetTableWriter.write(sourceTable, definition, writeInstructions,
shadowDestFiles[tableIdx].getPath(), tableDestination.getPath(), Collections.emptyMap(),
groupingColumnWritingInfoMap, metadataFileWriter, computedCache);
indexInfoList, metadataFileWriter, computedCache);
}
}

malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -898,12 +894,11 @@ private static void writeParquetTablesImpl(@NotNull final Table[] sources,
for (int tableIdx = 0; tableIdx < sources.length; tableIdx++) {
destFiles.add(destinations[tableIdx]);
installShadowFile(destinations[tableIdx], shadowDestFiles[tableIdx]);
if (groupingColumnWritingInfoMaps != null) {
final Map<String, ParquetTableWriter.GroupingColumnWritingInfo> gcwim =
groupingColumnWritingInfoMaps.get(tableIdx);
for (final ParquetTableWriter.GroupingColumnWritingInfo gfwi : gcwim.values()) {
final File indexDestFile = gfwi.destFileForMetadata;
final File shadowIndexFile = gfwi.destFile;
if (indexInfoLists != null) {
final List<ParquetTableWriter.IndexWritingInfo> indexInfoList = indexInfoLists.get(tableIdx);
for (final ParquetTableWriter.IndexWritingInfo info : indexInfoList) {
final File indexDestFile = info.destFileForMetadata;
final File shadowIndexFile = info.destFile;
destFiles.add(indexDestFile);
installShadowFile(indexDestFile, shadowIndexFile);
}
Expand Down
Loading
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.