diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index 4ede36c92a..75b9e69631 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -504,7 +504,7 @@ final class Files( metadata: FileDescription, source: BodyPartEntity ): IO[FileStorageMetadata] = - SaveFile(storage, remoteDiskStorageClient, s3Client.underlyingClient) + SaveFile(storage, remoteDiskStorageClient, s3Client) .apply(metadata.filename, source) .adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala index 54065bddd3..63fd2cd9ce 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/DigestAlgorithm.scala @@ -29,8 +29,6 @@ object DigestAlgorithm { final val default: DigestAlgorithm = new DigestAlgorithm("SHA-256") - final val md5: DigestAlgorithm = new DigestAlgorithm("MD5") - /** * Safely construct an [[DigestAlgorithm]] */ diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala index 4c183c45c6..d0ad7aec2c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala @@ -1,7 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model import akka.actor.ActorSystem -import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue} @@ -10,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{D import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile2} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue @@ -22,7 +21,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.{OrderingFields, ResourceShift} import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import io.circe.syntax._ import io.circe.{Encoder, Json, JsonObject} -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp sealed trait Storage extends Product with Serializable { @@ -102,8 +100,8 @@ object Storage { def fetchFile(client: S3StorageClient): FetchFile = new S3StorageFetchFile(client, value.bucket) - def saveFile(client: S3AsyncClientOp[IO])(implicit as: ActorSystem, uuidf: UUIDF): SaveFile = - new S3StorageSaveFile2(client, this) + def saveFile(s3StorageClient: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile = + new S3StorageSaveFile(s3StorageClient, this) } /** diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala index 2056e16d2d..9e95286430 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/SaveFile.scala @@ -10,8 +10,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.Computed import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage} import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import java.util.UUID import scala.concurrent.{ExecutionContext, Future} @@ -34,7 +34,7 @@ object SaveFile { /** * Construct a [[SaveFile]] from the given ''storage''. */ - def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3AsyncClientOp[IO])(implicit + def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3StorageClient)(implicit as: ActorSystem, uuidf: UUIDF ): SaveFile = diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index dbb513a5f4..87f217cebc 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -1,76 +1,168 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 +import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.model.Uri.Path -import akka.http.scaladsl.model.Uri.Path.Slash import akka.http.scaladsl.model.{BodyPartEntity, Uri} -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.alpakka.s3.{S3Attributes, S3Exception} -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString import cats.effect.IO import cats.implicits._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.{digestSink, intermediateFolders, sizeSink} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ -import ch.epfl.bluebrain.nexus.delta.plugins.storage.utils.SinkUtils +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax +import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter +import eu.timepit.refined.refineMV +import eu.timepit.refined.types.string.NonEmptyString +import fs2.Stream +import fs2.aws.s3.S3 +import fs2.aws.s3.S3.MultipartETagValidation +import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB} +import software.amazon.awssdk.services.s3.model._ import java.util.UUID -import scala.concurrent.Future -final class S3StorageSaveFile(storage: S3Storage, config: StorageTypeConfig)(implicit +final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Storage)(implicit as: ActorSystem, uuidf: UUIDF ) extends SaveFile { - import as.dispatcher - private val fileAlreadyExistException = new IllegalArgumentException("Collision, file already exist") - override def apply( + private val client = s3StorageClient.underlyingClient + private val s3 = S3.create(client) + private val multipartETagValidation = MultipartETagValidation.create[IO] + private val logger = Logger[S3StorageSaveFile] + private val partSizeMB: PartSizeMB = refineMV(5) + private val bucket = BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)) + + def apply( filename: String, entity: BodyPartEntity - ): IO[FileStorageMetadata] = { + ): IO[FileStorageMetadata] = for { uuid <- uuidf() path = Uri.Path(intermediateFolders(storage.project, uuid, filename)) result <- storeFile(path, uuid, entity) } yield result - } private def storeFile(path: Path, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = { - val key = path.toString() - val attributes = S3Attributes.settings(storage.value.alpakkaSettings(config)) - def s3Sink = S3.multipartUpload(storage.value.bucket, key).withAttributes(attributes) - IO.fromFuture( - IO.delay( - S3.getObjectMetadata(storage.value.bucket, key) - .withAttributes(attributes) - .runWith(Sink.last) - .flatMap { - case None => - entity.dataBytes.runWith(SinkUtils.combineMat(digestSink(storage.value.algorithm), sizeSink, s3Sink) { - case (digest, bytes, s3Result) => - Future.successful( - FileStorageMetadata( - uuid = uuid, - bytes = bytes, - digest = digest, - origin = Client, - location = s3Result.location.withPath(Slash(path)), - path = Uri.Path(key) + val key = path.toString() + val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) + + (for { + _ <- log(key, s"Checking for object existence") + _ <- getFileAttributes(key).redeemWith( + { + case _: NoSuchKeyException => IO.unit + case e => IO.raiseError(e) + }, + _ => IO.raiseError(ResourceAlreadyExists(key)) ) - ) - }) - case Some(_) => Future.failed(fileAlreadyExistException) - } + _ <- log(key, s"Beginning multipart upload") + maybeEtags <- uploadFileMultipart(fileData, key) + _ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags") + attr <- collectFileMetadata(fileData, key, uuid, maybeEtags) + } yield attr) + .adaptError { err => UnexpectedSaveError(key, err.getMessage) } + } + + private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = + StreamConverter( + source + .flatMapConcat(x => Source.fromIterator(() => x.iterator)) + .mapMaterializedValue(_ => NotUsed) + ) + + // TODO test etags and file round trip for true multipart uploads. + // It's not clear whether the last value in the stream will be the final aggregate checksum + private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] = + fileData + .through( + s3.uploadFileMultipart( + bucket, + FileKey(NonEmptyString.unsafeFrom(key)), + partSizeMB, + uploadEmptyFiles = true, + multipartETagValidation = multipartETagValidation.some + ) + ) + .compile + .to(List) + + private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] = + client + .getObjectAttributes( + GetObjectAttributesRequest + .builder() + .bucket(bucket.value.value) + .key(key) + .objectAttributes(ObjectAttributes.OBJECT_SIZE) // TODO get all values + .build() ) - ).adaptError { - case `fileAlreadyExistException` => ResourceAlreadyExists(key) - case err: S3Exception => UnexpectedSaveError(key, err.toString()) - case err => UnexpectedSaveError(key, err.getMessage) + + private def collectFileMetadata( + bytes: Stream[IO, Byte], + key: String, + uuid: UUID, + maybeEtags: List[Option[ETag]] + ): IO[FileStorageMetadata] = + maybeEtags.sequence match { + case Some(onlyPartETag :: Nil) => + // TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256. + // If we want to continue to check this, but allow for different algorithms, this needs to be abstracted + // in the tests and checked on specific file contents. + // The result will depend on whether we use a multipart upload or a standard put object. + for { + _ <- log(key, s"Received ETag for single part upload: $onlyPartETag") + fileSize <- computeSize(bytes) + digest <- computeDigest(bytes, storage.storageValue.algorithm) + metadata <- fileMetadata(key, uuid, fileSize, digest) + } yield metadata + case Some(other) => raiseUnexpectedErr(key, s"S3 multipart upload returned multiple etags unexpectedly: $other") + case None => raiseUnexpectedErr(key, "S3 multipart upload was aborted because no data was received") } + + // TODO issue fetching attributes when tested against localstack, only after the object is saved + // Verify if it's the same for real S3. Error msg: 'Could not parse XML response.' + // For now we just compute it manually. + // private def getFileSize(key: String) = + // getFileAttributes(key).flatMap { attr => + // log(key, s"File attributes from S3: $attr").as(attr.objectSize()) + // } + private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)(_ + _).compile.lastOrError + + private def computeDigest(bytes: Stream[IO, Byte], algorithm: DigestAlgorithm): IO[String] = { + val digest = algorithm.digest + bytes.chunks + .evalMap(chunk => IO(digest.update(chunk.toArray))) + .compile + .lastOrError + .map(_ => digest.digest().map("%02x".format(_)).mkString) + } + + private def fileMetadata(key: String, uuid: UUID, fileSize: Long, digest: String) = + s3StorageClient.baseEndpoint.map { base => + FileStorageMetadata( + uuid = uuid, + bytes = fileSize, + digest = Digest.ComputedDigest(storage.value.algorithm, digest), + origin = Client, + location = base / bucket.value.value / Uri.Path(key), + path = Uri.Path(key) + ) + } + + private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg)) + + private def log(key: String, msg: String) = { + val thing = s"Bucket: ${bucket.value}. Key: $key. $msg" + logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") >> IO.println(thing) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile2.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile2.scala deleted file mode 100644 index f980f76d09..0000000000 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile2.scala +++ /dev/null @@ -1,137 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 - -import akka.NotUsed -import akka.actor.ActorSystem -import akka.http.scaladsl.model.Uri.Path -import akka.http.scaladsl.model.{BodyPartEntity, Uri} -import akka.stream.scaladsl.Source -import akka.util.ByteString -import cats.effect.IO -import cats.implicits._ -import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client -import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.intermediateFolders -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ -import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter -import eu.timepit.refined.refineMV -import eu.timepit.refined.types.string.NonEmptyString -import fs2.aws.s3.S3 -import fs2.aws.s3.S3.MultipartETagValidation -import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB} -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp -import software.amazon.awssdk.services.s3.model._ -import fs2.Stream - -import java.util.UUID - -final class S3StorageSaveFile2(client: S3AsyncClientOp[IO], storage: S3Storage)(implicit - as: ActorSystem, - uuidf: UUIDF -) extends SaveFile { - - private val s3 = S3.create(client) - private val multipartETagValidation = MultipartETagValidation.create[IO] - private val logger = Logger[S3StorageSaveFile2] - private val partSizeMB: PartSizeMB = refineMV(5) - private val bucket = BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)) - - def apply( - filename: String, - entity: BodyPartEntity - ): IO[FileStorageMetadata] = { - for { - uuid <- uuidf() - path = Uri.Path(intermediateFolders(storage.project, uuid, filename)) - result <- storeFile(path, uuid, entity) - } yield result - } - - private def storeFile(path: Path, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = { - val key = path.toString() - val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) - - (for { - _ <- log(key, s"Checking for object existence") - _ <- getFileAttributes(key).redeemWith( - { - case _: NoSuchKeyException => IO.unit - case e => IO.raiseError(e) - }, - _ => IO.raiseError(ResourceAlreadyExists(key)) - ) - _ <- log(key, s"Beginning multipart upload") - maybeEtags <- uploadFileMultipart(fileData, key) - _ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags") - attr <- collectFileMetadata(key, uuid, maybeEtags) - } yield attr) - .adaptError { err => UnexpectedSaveError(key, err.getMessage) } - } - - private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] = - StreamConverter( - source - .flatMapConcat(x => Source.fromIterator(() => x.iterator)) - .mapMaterializedValue(_ => NotUsed) - ) - - // TODO test etags and file round trip for true multipart uploads. - // It's not clear whether the last value in the stream will be the final aggregate checksum - private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] = - fileData - .through( - s3.uploadFileMultipart( - bucket, - FileKey(NonEmptyString.unsafeFrom(key)), - partSizeMB, - uploadEmptyFiles = true, - multipartETagValidation = multipartETagValidation.some - ) - ) - .compile - .to(List) - - private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] = - client - .getObjectAttributes( - GetObjectAttributesRequest - .builder() - .bucket(bucket.value.value) - .key(key) - .objectAttributes(ObjectAttributes.knownValues()) - .build() - ) - .onError { e => - logger.error(e)(s"Error fetching S3 file attributes for key $key") - } - - private def getFileSize(key: String) = - getFileAttributes(key).flatMap { attr => - log(key, s"File attributes from S3: $attr").as(attr.objectSize()) - } - - private def collectFileMetadata(key: String, uuid: UUID, maybeEtags: List[Option[ETag]]): IO[FileStorageMetadata] = - maybeEtags.sequence match { - case Some(onlyPartETag :: Nil) => getFileSize(key).map(fileMetadata(key, uuid, _, onlyPartETag)) - case Some(other) => raiseUnexpectedErr(key, s"S3 multipart upload returned multiple etags unexpectedly: $other") - case None => raiseUnexpectedErr(key, "S3 multipart upload was aborted because no data was received") - } - - private def fileMetadata(key: String, uuid: UUID, fileSize: Long, digest: String) = - FileStorageMetadata( - uuid = uuid, - bytes = fileSize, - digest = Digest.ComputedDigest(DigestAlgorithm.md5, digest), - origin = Client, - location = Uri(key), // both of these are now absolute URIs? - path = Uri.Path(key) - ) - - private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg)) - - private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") -} diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 7e6fc52dc4..ba8843e0d0 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client +import akka.http.scaladsl.model.Uri import cats.effect.{IO, Resource} import cats.implicits.toBifunctorOps import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig @@ -13,6 +14,7 @@ import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3AsyncClient import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response} +import fs2.Stream import java.net.URI @@ -22,6 +24,8 @@ trait S3StorageClient { def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] def underlyingClient: S3AsyncClientOp[IO] + + def baseEndpoint: IO[Uri] } object S3StorageClient { @@ -41,12 +45,12 @@ object S3StorageClient { .forcePathStyle(true) .region(Region.US_EAST_1) ) - .map(new S3StorageClientImpl(_)) + .map(new S3StorageClientImpl(_, cfg.defaultEndpoint)) case None => Resource.pure(S3StorageClientDisabled) } - final class S3StorageClientImpl(client: S3AsyncClientOp[IO]) extends S3StorageClient { + final class S3StorageClientImpl(client: S3AsyncClientOp[IO], baseEndpoint: Uri) extends S3StorageClient { private val s3: S3[IO] = S3.create(client) override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] = @@ -54,12 +58,14 @@ object S3StorageClient { def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] = for { - bk <- fs2.Stream.fromEither[IO].apply(refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e))) - fk <- fs2.Stream.fromEither[IO].apply(refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e))) + bk <- Stream.fromEither[IO].apply(refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e))) + fk <- Stream.fromEither[IO].apply(refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e))) bytes <- s3.readFile(BucketName(bk), FileKey(fk)) } yield bytes override def underlyingClient: S3AsyncClientOp[IO] = client + + override def baseEndpoint: IO[Uri] = IO.pure(baseEndpoint) } final case object S3StorageClientDisabled extends S3StorageClient { @@ -72,5 +78,7 @@ object S3StorageClient { fs2.Stream.raiseError[IO](disabledErr) override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr + + override def baseEndpoint: IO[Uri] = raiseDisabledErr } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala new file mode 100644 index 0000000000..1bda05edb8 --- /dev/null +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/LocalStackS3StorageClient.scala @@ -0,0 +1,41 @@ +package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 + +import akka.http.scaladsl.model.Uri +import cats.effect.{IO, Resource} +import ch.epfl.bluebrain.nexus.delta.kernel.Secret +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions +import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 +import munit.CatsEffectSuite +import munit.catseffect.IOFixture +import org.testcontainers.containers.localstack.LocalStackContainer.Service + +object LocalStackS3StorageClient { + val ServiceType = Service.S3 + + def s3StorageClientResource(): Resource[IO, (S3StorageClient, S3StorageConfig)] = + LocalStackS3.localstackS3().flatMap { localstack => + LocalStackS3.fs2ClientFromLocalstack(localstack).map { client => + val creds = localstack.staticCredentialsProvider.resolveCredentials() + val (accessKey, secretKey) = (creds.accessKeyId(), creds.secretAccessKey()) + val conf: S3StorageConfig = S3StorageConfig( + digestAlgorithm = DigestAlgorithm.default, + defaultEndpoint = Uri(localstack.endpointOverride(LocalStackS3.ServiceType).toString), + defaultAccessKey = Secret(accessKey), + defaultSecretKey = Secret(secretKey), + defaultReadPermission = permissions.read, + defaultWritePermission = permissions.write, + showLocation = false, + defaultMaxFileSize = 1 + ) + (new S3StorageClient.S3StorageClientImpl(client, conf.defaultEndpoint), conf) + } + } + + trait Fixture { self: CatsEffectSuite => + val localStackS3Client: IOFixture[(S3StorageClient, S3StorageConfig)] = + ResourceSuiteLocalFixture("s3storageclient", s3StorageClientResource()) + } +} diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala index 2a1188172d..cf1962786f 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageAccessSpecLocalStack.scala @@ -4,11 +4,8 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientImpl import ch.epfl.bluebrain.nexus.delta.sdk.actor.ActorSystemSetup -import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.AnyFixture import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest} @@ -17,16 +14,15 @@ import scala.concurrent.duration.{Duration, DurationInt} class S3StorageAccessSpecLocalStack extends NexusSuite with StorageFixtures - with LocalStackS3.Fixture + with LocalStackS3StorageClient.Fixture with ActorSystemSetup.Fixture { override def munitIOTimeout: Duration = 60.seconds override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client) - private lazy val s3Client: S3AsyncClientOp[IO] = localStackS3Client() - private lazy val s3StorageClient: S3StorageClient = new S3StorageClientImpl(s3Client) - private lazy val s3Access = new S3StorageAccess(s3StorageClient) + private lazy val (s3Client: S3StorageClient, _) = localStackS3Client() + private lazy val s3Access = new S3StorageAccess(s3Client) test("List objects in an existing bucket") { givenAnS3Bucket { bucket => @@ -40,8 +36,8 @@ class S3StorageAccessSpecLocalStack def givenAnS3Bucket(test: String => IO[Unit]): IO[Unit] = { val bucket = genString() - s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> + s3Client.underlyingClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> test(bucket) >> - s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void + s3Client.underlyingClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void } } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpecLocalStack.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpecLocalStack.scala index c2270e7ff2..0a6830797b 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpecLocalStack.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageFetchSaveSpecLocalStack.scala @@ -3,32 +3,33 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3 import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpEntity import cats.effect.IO +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.AkkaSourceHelpers import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient -import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientImpl import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions.{read, write} import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax import ch.epfl.bluebrain.nexus.delta.sdk.actor.ActorSystemSetup import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef -import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3 import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import io.circe.Json -import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import munit.AnyFixture -import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest} +import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest, DeleteObjectRequest} import java.util.UUID import scala.concurrent.duration.{Duration, DurationInt} +import scala.jdk.CollectionConverters.ListHasAsScala class S3StorageFetchSaveSpecLocalStack extends NexusSuite with StorageFixtures - with LocalStackS3.Fixture - with ActorSystemSetup.Fixture { + with ActorSystemSetup.Fixture + with LocalStackS3StorageClient.Fixture + with AkkaSourceHelpers { override def munitIOTimeout: Duration = 120.seconds @@ -37,12 +38,10 @@ class S3StorageFetchSaveSpecLocalStack private val uuid = UUID.fromString("8049ba90-7cc6-4de5-93a1-802c04200dcc") implicit private val uuidf: UUIDF = UUIDF.fixed(uuid) - private lazy val s3Client: S3AsyncClientOp[IO] = localStackS3Client() - implicit private lazy val as: ActorSystem = actorSystem() - private lazy val s3StorageClient: S3StorageClient = new S3StorageClientImpl(s3Client) + private lazy val (s3StorageClient: S3StorageClient, _) = localStackS3Client() + implicit private lazy val as: ActorSystem = actorSystem() test("Save and fetch an object in a bucket") { - println(s"Did we get here?") givenAnS3Bucket { bucket => val s3Fetch = new S3StorageFetchFile(s3StorageClient, bucket) val storageValue = S3StorageValue( @@ -58,26 +57,38 @@ class S3StorageFetchSaveSpecLocalStack val iri = iri"http://localhost/s3" val project = ProjectRef.unsafe("org", "project") val storage = S3Storage(iri, project, storageValue, Json.obj()) - val s3Save = new S3StorageSaveFile2(s3Client, storage) + val s3Save = new S3StorageSaveFile(s3StorageClient, storage) val filename = "myfile.txt" - val content = "file content" + val content = genString() val entity = HttpEntity(content) - IO.println(s"Saving file") >> - s3Save.apply(filename, entity).flatMap { attr => - IO.println(s"Saved file attributes: $attr") >> - // TODO check returned file - s3Fetch.apply(attr.path) >> - IO.println(s"Fetched file") - } + val result = for { + attr <- s3Save.apply(filename, entity) + source <- s3Fetch.apply(attr.path) + } yield consume(source) + + assertIO(result, content) } } def givenAnS3Bucket(test: String => IO[Unit]): IO[Unit] = { val bucket = genString() - s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> + s3StorageClient.underlyingClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >> test(bucket) >> - s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void + emptyBucket(bucket) >> + s3StorageClient.underlyingClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void } + + def emptyBucket(bucket: String): IO[Unit] = + s3StorageClient + .listObjectsV2(bucket) + .flatMap { resp => + val keys: List[String] = resp.contents.asScala.map(_.key()).toList + keys.traverse(deleteObject(bucket, _)) + } + .void + + def deleteObject(bucket: String, key: String): IO[Unit] = + s3StorageClient.underlyingClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).void } diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala index 072d10d6ec..5c34f5bfa8 100644 --- a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/minio/LocalStackS3.scala @@ -11,7 +11,7 @@ import software.amazon.awssdk.services.s3.S3AsyncClient object LocalStackS3 { val ServiceType = Service.S3 - private def resource(): Resource[IO, LocalStackV2Container] = { + def localstackS3(): Resource[IO, LocalStackV2Container] = { def acquire: IO[LocalStackV2Container] = IO.delay { val containerDef = LocalStackV2Container.Def(services = Seq(ServiceType)) @@ -23,20 +23,22 @@ object LocalStackS3 { Resource.make(acquire)(release) } - private def fs2ClientResource(): Resource[IO, S3AsyncClientOp[IO]] = resource().flatMap { container => - val endpoint = container.endpointOverride(LocalStackS3.ServiceType) + def fs2ClientFromLocalstack(localstack: LocalStackV2Container): Resource[IO, S3AsyncClientOp[IO]] = { + val endpoint = localstack.endpointOverride(LocalStackS3.ServiceType) Interpreter[IO].S3AsyncClientOpResource( S3AsyncClient .builder() - .credentialsProvider(container.staticCredentialsProvider) + .credentialsProvider(localstack.staticCredentialsProvider) .endpointOverride(endpoint) .forcePathStyle(true) - .region(container.region) + .region(localstack.region) ) } + def fs2Client(): Resource[IO, S3AsyncClientOp[IO]] = localstackS3().flatMap(fs2ClientFromLocalstack) + trait Fixture { self: CatsEffectSuite => - val localStackS3Client: IOFixture[S3AsyncClientOp[IO]] = ResourceSuiteLocalFixture("s3client", fs2ClientResource()) + val localStackS3Client: IOFixture[S3AsyncClientOp[IO]] = ResourceSuiteLocalFixture("s3client", fs2Client()) } } diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala index 4bb5709ccb..a42c8f5a68 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala @@ -34,7 +34,7 @@ class S3StorageSpec extends StorageSpec { private val logoKey = "some/path/to/nexus-logo.png" val s3Endpoint: String = "http://s3.localhost.localstack.cloud:4566" - val s3BucketEndpoint: String = s"http://$bucket.s3.localhost.localstack.cloud:4566" + val s3BucketEndpoint: String = s"http://s3.localhost.localstack.cloud:4566/$bucket" private val credentialsProvider = (s3Config.accessKey, s3Config.secretKey) match { case (Some(ak), Some(sk)) => StaticCredentialsProvider.create(AwsBasicCredentials.create(ak, sk)) diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/StorageSpec.scala index 11778a0550..9f7c4d2a63 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/StorageSpec.scala @@ -152,7 +152,10 @@ abstract class StorageSpec extends BaseIntegrationSpec { locationPrefix.foreach { l => location.getOption(json).value should startWith(l) } - filterMetadataKeys.andThen(filterKey("_location"))(json) shouldEqual expected + val actual = filterMetadataKeys.andThen(filterNestedKeys("_location", "_digest"))(json) + println(s"====== EXPECTED ====== $expected") + println(s"====== ACTUAL ====== $actual") + actual shouldEqual expected } } } @@ -247,9 +250,12 @@ abstract class StorageSpec extends BaseIntegrationSpec { "storageId" -> storageId, "storageType" -> storageType ) - val expected = jsonContentOf("kg/files/list.json", mapping: _*) - filterSearchMetadata - .andThen(filterResults(Set("_location")))(json) should equalIgnoreArrayOrder(expected) + val expected = equalIgnoreArrayOrder(jsonContentOf("kg/files/list.json", mapping: _*)) + val actual = filterSearchMetadata.andThen(filterNestedKeys("_location", "_digest"))(json) + + println(s"====== EXPECTED ====== $expected") + println(s"====== ACTUAL ====== $actual") + actual shouldEqual expected } } @@ -400,7 +406,7 @@ abstract class StorageSpec extends BaseIntegrationSpec { deltaClient.get[Json](s"/files/$projectRef/attachment:attachment2", Coyote) { (json, response) => response.status shouldEqual StatusCodes.OK - filterMetadataKeys.andThen(filterKey("_location"))(json) shouldEqual expected + filterMetadataKeys.andThen(filterNestedKeys("_location", "_digest"))(json) shouldEqual expected } } }