Skip to content

Commit

Permalink
WIP - check object existence, fetch metadata for file size, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Apr 8, 2024
1 parent 465ac16 commit 79c1581
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
make[Files].from {
(
cfg: StoragePluginConfig,
storageTypeConfig: StorageTypeConfig,
aclCheck: AclCheck,
fetchContext: FetchContext,
storages: Storages,
Expand All @@ -179,7 +178,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
storages,
storagesStatistics,
xas,
storageTypeConfig,
cfg.files,
remoteDiskStorageClient,
s3Client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.schemas.{files => fileSchema}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.{StorageFetchRejection, StorageIsDeprecated}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageRejection, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchAttributeRejection, FetchFileRejection, SaveFileRejection}
Expand Down Expand Up @@ -57,8 +56,7 @@ final class Files(
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient,
config: StorageTypeConfig
s3Client: S3StorageClient
)(implicit
uuidF: UUIDF,
system: ClassicActorSystem
Expand Down Expand Up @@ -506,7 +504,7 @@ final class Files(
metadata: FileDescription,
source: BodyPartEntity
): IO[FileStorageMetadata] =
SaveFile(storage, remoteDiskStorageClient, config)
SaveFile(storage, remoteDiskStorageClient, s3Client.underlyingClient)
.apply(metadata.filename, source)
.adaptError { case e: SaveFileRejection => SaveRejection(iri, storage.id, e) }

Expand Down Expand Up @@ -764,7 +762,6 @@ object Files {
storages: FetchStorage,
storagesStatistics: StoragesStatistics,
xas: Transactors,
storageTypeConfig: StorageTypeConfig,
config: FilesConfig,
remoteDiskStorageClient: RemoteDiskStorageClient,
s3Client: S3StorageClient,
Expand All @@ -782,8 +779,7 @@ object Files {
storages,
storagesStatistics,
remoteDiskStorageClient,
s3Client,
storageTypeConfig
s3Client
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ object DigestAlgorithm {
final val default: DigestAlgorithm =
new DigestAlgorithm("SHA-256")

final val md5: DigestAlgorithm = new DigestAlgorithm("MD5")

/**
* Safely construct an [[DigestAlgorithm]]
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model

import akka.actor.ActorSystem
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskStorageFetchFile, DiskStorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageSaveFile2}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue
Expand All @@ -22,6 +22,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.{OrderingFields, ResourceShift}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp

sealed trait Storage extends Product with Serializable {

Expand Down Expand Up @@ -101,8 +102,8 @@ object Storage {
def fetchFile(client: S3StorageClient): FetchFile =
new S3StorageFetchFile(client, value.bucket)

def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(this, config)
def saveFile(client: S3AsyncClientOp[IO])(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile2(client, this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileStorageMetadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp

import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -34,13 +34,13 @@ object SaveFile {
/**
* Construct a [[SaveFile]] from the given ''storage''.
*/
def apply(storage: Storage, client: RemoteDiskStorageClient, config: StorageTypeConfig)(implicit
def apply(storage: Storage, client: RemoteDiskStorageClient, s3Client: S3AsyncClientOp[IO])(implicit
as: ActorSystem,
uuidf: UUIDF
): SaveFile =
storage match {
case storage: Storage.DiskStorage => storage.saveFile
case storage: Storage.S3Storage => storage.saveFile(config)
case storage: Storage.S3Storage => storage.saveFile(s3Client)
case storage: Storage.RemoteDiskStorage => storage.saveFile(client)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,40 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.intermediateFolders
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import eu.timepit.refined.refineMV
import eu.timepit.refined.types.string.NonEmptyString
import fs2.aws.s3.S3
import fs2.aws.s3.S3.MultipartETagValidation
import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB}
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest
import software.amazon.awssdk.services.s3.model._
import fs2.Stream

import java.util.UUID

final class S3StorageSaveFile2(client: S3AsyncClientOp[IO], storage: S3Storage)(implicit
as: ActorSystem,
uuidf: UUIDF
) {
private val fileAlreadyExistException = new IllegalArgumentException("Collision, file already exist")
) extends SaveFile {

private val s3 = S3.create(client)
private val multipartETagValidation = MultipartETagValidation.create[IO]
private val logger = Logger[S3StorageSaveFile2]
private val partSizeMB: PartSizeMB = refineMV(5)
private val bucket = BucketName(NonEmptyString.unsafeFrom(storage.value.bucket))

def apply(
filename: String,
Expand All @@ -42,58 +52,86 @@ final class S3StorageSaveFile2(client: S3AsyncClientOp[IO], storage: S3Storage)(
}

private def storeFile(path: Path, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = {
val key = path.toString()
val s3 = S3.create(client)
val convertedStream: fs2.Stream[IO, Byte] = StreamConverter(
entity.dataBytes.flatMapConcat(x => Source.fromIterator(() => x.iterator)).mapMaterializedValue(_ => NotUsed)
val key = path.toString()
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)

(for {
_ <- log(key, s"Checking for object existence")
_ <- getFileAttributes(key).redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)
_ <- log(key, s"Beginning multipart upload")
maybeEtags <- uploadFileMultipart(fileData, key)
_ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags")
attr <- collectFileMetadata(key, uuid, maybeEtags)
} yield attr)
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
}

private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] =
StreamConverter(
source
.flatMapConcat(x => Source.fromIterator(() => x.iterator))
.mapMaterializedValue(_ => NotUsed)
)

// TODO where to get the etag returned?
val thing: IO[Option[Option[ETag]]] = convertedStream
// TODO test etags and file round trip for true multipart uploads.
// It's not clear whether the last value in the stream will be the final aggregate checksum
private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] =
fileData
.through(
s3.uploadFileMultipart(
BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)),
bucket,
FileKey(NonEmptyString.unsafeFrom(key)),
PartSizeMB.unsafeFrom(5),
multipartETagValidation = MultipartETagValidation.create[IO].some
partSizeMB,
uploadEmptyFiles = true,
multipartETagValidation = multipartETagValidation.some
)
)
.compile
.last
final case class Attr(fileSize: Long, checksum: String)

// todo not sure this library sets the checksum on the object
val getSize: IO[Attr] =
client
.getObjectAttributes(GetObjectAttributesRequest.builder().bucket(storage.value.bucket).key(key).build())
.map(x => Attr(x.objectSize(), x.checksum().checksumSHA256()))
.to(List)

val otherThing: IO[FileStorageMetadata] = thing.flatMap { a =>
a.flatten match {
case Some(_) =>
getSize.map { case Attr(fileSize, checksum) =>
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
// TODO the digest for multipart uploads is a concatenation
// Add this as an option to the ADT
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
digest = Digest.ComputedDigest(DigestAlgorithm.default, checksum),
origin = Client,
location = Uri(key), // both of these are absolute URIs?
path = Uri.Path(key)
)
}
case None => IO.raiseError(UnexpectedSaveError(key, "S3 multipart upload did not complete"))
private def getFileAttributes(key: String): IO[GetObjectAttributesResponse] =
client
.getObjectAttributes(
GetObjectAttributesRequest
.builder()
.bucket(bucket.value.value)
.key(key)
.objectAttributes(ObjectAttributes.knownValues())
.build()
)
.onError { e =>
logger.error(e)(s"Error fetching S3 file attributes for key $key")
}

private def getFileSize(key: String) =
getFileAttributes(key).flatMap { attr =>
log(key, s"File attributes from S3: $attr").as(attr.objectSize())
}

otherThing
.adaptError {
case `fileAlreadyExistException` => ResourceAlreadyExists(key)
case err => UnexpectedSaveError(key, err.getMessage)
}
}
private def collectFileMetadata(key: String, uuid: UUID, maybeEtags: List[Option[ETag]]): IO[FileStorageMetadata] =
maybeEtags.sequence match {
case Some(onlyPartETag :: Nil) => getFileSize(key).map(fileMetadata(key, uuid, _, onlyPartETag))
case Some(other) => raiseUnexpectedErr(key, s"S3 multipart upload returned multiple etags unexpectedly: $other")
case None => raiseUnexpectedErr(key, "S3 multipart upload was aborted because no data was received")
}

private def fileMetadata(key: String, uuid: UUID, fileSize: Long, digest: String) =
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(DigestAlgorithm.md5, digest),
origin = Client,
location = Uri(key), // both of these are now absolute URIs?
path = Uri.Path(key)
)

private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg))

private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ trait S3StorageClient {
def listObjectsV2(bucket: String): IO[ListObjectsV2Response]

def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte]

def underlyingClient: S3AsyncClientOp[IO]
}

object S3StorageClient {
Expand Down Expand Up @@ -56,6 +58,8 @@ object S3StorageClient {
fk <- fs2.Stream.fromEither[IO].apply(refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e)))
bytes <- s3.readFile(BucketName(bk), FileKey(fk))
} yield bytes

override def underlyingClient: S3AsyncClientOp[IO] = client
}

final case object S3StorageClientDisabled extends S3StorageClient {
Expand All @@ -66,5 +70,7 @@ object S3StorageClient {

override def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] =
fs2.Stream.raiseError[IO](disabledErr)

override def underlyingClient: S3AsyncClientOp[IO] = throw disabledErr
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ class FilesSpec(fixture: RemoteStorageClientFixtures)
storages,
storageStatistics,
xas,
cfg,
FilesConfig(eventLogConfig, MediaTypeDetectorConfig.Empty),
remoteDiskStorageClient,
S3StorageClientDisabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class FilesRoutesSpec
storages,
storagesStatistics,
xas,
config,
FilesConfig(eventLogConfig, MediaTypeDetectorConfig.Empty),
remoteDiskStorageClient,
S3StorageClientDisabled,
Expand Down
Loading

0 comments on commit 79c1581

Please sign in to comment.