Skip to content

Commit

Permalink
Use S3 sigests
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyhappydan committed Apr 18, 2024
1 parent ad846de commit e986f80
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ object StoragesConfig {
* the default maximum allowed file size (in bytes) for uploaded files
*/
final case class S3StorageConfig(
digestAlgorithm: DigestAlgorithm,
defaultEndpoint: Uri,
defaultAccessKey: Secret[String],
defaultSecretKey: Secret[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ object StorageFields {
name,
description,
default,
cfg.digestAlgorithm,
bucket,
readPermission.getOrElse(cfg.defaultReadPermission),
writePermission.getOrElse(cfg.defaultWritePermission),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ object StorageValue {
name: Option[String],
description: Option[String],
default: Boolean,
algorithm: DigestAlgorithm,
bucket: String,
readPermission: Permission,
writePermission: Permission,
maxFileSize: Long
) extends StorageValue {

override val tpe: StorageType = StorageType.S3Storage
override val algorithm: DigestAlgorithm = DigestAlgorithm.MD5
override val tpe: StorageType = StorageType.S3Storage
}

object S3StorageValue {
Expand All @@ -138,6 +138,7 @@ object StorageValue {
*/
def apply(
default: Boolean,
algorithm: DigestAlgorithm,
bucket: String,
readPermission: Permission,
writePermission: Permission,
Expand All @@ -147,6 +148,7 @@ object StorageValue {
None,
None,
default,
algorithm,
bucket,
readPermission,
writePermission,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.clie
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import fs2.{Chunk, Pipe, Stream}
import org.apache.commons.codec.binary.Hex
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.model._

import java.util.UUID
import java.util.{Base64, UUID}

final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
as: ActorSystem,
Expand All @@ -41,20 +42,26 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
for {
uuid <- uuidf()
path = Uri.Path(intermediateFolders(storage.project, uuid, filename))
result <- storeFile(storage.value.bucket, path.toString(), uuid, entity)
result <- storeFile(storage.value.bucket, path.toString(), uuid, entity, storage.value.algorithm)
} yield result
}

private def storeFile(bucket: String, key: String, uuid: UUID, entity: BodyPartEntity): IO[FileStorageMetadata] = {
private def storeFile(
bucket: String,
key: String,
uuid: UUID,
entity: BodyPartEntity,
algorithm: DigestAlgorithm
): IO[FileStorageMetadata] = {
val fileData: Stream[IO, Byte] = convertStream(entity.dataBytes)

(for {
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
(md5, fileSize) <- uploadFile(fileData, bucket, key)
_ <- log(bucket, key, s"Finished upload. MD5: $md5")
attr <- fileMetadata(bucket, key, uuid, fileSize, md5)
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
(digest, fileSize) <- uploadFile(fileData, bucket, key, algorithm)
_ <- log(bucket, key, s"Finished upload. Digest: $digest")
attr <- fileMetadata(bucket, key, uuid, fileSize, algorithm, digest)
} yield attr)
.onError(e => logger.error(e)("Unexpected error when storing file"))
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
Expand All @@ -65,13 +72,14 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
key: String,
uuid: UUID,
fileSize: Long,
md5: String
algorithm: DigestAlgorithm,
digest: String
): IO[FileStorageMetadata] =
s3StorageClient.baseEndpoint.map { base =>
FileStorageMetadata(
uuid = uuid,
bytes = fileSize,
digest = Digest.ComputedDigest(DigestAlgorithm.MD5, md5),
digest = Digest.ComputedDigest(algorithm, digest),
origin = Client,
location = base / bucket / Uri.Path(key),
path = Uri.Path(key)
Expand All @@ -94,38 +102,68 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
.mapMaterializedValue(_ => NotUsed)
)

private def uploadFile(fileData: Stream[IO, Byte], bucket: String, key: String): IO[(String, Long)] = {
private def uploadFile(
fileData: Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
): IO[(String, Long)] = {
for {
fileSizeAcc <- Ref.of[IO, Long](0L)
md5 <- fileData
digest <- fileData
.evalTap(_ => fileSizeAcc.update(_ + 1))
.through(
uploadFilePipe(bucket, key)
uploadFilePipe(bucket, key, algorithm)
)
.compile
.onlyOrError
fileSize <- fileSizeAcc.get
} yield (md5, fileSize)
} yield (digest, fileSize)
}

private def uploadFilePipe(bucket: String, key: String): Pipe[IO, Byte, String] = { in =>
private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in =>
fs2.Stream.eval {
in.compile.to(Chunk).flatMap { chunks =>
val bs = chunks.toByteBuffer
s3.putObject(
PutObjectRequest
.builder()
.bucket(bucket)
.key(key)
.build(),
AsyncRequestBody.fromByteBuffer(bs)
).map { response =>
response.eTag().filter(_ != '"')
val bs = chunks.toByteBuffer
val request = PutObjectRequest
.builder()
.bucket(bucket)
.key(key)

for {
fullRequest <- setAlgorithm(request, algorithm)
response <- s3.putObject(
fullRequest
.build(),
AsyncRequestBody.fromByteBuffer(bs)
)
} yield {
parseResponse(response, algorithm)
}
}
}
}

private def parseResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = {
algorithm.value match {
case "MD5" => response.eTag().stripPrefix("\"").stripSuffix("\"")
case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256()))
case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1()))
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
}
}

private def setAlgorithm(
request: PutObjectRequest.Builder,
algorithm: DigestAlgorithm
): IO[PutObjectRequest.Builder] =
algorithm.value match {
case "MD5" => IO.pure(request)
case "SHA-256" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA256))
case "SHA-1" => IO.delay(request.checksumAlgorithm(ChecksumAlgorithm.SHA1))
case _ => IO.raiseError(new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}"))
}

private def getFileAttributes(bucket: String, key: String): IO[GetObjectAttributesResponse] =
s3StorageClient.getFileAttributes(bucket, key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait StorageFixtures extends CirceLiteral {
// format: off
implicit val config: StorageTypeConfig = StorageTypeConfig(
disk = DiskStorageConfig(diskVolume, Set(diskVolume,tmpVolume), DigestAlgorithm.default, permissions.read, permissions.write, showLocation = false, 50),
amazon = Some(S3StorageConfig("localhost", Secret(MinioDocker.RootUser), Secret(MinioDocker.RootPassword),
amazon = Some(S3StorageConfig(DigestAlgorithm.default, "localhost", Secret(MinioDocker.RootUser), Secret(MinioDocker.RootPassword),
permissions.read, permissions.write, showLocation = false, 60)),
remoteDisk = Some(RemoteDiskStorageConfig(DigestAlgorithm.default, BaseUri("http://localhost", Label.unsafe("v1")), Anonymous, permissions.read, permissions.write, showLocation = false, 70, 50.millis)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.http.scaladsl.model.Uri
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.permissions
import ch.epfl.bluebrain.nexus.testkit.minio.LocalStackS3
Expand Down Expand Up @@ -34,6 +35,7 @@ object LocalStackS3StorageClient {
val creds = localstack.staticCredentialsProvider.resolveCredentials()
val (accessKey, secretKey) = (creds.accessKeyId(), creds.secretAccessKey())
val conf: S3StorageConfig = S3StorageConfig(
digestAlgorithm = DigestAlgorithm.default,
defaultEndpoint = Uri(localstack.endpointOverride(LocalStackS3.ServiceType).toString),
defaultAccessKey = Secret(accessKey),
defaultSecretKey = Secret(secretKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class S3FileOperationsSuite

private lazy val fileOps = S3FileOperations.mk(s3StorageClient)

private def md5Hash(content: String) = {
Hex.encodeHexString(DigestAlgorithm.MD5.digest.digest(content.getBytes(StandardCharsets.UTF_8)))
private def makeContentHash(algorithm: DigestAlgorithm, content: String) = {
Hex.encodeHexString(algorithm.digest.digest(content.getBytes(StandardCharsets.UTF_8)))
}

test("List objects in an existing bucket") {
Expand All @@ -61,6 +61,7 @@ class S3FileOperationsSuite
givenAnS3Bucket { bucket =>
val storageValue = S3StorageValue(
default = false,
algorithm = DigestAlgorithm.default,
bucket = bucket,
readPermission = read,
writePermission = write,
Expand All @@ -73,17 +74,44 @@ class S3FileOperationsSuite

val filename = "myfile.txt"
val content = genString()
val hashOfContent = md5Hash(content)
val hashOfContent = makeContentHash(DigestAlgorithm.default, content)
val entity = HttpEntity(content)

val result = for {
attr <- fileOps.save(storage, filename, entity)
_ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent))
_ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.default, hashOfContent))
_ = assertEquals(attr.bytes, content.length.toLong)
source <- fileOps.fetch(bucket, attr.path)
} yield consume(source)

assertIO(result, content)
}
}

test("Use MD5 to calculate a checksum") {
givenAnS3Bucket { bucket =>
val storageValue = S3StorageValue(
default = false,
algorithm = DigestAlgorithm.MD5,
bucket = bucket,
readPermission = read,
writePermission = write,
maxFileSize = 20
)

val iri = iri"http://localhost/s3"
val project = ProjectRef.unsafe("org", "project")
val storage = S3Storage(iri, project, storageValue, Json.obj())

val filename = "myfile.txt"
val content = genString()
val hashOfContent = makeContentHash(DigestAlgorithm.MD5, content)
val entity = HttpEntity(content)

for {
attr <- fileOps.save(storage, filename, entity)
_ = assertEquals(attr.digest, ComputedDigest(DigestAlgorithm.MD5, hashOfContent))
} yield ()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.ship.config
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{permissions, StorageFixtures, StoragesConfig}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ServiceAccountConfig}
Expand Down Expand Up @@ -32,6 +33,7 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp

private val amazonConfig: S3StorageConfig =
S3StorageConfig(
DigestAlgorithm.default,
"https://s3.us-east-1.amazonaws.com",
Secret("my-key"),
Secret("my-secret-key"),
Expand Down

0 comments on commit e986f80

Please sign in to comment.