diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java index 6af4c2e45a1..e93bb00fefb 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetRemoteTest.java @@ -75,16 +75,43 @@ public void readSampleParquetFilesFromPublicS3Part1() { @Test public void readSampleParquetFilesFromPublicS3Part2() { Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING); - final S3Instructions s3Instructions = S3Instructions.builder() - .regionName("eu-west-3") - .readTimeout(Duration.ofSeconds(60)) - .credentials(Credentials.anonymous()) - .build(); - final ParquetInstructions readInstructions = new ParquetInstructions.Builder() - .setSpecialInstructions(s3Instructions) - .build(); - readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions) - .head(10).select(); + { + final S3Instructions s3Instructions = S3Instructions.builder() + .regionName("eu-west-3") + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions) + .head(10).select(); + } + + // Now read the same file without a region + { + final S3Instructions s3Instructions = S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .credentials(Credentials.anonymous()) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions) + .head(10).select(); + } + + // Now read the same file with credentials not set as anonymous + { + final S3Instructions s3Instructions = S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .build(); + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(s3Instructions) + .build(); + readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions) + .head(10).select(); + } } @Test @@ -135,6 +162,23 @@ public void readSampleParquetFromPublicGCS() { assertEquals(50, tableWithoutEndpointOverride.size()); } assertTableEquals(tableWithEndpointOverride, tableWithoutEndpointOverride); + + final Table tableWithNoRegionAndCredentials; + { + // Note that this assumes that credentials are not present in the credentials file. If they are, this test + // will fail. + final ParquetInstructions readInstructions = new ParquetInstructions.Builder() + .setSpecialInstructions(S3Instructions.builder() + .readTimeout(Duration.ofSeconds(60)) + .endpointOverride("https://storage.googleapis.com") + .build()) + .build(); + tableWithNoRegionAndCredentials = ParquetTools.readTable( + "gs://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select(); + assertEquals(2, tableWithNoRegionAndCredentials.numColumns()); + assertEquals(50, tableWithNoRegionAndCredentials.size()); + } + assertTableEquals(tableWithEndpointOverride, tableWithNoRegionAndCredentials); } @Test @@ -175,7 +219,7 @@ public void readKeyValuePartitionedParquetFromPublicS3() { } /** - * The follow test reads from Deephaven's s3 bucket, thus requires the credentials to be set up. + * The follow test reads from Deephaven's s3 bucket, thus requires the credentials to be set up, else will fail. */ @Test public void readMetadataPartitionedParquetFromS3() { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java index b1ce204a5ea..8700dce1e36 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/S3ParquetTestBase.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -91,22 +92,28 @@ public final void readSingleParquetFile() @Test public final void readWriteSingleParquetFile() { - readWriteSingleParquetFileHelper(0); // Empty table - readWriteSingleParquetFileHelper(5_000); - readWriteSingleParquetFileHelper(50_000); - readWriteSingleParquetFileHelper(500_000); + readWriteSingleParquetFileHelper(0, true); // Empty table + readWriteSingleParquetFileHelper(0, false); + readWriteSingleParquetFileHelper(5_000, true); + readWriteSingleParquetFileHelper(5_000, false); + readWriteSingleParquetFileHelper(50_000, true); + readWriteSingleParquetFileHelper(500_000, true); } - private void readWriteSingleParquetFileHelper(final int numRows) { + private void readWriteSingleParquetFileHelper(final int numRows, boolean withRegion) { final Table table = getTable(numRows); final URI uri = uri("table.parquet"); + S3Instructions s3Instructions = s3Instructions( + S3Instructions.builder() + .writePartSize(5 << 20) + .numConcurrentWriteParts(5) + .readTimeout(Duration.ofSeconds(10))) + .build(); + if (!withRegion) { + s3Instructions = s3Instructions.withRegionName(Optional.empty()); + } final ParquetInstructions instructions = ParquetInstructions.builder() - .setSpecialInstructions(s3Instructions( - S3Instructions.builder() - .writePartSize(5 << 20) - .numConcurrentWriteParts(5) - .readTimeout(Duration.ofSeconds(10))) - .build()) + .setSpecialInstructions(s3Instructions) .build(); // Write the table to S3 using ParquetTools write API diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java index 597c0435bb0..4cbcbc20498 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/Credentials.java @@ -3,13 +3,15 @@ // package io.deephaven.extensions.s3; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; + public interface Credentials { /** - * The default credentials. + * Default credentials provider that looks for credentials at a number of locations as described in + * {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found. * - * @see Default - * credentials provider chain + * @see DefaultCredentialsProvider */ static Credentials defaultCredentials() { return DefaultCredentials.DEFAULT_CREDENTIALS; diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java index 07936577e52..e3f506c975b 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/DefaultCredentials.java @@ -3,18 +3,36 @@ // package io.deephaven.extensions.s3; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; /** * Default AWS credentials provider that looks for credentials at a number of locations as described in - * {@link DefaultCredentialsProvider} + * {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found. + * + * @see DefaultCredentialsProvider */ -enum DefaultCredentials implements AwsSdkV2Credentials { +enum DefaultCredentials implements AwsSdkV2Credentials, AwsCredentialsProvider { DEFAULT_CREDENTIALS; + private static final AwsCredentialsProviderChain PROVIDER_CHAIN = AwsCredentialsProviderChain.builder() + .reuseLastProviderEnabled(true) + .credentialsProviders(new AwsCredentialsProvider[] { + DefaultCredentialsProvider.create(), + AnonymousCredentialsProvider.create() + }) + .build(); + @Override public final AwsCredentialsProvider awsV2CredentialsProvider() { - return DefaultCredentialsProvider.create(); + return DEFAULT_CREDENTIALS; + } + + @Override + public AwsCredentials resolveCredentials() { + return PROVIDER_CHAIN.resolveCredentials(); } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java index f8c0ae3f5b4..e029dff0e5e 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3AsyncClientFactory.java @@ -8,11 +8,10 @@ import org.jetbrains.annotations.NotNull; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.retry.RetryMode; -import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; -import software.amazon.awssdk.http.crt.AwsCrtHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; @@ -41,6 +40,25 @@ class S3AsyncClientFactory { private static volatile ScheduledExecutorService scheduledExecutor; static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) { + S3AsyncClient s3AsyncClient; + try { + s3AsyncClient = getAsyncClientBuilder(instructions).build(); + } catch (final SdkClientException e) { + if (instructions.regionName().isEmpty() && e.getMessage().contains("Unable to load region")) { + // We might have failed because region was not provided and could not be determined by the SDK. + // We will try again with a default region. + s3AsyncClient = getAsyncClientBuilder(instructions).region(Region.US_EAST_1).build(); + } else { + throw e; + } + } + if (log.isDebugEnabled()) { + log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl(); + } + return s3AsyncClient; + } + + private static S3AsyncClientBuilder getAsyncClientBuilder(@NotNull final S3Instructions instructions) { final S3AsyncClientBuilder builder = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, @@ -58,13 +76,15 @@ static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) .scheduledExecutorService(ensureScheduledExecutor()) .build()) .credentialsProvider(instructions.awsV2CredentialsProvider()); - instructions.regionName().map(Region::of).ifPresent(builder::region); - instructions.endpointOverride().ifPresent(builder::endpointOverride); - final S3AsyncClient s3AsyncClient = builder.build(); - if (log.isDebugEnabled()) { - log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl(); + if (instructions.regionName().isPresent()) { + builder.region(Region.of(instructions.regionName().get())); + } else { + // If region is not provided, we enable cross-region access to allow the SDK to determine the region + // based on the bucket location and cache it for future requests. + builder.crossRegionAccessEnabled(true); } - return s3AsyncClient; + instructions.endpointOverride().ifPresent(builder::endpointOverride); + return builder; } private static class HttpClientConfig { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java index 9fa3e955a2b..e6383f465ec 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java @@ -6,6 +6,7 @@ import io.deephaven.annotations.CopyableStyle; import io.deephaven.base.log.LogOutput; import io.deephaven.base.log.LogOutputAppendable; +import io.deephaven.util.annotations.VisibleForTesting; import org.immutables.value.Value.Check; import org.immutables.value.Value.Default; import org.immutables.value.Value.Immutable; @@ -51,7 +52,9 @@ public static Builder builder() { /** * The region name to use when reading or writing to S3. If not provided, the region name is picked by the AWS SDK * from 'aws.region' system property, "AWS_REGION" environment variable, the {user.home}/.aws/credentials or - * {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2. + * {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2. If no region name is derived from + * the above chain or derived the region name derived is incorrect for the bucket accessed, the correct region name + * will be derived internally, at the cost of one additional request. */ public abstract Optional regionName(); @@ -177,6 +180,9 @@ default Builder endpointOverride(String endpointOverride) { abstract S3Instructions withReadAheadCount(int readAheadCount); + @VisibleForTesting + public abstract S3Instructions withRegionName(Optional regionName); + @Lazy S3Instructions singleUse() { final int readAheadCount = Math.min(DEFAULT_READ_AHEAD_COUNT, readAheadCount()); diff --git a/py/server/deephaven/experimental/s3.py b/py/server/deephaven/experimental/s3.py index ade4e5f8629..f11e311cd78 100644 --- a/py/server/deephaven/experimental/s3.py +++ b/py/server/deephaven/experimental/s3.py @@ -51,10 +51,13 @@ def __init__(self, region_name (str): the region name for reading parquet files. If not provided, the default region will be picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the {user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running - in EC2. + in EC2. If no region name is derived from the above chain or the derived region name is incorrect for + the bucket accessed, the correct region name will be derived internally, at the cost of one additional + request. max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 256. - read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current - fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment. + read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the + current fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the + current fragment. fragment_size (int): the maximum size of each fragment to read, defaults to 64 KiB. If there are fewer bytes remaining in the file, the fetched fragment can be smaller. connection_timeout (DurationLike): @@ -65,8 +68,8 @@ def __init__(self, the amount of time to wait when reading a fragment before giving up and timing out, can be expressed as an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time duration types. Default to 2 seconds. - access_key_id (str): the access key for reading files. Both access key and secret access key must be provided - to use static credentials, else default credentials will be used. + access_key_id (str): the access key for reading files. Both access key and secret access key must be + provided to use static credentials, else default credentials will be used. secret_access_key (str): the secret access key for reading files. Both access key and secret key must be provided to use static credentials, else default credentials will be used. anonymous_access (bool): use anonymous credentials, this is useful when the S3 policy has been set to allow