Skip to content

Commit

Permalink
feat: Added support to read parquet metadata files from S3 (#5777)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jul 19, 2024
1 parent 6ca0c89 commit cb9c83d
Show file tree
Hide file tree
Showing 17 changed files with 449 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelCon
return wrappedProvider.isCompatibleWith(channelContext);
}

@Override
public boolean exists(@NotNull final URI uri) {
return wrappedProvider.exists(uri);
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri)
throws IOException {
@NotNull final URI uri) throws IOException {
final String uriString = uri.toString();
final KeyedObjectHashMap<String, PerPathPool> channelPool = channelPools.get(ChannelType.Read);
final CachedChannel result = tryGetPooledChannel(uriString, channelPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelCo
return true;
}

@Override
public boolean exists(@NotNull final URI uri) {
return Files.exists(Path.of(uri));
}

@Override
public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext,
@NotNull final URI uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ default SeekableChannelContext makeSingleUseContext() {
*/
boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext);

/**
* Returns true if the given URI exists in the underlying storage.
*
* @param uri the URI to check
* @return true if the URI exists
*/
boolean exists(@NotNull URI uri);

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext)
return channelContext == SeekableChannelContext.NULL;
}

@Override
public boolean exists(@NotNull URI uri) {
throw new UnsupportedOperationException("exists");
}

@Override
public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext,
@NotNull String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3976,7 +3976,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readTable(
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetKeyValuePartitionedLayout.create(testRootFile.toURI(), 2, ParquetInstructions.EMPTY, null),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
import io.deephaven.util.datastructures.SoftCachingFunction;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand All @@ -28,14 +27,12 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetUtils.resolve;
import static io.deephaven.parquet.base.ColumnPageReaderImpl.getDecompressorHolder;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;

Expand Down Expand Up @@ -83,12 +80,10 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
if (columnChunk.isSetFile_path()) {
columnChunkURI = resolve(rootURI, columnChunk.getFile_path());
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
columnChunkURI = rootURI;
}
// Construct the reader object but don't read the offset index yet
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import static io.deephaven.base.FileUtils.URI_SEPARATOR;
import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR;

public final class ParquetUtils {

public static final String PARQUET_FILE_EXTENSION = ".parquet";

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";
public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME;
public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;
private static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME;
private static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME;

private static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);

private static final String WINDOWS_FILE_SEPARATOR = "\\";

/**
* The number of bytes to buffer before flushing while writing parquet files and metadata files.
*/
Expand All @@ -42,17 +50,23 @@ public static String getPerFileMetadataKey(final String filePath) {
}

/**
* This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local
* file path or a URI. Also, it can point to a parquet file, metadata file or a directory.
* This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI.
*/
public static boolean isParquetFile(@NotNull final String source) {
boolean ret = source.endsWith(PARQUET_FILE_EXTENSION)
|| source.endsWith(METADATA_FILE_URI_SUFFIX)
|| source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX);
return source.endsWith(PARQUET_FILE_EXTENSION);
}

/**
* This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI.
*/
public static boolean isMetadataFile(@NotNull final String source) {
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
return true;
}
if (File.separatorChar != URI_SEPARATOR_CHAR) {
ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX);
}
return ret;
return false;
}

/**
Expand All @@ -74,4 +88,18 @@ public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull
}
return true;
}

/**
* Resolve a relative path against a base URI. The path can be from Windows or Unix systems.
*/
public static URI resolve(final URI base, final String relativePath) {
final URI relativeURI;
try {
// Sanitize the relative path before resolving it to avoid issues with separators and special characters
relativeURI = new URI(null, null, relativePath.replace(WINDOWS_FILE_SEPARATOR, URI_SEPARATOR), null);
} catch (final URISyntaxException e) {
throw new UncheckedDeephavenException("Failed to create URI from relative path: " + relativePath, e);
}
return base.resolve(relativeURI);
}
}
Loading

0 comments on commit cb9c83d

Please sign in to comment.