Skip to content

Commit

Permalink
Calculate content size from the stream without re-processing the enti…
Browse files Browse the repository at this point in the history
…re stream
  • Loading branch information
shinyhappydan committed Apr 18, 2024
1 parent cd0961f commit 4771759
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import cats.effect.{IO, Ref}
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FileOperations.intermediateFolders
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
Expand Down Expand Up @@ -43,39 +44,23 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)

(for {
_ <- log(key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(key, s"Beginning upload")
md5 <- uploadFile(fileData, bucket, key)
_ <- log(key, s"Finished upload. MD5: $md5")
attr <- collectFileMetadata(fileData, key, uuid, md5)
_ <- log(key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(key, s"Beginning upload")
(md5, fileSize) <- uploadFile(fileData, bucket, key)
_ <- log(key, s"Finished upload. MD5: $md5")
attr <- fileMetadata(bucket, key, uuid, fileSize, md5)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
}

def collectFileMetadata(
bytes: Stream[IO, Byte],
key: String,
uuid: UUID,
md5: String
): IO[FileStorageMetadata] = {
// TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256.
// If we want to continue to check this, but allow for different algorithms, this needs to be abstracted
// in the tests and verified for specific file contents.
// The result will als depend on whether we use a multipart upload or a standard put object.
for {
fileSize <- computeSize(bytes)
metadata <- fileMetadata(bucket, key, uuid, fileSize, md5)
} yield metadata
}

def fileMetadata(bucket: String, key: String, uuid: UUID, fileSize: Long, digest: String) =
def fileMetadata(bucket: String, key: String, uuid: UUID, fileSize: Long, md5: String) =
s3StorageClient.baseEndpoint.map { base =>
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(storage.value.algorithm, digest),
digest = Digest.ComputedDigest(DigestAlgorithm.MD5, md5),
origin = Client,
location = base / bucket / Uri.Path(key),
path = Uri.Path(key)
Expand Down Expand Up @@ -107,13 +92,19 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
.mapMaterializedValue(_ => NotUsed)
)

private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[String] =
fileData
.through(
uploadFilePipe(bucket, key)
)
.compile
.onlyOrError
private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[(String, Long)] = {
for {
fileSizeAcc <- Ref.of[IO, Long](0L)
md5 <- fileData
.evalTap(_ => fileSizeAcc.update(_ + 1))
.through(
uploadFilePipe(bucket, key)
)
.compile
.onlyOrError
fileSize <- fileSizeAcc.get
} yield (md5, fileSize)
}

private def uploadFilePipe(bucket: String, key: String): Pipe[IO, Byte, String] = { in =>
fs2.Stream.eval {
Expand All @@ -135,13 +126,4 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit

private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
s3StorageClient.getFileAttributes(bucket, key)

// TODO issue fetching attributes when tested against localstack, only after the object is saved
// Verify if it's the same for real S3. Error msg: 'Could not parse XML response.'
// For now we just compute it manually.
// private def getFileSize(key: String) =
// getFileAttributes(key).flatMap { attr =>
// log(key, s"File attributes from S3: $attr").as(attr.objectSize())
// }
private def computeSize(bytes: Stream[IO, Byte]): IO[Long] = bytes.fold(0L)((acc, _) => acc + 1).compile.lastOrError
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class S3FileOperationsSuite
val result = for {
attr <- fileOps.save(storage, filename, entity)
_ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent))
_ = assertEquals(attr.bytes, content.length.toLong)
source <- fileOps.fetch(bucket, attr.path)
} yield consume(source)

Expand Down

0 comments on commit 4771759

Please sign in to comment.