Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improvements to the S3 API #6028

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,43 @@ public void readSampleParquetFilesFromPublicS3Part1() {
@Test
public void readSampleParquetFilesFromPublicS3Part2() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("eu-west-3")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions)
.head(10).select();
{
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("eu-west-3")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions)
.head(10).select();
}

// Now read the same file without a region
{
final S3Instructions s3Instructions = S3Instructions.builder()
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions)
.head(10).select();
}

// Now read the same file with credentials not set as anonymous
{
final S3Instructions s3Instructions = S3Instructions.builder()
.readTimeout(Duration.ofSeconds(60))
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions)
.head(10).select();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -91,22 +92,28 @@ public final void readSingleParquetFile()

@Test
public final void readWriteSingleParquetFile() {
readWriteSingleParquetFileHelper(0); // Empty table
readWriteSingleParquetFileHelper(5_000);
readWriteSingleParquetFileHelper(50_000);
readWriteSingleParquetFileHelper(500_000);
readWriteSingleParquetFileHelper(0, true); // Empty table
readWriteSingleParquetFileHelper(0, false);
readWriteSingleParquetFileHelper(5_000, true);
readWriteSingleParquetFileHelper(5_000, false);
readWriteSingleParquetFileHelper(50_000, true);
readWriteSingleParquetFileHelper(500_000, true);
}

private void readWriteSingleParquetFileHelper(final int numRows) {
private void readWriteSingleParquetFileHelper(final int numRows, boolean withRegion) {
final Table table = getTable(numRows);
final URI uri = uri("table.parquet");
S3Instructions s3Instructions = s3Instructions(
S3Instructions.builder()
.writePartSize(5 << 20)
.numConcurrentWriteParts(5)
.readTimeout(Duration.ofSeconds(10)))
.build();
if (!withRegion) {
s3Instructions = s3Instructions.withRegionName(Optional.empty());
}
final ParquetInstructions instructions = ParquetInstructions.builder()
.setSpecialInstructions(s3Instructions(
S3Instructions.builder()
.writePartSize(5 << 20)
.numConcurrentWriteParts(5)
.readTimeout(Duration.ofSeconds(10)))
.build())
.setSpecialInstructions(s3Instructions)
.build();

// Write the table to S3 using ParquetTools write API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ public interface Credentials {
/**
* The default credentials.
*
* @see <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html">Default
* credentials provider chain</a>
* @see DefaultCredentials
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
static Credentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,34 @@
//
package io.deephaven.extensions.s3;

import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

/**
* Default AWS credentials provider that looks for credentials at a number of locations as described in
* {@link DefaultCredentialsProvider}
* {@link DefaultCredentialsProvider} and falls back to anonymous credentials if no credentials are found.
*/
enum DefaultCredentials implements AwsSdkV2Credentials {
enum DefaultCredentials implements AwsSdkV2Credentials, AwsCredentialsProvider {
DEFAULT_CREDENTIALS;

private static final AwsCredentialsProviderChain PROVIDER_CHAIN = AwsCredentialsProviderChain.builder()
.reuseLastProviderEnabled(true)
.credentialsProviders(new AwsCredentialsProvider[] {
DefaultCredentialsProvider.create(),
AnonymousCredentialsProvider.create()
})
.build();

@Override
public final AwsCredentialsProvider awsV2CredentialsProvider() {
return DefaultCredentialsProvider.create();
return DEFAULT_CREDENTIALS;
}

@Override
public AwsCredentials resolveCredentials() {
return PROVIDER_CHAIN.resolveCredentials();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -58,9 +56,27 @@ static S3AsyncClient getAsyncClient(@NotNull final S3Instructions instructions)
.scheduledExecutorService(ensureScheduledExecutor())
.build())
.credentialsProvider(instructions.awsV2CredentialsProvider());
instructions.regionName().map(Region::of).ifPresent(builder::region);
if (instructions.regionName().isPresent()) {
builder.region(Region.of(instructions.regionName().get()));
} else {
// If region is not provided, we enable cross-region access to allow the SDK to determine the region
// based on the bucket location and cache it for future requests.
builder.crossRegionAccessEnabled(true);
}
instructions.endpointOverride().ifPresent(builder::endpointOverride);
final S3AsyncClient s3AsyncClient = builder.build();
S3AsyncClient s3AsyncClient;
try {
s3AsyncClient = builder.build();
} catch (final Exception e) {
if (e.getMessage().contains("region") && instructions.regionName().isEmpty()) {
// TODO Check with Devin if this feels okay, this seems hacky to me but I am not able to figure out a
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// better way to handle this fallback. If I just set crossRegionAccessEnabled, it still requires one
// region to be set or derived from the default chain (as mentioned in DefaultAwsRegionProviderChain).
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
s3AsyncClient = builder.region(Region.US_EAST_1).build();
} else {
throw e;
}
}
if (log.isDebugEnabled()) {
log.debug().append("Building S3AsyncClient with instructions: ").append(instructions).endl();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.annotations.CopyableStyle;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.util.annotations.VisibleForTesting;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;
Expand Down Expand Up @@ -51,7 +52,9 @@ public static Builder builder() {
/**
* The region name to use when reading or writing to S3. If not provided, the region name is picked by the AWS SDK
* from 'aws.region' system property, "AWS_REGION" environment variable, the {user.home}/.aws/credentials or
* {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2.
* {user.home}/.aws/config files, or from EC2 metadata service, if running in EC2. If no region name is derived from
* the above chain or the derived region name derived is incorrect for the bucket accessed, the correct region name
* will be derived internally, at the cost of one additional request.
*/
public abstract Optional<String> regionName();

Expand Down Expand Up @@ -177,6 +180,9 @@ default Builder endpointOverride(String endpointOverride) {

abstract S3Instructions withReadAheadCount(int readAheadCount);

@VisibleForTesting
public abstract S3Instructions withRegionName(Optional<String> regionName);

@Lazy
S3Instructions singleUse() {
final int readAheadCount = Math.min(DEFAULT_READ_AHEAD_COUNT, readAheadCount());
Expand Down
3 changes: 2 additions & 1 deletion py/server/deephaven/experimental/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def __init__(self,
region_name (str): the region name for reading parquet files. If not provided, the default region will be
picked by the AWS SDK from 'aws.region' system property, "AWS_REGION" environment variable, the
{user.home}/.aws/credentials or {user.home}/.aws/config files, or from EC2 metadata service, if running
in EC2.
in EC2. If no region name is derived from the above chain or the region name derived is incorrect for the
bucket accessed, the correct region name will be derived internally, at the cost of one additional request.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
max_concurrent_requests (int): the maximum number of concurrent requests for reading files, default is 256.
read_ahead_count (int): the number of fragments to send asynchronous read requests for while reading the current
fragment. Defaults to 32, which means fetch the next 32 fragments in advance when reading the current fragment.
Expand Down