From efab603ebd4610768e5a34bbfbab29a3ba432210 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 25 Sep 2024 14:13:05 +0200 Subject: [PATCH] Refactoring failed elem to allow adding further details, introduce the projection to validate resources within a project (#5150) * Refactoring failed elem to allow adding further details, use json to make it more structured --------- Co-authored-by: Simon Dumas --- delta/app/src/main/resources/app.conf | 6 + .../nexus/delta/wiring/SchemasModule.scala | 26 +++- .../delta/routes/SchemasRoutesSpec.scala | 4 +- .../routes/list-indexing-errors.json | 22 ++- .../BlazegraphViewsIndexingRoutesSpec.scala | 4 +- .../compositeviews/CompositeViews.scala | 2 +- .../deletion/CompositeViewsDeletionTask.scala | 2 +- .../routes/list-indexing-errors.json | 22 ++- .../CompositeViewsIndexingRoutesSpec.scala | 2 +- .../indexing/ElasticSearchSink.scala | 29 ++-- .../routes/list-indexing-errors.json | 22 ++- .../indexing/ElasticSearchSinkSuite.scala | 17 ++- .../ElasticSearchIndexingRoutesSpec.scala | 3 +- .../storages/StorageDeletionTask.scala | 4 +- .../plugins/storage/storages/Storages.scala | 5 +- .../nexus/delta/sdk/ResourceShift.scala | 11 +- .../sdk/resolvers/ResourceResolution.scala | 5 +- .../nexus/delta/sdk/resources/Resources.scala | 10 ++ .../delta/sdk/resources/ResourcesImpl.scala | 6 +- .../sdk/resources/ValidateResource.scala | 6 +- .../nexus/delta/sdk/schemas/FetchSchema.scala | 41 +++--- .../delta/sdk/schemas/SchemasConfig.scala | 3 +- .../nexus/delta/sdk/schemas/SchemasImpl.scala | 8 +- .../job/SchemaValidationCoordinator.scala | 46 ++++++ .../schemas/job/SchemaValidationStream.scala | 51 +++++++ .../delta/sdk/generators/SchemaGen.scala | 18 ++- .../resources/ValidateResourceFixture.scala | 26 +++- .../delta/sdk/schemas/SchemasImplSuite.scala | 3 +- .../SchemaValidationCoordinatorSuite.scala | 133 ++++++++++++++++++ ...V1_11_M02_001__change_failed_elem_logs.ddl | 6 + .../delta/sourcing/PurgeElemFailures.scala | 12 +- .../sourcing/model/FailedElemLogRow.scala | 31 ++-- .../projections/FailedElemLogStore.scala | 33 +++-- .../projections/ProjectionErrors.scala | 10 ++ .../nexus/delta/sourcing/stream/Elem.scala | 38 ++--- .../delta/sourcing/stream/FailureReason.scala | 40 ++++++ .../delta/sourcing/stream/NoopSink.scala | 5 +- .../delta/sourcing/stream/Supervisor.scala | 11 +- .../projections/FailedElemLogStoreSuite.scala | 36 +++-- .../stream/ProjectionAssertions.scala | 35 +++++ .../nexus/ship/resources/ResourceWiring.scala | 2 +- 41 files changed, 626 insertions(+), 170 deletions(-) create mode 100644 delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala create mode 100644 delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala create mode 100644 delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala create mode 100644 delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala rename delta/sourcing-psql/src/{test => main}/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala (78%) create mode 100644 delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionAssertions.scala diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index 4bcea20f0d..446dbd9343 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -282,6 +282,12 @@ app { schemas { # the schemas event-log configuration event-log = ${app.defaults.event-log} + cache { + # The max number of schemas in cache + max-size = 50 + # The duration after an entry in the cache expires + expire-after = 5 minutes + } } # Type hierarchy configuration diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala index 016021523f..18d29ddd85 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/SchemasModule.scala @@ -21,11 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers} -import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource +import ch.epfl.bluebrain.nexus.delta.sdk.resources.{FetchResource, Resources, ValidateResource} import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.{SchemaDefinition, SchemaLog} import ch.epfl.bluebrain.nexus.delta.sdk.schemas._ +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.{SchemaValidationCoordinator, SchemaValidationStream} import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaEvent} import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors} import izumi.distage.model.definition.{Id, ModuleDef} @@ -36,6 +38,8 @@ object SchemasModule extends ModuleDef { implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass) + make[SchemasConfig].from { config: AppConfig => config.schemas } + make[ValidateShacl].fromEffect { (rcr: RemoteContextResolution @Id("aggregate")) => ValidateShacl(rcr) } make[ValidateSchema].from { (validateShacl: ValidateShacl, api: JsonLdApi) => ValidateSchema(validateShacl)(api) } @@ -44,8 +48,8 @@ object SchemasModule extends ModuleDef { Schemas.definition(validateSchema, clock) } - make[SchemaLog].from { (scopedDefinition: SchemaDefinition, config: AppConfig, xas: Transactors) => - ScopedEventLog(scopedDefinition, config.schemas.eventLog, xas) + make[SchemaLog].from { (scopedDefinition: SchemaDefinition, config: SchemasConfig, xas: Transactors) => + ScopedEventLog(scopedDefinition, config.eventLog, xas) } make[FetchSchema].from { (schemaLog: SchemaLog) => @@ -79,6 +83,22 @@ object SchemasModule extends ModuleDef { SchemaImports(aclCheck, resolvers, fetchSchema, fetchResource) } + make[SchemaValidationStream].fromEffect { + (resources: Resources, fetchSchema: FetchSchema, validateResource: ValidateResource, config: SchemasConfig) => + FetchSchema.cached(fetchSchema, config.cache).map { cached => + SchemaValidationStream( + resources.currentStates, + cached, + validateResource + ) + } + + } + + make[SchemaValidationCoordinator].from { (supervisor: Supervisor, schemaValidationStream: SchemaValidationStream) => + SchemaValidationCoordinator(supervisor, schemaValidationStream) + } + make[SchemasRoutes].from { ( identities: Identities, diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutesSpec.scala index c86bf1da82..0ed7191d6a 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemasRoutesSpec.scala @@ -76,10 +76,8 @@ class SchemasRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues { private val fetchContext = FetchContextDummy(List(project.value)) private val groupDirectives = DeltaSchemeDirectives(fetchContext) - private val config = SchemasConfig(eventLogConfig) - private val schemaDef = Schemas.definition(ValidateSchema(ValidateShacl(rcr).accepted), clock) - private lazy val schemaLog = ScopedEventLog(schemaDef, config.eventLog, xas) + private lazy val schemaLog = ScopedEventLog(schemaDef, eventLogConfig, xas) private lazy val routes = Route.seal( diff --git a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json index 8c2880cfbe..f0921daf06 100644 --- a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json @@ -7,25 +7,35 @@ "_total": 2, "_results": [ { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, "project": "org/proj", - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } }, { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } } ] } \ No newline at end of file diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala index b802baaee8..9f7deaccaa 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment.{IriSegment, StringSegment} +import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous @@ -127,7 +128,6 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures { } "fail to restart offset from view without resources/write permission" in { - Delete(s"$viewEndpoint/offset") ~> routes ~> check { response.shouldBeForbidden } @@ -155,7 +155,7 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures { aclCheck.append(AclAddress.Root, Anonymous -> Set(permissions.write)).accepted Get(s"$viewEndpoint/failures") ~> routes ~> check { response.status shouldBe StatusCodes.OK - response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json") + response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json") } } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala index 70e41ffce6..bb17a8ee6d 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala @@ -297,7 +297,7 @@ final class CompositeViews private ( /** * Return all existing views for the given project in a finite stream */ - def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] = + def currentViews(project: ProjectRef): SuccessElemStream[CompositeViewDef] = log.currentStates(Scope.Project(project)).map(toCompositeViewDef) /** diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/deletion/CompositeViewsDeletionTask.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/deletion/CompositeViewsDeletionTask.scala index 2a28c218a4..a521d19d07 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/deletion/CompositeViewsDeletionTask.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/deletion/CompositeViewsDeletionTask.scala @@ -43,7 +43,7 @@ object CompositeViewsDeletionTask { def apply(views: CompositeViews) = new CompositeViewsDeletionTask( - project => views.currentViews(project).evalMapFilter(_.toIO), + project => views.currentViews(project).map(_.value), (v: ActiveViewDef, subject: Subject) => views .internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject) diff --git a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json index 1af24479be..3b3327233c 100644 --- a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json @@ -7,25 +7,35 @@ "_total": 2, "_results": [ { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, "project": "myorg/myproj", - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } }, { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } } ] } \ No newline at end of file diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala index 54892beb7f..f8a4688c74 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala @@ -238,7 +238,7 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures { "return failures as a listing" in { Get(s"$viewEndpoint/failures") ~> asWriter ~> routes ~> check { response.status shouldBe StatusCodes.OK - response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json") + response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json") } } } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala index 2237841424..dda5ea9760 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala @@ -7,16 +7,16 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchA import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.{MixedOutcomes, Success} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.{BulkResponse, Refresh} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchAction, ElasticSearchClient, IndexLabel} +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, FailureReason} import fs2.Chunk import io.circe.{Json, JsonObject} import shapeless.Typeable import scala.concurrent.duration.FiniteDuration -import scala.util.control.NoStackTrace /** * Sink that pushes json documents into an Elasticsearch index @@ -99,16 +99,9 @@ object ElasticSearchSink { case element: FailedElem => element case element => items.get(documentId(element)) match { - case None => - element.failed( - BulkUpdateException( - JsonObject( - "reason" -> Json.fromString(s"${element.id} was not found in Elasticsearch response") - ) - ) - ) + case None => element.failed(onMissingInResponse(element.id)) case Some(MixedOutcomes.Outcome.Success) => element.void - case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(BulkUpdateException(json)) + case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(onIndexingFailure(element.id, json)) } } } @@ -173,7 +166,15 @@ object ElasticSearchSink { refresh ) - final case class BulkUpdateException(json: JsonObject) - extends Exception("Error updating elasticsearch: " + Json.fromJsonObject(json).noSpaces) - with NoStackTrace + private def onMissingInResponse(id: Iri) = FailureReason( + "MissingInResponse", + s"$id was not found in Elasticsearch response", + JsonObject.empty + ) + + private def onIndexingFailure(id: Iri, error: JsonObject) = FailureReason( + "IndexingFailure", + s"$id could not be indexed", + error + ) } diff --git a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json index 5a9cb102e4..b58a981b79 100644 --- a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json @@ -7,25 +7,35 @@ "_total": 2, "_results": [ { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, "project": "myorg/myproject", - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } }, { - "errorType": "java.lang.Exception", "id": "https://bluebrain.github.io/nexus/vocabulary/myid", - "message": "boom", "offset": { "@type": "At", "value": 42 }, - "_rev": 1 + "_rev": 1, + "reason": { + "type": "UnexpectedError", + "message": "boom", + "details": { + "exception" : "java.lang.Exception" + } + } } ] } \ No newline at end of file diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala index e15084b7ce..edf0551cf0 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala @@ -5,13 +5,13 @@ import akka.http.scaladsl.model.Uri.Query import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder} -import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink.BulkUpdateException import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.FailureReason import ch.epfl.bluebrain.nexus.testkit.CirceLiteral import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import fs2.Chunk @@ -106,10 +106,17 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi // The failed elem should be return intact _ = assertEquals(Some(failed), result.headOption) // The invalid one should hold the Elasticsearch error - _ = assert( - result.lift(1).flatMap(_.toThrowable).exists(_.isInstanceOf[BulkUpdateException]), - "We expect a 'BulkUpdateException' as an error here" - ) + _ = result.lift(1) match { + case Some(f: FailedElem) => + f.throwable match { + case reason: FailureReason => + assertEquals(reason.`type`, "IndexingFailure") + val detailKeys = reason.details.asObject.map(_.keys.toSet) + assertEquals(detailKeys, Some(Set("type", "reason", "caused_by"))) + case t => fail(s"An indexing failure was expected, got '$t'", t) + } + case other => fail(s"A failed elem was expected, got '$other'") + } // The valid one should remain a success and hold a Unit value _ = assert(result.lift(2).flatMap(_.toOption).contains(())) _ <- client diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala index 7950070b34..6271847492 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala @@ -17,6 +17,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment.{IriSegment, StringSegm import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, FetchContextDummy} import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution +import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef} import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous @@ -193,7 +194,7 @@ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures { aclCheck.append(AclAddress.Root, Anonymous -> Set(esPermissions.write)).accepted Get(s"$viewEndpoint/failures") ~> routes ~> check { response.status shouldBe StatusCodes.OK - response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json") + response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json") } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageDeletionTask.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageDeletionTask.scala index 4ec2a2e2a0..d9f11e7e1b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageDeletionTask.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/StorageDeletionTask.scala @@ -64,8 +64,6 @@ object StorageDeletionTask { new StorageDeletionTask(project => storages .currentStorages(project) - .evalMapFilter { - _.map(_.value).toIO - } + .map(_.value.value) ) } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala index 12ddaaa171..4bb9b0f6d1 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala @@ -29,8 +29,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution import ch.epfl.bluebrain.nexus.delta.sourcing._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, SuccessElemStream} import fs2.Stream import io.circe.Json import org.typelevel.log4cats @@ -254,7 +253,7 @@ final class Storages private ( /** * Return the existing storages in a project in a finite stream */ - def currentStorages(project: ProjectRef): Stream[IO, Elem[StorageState]] = + def currentStorages(project: ProjectRef): SuccessElemStream[StorageState] = log.currentStates(Scope.Project(project)).map { e => e.withProject(e.value.project) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala index c7593d9bb4..986a413e05 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala @@ -81,7 +81,16 @@ abstract class ResourceShift[State <: ScopedState, A, M]( def toGraphResourceElem(project: ProjectRef, resource: ResourceF[A])(implicit cr: RemoteContextResolution ): IO[Elem[GraphResource]] = toGraphResource(project, resource).redeem( - err => FailedElem(entityType, resource.id, Some(project), resource.updatedAt, Offset.Start, err, resource.rev), + err => + FailedElem( + entityType, + resource.id, + Some(project), + resource.updatedAt, + Offset.Start, + err, + resource.rev + ), graph => SuccessElem(entityType, resource.id, Some(project), resource.updatedAt, Offset.Start, graph, resource.rev) ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResourceResolution.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResourceResolution.scala index 0f350c51b0..132cd1c116 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResourceResolution.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResourceResolution.scala @@ -110,14 +110,15 @@ object ResourceResolution { resolvers: Resolvers, fetchSchema: FetchSchema, excludeDeprecated: Boolean - ): ResourceResolution[Schema] = + ): ResourceResolution[Schema] = { apply( aclCheck, resolvers, - fetchSchema.fetch _, + fetchSchema.option _, Permissions.schemas.read, excludeDeprecated ) + } private def deprecationCheck[R](excludeDeprecated: Boolean) = DeprecationCheck[ResourceF[R]](excludeDeprecated, _.deprecated) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/Resources.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/Resources.scala index 18e938419b..84c9e76a7b 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/Resources.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/Resources.scala @@ -20,6 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEntityDefinition, ScopedEventLog, StateMachine} import io.circe.Json @@ -260,6 +261,15 @@ trait Resources { projectRef: ProjectRef ): IO[DataResource] = fetch(IdSegmentRef(resourceRef), projectRef, None) + + /** + * Return the current states of resources for the given project + * @param project + * the project where the resources belong + * @param offset + * the offset to start from + */ + def currentStates(project: ProjectRef, offset: Offset): SuccessElemStream[ResourceState] } object Resources { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ResourcesImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ResourcesImpl.scala index 21d09a89e4..fb83b763f6 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ResourcesImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ResourcesImpl.scala @@ -20,8 +20,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.{NoCh import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{ResourceCommand, ResourceEvent, ResourceRejection, ResourceState} import ch.epfl.bluebrain.nexus.delta.sourcing._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import io.circe.Json final class ResourcesImpl private ( @@ -188,6 +189,9 @@ final class ResourcesImpl private ( |in project '${cmd.project}', returning the original value.""".stripMargin logger.info(message).as(currentState.toResource) } + + override def currentStates(project: ProjectRef, offset: Offset): SuccessElemStream[ResourceState] = + log.currentStates(Scope(project), offset) } object ResourcesImpl { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResource.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResource.scala index 98f8264503..89d5b6fcbc 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResource.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResource.scala @@ -7,6 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts import ch.epfl.bluebrain.nexus.delta.rdf.shacl.{ValidateShacl, ValidationReport} +import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceF import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources.kamonComponent @@ -43,10 +44,7 @@ trait ValidateResource { * @param schema * the schema to validate against */ - def apply( - jsonld: JsonLdAssembly, - schema: ResourceF[Schema] - ): IO[ValidationResult] + def apply(jsonld: JsonLdAssembly, schema: SchemaResource): IO[ValidationResult] } object ValidateResource { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/FetchSchema.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/FetchSchema.scala index 484549f69f..b18cf32b67 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/FetchSchema.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/FetchSchema.scala @@ -1,44 +1,43 @@ package ch.epfl.bluebrain.nexus.delta.sdk.schemas import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.cache.{CacheConfig, LocalCache} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource import ch.epfl.bluebrain.nexus.delta.sdk.model.Fetch.FetchF -import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef -import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef.{Latest, Revision, Tag} import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.SchemaRejection.{RevisionNotFound, SchemaNotFound, TagNotFound} import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaRejection, SchemaState} import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEventLogReadOnly +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.{Latest, Revision, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} trait FetchSchema { - /** Fetch the referenced resource in the given project */ - def fetch(ref: ResourceRef, project: ProjectRef): FetchF[Schema] - - def stateOrNotFound(id: IdSegmentRef, iri: Iri, ref: ProjectRef): IO[SchemaState] + /** Fetch the referenced schema in the given project */ + def apply(ref: ResourceRef, project: ProjectRef): IO[SchemaResource] + def option(ref: ResourceRef, project: ProjectRef): FetchF[Schema] = apply(ref, project).option } object FetchSchema { - def apply(log: ScopedEventLogReadOnly[Iri, SchemaState, SchemaRejection]): FetchSchema = { + def cached(underlying: FetchSchema, config: CacheConfig): IO[FetchSchema] = + LocalCache[(ResourceRef, ProjectRef), SchemaResource](config).map { + cache => (ref: ResourceRef, project: ProjectRef) => + cache.getOrElseUpdate((ref, project), underlying(ref, project)) + } - def notFound(iri: Iri, ref: ProjectRef) = SchemaNotFound(iri, ref) + def apply(log: ScopedEventLogReadOnly[Iri, SchemaState, SchemaRejection]): FetchSchema = { - new FetchSchema { - override def fetch(ref: ResourceRef, project: ProjectRef): FetchF[Schema] = - stateOrNotFound(IdSegmentRef(ref), ref.iri, project).attempt - .map(_.toOption) - .map(_.map(_.toResource)) + def notFound(iri: Iri, project: ProjectRef) = SchemaNotFound(iri, project) - override def stateOrNotFound(id: IdSegmentRef, iri: Iri, ref: ProjectRef): IO[SchemaState] = - id match { - case Latest(_) => log.stateOr(ref, iri, notFound(iri, ref)) - case Revision(_, rev) => log.stateOr(ref, iri, rev, notFound(iri, ref), RevisionNotFound) - case Tag(_, tag) => log.stateOr(ref, iri, tag, notFound(iri, ref), TagNotFound(tag)) + (ref: ResourceRef, project: ProjectRef) => + { + ref match { + case Latest(iri) => log.stateOr(project, iri, notFound(iri, project)) + case Revision(_, iri, rev) => log.stateOr(project, iri, rev, notFound(iri, project), RevisionNotFound) + case Tag(_, iri, tag) => log.stateOr(project, iri, tag, notFound(iri, project), TagNotFound(tag)) } - } - + }.map(_.toResource) } - } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasConfig.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasConfig.scala index 31697bf3de..0629dfc0fb 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasConfig.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasConfig.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.schemas +import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader @@ -10,7 +11,7 @@ import pureconfig.generic.semiauto.deriveReader * @param eventLog * configuration of the event log */ -final case class SchemasConfig(eventLog: EventLogConfig) +final case class SchemasConfig(eventLog: EventLogConfig, cache: CacheConfig) object SchemasConfig { implicit final val schemasConfigReader: ConfigReader[SchemasConfig] = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImpl.scala index 39e6733df9..8faf9bd110 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImpl.scala @@ -139,10 +139,10 @@ final class SchemasImpl private ( override def fetch(id: IdSegmentRef, projectRef: ProjectRef): IO[SchemaResource] = { for { - pc <- fetchContext.onRead(projectRef) - iri <- expandIri(id.value, pc) - state <- FetchSchema(log).stateOrNotFound(id, iri, projectRef) - } yield state.toResource + pc <- fetchContext.onRead(projectRef) + ref <- expandIri(id, pc) + resource <- FetchSchema(log)(ref, projectRef) + } yield resource }.span("fetchSchema") private def eval(cmd: SchemaCommand) = diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala new file mode 100644 index 0000000000..e0db77524f --- /dev/null +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinator.scala @@ -0,0 +1,46 @@ +package ch.epfl.bluebrain.nexus.delta.sdk.schemas.job + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ + +/** + * Allows to run a revalidation of the different data resouces in the given project + */ +trait SchemaValidationCoordinator { + + def run(project: ProjectRef): IO[Unit] + +} + +object SchemaValidationCoordinator { + + private val logger = Logger[SchemaValidationCoordinator] + + private[job] def projectionMetadata(project: ProjectRef): ProjectionMetadata = + ProjectionMetadata("schema", s"schema-$project-validate-resources", Some(project), None) + + def apply(supervisor: Supervisor, schemaValidationStream: SchemaValidationStream): SchemaValidationCoordinator = + new SchemaValidationCoordinator { + + private def compile(project: ProjectRef): IO[CompiledProjection] = + IO.fromEither( + CompiledProjection.compile( + projectionMetadata(project), + ExecutionStrategy.PersistentSingleNode, + Source(schemaValidationStream(project, _)), + new NoopSink[Unit] + ) + ) + + override def run(project: ProjectRef): IO[Unit] = { + for { + _ <- logger.info(s"Starting validation of resources for project '$project'") + compiled <- compile(project) + _ <- supervisor.destroy(compiled.metadata.name) + _ <- supervisor.run(compiled) + } yield () + } + } +} diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala new file mode 100644 index 0000000000..cfca1556cb --- /dev/null +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationStream.scala @@ -0,0 +1,51 @@ +package ch.epfl.bluebrain.nexus.delta.sdk.schemas.job + +import cats.effect.IO +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas +import ch.epfl.bluebrain.nexus.delta.sdk.resources.ValidateResource +import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{ResourceRejection, ResourceState} +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.Latest +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.FailureReason + +/** + * Streams the latest version of resources from a project and revalidate them with the latest version of the schema + * they are currently validated with. + * - Deprecated resources are skipped + * - Resources not validated with a schema are skipped too + */ +trait SchemaValidationStream { + + def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] +} + +object SchemaValidationStream { + + def apply( + resourceStream: (ProjectRef, Offset) => SuccessElemStream[ResourceState], + fetchSchema: FetchSchema, + validateResource: ValidateResource + ): SchemaValidationStream = new SchemaValidationStream { + + private def validateSingle(resource: ResourceState) = + for { + jsonld <- IO.fromEither(resource.toAssembly) + schema <- fetchSchema(Latest(resource.schema.iri), resource.schemaProject) + _ <- validateResource(jsonld, schema).adaptErr { case r: ResourceRejection => + FailureReason("ValidateSchema", r.reason, r) + } + } yield (Some(())) + + override def apply(project: ProjectRef, offset: Offset): ElemStream[Unit] = + resourceStream(project, offset).evalMap { + _.evalMapFilter { + case r if r.deprecated => IO.none + case r if r.schema.iri == schemas.resources => IO.none + case r => validateSingle(r) + } + } + } +} diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/generators/SchemaGen.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/generators/SchemaGen.scala index 49453d6829..202a49f329 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/generators/SchemaGen.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/generators/SchemaGen.scala @@ -4,7 +4,7 @@ import cats.data.NonEmptyList import cats.effect.IO import cats.effect.unsafe.implicits._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.{CompactedJsonLd, ExpandedJsonLd} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource @@ -22,6 +22,22 @@ object SchemaGen { // We put a lenient api for schemas otherwise the api checks data types before the actual schema validation process implicit val api: JsonLdApi = JsonLdJavaApi.lenient + def empty(id: Iri, project: ProjectRef) = + SchemaState( + id, + project, + Json.obj(), + CompactedJsonLd.empty, + NonEmptyList.of(ExpandedJsonLd.empty), + 1, + deprecated = false, + Tags.empty, + Instant.EPOCH, + Anonymous, + Instant.EPOCH, + Anonymous + ).toResource + def currentState( schema: Schema, rev: Int = 1, diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResourceFixture.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResourceFixture.scala index 4cf42222d8..a8328cd302 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResourceFixture.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/resources/ValidateResourceFixture.scala @@ -3,12 +3,13 @@ package ch.epfl.bluebrain.nexus.delta.sdk.resources import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidationReport +import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdAssembly import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceF import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.model.ResourceResolutionReport +import ch.epfl.bluebrain.nexus.delta.sdk.resources.SchemaClaim.DefinedSchemaClaim import ch.epfl.bluebrain.nexus.delta.sdk.resources.ValidationResult._ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection -import ch.epfl.bluebrain.nexus.delta.sdk.resources.SchemaClaim.DefinedSchemaClaim import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceRejection.InvalidSchemaRejection import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} @@ -84,4 +85,27 @@ trait ValidateResourceFixture { ) } + def validateForResources(validResources: Set[Iri]): ValidateResource = new ValidateResource { + + override def apply(jsonld: JsonLdAssembly, schema: SchemaClaim, enforceSchema: Boolean): IO[ValidationResult] = ??? + + override def apply(jsonld: JsonLdAssembly, schema: SchemaResource): IO[ValidationResult] = + if (validResources.contains(jsonld.id)) { + IO.pure( + Validated( + schema.value.project, + ResourceRef.Revision(schema.id, schema.rev), + defaultReport + ) + ) + } else + IO.raiseError( + InvalidSchemaRejection( + ResourceRef.Revision(schema.id, schema.rev), + schema.value.project, + ResourceResolutionReport() + ) + ) + } + } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImplSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImplSuite.scala index 0dd7ae6475..929aeb136b 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImplSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/SchemasImplSuite.scala @@ -72,10 +72,9 @@ class SchemasImplSuite extends NexusSuite with Doobie.Fixture with ConfigFixture private val tag = UserTag.unsafe("tag") private val fetchContext = FetchContextDummy(Map(project.ref -> project.context), Set(projectDeprecated.ref)) - private val config = SchemasConfig(eventLogConfig) private val schemaDef = Schemas.definition(ValidateSchema(ValidateShacl(rcr).accepted), clock) - private lazy val schemaLog = ScopedEventLog(schemaDef, config.eventLog, xas) + private lazy val schemaLog = ScopedEventLog(schemaDef, eventLogConfig, xas) private lazy val schemas: Schemas = SchemasImpl(schemaLog, fetchContext, schemaImports, resolverContextResolution) diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala new file mode 100644 index 0000000000..d233e4764a --- /dev/null +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala @@ -0,0 +1,133 @@ +package ch.epfl.bluebrain.nexus.delta.sdk.schemas.job + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{nxv, schemas} +import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource +import ch.epfl.bluebrain.nexus.delta.sdk.generators.SchemaGen +import ch.epfl.bluebrain.nexus.delta.sdk.model.Tags +import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.ResourceState +import ch.epfl.bluebrain.nexus.delta.sdk.resources.{ResourceInstanceFixture, Resources, ValidateResourceFixture} +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.FetchSchema +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator.projectionMetadata +import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.SchemaRejection.SchemaNotFound +import ch.epfl.bluebrain.nexus.delta.sdk.utils.Fixtures +import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef.{Latest, Revision} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply +import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import ch.epfl.bluebrain.nexus.testkit.mu.ce.PatienceConfig +import fs2.Stream +import munit.AnyFixture + +import scala.concurrent.duration._ + +import java.time.Instant + +class SchemaValidationCoordinatorSuite + extends NexusSuite + with Fixtures + with SupervisorSetup.Fixture + with ProjectionAssertions + with ResourceInstanceFixture + with ValidateResourceFixture { + + override def munitFixtures: Seq[AnyFixture[_]] = List(supervisor) + + implicit private val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, 10.millis) + + private lazy val (sv, projections, projectionErrors) = unapply(supervisor()) + + private val project = ProjectRef.unsafe("org", "proj") + private val projectionName = projectionMetadata(project).name + + private val validResource = nxv + "valid" + private val invalidResourceId = nxv + "invalid" + private val deprecated = nxv + "deprecated" + private val unconstrained = nxv + "unconstrained" + + private val schemaId = nxv + "myschema" + private val noSchema = schemas.resources + + private val schema = SchemaGen.empty(schemaId, project) + + private val fetchSchema = new FetchSchema { + + /** Fetch the referenced schema in the given project */ + override def apply(ref: ResourceRef, project: ProjectRef): IO[SchemaResource] = + (ref, project) match { + case (Latest(`schemaId`), `project`) => IO.pure(schema) + case _ => IO.raiseError(SchemaNotFound(ref.iri, project)) + } + } + + private def createResource(id: Iri, deprecated: Boolean, schemaId: Iri) = + ResourceState( + id, + projectRef, + projectRef, + source, + compacted, + expanded, + remoteContexts, + rev = 1, + deprecated = deprecated, + Revision(schemaId, 1), + types, + Tags.empty, + createdAt = Instant.EPOCH, + createdBy = Anonymous, + updatedAt = Instant.EPOCH, + updatedBy = Anonymous + ) + + private val validateResource = validateForResources(Set(validResource)) + + private def runValidation(resources: ResourceState*) = { + val stream = Stream.emits(resources.zipWithIndex).map { case (resource, index) => + SuccessElem( + Resources.entityType, + resource.id, + Some(resource.project), + resource.updatedAt, + Offset.at(index.toLong + 1), + resource, + resource.rev + ) + } + val schemaValidationStream = SchemaValidationStream( + (_, _) => stream, + fetchSchema, + validateResource + ) + + SchemaValidationCoordinator(sv, schemaValidationStream).run(project) + } + + test("Revalidate resources from a project") { + for { + _ <- runValidation( + // Valid resource - success + createResource(validResource, deprecated = false, schemaId), + // Deprecated resource - dropped + createResource(deprecated, deprecated = true, schemaId), + // Unconstrained resource - dropped + createResource(unconstrained, deprecated = false, noSchema), + // Invalid resource - dropped + createResource(invalidResourceId, deprecated = false, schemaId) + ) + _ <- waitProjectionCompletion(sv, projectionName) + expectedProgress = ProjectionProgress(Offset.at(4L), Instant.EPOCH, 4, 2, 1) + _ <- assertProgress(projections, projectionName)(expectedProgress) + _ <- projectionErrors + .failedElemEntries(projectionName, Offset.start) + .map { error => error.failedElemData.id -> error.failedElemData.reason.`type` } + .assert(invalidResourceId -> "ValidateSchema") + } yield () + } + +} diff --git a/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl new file mode 100644 index 0000000000..ef0e6bd6e0 --- /dev/null +++ b/delta/sourcing-psql/src/main/resources/scripts/postgres/init/V1_11_M02_001__change_failed_elem_logs.ddl @@ -0,0 +1,6 @@ +-- Stacktrace and message are now deprecated so they can be nullable +ALTER TABLE failed_elem_logs ALTER COLUMN stack_trace DROP NOT NULL; + +-- Adding a new column +ALTER TABLE failed_elem_logs ADD COLUMN IF NOT EXISTS details JSONB; + diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala index 45d900a2a7..380a066147 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/PurgeElemFailures.scala @@ -17,13 +17,11 @@ final class PurgeElemFailures private[sourcing] (xas: Transactors) { * Deletes the projection errors that are older than the given instant. */ def apply(instant: Instant): IO[Unit] = - for { - deleted <- sql""" - | DELETE FROM public.failed_elem_logs - | WHERE instant < $instant - """.stripMargin.update.run.transact(xas.write) - _ <- IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old indexing failures.")) - } yield () + sql"""DELETE FROM public.failed_elem_logs WHERE instant < $instant""".stripMargin.update.run + .transact(xas.write) + .flatMap { deleted => + IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted old projection failures.")) + } } object PurgeElemFailures { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala index 4cc8da541e..f5e7146f1d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala @@ -7,10 +7,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemData import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionMetadata} import doobie._ import doobie.postgres.implicits._ -import io.circe.Encoder +import io.circe.{Encoder, Json} import io.circe.generic.semiauto.deriveEncoder import java.time.Instant @@ -40,8 +40,9 @@ object FailedElemLogRow { Int, String, String, - String, - Instant + Option[String], + Instant, + Option[Json] ) /** @@ -53,17 +54,14 @@ object FailedElemLogRow { entityType: EntityType, offset: Offset, rev: Int, - errorType: String, - message: String, - stackTrace: String + reason: FailureReason ) val context: ContextValue = ContextValue(contexts.error) - implicit val failedElemDataEncoder: Encoder.AsObject[FailedElemData] = - deriveEncoder[FailedElemData] - .mapJsonObject(_.remove("stackTrace")) - .mapJsonObject(_.remove("entityType")) + implicit val failedElemDataEncoder: Encoder.AsObject[FailedElemData] = + deriveEncoder[FailedElemData].mapJsonObject(_.remove("entityType")) + implicit val failedElemDataJsonLdEncoder: JsonLdEncoder[FailedElemData] = JsonLdEncoder.computeFromCirce(ContextValue(contexts.error)) @@ -83,12 +81,19 @@ object FailedElemLogRow { errorType, message, stackTrace, - instant + instant, + details ) => + val reason = details + .map { d => + FailureReason(errorType, message, d) + } + .getOrElse(FailureReason(errorType, message, stackTrace)) + FailedElemLogRow( ordering, ProjectionMetadata(module, name, project, resourceId), - FailedElemData(elemId, elemProject, entityType, elemOffset, revision, errorType, message, stackTrace), + FailedElemData(elemId, elemProject, entityType, elemOffset, revision, reason), instant ) } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala index 00908d59a3..9b21cfe809 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala @@ -2,18 +2,16 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections import cats.effect.{Clock, IO} import cats.implicits._ - import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange -import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionMetadata, ProjectionStore} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionMetadata, ProjectionStore} import ch.epfl.bluebrain.nexus.delta.sourcing.{FragmentEncoder, Transactors} import doobie._ import doobie.syntax.all._ @@ -110,6 +108,13 @@ trait FailedElemLogStore { timeRange: TimeRange ): IO[List[FailedElemLogRow]] + /** + * Delete the errors related to the given projection + * @param projectionName + * the projection name + */ + def deleteEntriesForProjection(projectionName: String): IO[Unit] + } object FailedElemLogStore { @@ -139,7 +144,11 @@ object FailedElemLogStore { metadata: ProjectionMetadata, failure: FailedElem, instant: Instant - ): ConnectionIO[Unit] = + ): ConnectionIO[Unit] = { + val failureReason = failure.throwable match { + case f: FailureReason => f + case t => FailureReason(t) + } sql""" | INSERT INTO public.failed_elem_logs ( | projection_name, @@ -153,7 +162,7 @@ object FailedElemLogStore { | rev, | error_type, | message, - | stack_trace, + | details, | instant | ) | VALUES ( @@ -166,11 +175,12 @@ object FailedElemLogStore { | ${failure.id}, | ${failure.project}, | ${failure.rev}, - | ${failure.throwable.getClass.getCanonicalName}, - | ${failure.throwable.getMessage}, - | ${stackTraceAsString(failure.throwable)}, + | ${failureReason.`type`}, + | ${failureReason.message}, + | ${failureReason.details}, | $instant | )""".stripMargin.update.run.void + } override def stream( projectionProject: ProjectRef, @@ -220,6 +230,13 @@ object FailedElemLogStore { Some(fr"projection_id = $projectionId"), timeRange.asFragment ) + + override def deleteEntriesForProjection(projectionName: String): IO[Unit] = + sql"""DELETE FROM public.failed_elem_logs WHERE projection_name = $projectionName""".stripMargin.update.run + .transact(xas.write) + .flatMap { deleted => + IO.whenA(deleted > 0)(logger.info(s"Deleted $deleted projection failures for '$projectionName'.")) + } } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala index d2d7e75c2c..deca4329c2 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala @@ -66,6 +66,13 @@ trait ProjectionErrors { timeRange: TimeRange ): IO[List[FailedElemLogRow]] + /** + * Delete the errors related to the given projection + * @param projectionName + * the projection + */ + def deleteEntriesForProjection(projectionName: String): IO[Unit] + } object ProjectionErrors { @@ -90,6 +97,9 @@ object ProjectionErrors { pagination: FromPagination, timeRange: TimeRange ): IO[List[FailedElemLogRow]] = store.list(project, projectionId, pagination, timeRange) + + override def deleteEntriesForProjection(projectionName: String): IO[Unit] = + store.deleteEntriesForProjection(projectionName) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala index 36ae51f7d2..0a7226f827 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala @@ -9,10 +9,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import doobie.Read -import io.circe.{Decoder, Encoder} import io.circe.generic.extras.Configuration -import io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder} -import io.circe.syntax.EncoderOps +import io.circe.generic.extras.semiauto.deriveConfiguredCodec +import io.circe.{Codec, Decoder, Encoder} import java.time.Instant @@ -65,7 +64,8 @@ sealed trait Elem[+A] extends Product with Serializable { * @param throwable * the error why the element processing failed */ - def failed(throwable: Throwable): FailedElem = FailedElem(tpe, id, project, instant, offset, throwable, rev) + def failed(throwable: Throwable): FailedElem = + FailedElem(tpe, id, project, instant, offset, throwable, rev) /** * Produces a new [[SuccessElem]] with the provided value copying the common properties. @@ -154,24 +154,6 @@ sealed trait Elem[+A] extends Product with Serializable { case _: DroppedElem => None } - /** - * Returns the value as an [[IO]], raising a error on the failed case - */ - def toIO: IO[Option[A]] = this match { - case e: SuccessElem[A] => IO.pure(Some(e.value)) - case f: FailedElem => IO.raiseError(f.throwable) - case _: DroppedElem => IO.none - } - - /** - * Returns the underlying error for a [[FailedElem]] - */ - def toThrowable: Option[Throwable] = this match { - case _: SuccessElem[A] => None - case f: FailedElem => Some(f.throwable) - case _: DroppedElem => None - } - override def toString: String = s"${this.getClass.getSimpleName}[${project.fold("")(_.toString)}/$id:$rev]{${offset.value}}" } @@ -326,13 +308,11 @@ object Elem { implicit private val config: Configuration = Configuration.default.withDiscriminator(keywords.tpe) - implicit val elemUnitEncoder: Encoder.AsObject[Elem[Unit]] = { - implicit val throwableEncoder: Encoder[Throwable] = Encoder.instance[Throwable](_.getMessage.asJson) - deriveConfiguredEncoder[Elem[Unit]] - } + implicit val elemUnitEncoder: Codec.AsObject[Elem[Unit]] = { + implicit val throwableEncoder: Encoder[Throwable] = FailureReason.failureReasonCodec.contramap(FailureReason(_)) + implicit val throwableDecoder: Decoder[Throwable] = FailureReason.failureReasonCodec.map(identity) - implicit val elemUnitDecoder: Decoder[Elem[Unit]] = { - implicit val throwableDecoder: Decoder[Throwable] = Decoder.decodeString.map(new Exception(_)) - deriveConfiguredDecoder[Elem[Unit]] + deriveConfiguredCodec[Elem[Unit]] } + } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala new file mode 100644 index 0000000000..995ae76804 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailureReason.scala @@ -0,0 +1,40 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils.stackTraceAsString +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.semiauto.deriveConfiguredCodec +import io.circe.syntax.{EncoderOps, KeyOps} +import io.circe.{Codec, Encoder, Json} + +import scala.util.control.NoStackTrace + +case class FailureReason(`type`: String, message: String, details: Json) extends Exception with NoStackTrace + +object FailureReason { + + implicit private val config: Configuration = Configuration.default.withDiscriminator(keywords.tpe) + + implicit val failureReasonCodec: Codec.AsObject[FailureReason] = deriveConfiguredCodec[FailureReason] + + def apply(throwable: Throwable): FailureReason = + apply(throwable.getClass.getCanonicalName, throwable.getMessage, Some(stackTraceAsString(throwable))) + + def apply(errorType: String, message: String, stackTrace: Option[String]): FailureReason = + FailureReason( + "UnexpectedError", + message, + Json.obj( + "exception" := errorType, + "stacktrace" := stackTrace + ) + ) + + def apply[A: Encoder](tpe: String, message: String, value: A): FailureReason = + FailureReason( + tpe, + message, + value.asJson + ) + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala similarity index 78% rename from delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala rename to delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala index e3e3565f9c..5a75be21fe 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/NoopSink.scala @@ -5,10 +5,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import fs2.Chunk import shapeless.Typeable -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ +import scala.concurrent.duration.{FiniteDuration, _} -class NoopSink[A: Typeable] extends Sink { +final class NoopSink[A: Typeable] extends Sink { override type In = A diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala index 5d14ace5e6..05a16b993a 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem} +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStatus.Ignored import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.{EveryNode, PersistentSingleNode, TransientSingleNode} import fs2.Stream @@ -142,7 +142,7 @@ object Supervisor { supervision <- supervisionTask(semaphore, mapRef, signal, cfg).start supervisionRef <- Ref.of[IO, Fiber[IO, Throwable, Unit]](supervision) supervisor = - new Impl(projections, projectionErrors.saveFailedElems, cfg, semaphore, mapRef, signal, supervisionRef) + new Impl(projections, projectionErrors, cfg, semaphore, mapRef, signal, supervisionRef) _ <- watchRestarts(supervisor, projections) _ <- log.info("Delta supervisor is up") } yield supervisor @@ -250,7 +250,7 @@ object Supervisor { private class Impl( projections: Projections, - saveFailedElems: (ProjectionMetadata, List[FailedElem]) => IO[Unit], + projectionErrors: ProjectionErrors, cfg: ProjectionConfig, semaphore: Semaphore[IO], mapRef: Ref[IO, Map[String, Supervised]], @@ -301,10 +301,10 @@ object Supervisor { ( projections.progress(projection.metadata.name), projections.save(projection.metadata, _), - saveFailedElems(projection.metadata, _) + projectionErrors.saveFailedElems(projection.metadata, _) ) case TransientSingleNode | EveryNode => - (IO.none, (_: ProjectionProgress) => IO.unit, saveFailedElems(projection.metadata, _)) + (IO.none, (_: ProjectionProgress) => IO.unit, projectionErrors.saveFailedElems(projection.metadata, _)) } Projection(projection, fetchProgress, saveProgress, saveErrors)(cfg.batch) } @@ -350,6 +350,7 @@ object Supervisor { _ <- log.info(s"Destroying '${metadata.module}/${metadata.name}'...") _ <- stopProjection(s) _ <- IO.whenA(s.executionStrategy == PersistentSingleNode)(projections.delete(name)) + _ <- projectionErrors.deleteEntriesForProjection(name) _ <- onDestroy .retry(retryStrategy) .handleError(_ => ()) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala index ce9d84adb5..5ee2652bbc 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala @@ -12,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionMetadata} import ch.epfl.bluebrain.nexus.testkit.clock.MutableClock import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import munit.{AnyFixture, Location} @@ -31,7 +31,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private lazy val store = FailedElemLogStore(xas, QueryConfig(10, RefreshStrategy.Stop), mutableClock) private def createMetadata(project: ProjectRef, id: Iri) = - ProjectionMetadata("test", s"project|$id", Some(project), Some(id)) + ProjectionMetadata("test", s"$project|$id", Some(project), Some(id)) private val project1 = ProjectRef.unsafe("org", "proj") private val projection11 = nxv + "projection11" @@ -43,7 +43,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private val metadata21 = createMetadata(project2, projection12) private val id = nxv + "id" - private val error = new RuntimeException("boom") + private val error = FailureReason(new RuntimeException("boom")) private val rev = 1 private val entityType = EntityType("Test") @@ -56,6 +56,15 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private val fail4 = createFailedElem(project1, 4L) private val fail5 = createFailedElem(project2, 5L) + private def populateFailures = + for { + _ <- saveFailedElem(metadata11, fail1) + _ <- saveFailedElem(metadata12, fail2) + _ <- saveFailedElem(metadata12, fail3) + _ <- saveFailedElem(metadata12, fail4) + _ <- saveFailedElem(metadata21, fail5) + } yield () + private def saveFailedElem(metadata: ProjectionMetadata, failed: FailedElem) = mutableClock.set(failed.instant) >> store.save(metadata, List(failed)) @@ -94,13 +103,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with } test("Insert several failures") { - for { - _ <- saveFailedElem(metadata11, fail1) - _ <- saveFailedElem(metadata12, fail2) - _ <- saveFailedElem(metadata12, fail3) - _ <- saveFailedElem(metadata12, fail4) - _ <- saveFailedElem(metadata21, fail5) - } yield () + populateFailures } test(s"Get stream of failures for ${metadata11.name}") { @@ -112,7 +115,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with _ = assertEquals(r.instant, fail1.instant) elem = r.failedElemData _ = assertEquals(elem.offset, Offset.At(1L)) - _ = assertEquals(elem.errorType, "java.lang.RuntimeException") + _ = assertEquals(elem.reason.`type`, "UnexpectedError") _ = assertEquals(elem.id, id) _ = assertEquals(elem.entityType, entityType) _ = assertEquals(elem.rev, rev) @@ -186,4 +189,15 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with } yield () } + test("Delete fixtures for the given projection") { + for { + _ <- populateFailures + _ <- store.count.assertEquals(5L) + _ <- store.deleteEntriesForProjection(metadata11.name) + _ <- store.count.assertEquals(4L) + _ <- store.deleteEntriesForProjection(metadata12.name) + _ <- store.count.assertEquals(1L) + } yield () + } + } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionAssertions.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionAssertions.scala new file mode 100644 index 0000000000..c6d54ba20f --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionAssertions.scala @@ -0,0 +1,35 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections +import ch.epfl.bluebrain.nexus.testkit.mu.ce.{CatsEffectEventually, PatienceConfig} +import munit.{Assertions, CatsEffectAssertions, Location} + +trait ProjectionAssertions extends CatsEffectEventually { + self: Assertions with CatsEffectAssertions => + + /** + * Wait for the given project to complete its execution on the supervisor + */ + def waitProjectionCompletion(supervisor: Supervisor, projectionName: String)(implicit + loc: Location, + patience: PatienceConfig + ): IO[Unit] = + supervisor + .describe(projectionName) + .map(_.map(_.status)) + .assertEquals(Some(ExecutionStatus.Completed)) + .eventually + + /** + * Expect the projection to reach the expected progress + */ + def assertProgress(projections: Projections, projectionName: String)( + expected: ProjectionProgress + )(implicit loc: Location, patience: PatienceConfig): IO[Unit] = + projections + .progress(projectionName) + .assertEquals(Some(expected)) + .eventually + +} diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala index 332590c46e..974149f692 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/resources/ResourceWiring.scala @@ -40,7 +40,7 @@ object ResourceWiring { override def apply(jsonld: JsonLdAssembly, schema: SchemaClaim, enforceSchema: Boolean): IO[ValidationResult] = schema match { case defined: DefinedSchemaClaim => - fetchSchema.fetch(defined.schemaRef, schema.project).flatMap { + fetchSchema.option(defined.schemaRef, schema.project).flatMap { case Some(value) => IO.pure { Validated(