Skip to content

Commit

Permalink
WIP - multipart S3 upload
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Apr 8, 2024
1 parent 2aa48cf commit 465ac16
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object Storage {
override val storageValue: StorageValue = value

def fetchFile(client: S3StorageClient): FetchFile =
new S3StorageFetchFile(client, value)
new S3StorageFetchFile(client, value.bucket)

def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem, uuidf: UUIDF): SaveFile =
new S3StorageSaveFile(this, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.FetchFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
Expand All @@ -16,7 +15,7 @@ import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.DurationInt

final class S3StorageFetchFile(client: S3StorageClient, value: S3StorageValue) extends FetchFile {
final class S3StorageFetchFile(client: S3StorageClient, bucket: String) extends FetchFile {

override def apply(attributes: FileAttributes): IO[AkkaSource] =
apply(attributes.path)
Expand All @@ -26,7 +25,7 @@ final class S3StorageFetchFile(client: S3StorageClient, value: S3StorageValue) e
Source.fromGraph(
StreamConverter(
client
.readFile(value.bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.readFile(bucket, URLDecoder.decode(path.toString, UTF_8.toString))
.groupWithin(8192, 1.second)
.map(bytes => ByteString(bytes.toArray))
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.scaladsl.Source
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{Digest, FileStorageMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.intermediateFolders
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection._
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import eu.timepit.refined.types.string.NonEmptyString
import fs2.aws.s3.S3
import fs2.aws.s3.S3.MultipartETagValidation
import fs2.aws.s3.models.Models.{BucketName, ETag, FileKey, PartSizeMB}
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest

import java.util.UUID

final class S3StorageSaveFile2(client: S3AsyncClientOp[IO], storage: S3Storage)(implicit
as: ActorSystem,
uuidf: UUIDF
) {
private val fileAlreadyExistException = new IllegalArgumentException("Collision, file already exist")

def apply(
filename: String,
entity: BodyPartEntity
): IO[FileStorageMetadata] = {
for {
uuid <- uuidf()
path = Uri.Path(intermediateFolders(storage.project, uuid, filename))
result <- storeFile(path, uuid, entity)
} yield result
}

private def storeFile(path: Path, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = {
val key = path.toString()
val s3 = S3.create(client)
val convertedStream: fs2.Stream[IO, Byte] = StreamConverter(
entity.dataBytes.flatMapConcat(x => Source.fromIterator(() => x.iterator)).mapMaterializedValue(_ => NotUsed)
)

// TODO where to get the etag returned?
val thing: IO[Option[Option[ETag]]] = convertedStream
.through(
s3.uploadFileMultipart(
BucketName(NonEmptyString.unsafeFrom(storage.value.bucket)),
FileKey(NonEmptyString.unsafeFrom(key)),
PartSizeMB.unsafeFrom(5),
multipartETagValidation = MultipartETagValidation.create[IO].some
)
)
.compile
.last
final case class Attr(fileSize: Long, checksum: String)

// todo not sure this library sets the checksum on the object
val getSize: IO[Attr] =
client
.getObjectAttributes(GetObjectAttributesRequest.builder().bucket(storage.value.bucket).key(key).build())
.map(x => Attr(x.objectSize(), x.checksum().checksumSHA256()))

val otherThing: IO[FileStorageMetadata] = thing.flatMap { a =>
a.flatten match {
case Some(_) =>
getSize.map { case Attr(fileSize, checksum) =>
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
// TODO the digest for multipart uploads is a concatenation
// Add this as an option to the ADT
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
digest = Digest.ComputedDigest(DigestAlgorithm.default, checksum),
origin = Client,
location = Uri(key), // both of these are absolute URIs?
path = Uri.Path(key)
)
}
case None => IO.raiseError(UnexpectedSaveError(key, "S3 multipart upload did not complete"))
}

}

otherThing
.adaptError {
case `fileAlreadyExistException` => ResourceAlreadyExists(key)
case err => UnexpectedSaveError(key, err.getMessage)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpEntity
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StorageFixtures
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.S3StorageValue
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.S3StorageClientImpl
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions.{read, write}
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.actor.ActorSystemSetup
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import io.circe.Json
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.AnyFixture
import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBucketRequest}

import java.util.UUID
import scala.concurrent.duration.{Duration, DurationInt}

class S3StorageFetchSaveSpecLocalStack
extends NexusSuite
with StorageFixtures
with LocalStackS3.Fixture
with ActorSystemSetup.Fixture {

override def munitIOTimeout: Duration = 60.seconds

override def munitFixtures: Seq[AnyFixture[_]] = List(localStackS3Client, actorSystem)

private val uuid = UUID.fromString("8049ba90-7cc6-4de5-93a1-802c04200dcc")
implicit private val uuidf: UUIDF = UUIDF.fixed(uuid)

private lazy val s3Client: S3AsyncClientOp[IO] = localStackS3Client()
implicit private lazy val as: ActorSystem = actorSystem()
private lazy val s3StorageClient: S3StorageClient = new S3StorageClientImpl(s3Client)

test("Save and fetch an object in a bucket") {
println(s"Did we get here?")
givenAnS3Bucket { bucket =>
val s3Fetch = new S3StorageFetchFile(s3StorageClient, bucket)
val storageValue = S3StorageValue(
default = false,
algorithm = DigestAlgorithm.default,
bucket = bucket,
endpoint = None,
region = None,
readPermission = read,
writePermission = write,
maxFileSize = 20
)
val iri = iri"http://localhost/s3"
val project = ProjectRef.unsafe("org", "project")
val storage = S3Storage(iri, project, storageValue, Json.obj())
val s3Save = new S3StorageSaveFile2(s3Client, storage)

val filename = "myfile.txt"
val content = "file content"
val entity = HttpEntity(content)

IO.println(s"Saving file") >>
s3Save.apply(filename, entity).flatMap { attr =>
IO.println(s"Saved file attributes: $attr") >>
// TODO check returned file
s3Fetch.apply(attr.path) >>
IO.println(s"Fetched file")
}
}
}

def givenAnS3Bucket(test: String => IO[Unit]): IO[Unit] = {
val bucket = genString()
s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build) >>
test(bucket) >>
s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build).void
}
}

0 comments on commit 465ac16

Please sign in to comment.