Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ public interface ColumnChunkReader {
int getMaxRl();

/**
* @return The offset index for this column chunk, or null if it not found in the metadata.
* @return Whether the column chunk has offset index information set in the metadata or not.
*/
@Nullable
OffsetIndex getOffsetIndex();
boolean hasOffsetIndex();

/**
* Get the offset index for a column chunk.
*
* @param context The channel context to use for reading the offset index.
*
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
OffsetIndex getOffsetIndex(final SeekableChannelContext context);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Used to iterate over column page readers for each page with the capability to set channel context to for reading
Expand Down Expand Up @@ -69,9 +76,9 @@ interface ColumnPageDirectAccessor {
}

/**
* @return An accessor for individual parquet pages.
* @return An accessor for individual parquet pages which uses the provided offset index.
*/
ColumnPageDirectAccessor getPageAccessor();
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex);

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,24 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;

final class ColumnChunkReaderImpl implements ColumnChunkReader {

private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory nullMaterializerFactory;

private URI uri;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
*/
Expand All @@ -61,12 +54,11 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI columnChunkURI,
MessageType type, List<Type> fieldTypes, final long numRows, final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootURI = rootURI;
this.columnChunkURI = columnChunkURI;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand All @@ -75,12 +67,15 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
} else {
decompressor = CompressorAdapter.PASSTHRU;
}
this.offsetIndex = offsetIndex;
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new LazyCachingFunction<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
this.numRows = numRows;
this.version = version;
// Construct the reader object but don't read the offset index yet
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())
? new OffsetIndexReaderImpl(channelsProvider, columnChunk, columnChunkURI)
: OffsetIndexReader.NULL;
}

@Override
Expand All @@ -98,8 +93,15 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public OffsetIndex getOffsetIndex() {
return offsetIndex;
@Override
public boolean hasOffsetIndex() {
return columnChunk.isSetOffset_index_offset();
}

@Override
public OffsetIndex getOffsetIndex(final SeekableChannelContext context) {
// Read the offset index if it hasn't been read yet
return offsetIndexReader.getOffsetIndex(context);
}

@Override
Expand All @@ -108,23 +110,15 @@ public ColumnPageReaderIterator getPageIterator() {
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl();
return new ColumnPageDirectAccessorImpl(offsetIndex);
}

private URI getURI() {
if (uri != null) {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
}
return columnChunkURI;
}

@Override
Expand Down Expand Up @@ -307,7 +301,11 @@ private static int getNumValues(PageHeader pageHeader) {

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}
private final OffsetIndex offsetIndex;

ColumnPageDirectAccessorImpl(final OffsetIndex offsetIndex) {
this.offsetIndex = offsetIndex;
}

@Override
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;

/**
* A no-op implementation of MetadataFileWriterBase when we don't want to write metadata files for Parquet files.
*/
public final class NullParquetMetadataFileWriter implements ParquetMetadataFileWriter {

public static final NullParquetMetadataFileWriter INSTANCE = new NullParquetMetadataFileWriter();

private NullParquetMetadataFileWriter() {}

@Override
public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata metadata) {}

@Override
public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) {}

@Override
public void clear() {}
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.deephaven.parquet.base;

import io.deephaven.util.channel.SeekableChannelContext;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

