diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java index bb5bafda49b..92666983263 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/CachedChannelProvider.java @@ -87,10 +87,14 @@ public boolean isCompatibleWith(@NotNull final SeekableChannelContext channelCon return wrappedProvider.isCompatibleWith(channelContext); } + @Override + public boolean exists(@NotNull final URI uri) { + return wrappedProvider.exists(uri); + } + @Override public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext, - @NotNull final URI uri) - throws IOException { + @NotNull final URI uri) throws IOException { final String uriString = uri.toString(); final KeyedObjectHashMap channelPool = channelPools.get(ChannelType.Read); final CachedChannel result = tryGetPooledChannel(uriString, channelPool); diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java index 7b7b23b24ba..48083b074c3 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/LocalFSChannelProvider.java @@ -33,6 +33,11 @@ public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelCo return true; } + @Override + public boolean exists(@NotNull final URI uri) { + return Files.exists(Path.of(uri)); + } + @Override public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext, @NotNull final URI uri) diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index aca50b64cbf..951224b7d8f 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -58,6 +58,14 @@ default SeekableChannelContext makeSingleUseContext() { */ boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext); + /** + * Returns true if the given URI exists in the underlying storage. + * + * @param uri the URI to check + * @return true if the URI exists + */ + boolean exists(@NotNull URI uri); + default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr) throws IOException { return getReadChannel(channelContext, convertToURI(uriStr, false)); diff --git a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java index 66e401633cb..0f23fab7d39 100644 --- a/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java +++ b/Util/channel/src/test/java/io/deephaven/util/channel/CachedChannelProviderTest.java @@ -208,6 +208,11 @@ public boolean isCompatibleWith(@NotNull SeekableChannelContext channelContext) return channelContext == SeekableChannelContext.NULL; } + @Override + public boolean exists(@NotNull URI uri) { + throw new UnsupportedOperationException("exists"); + } + @Override public SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String path) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java index 09f0075f025..7dc900779b8 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java @@ -3976,7 +3976,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException { t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num"); final Table loaded = ParquetTools.readTable( - new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY), + ParquetKeyValuePartitionedLayout.create(testRootFile.toURI(), 2, ParquetInstructions.EMPTY, null), ParquetInstructions.EMPTY); // verify the sources are identical diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index b0b188dc432..042a0b5c72d 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -10,7 +10,6 @@ import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory; import io.deephaven.util.channel.SeekableChannelContext.ContextHolder; import io.deephaven.util.datastructures.SoftCachingFunction; -import org.apache.commons.io.FilenameUtils; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -28,14 +27,12 @@ import java.io.UncheckedIOException; import java.net.URI; import java.nio.channels.SeekableByteChannel; -import java.nio.file.Path; import java.util.List; import java.util.NoSuchElementException; import java.util.function.Function; -import static io.deephaven.base.FileUtils.convertToURI; +import static io.deephaven.parquet.base.ParquetUtils.resolve; import static io.deephaven.parquet.base.ColumnPageReaderImpl.getDecompressorHolder; -import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; import static org.apache.parquet.format.Encoding.RLE_DICTIONARY; @@ -83,12 +80,10 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader { this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary); this.numRows = numRows; this.version = version; - if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) { - final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path()); - this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false); + if (columnChunk.isSetFile_path()) { + columnChunkURI = resolve(rootURI, columnChunk.getFile_path()); } else { - // TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs - this.columnChunkURI = rootURI; + columnChunkURI = rootURI; } // Construct the reader object but don't read the offset index yet this.offsetIndexReader = (columnChunk.isSetOffset_index_offset()) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java index 5af9e80e98b..70f83f9adfc 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetUtils.java @@ -3,26 +3,34 @@ // package io.deephaven.parquet.base; +import io.deephaven.UncheckedDeephavenException; import org.jetbrains.annotations.NotNull; import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import static io.deephaven.base.FileUtils.URI_SEPARATOR; import static io.deephaven.base.FileUtils.URI_SEPARATOR_CHAR; public final class ParquetUtils { + public static final String PARQUET_FILE_EXTENSION = ".parquet"; + public static final String METADATA_FILE_NAME = "_metadata"; public static final String COMMON_METADATA_FILE_NAME = "_common_metadata"; - public static final String PARQUET_FILE_EXTENSION = ".parquet"; public static final String METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + METADATA_FILE_NAME; public static final String COMMON_METADATA_FILE_URI_SUFFIX = URI_SEPARATOR_CHAR + COMMON_METADATA_FILE_NAME; - public static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME; - public static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME; + private static final String METADATA_FILE_SUFFIX = File.separatorChar + METADATA_FILE_NAME; + private static final String COMMON_METADATA_FILE_SUFFIX = File.separatorChar + COMMON_METADATA_FILE_NAME; + private static final String MAGIC_STR = "PAR1"; public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII); + private static final String WINDOWS_FILE_SEPARATOR = "\\"; + /** * The number of bytes to buffer before flushing while writing parquet files and metadata files. */ @@ -42,17 +50,23 @@ public static String getPerFileMetadataKey(final String filePath) { } /** - * This method verifies if the source points to a parquet file or a metadata file. Provided source can be a local - * file path or a URI. Also, it can point to a parquet file, metadata file or a directory. + * This method verifies if the source points to a parquet file. Provided source can be a local file path or a URI. */ public static boolean isParquetFile(@NotNull final String source) { - boolean ret = source.endsWith(PARQUET_FILE_EXTENSION) - || source.endsWith(METADATA_FILE_URI_SUFFIX) - || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX); + return source.endsWith(PARQUET_FILE_EXTENSION); + } + + /** + * This method verifies if the source points to a metadata file. Provided source can be a local file path or a URI. + */ + public static boolean isMetadataFile(@NotNull final String source) { + if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) { + return true; + } if (File.separatorChar != URI_SEPARATOR_CHAR) { - ret = ret || source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX); + return source.endsWith(METADATA_FILE_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_SUFFIX); } - return ret; + return false; } /** @@ -74,4 +88,18 @@ public static boolean isVisibleParquetFile(@NotNull final Path rootDir, @NotNull } return true; } + + /** + * Resolve a relative path against a base URI. The path can be from Windows or Unix systems. + */ + public static URI resolve(final URI base, final String relativePath) { + final URI relativeURI; + try { + // Sanitize the relative path before resolving it to avoid issues with separators and special characters + relativeURI = new URI(null, null, relativePath.replace(WINDOWS_FILE_SEPARATOR, URI_SEPARATOR), null); + } catch (final URISyntaxException e) { + throw new UncheckedDeephavenException("Failed to create URI from relative path: " + relativePath, e); + } + return base.resolve(relativeURI); + } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index bf2c227828e..19d8ff09c71 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -22,6 +22,8 @@ import io.deephaven.parquet.base.ParquetMetadataFileWriter; import io.deephaven.parquet.base.NullParquetMetadataFileWriter; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import io.deephaven.util.channel.SeekableChannelsProviderPlugin; import io.deephaven.vector.*; import io.deephaven.engine.table.*; @@ -37,7 +39,7 @@ import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; -import io.deephaven.parquet.base.ParquetFileReader; +import io.deephaven.parquet.base.ParquetUtils; import io.deephaven.parquet.table.layout.ParquetFlatPartitionedLayout; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; import io.deephaven.parquet.table.layout.ParquetMetadataFileLayout; @@ -51,22 +53,14 @@ import org.jetbrains.annotations.Nullable; import java.io.File; -import java.io.IOException; import java.math.BigDecimal; import java.net.URI; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.BasicFileAttributes; import java.util.*; import java.util.function.Supplier; import java.util.stream.Collectors; import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; -import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_URI_SUFFIX; -import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_URI_SUFFIX; -import static io.deephaven.parquet.base.ParquetUtils.isParquetFile; import static io.deephaven.parquet.table.ParquetInstructions.FILE_INDEX_TOKEN; import static io.deephaven.parquet.table.ParquetInstructions.PARTITIONS_TOKEN; import static io.deephaven.parquet.table.ParquetInstructions.UUID_TOKEN; @@ -132,7 +126,9 @@ public static Table readTable(@NotNull final String source) { public static Table readTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions) { - final boolean isDirectory = !isParquetFile(source); + final boolean isParquetFile = ParquetUtils.isParquetFile(source); + final boolean isMetadataFile = !isParquetFile && ParquetUtils.isMetadataFile(source); + final boolean isDirectory = !isParquetFile && !isMetadataFile; final URI sourceURI = convertToURI(source, isDirectory); if (readInstructions.getFileLayout().isPresent()) { switch (readInstructions.getFileLayout().get()) { @@ -141,23 +137,18 @@ public static Table readTable( case FLAT_PARTITIONED: return readFlatPartitionedTable(sourceURI, readInstructions); case KV_PARTITIONED: - return readKeyValuePartitionedTable(sourceURI, readInstructions); + return readKeyValuePartitionedTable(sourceURI, readInstructions, null); case METADATA_PARTITIONED: - return readPartitionedTableWithMetadata(sourceURI, readInstructions); + return readPartitionedTableWithMetadata(sourceURI, readInstructions, null); } } - if (FILE_URI_SCHEME.equals(sourceURI.getScheme())) { - return readTableFromFileUri(sourceURI, readInstructions); - } - if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) { - throw new UnsupportedOperationException("We currently do not support reading parquet metadata files " + - "from non local storage"); - } - if (!isDirectory) { + if (isParquetFile) { return readSingleFileTable(sourceURI, readInstructions); } - // Both flat partitioned and key-value partitioned data can be read under key-value partitioned layout - return readKeyValuePartitionedTable(sourceURI, readInstructions); + if (isMetadataFile) { + return readPartitionedTableWithMetadata(sourceURI, readInstructions, null); + } + return readPartitionedTableDirectory(sourceURI, readInstructions); } /** @@ -930,82 +921,6 @@ public static void deleteTable(final String path) { FileUtils.deleteRecursivelyOnNFS(new File(path)); } - /** - * This method attempts to "do the right thing." It examines the source file URI to determine if it's a single - * parquet file, a metadata file, or a directory. If it's a directory, it additionally tries to guess the layout to - * use. Unless a metadata file is supplied or discovered in the directory, the highest (by - * {@link ParquetTableLocationKey location key} order) location found will be used to infer schema. - * - * @param source The source URI with {@value ParquetFileReader#FILE_URI_SCHEME} scheme - * @param instructions Instructions for reading - * @return A {@link Table} - */ - private static Table readTableFromFileUri( - @NotNull final URI source, - @NotNull final ParquetInstructions instructions) { - final Path sourcePath = Path.of(source); - if (!Files.exists(sourcePath)) { - throw new TableDataException("Source file " + source + " does not exist"); - } - final String sourceFileName = sourcePath.getFileName().toString(); - final BasicFileAttributes sourceAttr = readAttributes(sourcePath); - if (sourceAttr.isRegularFile()) { - if (sourceFileName.endsWith(PARQUET_FILE_EXTENSION)) { - return readSingleFileTable(source, instructions); - } - final URI parentDirURI = convertToURI(sourcePath.getParent(), true); - if (sourceFileName.equals(METADATA_FILE_NAME)) { - return readPartitionedTableWithMetadata(parentDirURI, instructions); - } - if (sourceFileName.equals(COMMON_METADATA_FILE_NAME)) { - return readPartitionedTableWithMetadata(parentDirURI, instructions); - } - throw new TableDataException( - "Source file " + source + " does not appear to be a parquet file or metadata file"); - } - if (sourceAttr.isDirectory()) { - final Path metadataPath = sourcePath.resolve(METADATA_FILE_NAME); - if (Files.exists(metadataPath)) { - return readPartitionedTableWithMetadata(source, instructions); - } - final Path firstEntryPath; - // Ignore dot files while looking for the first entry - try (final DirectoryStream sourceStream = - Files.newDirectoryStream(sourcePath, ParquetTools::ignoreDotFiles)) { - final Iterator entryIterator = sourceStream.iterator(); - if (!entryIterator.hasNext()) { - throw new TableDataException("Source directory " + source + " is empty"); - } - firstEntryPath = entryIterator.next(); - } catch (IOException e) { - throw new TableDataException("Error reading source directory " + source, e); - } - final String firstEntryFileName = firstEntryPath.getFileName().toString(); - final BasicFileAttributes firstEntryAttr = readAttributes(firstEntryPath); - if (firstEntryAttr.isDirectory() && firstEntryFileName.contains("=")) { - return readKeyValuePartitionedTable(source, instructions); - } - if (firstEntryAttr.isRegularFile() && firstEntryFileName.endsWith(PARQUET_FILE_EXTENSION)) { - return readFlatPartitionedTable(source, instructions); - } - throw new TableDataException("No recognized Parquet table layout found in " + source); - } - throw new TableDataException("Source " + source + " is neither a directory nor a regular file"); - } - - private static boolean ignoreDotFiles(Path path) { - final String filename = path.getFileName().toString(); - return !filename.isEmpty() && filename.charAt(0) != '.'; - } - - private static BasicFileAttributes readAttributes(@NotNull final Path path) { - try { - return Files.readAttributes(path, BasicFileAttributes.class); - } catch (IOException e) { - throw new TableDataException("Failed to read " + path + " file attributes", e); - } - } - /** * Reads in a table from a single parquet file using the table definition provided through the * {@link ParquetInstructions}. @@ -1142,26 +1057,43 @@ private static KnownLocationKeyFinder toKnownKeys( : KnownLocationKeyFinder.copyFrom(keyFinder, Comparator.naturalOrder()); } - private static Table readPartitionedTableWithMetadata( - @NotNull final URI sourceURI, + private static Table readPartitionedTableDirectory( + @NotNull final URI tableRootDirectory, @NotNull final ParquetInstructions readInstructions) { - if (!FILE_URI_SCHEME.equals(sourceURI.getScheme())) { - throw new UnsupportedOperationException("Reading metadata files from non local storage is not supported"); + // Check if the directory has a metadata file + final URI metadataFileURI = tableRootDirectory.resolve(METADATA_FILE_NAME); + final SeekableChannelsProvider channelsProvider = + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions()); + if (channelsProvider.exists(metadataFileURI)) { + return readPartitionedTableWithMetadata(metadataFileURI, readInstructions, channelsProvider); } + // Both flat partitioned and key-value partitioned data can be read under key-value partitioned layout + return readKeyValuePartitionedTable(tableRootDirectory, readInstructions, channelsProvider); + } + + /** + * Creates a partitioned table via the metadata parquet files from the root {@code directory}, inferring the table + * definition from those files. + * + * @param sourceURI the path or URI for the directory to search for .parquet files, the + * {@value ParquetUtils#METADATA_FILE_NAME} file or the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file. + * Note that the {@value ParquetUtils#COMMON_METADATA_FILE_NAME} file must be present in the same directory. + * @param readInstructions the instructions for customizations while reading + * @param channelsProvider the provider for creating seekable channels. If null, a new provider will be created and + * used for all channels created while reading the table. + */ + private static Table readPartitionedTableWithMetadata( + @NotNull final URI sourceURI, + @NotNull final ParquetInstructions readInstructions, + @Nullable final SeekableChannelsProvider channelsProvider) { verifyFileLayout(readInstructions, ParquetFileLayout.METADATA_PARTITIONED); if (readInstructions.getTableDefinition().isPresent()) { throw new UnsupportedOperationException("Detected table definition inside read instructions, reading " + "metadata files with custom table definition is currently not supported"); } - final File sourceFile = new File(sourceURI); - final String fileName = sourceFile.getName(); - final File directory; - if (fileName.equals(METADATA_FILE_NAME) || fileName.equals(COMMON_METADATA_FILE_NAME)) { - directory = sourceFile.getParentFile(); - } else { - directory = sourceFile; - } - final ParquetMetadataFileLayout layout = new ParquetMetadataFileLayout(directory, readInstructions); + final ParquetMetadataFileLayout layout = + ParquetMetadataFileLayout.create(sourceURI, readInstructions, channelsProvider); return readTable(layout, ensureTableDefinition(layout.getInstructions(), layout.getTableDefinition(), true)); } @@ -1184,22 +1116,22 @@ private static void verifyFileLayout( * * @param directoryUri the URI for the root directory to search for .parquet files * @param readInstructions the instructions for customizations while reading + * @param channelsProvider the provider for creating seekable channels. If null, a new provider will be created and + * used for all channels created while reading the table. * @return the table */ private static Table readKeyValuePartitionedTable( @NotNull final URI directoryUri, - @NotNull final ParquetInstructions readInstructions) { + @NotNull final ParquetInstructions readInstructions, + @Nullable final SeekableChannelsProvider channelsProvider) { verifyFileLayout(readInstructions, ParquetFileLayout.KV_PARTITIONED); if (readInstructions.getTableDefinition().isEmpty()) { - return readTable(new ParquetKeyValuePartitionedLayout(directoryUri, - MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions), readInstructions); + return readTable(ParquetKeyValuePartitionedLayout.create(directoryUri, + MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions, channelsProvider), readInstructions); } final TableDefinition tableDefinition = readInstructions.getTableDefinition().get(); - if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) { - throw new IllegalArgumentException("No partitioning columns"); - } - return readTable(new ParquetKeyValuePartitionedLayout(directoryUri, tableDefinition, - readInstructions), readInstructions); + return readTable(ParquetKeyValuePartitionedLayout.create(directoryUri, tableDefinition, + readInstructions, channelsProvider), readInstructions); } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index 53a057c0c95..c2b10421600 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -18,6 +18,7 @@ import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.net.URI; @@ -27,7 +28,6 @@ import java.util.function.Predicate; import java.util.stream.Stream; -import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static io.deephaven.parquet.base.ParquetUtils.isVisibleParquetFile; @@ -49,13 +49,27 @@ public class ParquetKeyValuePartitionedLayout private final SeekableChannelsProvider channelsProvider; - public ParquetKeyValuePartitionedLayout( + /** + * Create a new {@link ParquetKeyValuePartitionedLayout} for the given {@code tableRootDirectory} and + * {@code tableDefinition}. + * + * @param tableRootDirectory The root directory for the table. + * @param tableDefinition The table definition to use for the layout. + * @param readInstructions The instructions for customizations while reading. + * @param channelsProvider The provider for seekable channels. If {@code null}, a new provider will be created and + * used for all location keys. + */ + public static ParquetKeyValuePartitionedLayout create( @NotNull final URI tableRootDirectory, @NotNull final TableDefinition tableDefinition, - @NotNull final ParquetInstructions readInstructions) { - this(tableRootDirectory, tableDefinition, readInstructions, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions())); + @NotNull final ParquetInstructions readInstructions, + @Nullable SeekableChannelsProvider channelsProvider) { + if (channelsProvider == null) { + channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions()); + } + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, tableDefinition, readInstructions, + channelsProvider); } private ParquetKeyValuePartitionedLayout( @@ -71,13 +85,28 @@ private ParquetKeyValuePartitionedLayout( this.channelsProvider = channelsProvider; } - public ParquetKeyValuePartitionedLayout( + /** + * Create a new {@link ParquetKeyValuePartitionedLayout} for the given {@code tableRootDirectory}. The table + * definition will be inferred from the data using {@link CsvTools#readCsv(java.io.InputStream) CsvTools.readCsv}. + * + * @param tableRootDirectory The root directory for the table. + * @param maxPartitioningLevels The maximum number of partitioning levels to use. + * @param readInstructions The instructions for customizations while reading. + * @param channelsProvider The provider for seekable channels. If {@code null}, a new provider will be created and + * used for all location keys. + */ + + public static ParquetKeyValuePartitionedLayout create( @NotNull final URI tableRootDirectory, final int maxPartitioningLevels, - @NotNull final ParquetInstructions readInstructions) { - this(tableRootDirectory, maxPartitioningLevels, readInstructions, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions())); + @NotNull final ParquetInstructions readInstructions, + @Nullable SeekableChannelsProvider channelsProvider) { + if (channelsProvider == null) { + channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + readInstructions.getSpecialInstructions()); + } + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, maxPartitioningLevels, readInstructions, + channelsProvider); } private ParquetKeyValuePartitionedLayout( diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index cb5c495b6e7..4ec93d85a1b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -28,13 +28,13 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -42,11 +42,13 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_NAME; +import static io.deephaven.parquet.base.ParquetUtils.COMMON_METADATA_FILE_URI_SUFFIX; import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_NAME; +import static io.deephaven.parquet.base.ParquetUtils.METADATA_FILE_URI_SUFFIX; import static io.deephaven.parquet.base.ParquetUtils.METADATA_KEY; import static io.deephaven.parquet.base.ParquetUtils.getPerFileMetadataKey; +import static io.deephaven.parquet.base.ParquetUtils.resolve; import static java.util.stream.Collectors.toMap; /** @@ -67,64 +69,73 @@ */ public class ParquetMetadataFileLayout implements TableLocationKeyFinder { - private final File metadataFile; - private final File commonMetadataFile; + private final URI metadataFileURI; + private final URI commonMetadataFileURI; private final TableDefinition definition; private final ParquetInstructions instructions; private final List keys; - private final SeekableChannelsProvider channelsProvider; - public ParquetMetadataFileLayout(@NotNull final File directory) { - this(directory, ParquetInstructions.EMPTY); - } - - public ParquetMetadataFileLayout( - @NotNull final File directory, - @NotNull final ParquetInstructions inputInstructions) { - this(new File(directory, METADATA_FILE_NAME), new File(directory, COMMON_METADATA_FILE_NAME), - inputInstructions); - } - - public ParquetMetadataFileLayout( - @NotNull final File metadataFile, - @Nullable final File commonMetadataFile) { - this(metadataFile, commonMetadataFile, ParquetInstructions.EMPTY); + /** + * Create a new {@link ParquetMetadataFileLayout} for the given {@code source} and {@code inputInstructions}. + * + * @param source The source URI for the metadata file or directory containing the metadata file. + * @param inputInstructions The instructions for customizations while reading. + * @param channelsProvider The provider for seekable channels. If {@code null}, a new provider will be created and + * used for all location keys. + */ + public static ParquetMetadataFileLayout create( + @NotNull final URI source, + @NotNull final ParquetInstructions inputInstructions, + @Nullable SeekableChannelsProvider channelsProvider) { + final String path = source.getRawPath(); + final boolean isMetadataFile = path.endsWith(METADATA_FILE_URI_SUFFIX); + final boolean isCommonMetadataFile = !isMetadataFile && path.endsWith(COMMON_METADATA_FILE_URI_SUFFIX); + final boolean isDirectory = !isMetadataFile && !isCommonMetadataFile; + final URI directory = isDirectory ? source : source.resolve("."); + final URI metadataFileURI = isMetadataFile ? source : directory.resolve(METADATA_FILE_NAME); + final URI commonMetadataFileURI = isCommonMetadataFile ? source : directory.resolve(COMMON_METADATA_FILE_NAME); + if (channelsProvider == null) { + channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(source, + inputInstructions.getSpecialInstructions()); + } + return new ParquetMetadataFileLayout(directory, metadataFileURI, commonMetadataFileURI, inputInstructions, + channelsProvider); } - public ParquetMetadataFileLayout( - @NotNull final File metadataFile, - @Nullable final File commonMetadataFile, - @NotNull final ParquetInstructions inputInstructions) { + private ParquetMetadataFileLayout( + @NotNull final URI tableRootDirectory, + @NotNull final URI metadataFileURI, + @NotNull final URI commonMetadataFileURI, + @NotNull final ParquetInstructions inputInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { if (inputInstructions.isRefreshing()) { throw new IllegalArgumentException("ParquetMetadataFileLayout does not support refreshing"); } - this.metadataFile = metadataFile; - this.commonMetadataFile = commonMetadataFile; - channelsProvider = - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(metadataFile, false), - inputInstructions.getSpecialInstructions()); - if (!metadataFile.exists()) { - throw new TableDataException(String.format("Parquet metadata file %s does not exist", metadataFile)); + this.metadataFileURI = metadataFileURI; + this.commonMetadataFileURI = commonMetadataFileURI; + if (!channelsProvider.exists(metadataFileURI)) { + throw new TableDataException(String.format("Parquet metadata file %s does not exist", metadataFileURI)); } - final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFile, channelsProvider); + final ParquetFileReader metadataFileReader = ParquetFileReader.create(metadataFileURI, channelsProvider); final ParquetMetadataConverter converter = new ParquetMetadataConverter(); - final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFile, metadataFileReader, converter); + final ParquetMetadata metadataFileMetadata = convertMetadata(metadataFileURI, metadataFileReader, converter); final Pair>, ParquetInstructions> leafSchemaInfo = ParquetSchemaReader.convertSchema( metadataFileReader.getSchema(), metadataFileMetadata.getFileMetaData().getKeyValueMetaData(), inputInstructions); - if (commonMetadataFile != null && commonMetadataFile.exists()) { + if (channelsProvider.exists(commonMetadataFileURI)) { final ParquetFileReader commonMetadataFileReader = - ParquetFileReader.create(commonMetadataFile, channelsProvider); + ParquetFileReader.create(commonMetadataFileURI, channelsProvider); final Pair>, ParquetInstructions> fullSchemaInfo = ParquetSchemaReader.convertSchema( commonMetadataFileReader.getSchema(), - convertMetadata(commonMetadataFile, commonMetadataFileReader, converter).getFileMetaData() + convertMetadata(commonMetadataFileURI, commonMetadataFileReader, converter) + .getFileMetaData() .getKeyValueMetaData(), leafSchemaInfo.getSecond()); - final List> adjustedColumnDefinitions = new ArrayList<>(); + final Collection> adjustedColumnDefinitions = new ArrayList<>(); final Map> leafDefinitionsMap = leafSchemaInfo.getFirst().stream().collect(toMap(ColumnDefinition::getName, Function.identity())); for (final ColumnDefinition fullDefinition : fullSchemaInfo.getFirst()) { @@ -138,7 +149,7 @@ public ParquetMetadataFileLayout( fullDefinition.describeDifferences(differences, leafDefinition, "full schema", "file schema", "", false); throw new TableDataException(String.format("Schema mismatch between %s and %s for column %s: %s", - metadataFile, commonMetadataFile, fullDefinition.getName(), differences)); + metadataFileURI, commonMetadataFileURI, fullDefinition.getName(), differences)); } } definition = TableDefinition.of(adjustedColumnDefinitions); @@ -160,7 +171,6 @@ public ParquetMetadataFileLayout( FilenameUtils.separatorsToSystem(rowGroups.get(rgi).getColumns().get(0).getFile_path()); filePathToRowGroupIndices.computeIfAbsent(relativePath, fn -> new TIntArrayList()).add(rgi); } - final File directory = metadataFile.getParentFile(); final MutableInt partitionOrder = new MutableInt(0); keys = filePathToRowGroupIndices.entrySet().stream().map(entry -> { final String relativePathString = entry.getKey(); @@ -168,7 +178,7 @@ public ParquetMetadataFileLayout( if (relativePathString == null || relativePathString.isEmpty()) { throw new TableDataException(String.format( "Missing parquet file name for row groups %s in %s", - Arrays.toString(rowGroupIndices), metadataFile)); + Arrays.toString(rowGroupIndices), metadataFileURI)); } final LinkedHashMap> partitions = partitioningColumns.isEmpty() ? null : new LinkedHashMap<>(); @@ -190,7 +200,7 @@ public ParquetMetadataFileLayout( if (pathComponents.length != 2) { throw new TableDataException(String.format( "Unexpected path format found for hive-style partitioning from %s for %s", - relativePathString, metadataFile)); + relativePathString, metadataFileURI)); } partitionKey = instructions.getColumnNameFromParquetColumnNameOrDefault(pathComponents[0]); partitionValueRaw = pathComponents[1]; @@ -203,12 +213,12 @@ public ParquetMetadataFileLayout( if (partitions.containsKey(partitionKey)) { throw new TableDataException(String.format( "Unexpected duplicate partition key %s when parsing %s for %s", - partitionKey, relativePathString, metadataFile)); + partitionKey, relativePathString, metadataFileURI)); } partitions.put(partitionKey, partitionValue); } } - final URI partitionFileURI = convertToURI(new File(directory, relativePathString), false); + final URI partitionFileURI = resolve(tableRootDirectory, relativePathString); final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFileURI, partitionOrder.getAndIncrement(), partitions, inputInstructions, channelsProvider); tlk.setFileReader(metadataFileReader); @@ -246,16 +256,17 @@ private static ParquetMetadata getParquetMetadataForFile(@NotNull final String p } public String toString() { - return ParquetMetadataFileLayout.class.getSimpleName() + '[' + metadataFile + ',' + commonMetadataFile + ']'; + return ParquetMetadataFileLayout.class.getSimpleName() + '[' + metadataFileURI + ',' + commonMetadataFileURI + + ']'; } - private static ParquetMetadata convertMetadata(@NotNull final File file, + private static ParquetMetadata convertMetadata(@NotNull final URI uri, @NotNull final ParquetFileReader fileReader, @NotNull final ParquetMetadataConverter converter) { try { return converter.fromParquetMetadata(fileReader.fileMetaData); } catch (IOException e) { - throw new TableDataException("Error while converting file metadata from " + file); + throw new TableDataException("Error while converting file metadata from " + uri); } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 2a3c96e8d35..416013bf376 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -622,8 +622,6 @@ public void basicParquetWithMetadataTest() { assertTableEquals(table, fromDiskWithMetadata); fromDiskWithCommonMetadata = readTable(commonMetadataFile.getPath(), readInstructions); assertTableEquals(table, fromDiskWithCommonMetadata); - - } @Test @@ -1144,10 +1142,24 @@ public void someMoreKeyValuePartitionedTestsWithComplexKeys() { assertTableEquals(inputData.sort("symbol", "epic_collection_id"), fromDisk.sort("symbol", "epic_collection_id")); - final File commonMetadata = new File(parentDir, "_common_metadata"); - final Table fromDiskWithMetadata = readTable(commonMetadata.getPath()); + final Table fromDiskMetadataPartitioned = + readTable(parentDir.getPath(), + EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + for (final String col : partitioningCols) { + assertTrue(fromDiskMetadataPartitioned.getDefinition().getColumn(col).isPartitioning()); + } + assertTableEquals(inputData.sort("symbol", "epic_collection_id"), + fromDiskMetadataPartitioned.sort("symbol", "epic_collection_id")); + + final File metadata = new File(parentDir, "_metadata"); + final Table fromDiskWithMetadata = readTable(metadata.getPath()); assertTableEquals(inputData.sort("symbol", "epic_collection_id"), fromDiskWithMetadata.sort("symbol", "epic_collection_id")); + + final File commonMetadata = new File(parentDir, "_common_metadata"); + final Table fromDiskWithCommonMetadata = readTable(commonMetadata.getPath()); + assertTableEquals(inputData.sort("symbol", "epic_collection_id"), + fromDiskWithCommonMetadata.sort("symbol", "epic_collection_id")); } @Test @@ -1193,13 +1205,15 @@ public void testAllPartitioningColumnTypes() { .build(); writeKeyValuePartitionedTable(inputData, parentDir.getPath(), writeInstructions); - // Verify that we can read the partition values, but types like LocalDate or LocalTime will be read as strings - // Therefore, we cannot compare the tables directly - readTable(parentDir.getPath(), EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)).select(); + // Verify that we can read the partition values, but types like LocalDate or LocalTime will be read as strings, + // and byte, short will be read as integers. Therefore, we cannot compare the tables directly + final Table fromDiskPartitioned = readTable(parentDir.getPath(), + EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); + assertNotEquals(fromDiskPartitioned.getDefinition(), inputData.getDefinition()); - // Reading with metadata file should deduce the correct type, so we can compare the tables - final File commonMetadata = new File(parentDir, "_common_metadata"); - final Table fromDiskWithMetadata = readTable(commonMetadata.getPath()); + // Reading the directory directly should correctly detect the metadata files and deduce the correct types + final Table fromDiskWithMetadata = readTable(parentDir.getPath()); + assertEquals(fromDiskWithMetadata.getDefinition(), inputData.getDefinition()); final String[] partitioningColumns = definition.getPartitioningColumns().stream() .map(ColumnDefinition::getName).toArray(String[]::new); assertTableEquals(inputData.sort(partitioningColumns), fromDiskWithMetadata.sort(partitioningColumns)); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java index 70df7cf5534..662e17e8256 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java @@ -6,7 +6,9 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.Credentials; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.test.types.OutOfBandTest; @@ -17,6 +19,8 @@ import java.time.Duration; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.parquet.table.ParquetTools.readTable; import static org.junit.Assert.assertEquals; /** @@ -86,4 +90,39 @@ public void readKeyValuePartitionedParquetFromPublicS3() { readInstructions).head(10).select(); assertEquals(2, table.numColumns()); } + + @Test + public void readMetadataPartitionedParquetFromS3() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofLong("I")); + final Table source = ((QueryTable) TableTools.emptyTable(1_000_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "I = ii")) + .withDefinitionUnsafe(definition); + + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("us-east-1") + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .setFileLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED) + .build(); + final Table fromS3Partitioned = readTable("s3://dh-s3-parquet-test1/keyValuePartitionedWithMetadataTest/", + readInstructions); + assertTableEquals(source.sort("PC1", "PC2"), fromS3Partitioned.sort("PC1", "PC2")); + + final Table fromDiskWithMetadata = + readTable("s3://dh-s3-parquet-test1/keyValuePartitionedWithMetadataTest/_metadata", + readInstructions); + assertTableEquals(source.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); + + final Table fromDiskWithCommonMetadata = + readTable("s3://dh-s3-parquet-test1/keyValuePartitionedWithMetadataTest/_common_metadata", + readInstructions); + assertTableEquals(source.sort("PC1", "PC2"), fromDiskWithCommonMetadata.sort("PC1", "PC2")); + } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index 1aa86dc9b5d..17e99079e1c 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -7,13 +7,13 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.QueryTable; +import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup; import io.deephaven.test.types.OutOfBandTest; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -32,6 +32,7 @@ import static io.deephaven.engine.util.TableTools.merge; import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @Category(OutOfBandTest.class) abstract class S3ParquetTestBase extends S3SeekableChannelTestSetup { @@ -135,6 +136,11 @@ public final void readFlatPartitionedParquetDataAsKVPartitioned() final Table fromS3AsKV = ParquetTools.readTable(uri.toString(), readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)); assertTableEquals(expected, fromS3AsKV); + + // Read with definition without layout + final Table fromS3AsFlatWithDefinition = ParquetTools.readTable(uri.toString(), + readInstructions.withTableDefinition(expected.getDefinition())); + assertTableEquals(expected, fromS3AsFlatWithDefinition); } @Test @@ -167,9 +173,25 @@ public void readKeyValuePartitionedParquetData() .setTableDefinition(definition) .build(); final Table fromS3 = ParquetTools.readTable(uri.toString(), readInstructions); - assertTrue(fromS3.getDefinition().getColumn("PC1").isPartitioning()); - assertTrue(fromS3.getDefinition().getColumn("PC2").isPartitioning()); - assertTableEquals(table.sort("PC1", "PC2"), fromS3); + final Table fromDisk = ParquetTools.readTable(destDir.getPath()); + readPartitionedParquetTestHelper(fromDisk, fromS3); + + // Failure cases for missing metadata files + try { + ParquetTools.readTable(uri.toString(), + readInstructions.withTableDefinitionAndLayout(null, + ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + fail("Expected exception because metadata file is not present"); + } catch (final TableDataException expected) { + assertTrue(expected.getMessage().contains("metadata")); + } + final URI metadataFileURI = uri(destDirName + "/_metadata"); + try { + ParquetTools.readTable(metadataFileURI.toString(), readInstructions.withTableDefinition(null)); + fail("Expected exception because metadata file is not present"); + } catch (final TableDataException expected) { + assertTrue(expected.getMessage().contains("metadata")); + } } @Test @@ -196,25 +218,78 @@ public void readMetadataPartitionedParquetData() assertTrue(new File(destDir, "_metadata").exists()); assertTrue(new File(destDir, "_common_metadata").exists()); uploadDirectory(destDir.toPath(), destDirName); - final URI metadataFileURI = uri(destDirName + "/_metadata"); final ParquetInstructions readInstructions = ParquetInstructions.builder() .setSpecialInstructions(s3Instructions( S3Instructions.builder() .readTimeout(Duration.ofSeconds(10))) .build()) - .setTableDefinition(definition) .build(); - try { - ParquetTools.readTable(metadataFileURI.toString(), readInstructions); - Assert.fail("Exception expected for unsupported metadata file read from S3"); - } catch (UnsupportedOperationException e) { - } final URI directoryURI = uri(destDirName); + final Table fromS3MetadataPartitioned = ParquetTools.readTable(directoryURI.toString(), + readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + final Table fromDiskMetadataPartitioned = ParquetTools.readTable(destDir.getPath(), + ParquetInstructions.EMPTY.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); + readPartitionedParquetTestHelper(fromDiskMetadataPartitioned, fromS3MetadataPartitioned); + + final URI metadataFileURI = uri(destDirName + "/_metadata"); + final Table fromS3WithMetadata = ParquetTools.readTable(metadataFileURI.toString(), readInstructions); + final Table fromDiskWithMetadata = ParquetTools.readTable(new File(destDir, "_metadata").getPath()); + readPartitionedParquetTestHelper(fromDiskWithMetadata, fromS3WithMetadata); + + final URI commonMetadataFileURI = uri(destDirName + "/_common_metadata"); + final Table fromS3WithCommonMetadata = + ParquetTools.readTable(commonMetadataFileURI.toString(), readInstructions); + final Table fromDiskWithCommonMetadata = + ParquetTools.readTable(new File(destDir, "_common_metadata").getPath()); + readPartitionedParquetTestHelper(fromDiskWithCommonMetadata, fromS3WithCommonMetadata); + } + + private static void readPartitionedParquetTestHelper(final Table expected, final Table fromS3) { + assertTrue(fromS3.getDefinition().getColumn("PC1").isPartitioning()); + assertTrue(fromS3.getDefinition().getColumn("PC2").isPartitioning()); + assertTableEquals(expected, fromS3); + } + + @Test + public void readMetadataPartitionedParquetWithMissingMetadataFile() + throws ExecutionException, InterruptedException, TimeoutException, IOException { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofInt("someIntColumn"), + ColumnDefinition.ofString("someStringColumn")); + final Table table = ((QueryTable) TableTools.emptyTable(500_000) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "someIntColumn = (int) i", + "someStringColumn = String.valueOf(i)")) + .withDefinitionUnsafe(definition); + final String destDirName = "metadataPartitionedDataDir"; + final File destDir = new File(folder.newFolder(), destDirName); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setBaseNameForPartitionedParquetData("data") + .setGenerateMetadataFiles(true) + .build(); + writeKeyValuePartitionedTable(table, destDir.getPath(), writeInstructions); + + // Delete the metadata file before uploading + final File metadataFile = new File(destDir, "_metadata"); + metadataFile.delete(); + + uploadDirectory(destDir.toPath(), destDirName); + final URI directoryURI = uri(destDirName); + final ParquetInstructions readInstructions = ParquetInstructions.builder() + .setSpecialInstructions(s3Instructions( + S3Instructions.builder() + .readTimeout(Duration.ofSeconds(10))) + .build()) + .build(); try { ParquetTools.readTable(directoryURI.toString(), readInstructions.withLayout(ParquetInstructions.ParquetFileLayout.METADATA_PARTITIONED)); - Assert.fail("Exception expected for unsupported metadata file read from S3"); - } catch (UnsupportedOperationException e) { + fail("Expected exception because metadata file is not present"); + } catch (final TableDataException expected) { + assertTrue(expected.getMessage().contains("metadata")); } } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java index 03077542e4b..de89778aefa 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/TestParquetTools.java @@ -353,7 +353,7 @@ public void testPartitionedRead() { final TableDefinition partitionedDefinition = TableDefinition.of(allColumns); final Table result = ParquetTools.readTable( - new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY), + ParquetKeyValuePartitionedLayout.create(testRootFile.toURI(), 2, ParquetInstructions.EMPTY, null), ParquetInstructions.EMPTY); TestCase.assertEquals(partitionedDefinition, result.getDefinition()); final Table expected = TableTools.merge( diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java index fb58ee43632..9bf54929446 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ChannelContext.java @@ -10,15 +10,13 @@ import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Uri; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -184,6 +182,9 @@ static IOException handleS3Exception(final Exception e, final String operationDe return new IOException(String.format("Thread interrupted while %s", operationDescription), e); } if (e instanceof ExecutionException) { + if (e.getCause() instanceof NoSuchKeyException) { + throw (NoSuchKeyException) e.getCause(); + } return new IOException(String.format("Execution exception occurred while %s", operationDescription), e); } if (e instanceof TimeoutException) { @@ -201,25 +202,7 @@ private void ensureSize() throws IOException { if (size != UNINITIALIZED_SIZE) { return; } - if (log.isDebugEnabled()) { - log.debug().append("Head: ").append(ctxStr()).endl(); - } - // Fetch the size of the file on the first read using a blocking HEAD request, and store it in the context - // for future use - final HeadObjectResponse headObjectResponse; - try { - headObjectResponse = client - .headObject(HeadObjectRequest.builder() - .bucket(uri.bucket().orElseThrow()) - .key(uri.key().orElseThrow()) - .build()) - .get(instructions.readTimeout().toNanos(), TimeUnit.NANOSECONDS); - } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { - throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr()), instructions); - } - final long fileSize = headObjectResponse.contentLength(); - setSize(fileSize); - provider.updateFileSizeCache(uri.uri(), fileSize); + setSize(provider.fetchFileSize(uri)); } private void setSize(final long size) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index d819fc4d3c6..7083e1c22cf 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -16,8 +16,11 @@ import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Uri; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import java.io.IOException; import java.io.InputStream; @@ -51,6 +54,7 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { private static final int MAX_KEYS_PER_BATCH = 1000; + private static final int UNKNOWN_SIZE = -1; private static final Logger log = LoggerFactory.getLogger(S3SeekableChannelProvider.class); @@ -76,14 +80,30 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider { this.fileSizeCacheRef = new SoftReference<>(new KeyedObjectHashMap<>(FileSizeInfo.URI_MATCH_KEY)); } + @Override + public boolean exists(@NotNull final URI uri) { + if (getCachedSize(uri) != UNKNOWN_SIZE) { + return true; + } + final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); + try { + fetchFileSize(s3Uri); + } catch (final NoSuchKeyException e) { + return false; + } catch (final IOException e) { + throw new UncheckedDeephavenException("Error fetching file size for URI " + uri, e); + } + return true; + } + @Override public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext, @NotNull final URI uri) { final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); // context is unused here, will be set before reading from the channel - final Map fileSizeCache = fileSizeCacheRef.get(); - if (fileSizeCache != null && fileSizeCache.containsKey(uri)) { - return new S3SeekableByteChannel(s3Uri, fileSizeCache.get(uri).size); + final long cachedSize = getCachedSize(uri); + if (cachedSize != UNKNOWN_SIZE) { + return new S3SeekableByteChannel(s3Uri, cachedSize); } return new S3SeekableByteChannel(s3Uri); } @@ -234,10 +254,58 @@ private Map getFileSizeCache() { return cache; } + /** + * Fetch the size of the file at the given S3 URI. + * + * @throws NoSuchKeyException if the file does not exist + * @throws IOException if there is an error fetching the file size + */ + long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException { + final long cachedSize = getCachedSize(s3Uri.uri()); + if (cachedSize != UNKNOWN_SIZE) { + return cachedSize; + } + // Fetch the size of the file using a blocking HEAD request, and store it in the cache for future use + if (log.isDebugEnabled()) { + log.debug().append("Head: ").append(s3Uri.toString()).endl(); + } + final HeadObjectResponse headObjectResponse; + try { + headObjectResponse = s3AsyncClient + .headObject(HeadObjectRequest.builder() + .bucket(s3Uri.bucket().orElseThrow()) + .key(s3Uri.key().orElseThrow()) + .build()) + .get(s3Instructions.readTimeout().toNanos(), TimeUnit.NANOSECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) { + throw handleS3Exception(e, String.format("fetching HEAD for file %s", s3Uri), s3Instructions); + } + final long fileSize = headObjectResponse.contentLength(); + updateFileSizeCache(s3Uri.uri(), fileSize); + return fileSize; + } + + /** + * Get the cached size for the given URI, or {@value UNKNOWN_SIZE} if the size is not cached. + */ + private long getCachedSize(final URI uri) { + final Map fileSizeCache = fileSizeCacheRef.get(); + if (fileSizeCache != null) { + final FileSizeInfo sizeInfo = fileSizeCache.get(uri); + if (sizeInfo != null) { + return sizeInfo.size; + } + } + return UNKNOWN_SIZE; + } + /** * Cache the file size for the given URI. */ - void updateFileSizeCache(@NotNull final URI uri, final long size) { + private void updateFileSizeCache(@NotNull final URI uri, final long size) { + if (size < 0) { + throw new IllegalArgumentException("Invalid file size: " + size + " for URI " + uri); + } final Map fileSizeCache = getFileSizeCache(); fileSizeCache.compute(uri, (key, existingInfo) -> { if (existingInfo == null) { diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java index 56235d6c336..345894bba64 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProvider.java @@ -52,6 +52,11 @@ public boolean isCompatibleWith(@Nullable final SeekableChannelContext channelCo return channelContext instanceof BaseSeekableChannelContext; } + @Override + public boolean exists(@NotNull final URI uri) { + return Files.exists(Path.of(uri)); + } + @Override public SeekableByteChannel getReadChannel(@Nullable final SeekableChannelContext channelContext, @NotNull final URI uri) throws IOException {