Skip to content

Commit

Permalink
Make sha256 default for s3 storages (#4950)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored May 7, 2024
1 parent 46a9e48 commit a742162
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 137 deletions.
2 changes: 0 additions & 2 deletions delta/plugins/storage/src/main/resources/storage.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ plugins.storage {
amazon {
# to enable s3 storage
enabled = false
# the default digest algorithm
digest-algorithm = "SHA-256"
# the default endpoint of the current storage
default-endpoint = "https://s3.us-east-1.amazonaws.com"
# flag to use the default aws credential provider (ignores the provided keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import pureconfig.ConfigReader.Result
import pureconfig.ConvertHelpers.{catchReadError, optF}
import pureconfig.error.{CannotConvert, ConfigReaderFailures, ConvertFailure, FailureReason}
import pureconfig.generic.auto._
import pureconfig.{ConfigConvert, ConfigObjectCursor, ConfigReader}
import pureconfig.{ConfigConvert, ConfigReader}

import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -75,33 +74,10 @@ object StoragesConfig {

object StorageTypeConfig {

final case class WrongAllowedKeys(defaultVolume: AbsolutePath) extends FailureReason {
final private case class WrongAllowedKeys(defaultVolume: AbsolutePath) extends FailureReason {
val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)"
}

final case class DigestNotSupportedOnS3(digestAlgorithm: DigestAlgorithm) extends FailureReason {
val description: String = s"Digest algorithm '${digestAlgorithm.value}' is not supported on S3"
}

private def assertValidS3Algorithm(
digestAlgorithm: DigestAlgorithm,
amazonCursor: ConfigObjectCursor
): Result[Unit] = {
digestAlgorithm.value match {
case "SHA-256" | "SHA-1" => Right(())
case _ =>
Left(
ConfigReaderFailures(
ConvertFailure(
DigestNotSupportedOnS3(digestAlgorithm),
None,
amazonCursor.atKeyOrUndefined("digest-algorithm").path
)
)
)
}
}

implicit val storageTypeConfigReader: ConfigReader[StorageTypeConfig] = ConfigReader.fromCursor { cursor =>
for {
obj <- cursor.asObjectCursor
Expand All @@ -118,7 +94,6 @@ object StoragesConfig {
amazonEnabledCursor <- amazonCursor.atKey("enabled")
amazonEnabled <- amazonEnabledCursor.asBoolean
amazon <- ConfigReader[S3StorageConfig].from(amazonCursor)
_ <- assertValidS3Algorithm(amazon.digestAlgorithm, amazonCursor)
remoteCursor <- obj.atKeyOrUndefined("remote-disk").asObjectCursor
remoteEnabledCursor <- remoteCursor.atKey("enabled")
remoteEnabled <- remoteEnabledCursor.asBoolean
Expand Down Expand Up @@ -173,8 +148,6 @@ object StoragesConfig {
/**
* Amazon S3 compatible storage configuration
*
* @param digestAlgorithm
* algorithm for checksum calculation
* @param defaultEndpoint
* the default endpoint of the current storage
* @param useDefaultCredentialProvider
Expand All @@ -199,7 +172,6 @@ object StoragesConfig {
* global prefix prepended to the path of all saved S3 files
*/
final case class S3StorageConfig(
digestAlgorithm: DigestAlgorithm,
defaultEndpoint: Uri,
useDefaultCredentialProvider: Boolean,
defaultAccessKey: Secret[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ object StorageFields {
name,
description,
default,
cfg.digestAlgorithm,
bucket.getOrElse(cfg.defaultBucket),
readPermission.getOrElse(cfg.defaultReadPermission),
writePermission.getOrElse(cfg.defaultWritePermission),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.circe.syntax._
import io.circe.{Codec, Encoder}

import java.io.File

import scala.reflect.io.Directory

sealed trait StorageValue extends Product with Serializable {
Expand Down Expand Up @@ -116,11 +115,11 @@ object StorageValue {
name: Option[String],
description: Option[String],
default: Boolean,
algorithm: DigestAlgorithm,
bucket: String,
readPermission: Permission,
writePermission: Permission,
maxFileSize: Long
maxFileSize: Long,
algorithm: DigestAlgorithm = DigestAlgorithm.default
) extends StorageValue {

override val tpe: StorageType = StorageType.S3Storage
Expand All @@ -134,7 +133,6 @@ object StorageValue {
*/
def apply(
default: Boolean,
algorithm: DigestAlgorithm,
bucket: String,
readPermission: Permission,
writePermission: Permission,
Expand All @@ -144,7 +142,6 @@ object StorageValue {
None,
None,
default,
algorithm,
bucket,
readPermission,
writePermission,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
_ <- log(bucket, key, s"Checking for object existence")
_ <- validateObjectDoesNotExist(bucket, key)
_ <- log(bucket, key, s"Beginning upload")
uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key, algorithm)
uploadMetadata <- s3StorageClient.uploadFile(fileData, bucket, key)
_ <- log(bucket, key, s"Finished upload. Digest: ${uploadMetadata.checksum}")
attr = fileMetadata(key, uuid, algorithm, uploadMetadata)
} yield attr)
Expand Down Expand Up @@ -94,5 +94,5 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient)(implicit
)

private def log(bucket: String, key: String, msg: String): IO[Unit] =
logger.info(s"Bucket: ${bucket}. Key: $key. $msg")
logger.info(s"Bucket: $bucket. Key: $key. $msg")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli
import akka.http.scaladsl.model.Uri
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.{HeadObject, UploadMetadata}
import fs2.Stream
import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp}
Expand All @@ -15,6 +14,9 @@ import software.amazon.awssdk.services.s3.model._
import java.net.URI

trait S3StorageClient {

final val checksumAlgorithm: ChecksumAlgorithm = ChecksumAlgorithm.SHA256

def listObjectsV2(bucket: String): IO[ListObjectsV2Response]

def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response]
Expand All @@ -29,23 +31,20 @@ trait S3StorageClient {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CopyObjectResponse]

def copyObjectMultiPart(
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CompleteMultipartUploadResponse]

def uploadFile(
fileData: Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
key: String
): IO[UploadMetadata]

def objectExists(bucket: String, key: String): IO[Boolean]
Expand All @@ -62,8 +61,7 @@ object S3StorageClient {
case class HeadObject(
fileSize: Long,
contentType: Option[String],
sha256Checksum: Option[String],
sha1Checksum: Option[String]
sha256Checksum: Option[String]
)

def resource(s3Config: Option[S3StorageConfig]): Resource[IO, S3StorageClient] = s3Config match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.{HeadObject, UploadMetadata}
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.Stream
Expand All @@ -26,17 +25,15 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CopyObjectResponse] = raiseDisabledErr

override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr

override def uploadFile(
fileData: Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
key: String
): IO[UploadMetadata] = raiseDisabledErr

override def bucketExists(bucket: String): IO[Boolean] = raiseDisabledErr
Expand All @@ -47,8 +44,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CompleteMultipartUploadResponse] = raiseDisabledErr

override def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] = throw disabledErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli
import akka.http.scaladsl.model.Uri
import cats.effect.{IO, Ref}
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient.{HeadObject, UploadMetadata}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClientImpl._
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.uriSyntax
import eu.timepit.refined.refineMV
import eu.timepit.refined.types.string.NonEmptyString
Expand All @@ -21,9 +19,13 @@ import software.amazon.awssdk.services.s3.model._

import java.util.Base64
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val baseEndpoint: Uri, val prefix: Uri)
extends S3StorageClient {
final private[client] class S3StorageClientImpl(
client: S3AsyncClientOp[IO],
val baseEndpoint: Uri,
val prefix: Uri
) extends S3StorageClient {

override def listObjectsV2(bucket: String): IO[ListObjectsV2Response] =
client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build())
Expand Down Expand Up @@ -59,17 +61,15 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
HeadObject(
resp.contentLength(),
Option(resp.contentType()),
Option(resp.checksumSHA256()),
Option(resp.checksumSHA1())
Option(resp.checksumSHA256())
)
)

override def copyObject(
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CopyObjectResponse] =
client.copyObject(
CopyObjectRequest
Expand All @@ -86,8 +86,7 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
destinationKey: String
): IO[CompleteMultipartUploadResponse] = {
val partSize = 5_000_000_000L // 5GB
for {
Expand Down Expand Up @@ -115,6 +114,7 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
CompletedPart.builder
.partNumber(partNumber + 1)
.eTag(response.copyPartResult.eTag)
.checksumSHA256(response.copyPartResult.checksumSHA256)
.build
)
}
Expand Down Expand Up @@ -147,15 +147,14 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
override def uploadFile(
fileData: Stream[IO, Byte],
bucket: String,
key: String,
algorithm: DigestAlgorithm
key: String
): IO[UploadMetadata] = {
for {
fileSizeAcc <- Ref.of[IO, Long](0L)
digest <- fileData
.evalTap(_ => fileSizeAcc.update(_ + 1))
.through(
uploadFilePipe(bucket, key, algorithm)
uploadFilePipe(bucket, key)
)
.compile
.onlyOrError
Expand All @@ -164,7 +163,7 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
} yield UploadMetadata(digest, fileSize, location)
}

private def uploadFilePipe(bucket: String, key: String, algorithm: DigestAlgorithm): Pipe[IO, Byte, String] = { in =>
private def uploadFilePipe(bucket: String, key: String): Pipe[IO, Byte, String] = { in =>
fs2.Stream.eval {
in.compile.to(Chunk).flatMap { chunks =>
val bs = chunks.toByteBuffer
Expand All @@ -173,23 +172,23 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
PutObjectRequest
.builder()
.bucket(bucket)
.deltaDigest(algorithm)
.checksumAlgorithm(checksumAlgorithm)
.key(key)
.build(),
AsyncRequestBody.fromByteBuffer(bs)
)
} yield {
checksumFromResponse(response, algorithm)
}
checksum <- checksumFromResponse(response)
} yield checksum
}
}
}

private def checksumFromResponse(response: PutObjectResponse, algorithm: DigestAlgorithm): String = {
algorithm.value match {
case "SHA-256" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256()))
case "SHA-1" => Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA1()))
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
private def checksumFromResponse(response: PutObjectResponse): IO[String] = {
Try {
Hex.encodeHexString(Base64.getDecoder.decode(response.checksumSHA256()))
} match {
case Failure(exception) => IO.raiseError(exception)
case Success(checksum) => IO.pure(checksum)
}
}

Expand Down Expand Up @@ -221,14 +220,3 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
.build

}

private object S3StorageClientImpl {
implicit class PutObjectRequestOps(request: PutObjectRequest.Builder) {
def deltaDigest(algorithm: DigestAlgorithm): PutObjectRequest.Builder =
algorithm.value match {
case "SHA-256" => request.checksumAlgorithm(ChecksumAlgorithm.SHA256)
case "SHA-1" => request.checksumAlgorithm(ChecksumAlgorithm.SHA1)
case _ => throw new IllegalArgumentException(s"Unsupported algorithm for S3: ${algorithm.value}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait StorageFixtures extends CirceLiteral {
// format: off
implicit val config: StorageTypeConfig = StorageTypeConfig(
disk = DiskStorageConfig(diskVolume, Set(diskVolume,tmpVolume), DigestAlgorithm.default, permissions.read, permissions.write, showLocation = false, 50),
amazon = Some(S3StorageConfig(DigestAlgorithm.default, "localhost", useDefaultCredentialProvider = false, Secret("my_key"), Secret("my_secret_key"),
amazon = Some(S3StorageConfig("localhost", useDefaultCredentialProvider = false, Secret("my_key"), Secret("my_secret_key"),
permissions.read, permissions.write, showLocation = false, 60, defaultBucket = "potato", prefix = None)),
remoteDisk = Some(RemoteDiskStorageConfig(DigestAlgorithm.default, BaseUri("http://localhost", Label.unsafe("v1")), Anonymous, permissions.read, permissions.write, showLocation = false, 70, 50.millis)),
)
Expand Down
Loading

0 comments on commit a742162

Please sign in to comment.