From 7480812cb79036e9ca6f909de32a0f78c9e7baf7 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 6 Sep 2024 13:10:34 -0500 Subject: [PATCH] feat: Added native support to read/write parquet files from GCS URIs (#6007) Closes #5999 --- .../URIStreamKeyValuePartitionLayout.java | 3 +- .../parquet/table/S3ParquetRemoteTest.java | 57 +++++++++++++++ .../s3/GCSSeekableChannelProvider.java | 72 +++++++++++++++++++ .../s3/GCSSeekableChannelProviderPlugin.java | 63 ++++++++++++++++ .../extensions/s3/S3Instructions.java | 2 + .../s3/S3SeekableChannelProvider.java | 23 ++++-- 6 files changed, 213 insertions(+), 7 deletions(-) create mode 100644 extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java create mode 100644 extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java index 2defb0a762b..1fb27e1d6b4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/local/URIStreamKeyValuePartitionLayout.java @@ -90,7 +90,8 @@ protected final void findKeys(@NotNull final Stream uriStream, buildLocationKeys(locationTable, targetURIs, locationKeyObserver); } - private void getPartitions(@NotNull final URI relativePath, + private void getPartitions( + @NotNull final URI relativePath, @NotNull final Set partitionKeys, @NotNull final Collection partitionValues, @NotNull final TIntObjectMap partitionColInfo, diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java index 53106574553..6af4c2e45a1 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java @@ -22,6 +22,7 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.parquet.table.ParquetTools.readTable; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * These tests verify the behavior of Parquet implementation when reading against remote S3 servers. @@ -100,6 +101,59 @@ public void readSampleParquetFilesFromPublicS3Part3() { readTable("s3://redshift-downloads/redset/serverless/full.parquet", readInstructions).head(10).select(); } + @Test + public void readSampleParquetFromPublicGCS() { + Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); + final Table tableWithEndpointOverride; + { + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .regionName("us-east-1") + .endpointOverride("https://storage.googleapis.com") + .build()) + .build(); + tableWithEndpointOverride = ParquetTools.readTable( + "s3://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select(); + assertEquals(2, tableWithEndpointOverride.numColumns()); + assertEquals(50, tableWithEndpointOverride.size()); + } + + final Table tableWithoutEndpointOverride; + { + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .regionName("us-east-1") + .credentials(Credentials.anonymous()) + .build()) + .build(); + tableWithoutEndpointOverride = ParquetTools.readTable( + "gs://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select(); + assertEquals(2, tableWithoutEndpointOverride.numColumns()); + assertEquals(50, tableWithoutEndpointOverride.size()); + } + assertTableEquals(tableWithEndpointOverride, tableWithoutEndpointOverride); + } + + @Test + public void testReadFromGCSFailure() { + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .endpointOverride("https://storage.com") + .build()) + .build(); + try { + ParquetTools.readTable( + "gs://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select(); + } catch (final IllegalArgumentException e) { + assertTrue(e.toString().contains("endpoint override")); + } + } + @Test public void readKeyValuePartitionedParquetFromPublicS3() { Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); @@ -120,6 +174,9 @@ public void readKeyValuePartitionedParquetFromPublicS3() { assertEquals(2, table.numColumns()); } + /** + * The follow test reads from Deephaven's s3 bucket, thus requires the credentials to be set up. + */ @Test public void readMetadataPartitionedParquetFromS3() { Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java new file mode 100644 index 00000000000..cbf7b410101 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java @@ -0,0 +1,72 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.util.channel.CompletableOutputStream; +import io.deephaven.util.channel.SeekableChannelContext; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SeekableByteChannel; +import java.util.stream.Stream; + +import static io.deephaven.extensions.s3.GCSSeekableChannelProviderPlugin.GCS_URI_SCHEME; +import static io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin.S3_URI_SCHEME; + +final class GCSSeekableChannelProvider extends S3SeekableChannelProvider { + + private static final Logger log = LoggerFactory.getLogger(GCSSeekableChannelProvider.class); + + GCSSeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { + super(s3Instructions); + } + + @Override + public boolean exists(@NotNull final URI uri) { + return super.exists(gcsToS3Uri(uri)); + } + + @Override + public SeekableByteChannel getReadChannel( + @NotNull final SeekableChannelContext channelContext, + @NotNull final URI uri) { + return super.getReadChannel(channelContext, gcsToS3Uri(uri)); + } + + @Override + public CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) { + return super.getOutputStream(gcsToS3Uri(uri), bufferSizeHint); + } + + @Override + public Stream list(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); + } + return createStream(gcsToS3Uri(directory), false, GCS_URI_SCHEME); + } + + @Override + public Stream walk(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); + } + return createStream(gcsToS3Uri(directory), true, GCS_URI_SCHEME); + } + + private static URI gcsToS3Uri(@NotNull final URI uri) { + try { + if (S3_URI_SCHEME.equals(uri.getScheme())) { + return uri; + } + return new URI(S3_URI_SCHEME, uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(), + uri.getQuery(), uri.getFragment()); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Failed to convert GCS URI " + uri + " to s3 URI", e); + } + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java new file mode 100644 index 00000000000..6c3ca91ac8e --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java @@ -0,0 +1,63 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import com.google.auto.service.AutoService; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderPlugin; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; + +/** + * {@link SeekableChannelsProviderPlugin} implementation used for reading files from Google Cloud Storage. + */ +@AutoService(SeekableChannelsProviderPlugin.class) +public final class GCSSeekableChannelProviderPlugin implements SeekableChannelsProviderPlugin { + + static final String GCS_URI_SCHEME = "gs"; + + private static final String ENDPOINT_OVERRIDE_SUFFIX = ".googleapis.com"; + private static final URI DEFAULT_ENDPOINT_OVERRIDE = URI.create("https://storage.googleapis.com"); + private static final S3Instructions DEFAULT_INSTRUCTIONS = + S3Instructions.builder().endpointOverride(DEFAULT_ENDPOINT_OVERRIDE).build(); + + @Override + public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { + return GCS_URI_SCHEME.equals(uri.getScheme()); + } + + @Override + public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { + if (!isCompatible(uri, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + } + return new GCSSeekableChannelProvider(s3Instructions(config)); + } + + /** + * Get the S3Instructions from the config object, or use the default if the config is null. + */ + private static S3Instructions s3Instructions(@Nullable final Object config) { + if (config == null) { + return DEFAULT_INSTRUCTIONS; + } + if (!(config instanceof S3Instructions)) { + throw new IllegalArgumentException("Only S3Instructions are valid when reading GCS URIs, " + + "provided config instance of class " + config.getClass().getName()); + } + final S3Instructions s3Instructions = (S3Instructions) config; + if (s3Instructions.endpointOverride().isEmpty()) { + return s3Instructions.withEndpointOverride(DEFAULT_ENDPOINT_OVERRIDE); + } + if (!(s3Instructions.endpointOverride().get()).toString().endsWith(ENDPOINT_OVERRIDE_SUFFIX)) { + throw new IllegalArgumentException("Provided endpoint override=(" + + s3Instructions.endpointOverride().get() + " not supported when reading GCS URIs, must end with " + + ENDPOINT_OVERRIDE_SUFFIX); + } + return s3Instructions; + } +} + diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index f6a259c26aa..9fa3e955a2b 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -145,6 +145,8 @@ public LogOutput append(final LogOutput logOutput) { */ public abstract Optional endpointOverride(); + public abstract S3Instructions withEndpointOverride(final URI endpointOverride); + public interface Builder { Builder regionName(String regionName); diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index 4bd06a1b661..84d1566a374 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -51,7 +51,7 @@ /** * {@link SeekableChannelsProvider} implementation that is used to fetch objects from an S3-compatible API. */ -final class S3SeekableChannelProvider implements SeekableChannelsProvider { +class S3SeekableChannelProvider implements SeekableChannelsProvider { private static final int MAX_KEYS_PER_BATCH = 1000; private static final int UNKNOWN_SIZE = -1; @@ -97,7 +97,8 @@ public boolean exists(@NotNull final URI uri) { } @Override - public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext, + public SeekableByteChannel getReadChannel( + @NotNull final SeekableChannelContext channelContext, @NotNull final URI uri) { final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri); // context is unused here, will be set before reading from the channel @@ -140,7 +141,7 @@ public Stream list(@NotNull final URI directory) { if (log.isDebugEnabled()) { log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); } - return createStream(directory, false); + return createStream(directory, false, S3_URI_SCHEME); } @Override @@ -148,10 +149,20 @@ public Stream walk(@NotNull final URI directory) { if (log.isDebugEnabled()) { log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); } - return createStream(directory, true); + return createStream(directory, true, S3_URI_SCHEME); } - private Stream createStream(@NotNull final URI directory, final boolean isRecursive) { + /** + * Create a stream of URIs, the elements of which are the entries in the directory. + * + * @param directory The parent directory to list. + * @param isRecursive Whether to list the entries recursively. + * @param childScheme The scheme to apply to the children URIs in the returned stream. + */ + Stream createStream( + @NotNull final URI directory, + final boolean isRecursive, + @NotNull final String childScheme) { // The following iterator fetches URIs from S3 in batches and creates a stream final Iterator iterator = new Iterator<>() { private final String bucketName; @@ -222,7 +233,7 @@ private void fetchNextBatch() throws IOException { } final URI uri; try { - uri = new URI(S3_URI_SCHEME, directory.getUserInfo(), directory.getHost(), + uri = new URI(childScheme, directory.getUserInfo(), directory.getHost(), directory.getPort(), path, null, null); } catch (final URISyntaxException e) { throw new UncheckedDeephavenException("Failed to create URI for S3 object with key: "