Skip to content

Commit

Permalink
Update multi part copy operation with checksum (#4938)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored May 6, 2024
1 parent d008b98 commit 9af6ef1
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Base64
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success, Try}

trait S3FileOperations {
def checkBucketExists(bucket: String): IO[Unit]
Expand Down Expand Up @@ -127,10 +128,17 @@ object S3FileOperations {
private def checksumFrom(response: HeadObject) = IO.fromOption {
response.sha256Checksum
.map { checksum =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(Base64.getDecoder.decode(checksum))
)
Try {
Base64.getDecoder.decode(checksum)
} match {
case Failure(_) => Digest.NotComputedDigest
case Success(decodedValue) =>
Digest.ComputedDigest(
DigestAlgorithm.default,
Hex.encodeHexString(decodedValue)
)
}

}
}(new IllegalArgumentException("Missing checksum"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ trait S3StorageClient {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
): IO[CompleteMultipartUploadResponse]

def uploadFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
): IO[CompleteMultipartUploadResponse] = raiseDisabledErr
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
): IO[CompleteMultipartUploadResponse] = {
val partSize = 5_000_000_000L // 5GB
for {
// Initiate the multipart upload
createMultipartUploadResponse <-
client.createMultipartUpload(createMultipartUploadRequest(destinationBucket, destinationKey))
client.createMultipartUpload(createMultipartUploadRequest(destinationBucket, destinationKey, checksumAlgorithm))
// Get the object size
objectSize <- headObject(sourceBucket, sourceKey).map(_.fileSize)
// Copy the object using 5 MB parts
Expand Down Expand Up @@ -200,10 +201,11 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO], val
.build()
}

private def createMultipartUploadRequest(bucket: String, fileKey: String) =
private def createMultipartUploadRequest(bucket: String, fileKey: String, checksumAlgorithm: ChecksumAlgorithm) =
CreateMultipartUploadRequest.builder
.bucket(bucket)
.key(fileKey)
.checksumAlgorithm(checksumAlgorithm)
.build

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ object FileCopier {
val key = path.toString
val FIVE_GB = 5_000_000_000L

// TODO: Check if we only use SHA256 or not? If not we need to pass the right algo
if (fileSize >= FIVE_GB)
s3StorageClient.copyObjectMultiPart(importBucket, key, targetBucket, key).void
s3StorageClient.copyObjectMultiPart(importBucket, key, targetBucket, key, ChecksumAlgorithm.SHA256).void
else
// TODO: Check if we only use SHA256 or not? If not we need to pass the right algo
s3StorageClient.copyObject(importBucket, key, targetBucket, key, ChecksumAlgorithm.SHA256).void
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ object RunShipSuite {
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String
destinationKey: String,
checksumAlgorithm: ChecksumAlgorithm
): IO[CompleteMultipartUploadResponse] =
IO.raiseError(new NotImplementedError("copyObjectMultiPart is not implemented"))
}
Expand Down

0 comments on commit 9af6ef1

Please sign in to comment.