Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,13 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {

private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializer.Factory nullMaterializerFactory;

private URI uri;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
*/
Expand All @@ -62,12 +57,12 @@ public class ColumnChunkReaderImpl implements ColumnChunkReader {
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI columnChunkURI,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootURI = rootURI;
this.columnChunkURI = columnChunkURI;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand Down Expand Up @@ -122,15 +117,7 @@ public final ColumnPageDirectAccessor getPageAccessor() {
}

private URI getURI() {
if (uri != null) {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
}
return columnChunkURI;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;

/**
* A no-op implementation of MetadataFileWriterBase when we don't want to write metadata files for Parquet files.
*/
public final class NullParquetMetadataFileWriter implements ParquetMetadataFileWriter {

public static final NullParquetMetadataFileWriter INSTANCE = new NullParquetMetadataFileWriter();

private NullParquetMetadataFileWriter() {}

@Override
public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata metadata) {}

@Override
public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
private static final String MAGIC_STR = "PAR1";
static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static final String FILE_URI_SCHEME = "file";

public final FileMetaData fileMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,23 +40,29 @@ public final class ParquetFileWriter {
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
private final File metadataFilePath;
private final ParquetMetadataFileWriter metadataFileWriter;

public ParquetFileWriter(
final String filePath,
final File destFile,
final File metadataFilePath,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final SeekableChannelsProvider channelsProvider,
final int targetPageSize,
final ByteBufferAllocator allocator,
final MessageType type,
final String codecName,
final Map<String, String> extraMetaData) throws IOException {
final Map<String, String> extraMetaData,
@NotNull final ParquetMetadataFileWriter metadataFileWriter) throws IOException {
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.extraMetaData = new HashMap<>(extraMetaData);
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false),
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(destFile.getPath(), false),
OUTPUT_BUFFER_SIZE);
bufferedOutput.write(ParquetFileReader.MAGIC);
this.type = type;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
this.metadataFilePath = metadataFilePath;
this.metadataFileWriter = metadataFileWriter;
}

public RowGroupWriter addRowGroup(final long size) {
Expand All @@ -70,13 +78,16 @@ public void close() throws IOException {
serializeOffsetIndexes();
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer);
serializeFooter(footer, bufferedOutput);
metadataFileWriter.addParquetFileMetadata(metadataFilePath, footer);
// Flush any buffered data and close the channel
bufferedOutput.close();
compressorAdapter.close();
}

private void serializeFooter(final ParquetMetadata footer) throws IOException {
public static void serializeFooter(final ParquetMetadata footer,
final PositionedBufferedOutputStream bufferedOutput)
throws IOException {
final long footerIndex = bufferedOutput.position();
final org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(VERSION, footer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;
import java.io.IOException;

/**
* Used to write _metadata and _common_metadata files for Parquet.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface ParquetMetadataFileWriter {

/**
* Add the parquet metadata for the provided parquet file to the list of metadata to be written to combined metadata
* files.
*
* @param parquetFile The parquet file destination path
* @param metadata The parquet metadata corresponding to the parquet file
*/
void addParquetFileMetadata(File parquetFile, ParquetMetadata metadata);

/**
* Write the combined metadata files for all metadata accumulated so far and clear the list.
*
* @param metadataFile The destination file for the _metadata file
* @param commonMetadataFile The destination file for the _common_metadata file
*/
void writeMetadataFiles(File metadataFile, File commonMetadataFile) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;

final class PositionedBufferedOutputStream extends BufferedOutputStream {
public final class PositionedBufferedOutputStream extends BufferedOutputStream {

private final SeekableByteChannel writeChannel;

public PositionedBufferedOutputStream(final SeekableByteChannel writeChannel) {
super(Channels.newOutputStream(writeChannel));
this.writeChannel = writeChannel;
}

PositionedBufferedOutputStream(final SeekableByteChannel writeChannel, final int size) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
super(Channels.newOutputStream(writeChannel), size);
this.writeChannel = writeChannel;
}

/**
* Get total number of bytes written to this stream
* Get the total number of bytes written to this stream
*/
long position() throws IOException {
// Number of bytes buffered in the stream + bytes written to the underlying channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;

public class RowGroupReaderImpl implements RowGroupReader {

private static final int BUFFER_SIZE = 65536;
Expand Down Expand Up @@ -79,17 +82,26 @@ public ColumnChunkReaderImpl getColumnChunk(@NotNull final List<String> path,
return null;
}

final URI columnChunkURI;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
columnChunkURI = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
columnChunkURI = rootURI;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

OffsetIndex offsetIndex = null;
if (columnChunk.isSetOffset_index_offset()) {
try (final SeekableByteChannel readChannel = channelsProvider.getReadChannel(channelContext, rootURI)) {
try (final SeekableByteChannel readChannel =
channelsProvider.getReadChannel(channelContext, columnChunkURI)) {
readChannel.position(columnChunk.getOffset_index_offset());
offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(
new BufferedInputStream(Channels.newInputStream(readChannel), BUFFER_SIZE)));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
return new ColumnChunkReaderImpl(columnChunk, channelsProvider, rootURI, type, offsetIndex, fieldTypes,
return new ColumnChunkReaderImpl(columnChunk, channelsProvider, columnChunkURI, type, offsetIndex, fieldTypes,
numRows(), version);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public static int getDefaultTargetPageSize() {
return defaultTargetPageSize;
}

static final String DEFAULT_METADATA_ROOT_DIR = ""; // Empty = No metadata files written

public ParquetInstructions() {}

public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) {
Expand Down Expand Up @@ -164,6 +166,11 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean isRefreshing();

/**
* @return the directory in which metadata files should be stored.
*/
public abstract String getMetadataRootDir();

@VisibleForTesting
public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) {
if (i1 == EMPTY) {
Expand Down Expand Up @@ -238,6 +245,11 @@ public int getTargetPageSize() {
public boolean isRefreshing() {
return DEFAULT_IS_REFRESHING;
}

@Override
public String getMetadataRootDir() {
return DEFAULT_METADATA_ROOT_DIR;
}
};

private static class ColumnInstructions {
Expand Down Expand Up @@ -306,6 +318,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final int targetPageSize;
private final boolean isRefreshing;
private final Object specialInstructions;
private final String metadataRootDir;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -316,7 +329,8 @@ private ReadOnly(
final boolean isLegacyParquet,
final int targetPageSize,
final boolean isRefreshing,
final Object specialInstructions) {
final Object specialInstructions,
final String metadataRootDir) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -326,6 +340,7 @@ private ReadOnly(
this.targetPageSize = targetPageSize;
this.isRefreshing = isRefreshing;
this.specialInstructions = specialInstructions;
this.metadataRootDir = metadataRootDir;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -419,6 +434,10 @@ public boolean isRefreshing() {
return specialInstructions;
}

@Override
public String getMetadataRootDir() {
return metadataRootDir;
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
Expand Down Expand Up @@ -471,6 +490,7 @@ public static class Builder {
private int targetPageSize = defaultTargetPageSize;
private boolean isRefreshing = DEFAULT_IS_REFRESHING;
private Object specialInstructions;
private String metadataRootDir = DEFAULT_METADATA_ROOT_DIR;

public Builder() {}

Expand Down Expand Up @@ -647,6 +667,17 @@ public Builder setSpecialInstructions(final Object specialInstructions) {
return this;
}

/**
* Set the default metadata root directory.
*
* @param metadataRootDir the root directory to store metadata files in. All the parquet destinations should be
* inside this directory.
*/
public Builder setMetadataRootDir(final String metadataRootDir) {
this.metadataRootDir = metadataRootDir;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -655,7 +686,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions);
specialInstructions, metadataRootDir);
}
}

Expand Down
Loading
Loading