Skip to content

Commit

Permalink
Retry when S3 fails to copy the file during import (#5069)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Jul 24, 2024
1 parent 9fbf328 commit 7cee429
Showing 1 changed file with 46 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package ch.epfl.bluebrain.nexus.ship.files
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator}
Expand All @@ -12,6 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.FileProcessingConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult.{CopySkipped, CopySuccess}
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import software.amazon.awssdk.services.s3.model.S3Exception

import scala.concurrent.duration.DurationInt

Expand All @@ -27,6 +30,13 @@ object FileCopier {

private val longCopyThreshold = 5.seconds

private val copyRetryStrategy: RetryStrategy[S3Exception] = RetryStrategy.constant(
30.seconds,
10,
e => e.statusCode() >= 500 && e.statusCode() < 600,
logError(logger, "s3Copy")
)

sealed trait CopyResult extends Product with Serializable

object CopyResult {
Expand All @@ -44,40 +54,41 @@ object FileCopier {
val importBucket = config.importBucket
val targetBucket = config.targetBucket
val locationGenerator = new S3LocationGenerator(config.prefix.getOrElse(Path.Empty))
(project: ProjectRef, attributes: FileAttributes) => {
val origin = attributes.path
val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename
val target = locationGenerator.file(project, attributes.uuid, patchedFileName).path
val FIVE_GB = 5_000_000_000L

val originKey = UrlUtils.decode(origin)
val targetKey = UrlUtils.decode(target)

val copyOptions = CopyOptions(overwriteTarget = false, attributes.mediaType)

def copy = {
if (attributes.bytes >= FIVE_GB) {
logger.info(s"Attempting to copy a large file from $importBucket/$originKey to $targetBucket/$targetKey") >>
s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions)
} else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions)
}.timed.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.")
)
}

for {
isObject <- s3StorageClient.objectExists(importBucket, originKey)
isFolder <-
if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents)
_ <- IO.whenA(isObject) { copy }
_ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") }
_ <- IO.whenA(!isFolder && !isObject) {
logger.error(s"$target is neither an object or folder, something is wrong.")
}
} yield if (isObject) CopySuccess(target) else CopySkipped
}
(project: ProjectRef, attributes: FileAttributes) =>
{
val origin = attributes.path
val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename
val target = locationGenerator.file(project, attributes.uuid, patchedFileName).path
val FIVE_GB = 5_000_000_000L

val originKey = UrlUtils.decode(origin)
val targetKey = UrlUtils.decode(target)

val copyOptions = CopyOptions(overwriteTarget = false, attributes.mediaType)

def copy = {
if (attributes.bytes >= FIVE_GB) {
logger.info(s"Attempting to copy a large file from $importBucket/$originKey to $targetBucket/$targetKey") >>
s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions)
} else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions)
}.timed.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.")
)
}

for {
isObject <- s3StorageClient.objectExists(importBucket, originKey)
isFolder <-
if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents)
_ <- IO.whenA(isObject) { copy }
_ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") }
_ <- IO.whenA(!isFolder && !isObject) {
logger.error(s"$target is neither an object or folder, something is wrong.")
}
} yield if (isObject) CopySuccess(target) else CopySkipped
}.retry(copyRetryStrategy)
}

def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(CopySuccess(attributes.path))
Expand Down

0 comments on commit 7cee429

Please sign in to comment.