/**
* Interface for reading the offset index for a column chunk.
*/
public interface OffsetIndexReader {

/**
* Get the offset index for a column chunk.
*
* @param context The channel context to use for reading the offset index.
*/
OffsetIndex getOffsetIndex(SeekableChannelContext context);

/**
* A null implementation of the offset index reader.
*/
OffsetIndexReader NULL = context -> null;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.deephaven.parquet.base;

import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.parquet.format.ColumnChunk;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.format.Util;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;

/**
* Implementation of {@link OffsetIndexReader}, which reads the offset index for a column chunk on demand.
*/
final class OffsetIndexReaderImpl implements OffsetIndexReader {

private final SeekableChannelsProvider channelsProvider;
private final ColumnChunk chunk;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private final URI columnChunkURI;
private OffsetIndex offsetIndex;

OffsetIndexReaderImpl(final SeekableChannelsProvider channelsProvider, final ColumnChunk chunk,
final URI columnChunkURI) {
this.channelsProvider = channelsProvider;
this.chunk = chunk;
this.columnChunkURI = columnChunkURI;
this.offsetIndex = null;
}

@Override
public OffsetIndex getOffsetIndex(@NotNull final SeekableChannelContext context) {
if (offsetIndex != null) {
return offsetIndex;
}
return readOffsetIndex(context);
}

private OffsetIndex readOffsetIndex(@NotNull final SeekableChannelContext channelContext) {
try (
final SeekableChannelContext.ContextHolder holder =
SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel readChannel = channelsProvider.getReadChannel(holder.get(), columnChunkURI);
final InputStream in =
channelsProvider.getInputStream(readChannel.position(chunk.getOffset_index_offset()))) {
return (offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(in)));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
private static final String MAGIC_STR = "PAR1";
static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
public static final String FILE_URI_SCHEME = "file";

public final FileMetaData fileMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -38,23 +40,29 @@ public final class ParquetFileWriter {
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
private final File metadataFilePath;
private final ParquetMetadataFileWriter metadataFileWriter;

public ParquetFileWriter(
final String filePath,
final File destFile,
final File metadataFilePath,
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final SeekableChannelsProvider channelsProvider,
final int targetPageSize,
final ByteBufferAllocator allocator,
final MessageType type,
final String codecName,
final Map<String, String> extraMetaData) throws IOException {
final Map<String, String> extraMetaData,
@NotNull final ParquetMetadataFileWriter metadataFileWriter) throws IOException {
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.extraMetaData = new HashMap<>(extraMetaData);
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false),
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(destFile.getPath(), false),
OUTPUT_BUFFER_SIZE);
bufferedOutput.write(ParquetFileReader.MAGIC);
this.type = type;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
this.metadataFilePath = metadataFilePath;
this.metadataFileWriter = metadataFileWriter;
}

public RowGroupWriter addRowGroup(final long size) {
Expand All @@ -70,13 +78,16 @@ public void close() throws IOException {
serializeOffsetIndexes();
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer);
serializeFooter(footer, bufferedOutput);
metadataFileWriter.addParquetFileMetadata(metadataFilePath, footer);
// Flush any buffered data and close the channel
bufferedOutput.close();
compressorAdapter.close();
}

private void serializeFooter(final ParquetMetadata footer) throws IOException {
public static void serializeFooter(final ParquetMetadata footer,
final PositionedBufferedOutputStream bufferedOutput)
throws IOException {
final long footerIndex = bufferedOutput.position();
final org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(VERSION, footer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.deephaven.parquet.base;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.File;
import java.io.IOException;

/**
* Used to write _metadata and _common_metadata files for Parquet.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface ParquetMetadataFileWriter {

/**
* Add the parquet metadata for the provided parquet file to the list of metadata to be written to combined metadata
* files.
*
* @param parquetFile The parquet file destination path
* @param metadata The parquet metadata corresponding to the parquet file
*/
void addParquetFileMetadata(File parquetFile, ParquetMetadata metadata);

/**
* Write the combined metadata files for all metadata accumulated so far and clear the list.
*
* @param metadataFile The destination file for the _metadata file
* @param commonMetadataFile The destination file for the _common_metadata file
*/
void writeMetadataFiles(File metadataFile, File commonMetadataFile) throws IOException;

/**
* Clear the list of metadata accumulated so far.
*/
void clear();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;

final class PositionedBufferedOutputStream extends BufferedOutputStream {
public final class PositionedBufferedOutputStream extends BufferedOutputStream {

private final SeekableByteChannel writeChannel;

public PositionedBufferedOutputStream(final SeekableByteChannel writeChannel) {
super(Channels.newOutputStream(writeChannel));
this.writeChannel = writeChannel;
}

PositionedBufferedOutputStream(final SeekableByteChannel writeChannel, final int size) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
super(Channels.newOutputStream(writeChannel), size);
this.writeChannel = writeChannel;
}

/**
* Get total number of bytes written to this stream
* Get the total number of bytes written to this stream
*/
long position() throws IOException {
// Number of bytes buffered in the stream + bytes written to the underlying channel
Expand Down
Loading
Loading