From 9d55a6383c8382cf9b3e3e9ba872430f4df5d9ed Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 6 May 2024 13:44:00 +0200 Subject: [PATCH] Add progress logging, allow to skip file events (#4932) * Add progress logging, allow to skip file events * Run static analysis for all modules --------- Co-authored-by: Simon Dumas --- build.sbt | 1 - .../operations/s3/S3FileOperations.scala | 2 +- ship/src/main/resources/ship-default.conf | 3 +++ .../bluebrain/nexus/ship/EventProcessor.scala | 22 ++++++++++--------- .../nexus/ship/config/InputConfig.scala | 3 ++- .../nexus/ship/files/FileProcessor.scala | 11 +++++++++- .../ship/config/ShipConfigFixtures.scala | 3 ++- 7 files changed, 30 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index bd3330c316..099e4b05a5 100755 --- a/build.sbt +++ b/build.sbt @@ -1085,7 +1085,6 @@ val coreModules = List("kernel", "rdf", "sdk", "sourcingPsql", "testkit") val staticAnalysis = s""" |scalafmtSbtCheck ; - |project delta ; |scalafmtCheck ; |Test/scalafmtCheck ; |scapegoat ; diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index 3140fc4e59..7474134d50 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -85,7 +85,7 @@ object S3FileOperations { uuidF: UUIDF ): IO[S3FileMetadata] = { for { - _ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path") + _ <- log.debug(s"Fetching attributes for S3 file. Bucket $bucket at path $path") resp <- client.headObject(bucket, path.toString()) metadata <- mkS3Metadata(path, resp) } yield metadata diff --git a/ship/src/main/resources/ship-default.conf b/ship/src/main/resources/ship-default.conf index b789b1ceb2..15b1b15ef8 100644 --- a/ship/src/main/resources/ship-default.conf +++ b/ship/src/main/resources/ship-default.conf @@ -85,6 +85,9 @@ ship { # If true, no resource validation is performed disable-resource-validation = false + # To skip file events to make the batch run faster and focus on other events + skip-file-events = false + storages { # S3 compatible storage configuration diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala index 0615131d50..fe17486163 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/EventProcessor.scala @@ -37,18 +37,20 @@ object EventProcessor { } eventStream .evalScan(ImportReport.start) { case (report, event) => + val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped } processorsMap.get(event.`type`) match { case Some(processor) => - processor - .evaluate(event) - .map { status => - report + (event, status) - } - .onError { err => - logger.error(err)( - s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." - ) - } + IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}")) >> + processor + .evaluate(event) + .map { status => + report + (event, status) + } + .onError { err => + logger.error(err)( + s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'." + ) + } case None => logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >> IO.pure(report + (event, ImportStatus.Dropped)) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala index d758dc66ef..5035d4a083 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/InputConfig.scala @@ -22,7 +22,8 @@ final case class InputConfig( storages: StoragesConfig, importBucket: String, targetBucket: String, - disableResourceValidation: Boolean + disableResourceValidation: Boolean, + skipFileEvents: Boolean ) object InputConfig { diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala index b3a73ed742..a60837a648 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileProcessor.scala @@ -93,6 +93,14 @@ object FileProcessor { private val logger = Logger[FileProcessor] + private val noop = new EventProcessor[FileEvent] { + override def resourceType: EntityType = Files.entityType + + override def decoder: Decoder[FileEvent] = FileEvent.serializer.codec + + override def evaluate(event: FileEvent): IO[ImportStatus] = IO.pure(ImportStatus.Dropped) + } + def apply( fetchContext: FetchContext, s3Client: S3StorageClient, @@ -101,7 +109,8 @@ object FileProcessor { config: InputConfig, clock: EventClock, xas: Transactors - )(implicit jsonLdApi: JsonLdApi): FileProcessor = { + )(implicit jsonLdApi: JsonLdApi): EventProcessor[FileEvent] = if (config.skipFileEvents) noop + else { val storages = StorageWiring.storages(fetchContext, rcr, config, clock, xas) diff --git a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala index 24f3a86f42..1d36dd3947 100644 --- a/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala +++ b/ship/src/test/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfigFixtures.scala @@ -59,7 +59,8 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp StoragesConfig(eventLogConfig, pagination, config.copy(amazon = Some(amazonConfig))), importBucket, targetBucket, - disableResourceValidation = false + disableResourceValidation = false, + skipFileEvents = false ) }