Skip to content

Commit

Permalink
Review with Devin
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Sep 16, 2024
1 parent 2a75abd commit 2dde5eb
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
//
package io.deephaven.extensions.s3;

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

public interface Credentials {

/**
* The default credentials.
* Default credentials provider that looks for credentials at a number of locations as described in
* {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found.
*
* @see DefaultCredentials
* @see DefaultCredentialsProvider
*/
static Credentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
/**
* Default AWS credentials provider that looks for credentials at a number of locations as described in
* {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found.
*
* @see DefaultCredentialsProvider
*/
enum DefaultCredentials implements AwsSdkV2Credentials, AwsCredentialsProvider {
DEFAULT_CREDENTIALS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ class S3AsyncClientFactory {
private static volatile ScheduledExecutorService scheduledExecutor;

static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions) {
S3AsyncClient s3AsyncClient;
try {
s3AsyncClient = getAsyncClientBuilder(instructions).build();
} catch (final RuntimeException e) {
if (instructions.regionName().isEmpty() && e.getMessage().contains("region")) {
// We might have failed because region was not provided and could not be determined by the SDK.
// We will try again with a default region.
s3AsyncClient = getAsyncClientBuilder(instructions).region(Region.US_EAST_2).build();
} else {
throw e;
}
}
if (log.isDebugEnabled()) {
log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl();
}
return s3AsyncClient;
}

private static S3AsyncClientBuilder getAsyncClientBuilder(@NotNull final S3Instructions instructions) {
final S3AsyncClientBuilder builder = S3AsyncClient.builder()
.asyncConfiguration(
b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Expand All @@ -64,23 +83,7 @@ static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions)
builder.crossRegionAccessEnabled(true);
}
instructions.endpointOverride().ifPresent(builder::endpointOverride);
S3AsyncClient s3AsyncClient;
try {
s3AsyncClient = builder.build();
} catch (final Exception e) {
if (e.getMessage().contains("region") && instructions.regionName().isEmpty()) {
// TODO Check with Devin if this feels okay, this seems hacky to me but I am not able to figure out a
// better way to handle this fallback. If I just set crossRegionAccessEnabled, it still requires one
// region to be set or derived from the default chain (as mentioned in DefaultAwsRegionProviderChain).
s3AsyncClient = builder.region(Region.US_EAST_1).build();
} else {
throw e;
}
}
if (log.isDebugEnabled()) {
log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl();
}
return s3AsyncClient;
return builder;
}

private static class HttpClientConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static Builder builder() {
* The region name to use when reading or writing to S3. If not provided, the region name is picked by the AWS SDK
* from 'aws.region' system property, "AWS_REGION" environment variable, the {user.home}/.aws/credentials or
* {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2. If no region name is derived from
* the above chain or the derived region name derived is incorrect for the bucket accessed, the correct region name
* the above chain or derived the region name derived is incorrect for the bucket accessed, the correct region name
* will be derived internally, at the cost of one additional request.
*/
public abstract Optional<String> regionName();
Expand Down
14 changes: 8 additions & 6 deletions py/server/deephaven/experimental/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ def __init__(self,
region_name (str): the region name for reading parquet files. If not provided, the default region will be
picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the
{user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running
in EC2. If no region name is derived from the above chain or the region name derived is incorrect for the
bucket accessed, the correct region name will be derived internally, at the cost of one additional request.
in EC2. If no region name is derived from the above chain or the derived region name is incorrect for
the bucket accessed, the correct region name will be derived internally, at the cost of one additional
request.
max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 256.
read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current
fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment.
read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the
current fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the
current fragment.
fragment_size (int): the maximum size of each fragment to read, defaults to 64 KiB. If there are fewer bytes
remaining in the file, the fetched fragment can be smaller.
connection_timeout (DurationLike):
Expand All @@ -66,8 +68,8 @@ def __init__(self,
the amount of time to wait when reading a fragment before giving up and timing out, can be expressed as
an integer in nanoseconds, a time interval string, e.g. "PT00:00:00.001" or "PT1s", or other time
duration types. Default to 2 seconds.
access_key_id (str): the access key for reading files. Both access key and secret access key must be provided
to use static credentials, else default credentials will be used.
access_key_id (str): the access key for reading files. Both access key and secret access key must be
provided to use static credentials, else default credentials will be used.
secret_access_key (str): the secret access key for reading files. Both access key and secret key must be
provided to use static credentials, else default credentials will be used.
anonymous_access (bool): use anonymous credentials, this is useful when the S3 policy has been set to allow
Expand Down

0 comments on commit 2dde5eb

Please sign in to comment.