Skip to content

Commit

Permalink
Add progress logging, allow to skip file events (#4932)
Browse files Browse the repository at this point in the history
* Add progress logging, allow to skip file events

* Run static analysis for all modules

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored May 6, 2024
1 parent 3869746 commit 9d55a63
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 15 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ val coreModules = List("kernel", "rdf", "sdk", "sourcingPsql", "testkit")
val staticAnalysis =
s"""
|scalafmtSbtCheck ;
|project delta ;
|scalafmtCheck ;
|Test/scalafmtCheck ;
|scapegoat ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object S3FileOperations {
uuidF: UUIDF
): IO[S3FileMetadata] = {
for {
_ <- log.info(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
_ <- log.debug(s"Fetching attributes for S3 file. Bucket $bucket at path $path")
resp <- client.headObject(bucket, path.toString())
metadata <- mkS3Metadata(path, resp)
} yield metadata
Expand Down
3 changes: 3 additions & 0 deletions ship/src/main/resources/ship-default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ ship {
# If true, no resource validation is performed
disable-resource-validation = false

# To skip file events to make the batch run faster and focus on other events
skip-file-events = false

storages {

# S3 compatible storage configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ object EventProcessor {
}
eventStream
.evalScan(ImportReport.start) { case (report, event) =>
val processed = report.progress.foldLeft(0L) { case (acc, (_, stats)) => acc + stats.success + stats.dropped }
processorsMap.get(event.`type`) match {
case Some(processor) =>
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
IO.whenA(processed % 1000 == 0)(logger.info(s"Current progress is: ${report.progress}")) >>
processor
.evaluate(event)
.map { status =>
report + (event, status)
}
.onError { err =>
logger.error(err)(
s"Error while processing event with offset '${event.ordering.value}' with processor '${event.`type`}'."
)
}
case None =>
logger.warn(s"No processor is provided for '${event.`type`}', skipping...") >>
IO.pure(report + (event, ImportStatus.Dropped))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ final case class InputConfig(
storages: StoragesConfig,
importBucket: String,
targetBucket: String,
disableResourceValidation: Boolean
disableResourceValidation: Boolean,
skipFileEvents: Boolean
)

object InputConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ object FileProcessor {

private val logger = Logger[FileProcessor]

private val noop = new EventProcessor[FileEvent] {
override def resourceType: EntityType = Files.entityType

override def decoder: Decoder[FileEvent] = FileEvent.serializer.codec

override def evaluate(event: FileEvent): IO[ImportStatus] = IO.pure(ImportStatus.Dropped)
}

def apply(
fetchContext: FetchContext,
s3Client: S3StorageClient,
Expand All @@ -101,7 +109,8 @@ object FileProcessor {
config: InputConfig,
clock: EventClock,
xas: Transactors
)(implicit jsonLdApi: JsonLdApi): FileProcessor = {
)(implicit jsonLdApi: JsonLdApi): EventProcessor[FileEvent] = if (config.skipFileEvents) noop
else {

val storages = StorageWiring.storages(fetchContext, rcr, config, clock, xas)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ trait ShipConfigFixtures extends ConfigFixtures with StorageFixtures with Classp
StoragesConfig(eventLogConfig, pagination, config.copy(amazon = Some(amazonConfig))),
importBucket,
targetBucket,
disableResourceValidation = false
disableResourceValidation = false,
skipFileEvents = false
)

}

0 comments on commit 9d55a63

Please sign in to comment.