From f8ae4ef3ee14b5f64b7c0ce54387d8a9945a992d Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 30 Jan 2024 23:55:51 +0530 Subject: [PATCH 01/45] Copied methods from inside parquet hadoop to write metadata files --- extensions/parquet/base/build.gradle | 1 + .../parquet/base/ColumnChunkReaderImpl.java | 2 +- .../parquet/base/ParquetFileWriter.java | 145 ++++++++++++++++++ .../parquet/base/RowGroupReaderImpl.java | 15 +- .../table/ParquetTableReadWriteTest.java | 15 ++ 5 files changed, 176 insertions(+), 2 deletions(-) diff --git a/extensions/parquet/base/build.gradle b/extensions/parquet/base/build.gradle index 028731cd85a..3dc24f29c84 100644 --- a/extensions/parquet/base/build.gradle +++ b/extensions/parquet/base/build.gradle @@ -9,6 +9,7 @@ dependencies { api project(':util-channel') Classpaths.inheritParquetHadoop(project) + Classpaths.inheritParquetHadoopConfiguration(project) implementation project(':extensions-parquet-compression') implementation project(':Base') diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index 9a1570aac53..88de4065b1e 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -125,7 +125,7 @@ private URI getURI() { if (uri != null) { return uri; } - if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(uri.getScheme())) { + if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) { return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri(); } else { // TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index db4515ed364..9572c84a5ed 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -3,6 +3,8 @@ */ package io.deephaven.parquet.base; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.format.converter.ParquetMetadataConverter; import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; @@ -12,18 +14,26 @@ import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.Footer; import org.apache.parquet.hadoop.metadata.*; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.parquet.format.Util.writeFileMetaData; +import static org.apache.parquet.hadoop.ParquetFileWriter.CURRENT_VERSION; +import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC; public final class ParquetFileWriter { private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); @@ -38,6 +48,10 @@ public final class ParquetFileWriter { private final Map extraMetaData; private final List blocks = new ArrayList<>(); private final List> offsetIndexes = new ArrayList<>(); + private final String filePath; + private final SeekableChannelsProvider channelsProvider; + private static final String PARQUET_METADATA_FILE = "_metadata"; + private static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; public ParquetFileWriter( final String filePath, @@ -55,6 +69,8 @@ public ParquetFileWriter( bufferedOutput.write(ParquetFileReader.MAGIC); this.type = type; this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName); + this.filePath = filePath; + this.channelsProvider = channelsProvider; } public RowGroupWriter addRowGroup(final long size) { @@ -74,8 +90,137 @@ public void close() throws IOException { // Flush any buffered data and close the channel bufferedOutput.close(); compressorAdapter.close(); + // Write the metadata file + final Path metadataDirPath = new Path(new File(filePath).getParent()); + final String actualFilePath = filePath.replace(".NEW_", ""); + final Footer fsFooter = new Footer(new Path(actualFilePath), footer); + final ArrayList