diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala index 5da54bb9d6..8003af9aeb 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3StorageSaveFile.scala @@ -5,12 +5,13 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.{BodyPartEntity, Uri} import akka.stream.scaladsl.Source import akka.util.ByteString -import cats.effect.IO +import cats.effect.{IO, Ref} import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata} +import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._ @@ -43,39 +44,23 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes) (for { - _ <- log(key, s"Checking for object existence") - _ <- validateObjectDoesNotExist(bucket, key) - _ <- log(key, s"Beginning upload") - md5 <- uploadFile(fileData, bucket, key) - _ <- log(key, s"Finished upload. MD5: $md5") - attr <- collectFileMetadata(fileData, key, uuid, md5) + _ <- log(key, s"Checking for object existence") + _ <- validateObjectDoesNotExist(bucket, key) + _ <- log(key, s"Beginning upload") + (md5, fileSize) <- uploadFile(fileData, bucket, key) + _ <- log(key, s"Finished upload. MD5: $md5") + attr <- fileMetadata(bucket, key, uuid, fileSize, md5) } yield attr) .onError(e => logger.error(e)("Unexpected error when storing file")) .adaptError { err => UnexpectedSaveError(key, err.getMessage) } } - def collectFileMetadata( - bytes: Stream[IO, Byte], - key: String, - uuid: UUID, - md5: String - ): IO[FileStorageMetadata] = { - // TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256. - // If we want to continue to check this, but allow for different algorithms, this needs to be abstracted - // in the tests and verified for specific file contents. - // The result will als depend on whether we use a multipart upload or a standard put object. - for { - fileSize <- computeSize(bytes) - metadata <- fileMetadata(bucket, key, uuid, fileSize, md5) - } yield metadata - } - - def fileMetadata(bucket: String, key: String, uuid: UUID, fileSize: Long, digest: String) = + def fileMetadata(bucket: String, key: String, uuid: UUID, fileSize: Long, md5: String) = s3StorageClient.baseEndpoint.map { base => FileStorageMetadata( uuid = uuid, bytes = fileSize, - digest = Digest.ComputedDigest(storage.value.algorithm, digest), + digest = Digest.ComputedDigest(DigestAlgorithm.MD5, md5), origin = Client, location = base / bucket / Uri.Path(key), path = Uri.Path(key) @@ -107,13 +92,19 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit .mapMaterializedValue(_ => NotUsed) ) - private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[String] = - fileData - .through( - uploadFilePipe(bucket, key) - ) - .compile - .onlyOrError + private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[(String, Long)] = { + for { + fileSizeAcc <- Ref.of[IO, Long](0L) + md5 <- fileData + .evalTap(_ => fileSizeAcc.update(_ + 1)) + .through( + uploadFilePipe(bucket, key) + ) + .compile + .onlyOrError + fileSize <- fileSizeAcc.get + } yield (md5, fileSize) + } private def uploadFilePipe(bucket: String, key: String): Pipe[IO, Byte, String] = { in => fs2.Stream.eval { @@ -135,13 +126,4 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] = s3StorageClient.getFileAttributes(bucket, key) - - // TODO issue fetching attributes when tested against localstack, only after the object is saved - // Verify if it's the same for real S3. Error msg: 'Could not parse XML response.' - // For now we just compute it manually. - // private def getFileSize(key: String) = - // getFileAttributes(key).flatMap { attr => - // log(key, s"File attributes from S3: $attr").as(attr.objectSize()) - // } - private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)((acc, _) => acc + 1).compile.lastOrError } diff --git a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala index e5f76a1c98..e94509133c 100644 --- a/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala +++ b/delta/plugins/storage/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperationsSuite.scala @@ -79,6 +79,7 @@ class S3FileOperationsSuite val result = for { attr <- fileOps.save(storage, filename, entity) _ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent)) + _ = assertEquals(attr.bytes, content.length.toLong) source <- fileOps.fetch(bucket, attr.path) } yield consume(source)