diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle index d96e2cf4713..64954314387 100644 --- a/extensions/s3/build.gradle +++ b/extensions/s3/build.gradle @@ -31,6 +31,7 @@ dependencies { testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation 'software.amazon.awssdk:s3-transfer-manager' testImplementation "org.testcontainers:testcontainers:1.19.4" testImplementation "org.testcontainers:junit-jupiter:1.19.4" testImplementation "org.testcontainers:localstack:1.19.4" diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java index 63c47e5d498..41daca3e3a6 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java @@ -5,10 +5,10 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.SingletonContainers.LocalStack; +import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase { @@ -25,7 +25,7 @@ public Builder s3Instructions(Builder builder) { } @Override - public S3Client s3Client() { - return LocalStack.s3Client(); + public S3AsyncClient s3AsyncClient() { + return LocalStack.s3AsyncClient(); } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java index b7c2464c4c7..e427fd3e64e 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java @@ -5,12 +5,12 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.SingletonContainers.MinIO; +import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; import io.deephaven.stats.util.OSUtil; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase { @@ -29,7 +29,7 @@ public Builder s3Instructions(Builder builder) { } @Override - public S3Client s3Client() { - return MinIO.s3Client(); + public S3AsyncClient s3AsyncClient() { + return MinIO.s3AsyncClient(); } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java index 41a788ea634..74e941966f2 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java @@ -4,82 +4,101 @@ package io.deephaven.extensions.s3; +import io.deephaven.extensions.s3.testlib.S3Helper; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.util.ArrayList; -import java.util.List; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Duration; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; public abstract class S3SeekableChannelTestBase { - public abstract S3Client s3Client(); + public abstract S3AsyncClient s3AsyncClient(); public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); - private S3Client client; - + private ExecutorService executor; + private S3AsyncClient asyncClient; private String bucket; - private final List keys = new ArrayList<>(); - @BeforeEach - void setUp() { + void setUp() throws ExecutionException, InterruptedException, TimeoutException { + executor = Executors.newCachedThreadPool(); bucket = UUID.randomUUID().toString(); - client = s3Client(); - client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + asyncClient = s3AsyncClient(); + asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); } @AfterEach - void tearDown() { - for (String key : keys) { - client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()); - } - keys.clear(); - client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()); - client.close(); + void tearDown() throws ExecutionException, InterruptedException, TimeoutException { + S3Helper.deleteAllKeys(asyncClient, bucket); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + asyncClient.close(); + executor.shutdownNow(); } @Test - void readEmptyFile() throws IOException { - putObject("empty.txt", RequestBody.empty()); - final URI uri = uri("empty.txt"); - final ByteBuffer buffer = ByteBuffer.allocate(1); - try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); - final SeekableChannelContext context = provider.makeContext(); - final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { - assertThat(readChannel.read(buffer)).isEqualTo(-1); + void readSimpleFiles() + throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException { + uploadDirectory("readSimpleFiles"); + { + final URI uri = uri("empty.txt"); + final ByteBuffer buffer = ByteBuffer.allocate(1); + try ( + final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + assertThat(readChannel.read(buffer)).isEqualTo(-1); + } + } + { + final URI uri = uri("hello/world.txt"); + try ( + final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + final ByteBuffer bytes = readAll(readChannel, 32); + assertThat(bytes).isEqualTo(ByteBuffer.wrap("Hello, world!".getBytes(StandardCharsets.UTF_8))); + } } } @Test - void read32MiB() throws IOException { + void read32MiB() throws IOException, ExecutionException, InterruptedException, TimeoutException { final int numBytes = 33554432; - putObject("32MiB.bin", RequestBody.fromInputStream(new InputStream() { + putObject("32MiB.bin", AsyncRequestBody.fromInputStream(new InputStream() { @Override public int read() { return 42; } - }, numBytes)); + }, (long) numBytes, executor)); final URI uri = uri("32MiB.bin"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( @@ -96,13 +115,24 @@ public int read() { } } + private void uploadDirectory(String resourceDir) + throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { + S3Helper.uploadDirectory( + asyncClient, + Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()), + bucket, + null, + Duration.ofSeconds(5)); + } + private URI uri(String key) { return URI.create(String.format("s3://%s/%s", bucket, key)); } - private void putObject(String key, RequestBody body) { - client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body); - keys.add(key); + private void putObject(String key, AsyncRequestBody body) + throws ExecutionException, InterruptedException, TimeoutException { + asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, + TimeUnit.SECONDS); } private SeekableChannelsProvider providerImpl(URI uri) { @@ -110,4 +140,18 @@ private SeekableChannelsProvider providerImpl(URI uri) { final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); return plugin.createProvider(uri, instructions); } + + private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { + final ByteBuffer dst = ByteBuffer.allocate(maxBytes); + while (dst.remaining() > 0 && channel.read(dst) != -1) { + // continue + } + if (dst.remaining() == 0) { + if (channel.read(ByteBuffer.allocate(1)) != -1) { + throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); + } + } + dst.flip(); + return dst; + } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java new file mode 100644 index 00000000000..6d2b839f471 --- /dev/null +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java @@ -0,0 +1,88 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3.testlib; + +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload; +import software.amazon.awssdk.transfer.s3.model.DirectoryUpload; +import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public final class S3Helper { + public static void uploadDirectory( + S3AsyncClient s3AsyncClient, + Path dir, + String bucket, + String prefix, + Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { + try (final S3TransferManager manager = S3TransferManager.builder().s3Client(s3AsyncClient).build()) { + uploadDirectory(manager, dir, bucket, prefix, timeout); + } + } + + public static void uploadDirectory( + S3TransferManager transferManager, + Path dir, + String bucket, + String prefix, + Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { + // Not a way to get a list of the uploaded files, even when using a TransferListener. + final DirectoryUpload directoryUpload = transferManager.uploadDirectory(UploadDirectoryRequest.builder() + .source(dir) + .bucket(bucket) + .s3Prefix(prefix) + .build()); + final CompletedDirectoryUpload upload = + directoryUpload.completionFuture().get(timeout.toNanos(), TimeUnit.NANOSECONDS); + if (!upload.failedTransfers().isEmpty()) { + throw new RuntimeException("Upload has failed transfers"); + } + } + + public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket) + throws ExecutionException, InterruptedException, TimeoutException { + ListObjectsV2Response response = s3AsyncClient + .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + final List> futures = new ArrayList<>(); + while (true) { + final List deletes = response.contents() + .stream() + .map(S3Object::key) + .map(S3Helper::objectId) + .collect(Collectors.toList()); + futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder() + .bucket(bucket) + .delete(Delete.builder().objects(deletes).build()) + .build())); + final String nextContinuationToken = response.nextContinuationToken(); + if (nextContinuationToken == null) { + break; + } + response = s3AsyncClient.listObjectsV2( + ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build()) + .get(5, TimeUnit.SECONDS); + } + CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS); + } + + private static ObjectIdentifier objectId(String o) { + return ObjectIdentifier.builder().key(o).build(); + } +} diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java similarity index 80% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java index f4432b6e1e4..3711235a4ec 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java @@ -1,8 +1,10 @@ // // Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending // -package io.deephaven.extensions.s3; +package io.deephaven.extensions.s3.testlib; +import io.deephaven.extensions.s3.Credentials; +import io.deephaven.extensions.s3.S3Instructions.Builder; import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -11,18 +13,18 @@ import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; import java.net.URI; -final class SingletonContainers { +public final class SingletonContainers { // This pattern allows the respective images to be spun up as a container once per-JVM as opposed to once per-class // or once per-test. // https://java.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers // https://testcontainers.com/guides/testcontainers-container-lifecycle/#_using_singleton_containers - static final class LocalStack { + public static final class LocalStack { private static final LocalStackContainer LOCALSTACK_S3 = new LocalStackContainer(DockerImageName.parse(System.getProperty("testcontainers.localstack.image"))) .withServices(Service.S3); @@ -30,20 +32,20 @@ static final class LocalStack { LOCALSTACK_S3.start(); } - static void init() { + public static void init() { // no-op, ensures this class is initialized } - static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) { + public static Builder s3Instructions(Builder builder) { return builder .endpointOverride(LOCALSTACK_S3.getEndpoint()) .regionName(LOCALSTACK_S3.getRegion()) .credentials(Credentials.basic(LOCALSTACK_S3.getAccessKey(), LOCALSTACK_S3.getSecretKey())); } - static S3Client s3Client() { - return S3Client - .builder() + public static S3AsyncClient s3AsyncClient() { + return S3AsyncClient + .crtBuilder() .endpointOverride(LOCALSTACK_S3.getEndpoint()) .region(Region.of(LOCALSTACK_S3.getRegion())) .credentialsProvider(StaticCredentialsProvider.create( @@ -52,7 +54,7 @@ static S3Client s3Client() { } } - static final class MinIO { + public static final class MinIO { // MINIO_DOMAIN is set so MinIO will accept virtual-host style requests; see virtual-host style implementation // comments in S3Instructions. // https://min.io/docs/minio/linux/reference/minio-server/settings/core.html#domain @@ -63,20 +65,20 @@ static final class MinIO { MINIO.start(); } - static void init() { + public static void init() { // no-op, ensures this class is initialized } - static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) { + public static Builder s3Instructions(Builder builder) { return builder .endpointOverride(URI.create(MINIO.getS3URL())) .regionName(Region.AWS_GLOBAL.id()) .credentials(Credentials.basic(MINIO.getUserName(), MINIO.getPassword())); } - static S3Client s3Client() { - return S3Client - .builder() + public static S3AsyncClient s3AsyncClient() { + return S3AsyncClient + .crtBuilder() .endpointOverride(URI.create(MINIO.getS3URL())) .region(Region.AWS_GLOBAL) .credentialsProvider(StaticCredentialsProvider.create( diff --git a/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/empty.txt b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/empty.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt new file mode 100644 index 00000000000..5dd01c177f5 --- /dev/null +++ b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt @@ -0,0 +1 @@ +Hello, world! \ No newline at end of file