From 24f88bcc3e99b89f1dd873c755e03dad4d04b4e1 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 4 Oct 2024 18:23:35 +0200 Subject: [PATCH] Make project in Elem mandatory (#5170) Co-authored-by: Simon Dumas --- delta/app/src/main/resources/app.conf | 9 +- .../delta/routes/SchemaJobRoutesSpec.scala | 5 +- .../plugins/blazegraph/BlazegraphViews.scala | 4 +- .../routes/list-indexing-errors.json | 5 +- .../BlazegraphIndexingActionSuite.scala | 12 +- .../indexing/BlazegraphCoordinatorSuite.scala | 12 +- .../indexing/BlazegraphSinkSuite.scala | 10 +- .../BlazegraphViewsIndexingRoutesSpec.scala | 5 +- .../compositeviews/CompositeViews.scala | 6 +- .../store/CompositeRestartStore.scala | 2 +- .../routes/list-indexing-errors.json | 5 +- .../client/DeltaClientSpec.scala | 10 +- .../indexing/CompositeIndexingSuite.scala | 2 +- .../CompositeViewsIndexingRoutesSpec.scala | 5 +- .../store/CompositeRestartStoreSuite.scala | 2 +- .../stream/RemoteGraphStreamSuite.scala | 2 +- .../elasticsearch/ElasticSearchViews.scala | 2 +- .../indexing/ElasticSearchSink.scala | 7 +- .../metrics/EventMetricsProjection.scala | 4 +- .../routes/list-indexing-errors.json | 5 +- .../ElasticSearchIndexingActionSuite.scala | 12 +- .../ElasticSearchCoordinatorSuite.scala | 12 +- .../indexing/ElasticSearchSinkSuite.scala | 12 +- .../GraphResourceToDocumentSuite.scala | 2 +- .../ElasticSearchIndexingRoutesSpec.scala | 5 +- .../GraphAnalyticsCoordinatorSuite.scala | 12 +- .../indexing/GraphAnalyticsSinkSuite.scala | 13 +-- .../plugins/storage/storages/Storages.scala | 4 +- .../nexus/delta/sdk/ResourceShift.scala | 4 +- .../nexus/delta/sdk/projects/Projects.scala | 4 +- .../delta/sdk/projects/ProjectsImpl.scala | 7 +- .../nexus/delta/sdk/sse/SseConfig.scala | 3 +- .../metrics/ProjectScopedMetricStream.scala | 2 +- .../SchemaValidationCoordinatorSuite.scala | 2 +- .../delta/sdk/sse/SseElemStreamSuite.scala | 2 +- .../delta/sdk/sse/SseEventLogSuite.scala | 10 +- .../projections/ProjectionRestartStore.scala | 15 +-- .../sourcing/projections/Projections.scala | 8 +- .../projections/model/ProjectionRestart.scala | 17 +-- .../delta/sourcing/query/StreamingQuery.scala | 8 +- .../nexus/delta/sourcing/stream/Elem.scala | 24 ++-- .../delta/sourcing/stream/Supervisor.scala | 29 +---- .../delta/sourcing/stream/WatchRestarts.scala | 57 +++++++++ .../projections/FailedElemLogStoreSuite.scala | 2 +- .../ProjectionRestartStoreSuite.scala | 9 +- .../sourcing/query/StreamingQuerySuite.scala | 108 ++++++++---------- .../state/ScopedStateStoreSuite.scala | 47 ++++---- .../delta/sourcing/stream/CacheSink.scala | 7 +- .../stream/FailedElemPersistenceSuite.scala | 9 +- .../sourcing/stream/OperationSuite.scala | 6 +- .../sourcing/stream/PullRequestStream.scala | 8 +- .../sourcing/stream/SupervisorSuite.scala | 12 +- .../pipes/DataConstructQuerySuite.scala | 2 +- .../stream/pipes/DiscardMetadataSuite.scala | 2 +- .../stream/pipes/FilterBySchemaSuite.scala | 2 +- .../stream/pipes/FilterByTypeSuite.scala | 2 +- .../stream/pipes/FilterDeprecatedSuite.scala | 4 +- .../stream/pipes/SelectPredicatesSuite.scala | 2 +- .../stream/pipes/SourceAsTextSuite.scala | 2 +- 59 files changed, 285 insertions(+), 317 deletions(-) create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/WatchRestarts.scala diff --git a/delta/app/src/main/resources/app.conf b/delta/app/src/main/resources/app.conf index 9b50fcc310..1342ca10fd 100644 --- a/delta/app/src/main/resources/app.conf +++ b/delta/app/src/main/resources/app.conf @@ -299,15 +299,8 @@ app { # SSE configuration sse { - # the schemas event-log configuration + # the SSE query configuration query = ${app.defaults.query} - # the sse cache to get projects uuids - cache { - # The max number of tokens in the projects uuids cache - max-size = 500 - # The duration after which the cache expires - expire-after = 15 minutes - } } # projection configuration diff --git a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala index 9ad3347816..83723505af 100644 --- a/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala +++ b/delta/app/src/test/scala/ch/epfl/bluebrain/nexus/delta/routes/SchemaJobRoutesSpec.scala @@ -67,9 +67,8 @@ class SchemaJobRoutesSpec extends BaseRouteSpec { val projectionMetadata = SchemaValidationCoordinator.projectionMetadata(project.ref) val reason = FailureReason("ValidationFail", json"""{ "details": "..." }""") - val fail1 = FailedElem(EntityType("ACL"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev) - val fail2 = - FailedElem(EntityType("Schema"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev) + val fail1 = FailedElem(EntityType("ACL"), resourceId, project.ref, Instant.EPOCH, Offset.At(42L), reason, rev) + val fail2 = FailedElem(EntityType("Schema"), resourceId, project.ref, Instant.EPOCH, Offset.At(42L), reason, rev) ( projections.save(projectionMetadata, progress) >> diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala index bc9935b467..cc6003d414 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala @@ -299,9 +299,7 @@ final class BlazegraphViews( } private def toIndexViewDef(elem: Elem.SuccessElem[BlazegraphViewState]) = - elem.withProject(elem.value.project).traverse { v => - IndexingViewDef(v, prefix) - } + elem.traverse { v => IndexingViewDef(v, prefix) } private def eval(cmd: BlazegraphViewCommand): IO[ViewResource] = log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource) diff --git a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json index c4b3d5f714..8ab0180373 100644 --- a/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/blazegraph/src/test/resources/routes/list-indexing-errors.json @@ -23,11 +23,12 @@ } }, { - "id": "https://bluebrain.github.io/nexus/vocabulary/myid", + "id": "https://bluebrain.github.io/nexus/vocabulary/myid2", "offset": { "@type": "At", - "value": 42 + "value": 43 }, + "project": "org/proj", "_rev": 1, "reason": { "type": "UnexpectedError", diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala index c35298b210..f380f700b3 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphIndexingActionSuite.scala @@ -80,7 +80,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { SuccessElem( tpe = BlazegraphViews.entityType, id = view1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(1L), value = view1, @@ -89,7 +89,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { SuccessElem( tpe = BlazegraphViews.entityType, id = view2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(2L), value = view2, @@ -98,7 +98,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { SuccessElem( tpe = BlazegraphViews.entityType, id = view3.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(3L), value = view3, @@ -107,7 +107,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { SuccessElem( tpe = BlazegraphViews.entityType, id = view4.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(4L), value = view4, @@ -141,7 +141,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { private val elem = SuccessElem( tpe = PullRequest.entityType, id = pr.id, - project = Some(project), + project = project, instant = pr.updatedAt, offset = Offset.at(1L), value = PullRequestState.toGraphResource(pr, base), @@ -171,7 +171,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures { val failed = FailedElem( tpe = PullRequest.entityType, id = pr.id, - project = Some(project), + project = project, instant = pr.updatedAt, offset = Offset.at(1L), new IllegalStateException("Boom"), diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala index 19df5b9cf6..5183922e58 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphCoordinatorSuite.scala @@ -92,7 +92,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = view1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(1L), value = view1, @@ -101,7 +101,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = view2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(2L), value = view2, @@ -110,7 +110,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = view3.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(3L), value = view3, @@ -120,7 +120,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = deprecatedView1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(4L), value = deprecatedView1, @@ -129,7 +129,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = updatedView2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(5L), value = updatedView2, @@ -139,7 +139,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture SuccessElem( tpe = BlazegraphViews.entityType, id = updatedView2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(6L), value = updatedView2, diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala index 1a6f29594e..08320fcb00 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/indexing/BlazegraphSinkSuite.scala @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples} import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem} import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite @@ -32,6 +32,8 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture private lazy val sink = createSink(namespace) + private val project = ProjectRef.unsafe("org", "project") + private val resource1Id = iri"https://bbp.epfl.ch/resource1" private val resource1Ntriples = NTriples(contentOf("sparql/resource1.ntriples"), resource1Id) private val resource1NtriplesUpdated = NTriples(contentOf("sparql/resource1_updated.ntriples"), resource1Id) @@ -54,14 +56,14 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture private def asElems(chunk: Chunk[(Iri, NTriples)]) = chunk.zipWithIndex.map { case ((id, ntriples), index) => - SuccessElem(entityType, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1) + SuccessElem(entityType, id, project, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1) } private def createGraph(chunk: Chunk[(Iri, NTriples)]) = chunk.foldLeft(Graph.empty) { case (acc, (_, ntriples)) => acc ++ Graph(ntriples).getOrElse(Graph.empty) } - private def dropped(id: Iri, offset: Offset) = DroppedElem(entityType, id, None, Instant.EPOCH, offset, 1) + private def dropped(id: Iri, offset: Offset) = DroppedElem(entityType, id, project, Instant.EPOCH, offset, 1) private def query(namespace: String) = client @@ -96,7 +98,7 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture test("Report errors when the id is not a valid absolute iri") { val chunk = Chunk( - SuccessElem(entityType, nxv + "é-wrong", None, Instant.EPOCH, Offset.at(5L), resource1Ntriples, 1) + SuccessElem(entityType, nxv + "é-wrong", project, Instant.EPOCH, Offset.at(5L), resource1Ntriples, 1) ) val expected = createGraph(Chunk(resource1Id -> resource1Ntriples, resource3Id -> resource3Ntriples)) diff --git a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala index 9f7deaccaa..114d3a031e 100644 --- a/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala +++ b/delta/plugins/blazegraph/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/routes/BlazegraphViewsIndexingRoutesSpec.scala @@ -30,6 +30,7 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures { private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock) private val myId = nxv + "myid" + private val myId2 = nxv + "myid2" private val indexingView = ActiveViewDef( ViewRef(projectRef, myId), "projection", @@ -65,8 +66,8 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures { super.beforeAll() val error = new Exception("boom") val rev = 1 - val fail1 = FailedElem(EntityType("ACL"), myId, Some(projectRef), Instant.EPOCH, Offset.At(42L), error, rev) - val fail2 = FailedElem(EntityType("Schema"), myId, None, Instant.EPOCH, Offset.At(42L), error, rev) + val fail1 = FailedElem(EntityType("ACL"), myId, projectRef, Instant.EPOCH, Offset.At(42L), error, rev) + val fail2 = FailedElem(EntityType("Schema"), myId2, projectRef, Instant.EPOCH, Offset.At(43L), error, rev) val save = for { _ <- projections.save(indexingView.projectionMetadata, progress) _ <- projectionErrors.saveFailedElems(indexingView.projectionMetadata, List(fail1, fail2)) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala index bb17a8ee6d..645d3f2cc7 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala @@ -303,13 +303,11 @@ final class CompositeViews private ( /** * Return the indexing views in a non-ending stream */ - def views(start: Offset): ElemStream[CompositeViewDef] = + def views(start: Offset): SuccessElemStream[CompositeViewDef] = log.states(Scope.Root, start).map(toCompositeViewDef) private def toCompositeViewDef(elem: Elem.SuccessElem[CompositeViewState]) = - elem.withProject(elem.value.project).mapValue { v => - CompositeViewDef(v) - } + elem.mapValue { v => CompositeViewDef(v) } private def eval(cmd: CompositeViewCommand): IO[ViewResource] = log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala index 1048ab99aa..aefe917332 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStore.scala @@ -81,7 +81,7 @@ final class CompositeRestartStore(xas: Transactors) { |LIMIT 1""".stripMargin .query[(Offset, ProjectRef, Iri, Json, Instant)] .map { case (offset, project, id, json, instant) => - Elem.fromEither(entityType, id, Some(project), instant, offset, json.as[CompositeRestart], 1) + Elem.fromEither(entityType, id, project, instant, offset, json.as[CompositeRestart], 1) } .option .transact(xas.read) diff --git a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json index 98eeb66b7c..acea13d0b8 100644 --- a/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/composite-views/src/test/resources/routes/list-indexing-errors.json @@ -23,11 +23,12 @@ } }, { - "id": "https://bluebrain.github.io/nexus/vocabulary/myid", + "id": "https://bluebrain.github.io/nexus/vocabulary/myid2", "offset": { "@type": "At", - "value": 42 + "value": 43 }, + "project": "myorg/myproj", "_rev": 1, "reason": { "type": "UnexpectedError", diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala index 6cc8119ea2..8223bed408 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClientSpec.scala @@ -73,15 +73,7 @@ class DeltaClientSpec private val invalidTag = Some(UserTag.unsafe("unknowntag")) private def elem(i: Int): Elem[Unit] = - SuccessElem( - EntityType("test"), - iri"https://bbp.epfl.ch/$i", - Some(project), - Instant.EPOCH, - Offset.at(i.toLong), - (), - 1 - ) + SuccessElem(EntityType("test"), iri"https://bbp.epfl.ch/$i", project, Instant.EPOCH, Offset.at(i.toLong), (), 1) override def beforeAll(): Unit = { super.beforeAll() diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala index 0f2a50aa32..31d8986074 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/indexing/CompositeIndexingSuite.scala @@ -166,7 +166,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst } yield SuccessElem( entityType, value.id, - Some(project), + project, Instant.EPOCH, Offset.at(offset), resource, diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala index f8a4688c74..b76a080905 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/routes/CompositeViewsIndexingRoutesSpec.scala @@ -38,6 +38,7 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures { private val nowPlus5 = now.plusSeconds(5) private val myId = nxv + "myid" + private val myId2 = nxv + "myid2" private val view = CompositeViewsGen.resourceFor(projectRef, myId, uuid, viewValue, source = Json.obj()) private val indexingView = ActiveViewDef( ViewRef(view.value.project, view.id), @@ -93,8 +94,8 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures { super.beforeAll() val error = new Exception("boom") val rev = 1 - val fail1 = FailedElem(EntityType("ACL"), myId, Some(projectRef), Instant.EPOCH, Offset.At(42L), error, rev) - val fail2 = FailedElem(EntityType("Schema"), myId, None, Instant.EPOCH, Offset.At(42L), error, rev) + val fail1 = FailedElem(EntityType("ACL"), myId, projectRef, Instant.EPOCH, Offset.At(42L), error, rev) + val fail2 = FailedElem(EntityType("Schema"), myId2, projectRef, Instant.EPOCH, Offset.At(43L), error, rev) val saveFailedElems = projectionErrors.saveFailedElems(indexingView.metadata, List(fail1, fail2)) saveFailedElems.accepted diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStoreSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStoreSuite.scala index 8c1d0da24b..79d22317fd 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStoreSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/store/CompositeRestartStoreSuite.scala @@ -37,7 +37,7 @@ class CompositeRestartStoreSuite extends NexusSuite with Doobie.Fixture with Doo private val cr3 = PartialRebuild(viewRef, projection, Instant.EPOCH.plusSeconds(5L), Anonymous) private def toElem(offset: Offset, restart: CompositeRestart) = - SuccessElem(entityType, restart.id, Some(restart.project), restart.instant, offset, restart, 1) + SuccessElem(entityType, restart.id, restart.project, restart.instant, offset, restart, 1) test("Save composite restarts") { for { diff --git a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/stream/RemoteGraphStreamSuite.scala b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/stream/RemoteGraphStreamSuite.scala index ccbb2ce150..37cce1e348 100644 --- a/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/stream/RemoteGraphStreamSuite.scala +++ b/delta/plugins/composite-views/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/stream/RemoteGraphStreamSuite.scala @@ -36,7 +36,7 @@ class RemoteGraphStreamSuite extends NexusSuite { private val id = iri"https://example.com/testresource" private val project = ProjectRef.unsafe("org", "proj") - private val elem = SuccessElem(EntityType("test"), id, Some(project), Instant.EPOCH, Offset.Start, (), 1) + private val elem = SuccessElem(EntityType("test"), id, project, Instant.EPOCH, Offset.Start, (), 1) private val nQuads = NQuads(contentOf("remote/resource.nq"), id) test("Metadata should be filtered correctly") { diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala index ac42669b49..d1a12f25f5 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala @@ -364,7 +364,7 @@ final class ElasticSearchViews private ( } private def toIndexViewDef(elem: Elem.SuccessElem[ElasticSearchViewState]) = - elem.withProject(elem.value.project).traverse { v => + elem.traverse { v => IndexingViewDef(v, defaultElasticsearchMapping, defaultElasticsearchSettings, prefix) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala index 4583bb1c81..6f10648668 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSink.scala @@ -77,11 +77,8 @@ object ElasticSearchSink { * @return * a function that maps an elem to a documentId based on the project (if available), the elem id, and the revision */ - private val eventDocumentId: Elem[_] => String = elem => - elem.project match { - case Some(project) => s"$project/${elem.id}:${elem.rev}" - case None => s"${elem.id}/${elem.rev}" - } + private val eventDocumentId: Elem[_] => String = + elem => s"${elem.project}/${elem.id}:${elem.rev}" /** * Mark and update the elements according to the elasticsearch response diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjection.scala index 65a86924eb..32a8d8b4a7 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/metrics/EventMetricsProjection.scala @@ -100,9 +100,7 @@ object EventMetricsProjection { init: IO[Unit] ): IO[EventMetricsProjection] = { - val source = Source { (offset: Offset) => - metrics(offset).map(e => e.withProject(e.value.project)) - } + val source = Source { (offset: Offset) => metrics(offset) } val compiledProjection = CompiledProjection.compile( diff --git a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json index 50c2b44be2..dc92d5d6fc 100644 --- a/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json +++ b/delta/plugins/elasticsearch/src/test/resources/routes/list-indexing-errors.json @@ -23,11 +23,12 @@ } }, { - "id": "https://bluebrain.github.io/nexus/vocabulary/myid", + "id": "https://bluebrain.github.io/nexus/vocabulary/myid2", "offset": { "@type": "At", - "value": 42 + "value": 43 }, + "project": "myorg/myproject", "_rev": 1, "reason": { "type": "UnexpectedError", diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala index 19f6c7812b..a91ab0fde4 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchIndexingActionSuite.scala @@ -92,7 +92,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with SuccessElem( tpe = ElasticSearchViews.entityType, id = view1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(1L), value = view1, @@ -101,7 +101,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with SuccessElem( tpe = ElasticSearchViews.entityType, id = view2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(2L), value = view2, @@ -110,7 +110,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with SuccessElem( tpe = ElasticSearchViews.entityType, id = view3.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(3L), value = view3, @@ -119,7 +119,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with SuccessElem( tpe = ElasticSearchViews.entityType, id = view4.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(4L), value = view4, @@ -153,7 +153,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with private val elem = SuccessElem( tpe = PullRequest.entityType, id = pr.id, - project = Some(project), + project = project, instant = pr.updatedAt, offset = Offset.at(1L), value = PullRequestState.toGraphResource(pr, base), @@ -183,7 +183,7 @@ class ElasticSearchIndexingActionSuite extends NexusSuite with CirceLiteral with val failed = FailedElem( tpe = PullRequest.entityType, id = pr.id, - project = Some(project), + project = project, instant = pr.updatedAt, offset = Offset.at(1L), new IllegalStateException("Boom"), diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala index 3c889d5fb3..5cb38f0946 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchCoordinatorSuite.scala @@ -105,7 +105,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = view1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(1L), value = view1, @@ -114,7 +114,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = view2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(2L), value = view2, @@ -123,7 +123,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = view3.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(3L), value = view3, @@ -133,7 +133,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = deprecatedView1.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(4L), value = deprecatedView1, @@ -142,7 +142,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = updatedView2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(5L), value = updatedView2, @@ -152,7 +152,7 @@ class ElasticSearchCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixt SuccessElem( tpe = ElasticSearchViews.entityType, id = updatedView2.ref.viewId, - project = Some(project), + project = project, instant = Instant.EPOCH, offset = Offset.at(6L), value = updatedView2, diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala index c8ff0a4e18..62ebc4ccf9 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/ElasticSearchSinkSuite.scala @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, Q import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.FailureReason @@ -39,17 +39,19 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi private val brian = (nxv + "brian", json"""{"name": "Brian", "age": 19 }""") private val judy = (nxv + "judy", json"""{"name": "Judy", "age": 47 }""") + private val project = ProjectRef.unsafe("bbp", "members") + private val members = Set(alice, bob, brian, judy) val rev = 1 private def asChunk(values: Iterable[(Iri, Json)]) = Chunk.from(values).zipWithIndex.map { case ((id, json), index) => - SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev) + SuccessElem(membersEntity, id, project, Instant.EPOCH, Offset.at(index.toLong + 1), json, rev) } private def dropped(id: Iri, offset: Offset) = - DroppedElem(membersEntity, id, None, Instant.EPOCH, offset, rev) + DroppedElem(membersEntity, id, project, Instant.EPOCH, offset, rev) test("Create the index") { client.createIndex(index, None, None).assertEquals(true) @@ -83,7 +85,7 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi val failed = FailedElem( membersEntity, nxv + "fail", - None, + project, Instant.EPOCH, Offset.at(1L), new IllegalArgumentException("Boom"), @@ -94,7 +96,7 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi Seq( Chunk.singleton(failed), Chunk(invalidElement, alice).map { case (id, json) => - SuccessElem(membersEntity, id, None, Instant.EPOCH, Offset.at(members.size.toLong + 1), json, rev) + SuccessElem(membersEntity, id, project, Instant.EPOCH, Offset.at(members.size.toLong + 1), json, rev) } ) ) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/GraphResourceToDocumentSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/GraphResourceToDocumentSuite.scala index ac4382d2ff..50685793b5 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/GraphResourceToDocumentSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/indexing/GraphResourceToDocumentSuite.scala @@ -130,7 +130,7 @@ class GraphResourceToDocumentSuite extends NexusSuite with Fixtures with JsonAss metadataGraph, source ) - SuccessElem(entityType, id, Some(project), Instant.EPOCH, Offset.start, graphResource, 1) + SuccessElem(entityType, id, project, Instant.EPOCH, Offset.start, graphResource, 1) } } diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala index 6271847492..1b23ff7326 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/routes/ElasticSearchIndexingRoutesSpec.scala @@ -40,6 +40,7 @@ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures { implicit private val fetchContext: FetchContext = FetchContextDummy(Map(project.value.ref -> project.value.context)) private val myId = nxv + "myid" + private val myId2 = nxv + "myid2" private val indexingView = ActiveViewDef( ViewRef(projectRef, myId), "projection", @@ -104,8 +105,8 @@ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures { super.beforeAll() val error = new Exception("boom") val rev = 1 - val fail1 = FailedElem(EntityType("ACL"), myId, Some(projectRef), Instant.EPOCH, Offset.At(42L), error, rev) - val fail2 = FailedElem(EntityType("Schema"), myId, None, Instant.EPOCH, Offset.At(42L), error, rev) + val fail1 = FailedElem(EntityType("ACL"), myId, projectRef, Instant.EPOCH, Offset.At(42L), error, rev) + val fail2 = FailedElem(EntityType("Schema"), myId2, projectRef, Instant.EPOCH, Offset.At(43L), error, rev) val save = for { _ <- projections.save(indexingView.projectionMetadata, progress) _ <- projectionErrors.saveFailedElems(indexingView.projectionMetadata, List(fail1, fail2)) diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala index af9ed3e015..090a64d9c6 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/GraphAnalyticsCoordinatorSuite.scala @@ -35,14 +35,14 @@ class GraphAnalyticsCoordinatorSuite extends NexusSuite with SupervisorSetup.Fix private val project2 = ProjectRef.unsafe("org", "proj2") private val project2Id = Projects.encodeId(project1) - private def success[A](ref: ProjectRef, id: Iri, value: A, offset: Long): Elem[A] = - SuccessElem(tpe = Projects.entityType, id, Some(ref), Instant.EPOCH, Offset.at(offset), value, 1) + private def success[A](project: ProjectRef, id: Iri, value: A, offset: Long): Elem[A] = + SuccessElem(tpe = Projects.entityType, id, project, Instant.EPOCH, Offset.at(offset), value, 1) - private def dropped[A](ref: ProjectRef, id: Iri, offset: Long): Elem[A] = - DroppedElem(tpe = Projects.entityType, id, Some(ref), Instant.EPOCH, Offset.at(offset), 1) + private def dropped[A](project: ProjectRef, id: Iri, offset: Long): Elem[A] = + DroppedElem(tpe = Projects.entityType, id, project, Instant.EPOCH, Offset.at(offset), 1) - private def failed[A](ref: ProjectRef, id: Iri, error: Throwable, offset: Long): Elem[A] = - FailedElem(tpe = Projects.entityType, id, Some(ref), Instant.EPOCH, Offset.at(offset), error, 1) + private def failed[A](project: ProjectRef, id: Iri, error: Throwable, offset: Long): Elem[A] = + FailedElem(tpe = Projects.entityType, id, project, Instant.EPOCH, Offset.at(offset), error, 1) private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync() diff --git a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala index f69fa90b99..4564944642 100644 --- a/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala +++ b/delta/plugins/graph-analytics/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/graph/analytics/indexing/GraphAnalyticsSinkSuite.scala @@ -93,7 +93,7 @@ class GraphAnalyticsSinkSuite extends NexusSuite with ElasticSearchClientSetup.F } private def success(id: Iri, result: GraphAnalyticsResult) = - SuccessElem(Resources.entityType, id, Some(project), Instant.EPOCH, Offset.start, result, 1) + SuccessElem(Resources.entityType, id, project, Instant.EPOCH, Offset.start, result, 1) test("Push index results") { def indexActive(id: Iri, expanded: ExpandedJsonLd) = { @@ -137,18 +137,11 @@ class GraphAnalyticsSinkSuite extends NexusSuite with ElasticSearchClientSetup.F } test("Push update by query result results") { + val error = new IllegalStateException("BOOM") val chunk = Chunk( success(file1, GraphAnalyticsResult.UpdateByQuery(file1, Set(nxvFile))), success(resource3, GraphAnalyticsResult.Noop), - FailedElem( - Resources.entityType, - resource3, - Some(project), - Instant.EPOCH, - Offset.start, - new IllegalStateException("BOOM"), - 1 - ) + FailedElem(Resources.entityType, resource3, project, Instant.EPOCH, Offset.start, error, 1) ) for { diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala index 4bb9b0f6d1..274456f632 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala @@ -254,9 +254,7 @@ final class Storages private ( * Return the existing storages in a project in a finite stream */ def currentStorages(project: ProjectRef): SuccessElemStream[StorageState] = - log.currentStates(Scope.Project(project)).map { e => - e.withProject(e.value.project) - } + log.currentStates(Scope.Project(project)) private def unsetPreviousDefaultIfRequired( project: ProjectRef, diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala index 986a413e05..5ced779a43 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala @@ -85,13 +85,13 @@ abstract class ResourceShift[State <: ScopedState, A, M]( FailedElem( entityType, resource.id, - Some(project), + project, resource.updatedAt, Offset.Start, err, resource.rev ), - graph => SuccessElem(entityType, resource.id, Some(project), resource.updatedAt, Offset.Start, graph, resource.rev) + graph => SuccessElem(entityType, resource.id, project, resource.updatedAt, Offset.Start, graph, resource.rev) ) private def toGraphResource(project: ProjectRef, resource: ResourceF[A])(implicit diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/Projects.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/Projects.scala index 985420e5d9..b780451c90 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/Projects.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/Projects.scala @@ -15,7 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.{Incorr import ch.epfl.bluebrain.nexus.delta.sdk.projects.model._ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEntityDefinition, StateMachine} import fs2.Stream @@ -143,7 +143,7 @@ trait Projects { /** * Stream project states in a non-finite stream */ - def states(offset: Offset): ElemStream[ProjectState] + def states(offset: Offset): SuccessElemStream[ProjectState] /** * The default api mappings diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala index 4b1820d9f6..4c5d4505a8 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala @@ -18,7 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing._ import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import fs2.Stream @@ -89,10 +89,7 @@ final class ProjectsImpl private ( override def currentRefs: Stream[IO, ProjectRef] = log.currentStates(Scope.root).map(_.value.project) - override def states(offset: Offset): ElemStream[ProjectState] = - log.states(Scope.root, offset).map { e => - e.withProject { e.value.project } - } + override def states(offset: Offset): SuccessElemStream[ProjectState] = log.states(Scope.root, offset) private def eval(cmd: ProjectCommand): IO[ProjectResource] = log.evaluate(cmd.ref, cmd.ref, cmd).map(_._2.toResource(defaultApiMappings)) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseConfig.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseConfig.scala index 0e0e229b60..77103d010b 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseConfig.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseConfig.scala @@ -1,11 +1,10 @@ package ch.epfl.bluebrain.nexus.delta.sdk.sse -import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import pureconfig.ConfigReader import pureconfig.generic.semiauto.deriveReader -final case class SseConfig(query: QueryConfig, cache: CacheConfig) +final case class SseConfig(query: QueryConfig) object SseConfig { diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/metrics/ProjectScopedMetricStream.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/metrics/ProjectScopedMetricStream.scala index 2bf95b69ea..5d0c3e9fc0 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/metrics/ProjectScopedMetricStream.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/metrics/ProjectScopedMetricStream.scala @@ -17,6 +17,6 @@ object ProjectScopedMetricStream { ) private def elem(entityType: EntityType, metric: ProjectScopedMetric, offset: Long) = - Elem.SuccessElem(entityType, metric.id, Some(metric.project), metric.instant, Offset.At(offset), metric, metric.rev) + Elem.SuccessElem(entityType, metric.id, metric.project, metric.instant, Offset.At(offset), metric, metric.rev) } diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala index d233e4764a..851783a5e8 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/schemas/job/SchemaValidationCoordinatorSuite.scala @@ -92,7 +92,7 @@ class SchemaValidationCoordinatorSuite SuccessElem( Resources.entityType, resource.id, - Some(resource.project), + resource.project, resource.updatedAt, Offset.at(index.toLong + 1), resource, diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStreamSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStreamSuite.scala index 78282be6f8..3d4f055707 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStreamSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseElemStreamSuite.scala @@ -16,7 +16,7 @@ class SseElemStreamSuite extends FunSuite { val elem = SuccessElem( Resources.entityType, iri"https://bbp.epfl.ch/my-resource", - Some(ProjectRef.unsafe("org", "proj")), + ProjectRef.unsafe("org", "proj"), Instant.EPOCH, Offset.at(42L), (), diff --git a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala index 423711cd33..5ecfe18d98 100644 --- a/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala +++ b/delta/sdk/src/test/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLogSuite.scala @@ -18,12 +18,12 @@ class SseEventLogSuite extends NexusSuite with ConfigFixtures { implicit private val jo: JsonKeyOrdering = JsonKeyOrdering.alphabetical - private val ref = ProjectRef.unsafe("org", "proj") + private val project = ProjectRef.unsafe("org", "proj") private def makeSuccessElem(sseData: SseData) = Elem.SuccessElem( EntityType("Person"), nxv + "1", - None, + project, Instant.now(), Offset.at(5L), sseData, @@ -44,7 +44,7 @@ class SseEventLogSuite extends NexusSuite with ConfigFixtures { val elem = Elem.SuccessElem( EntityType("Person"), nxv + "1", - None, + project, Instant.now(), Offset.at(5L), SseData("Person", Some(ProjectRef.unsafe("xxx", "xxx")), JsonObject("name" -> "John Doe".asJson)), @@ -60,10 +60,10 @@ class SseEventLogSuite extends NexusSuite with ConfigFixtures { val elem = Elem.SuccessElem( EntityType("Person"), nxv + "1", - None, + project, Instant.now(), Offset.at(5L), - SseData("Person", Some(ref), JsonObject("name" -> "John Doe".asJson)), + SseData("Person", Some(project), JsonObject("name" -> "John Doe".asJson)), 4 ) assertEquals( diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala index 6119e15a88..80bed63e5b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStore.scala @@ -4,19 +4,17 @@ import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectionRestartStore.logger import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart.{entityType, restartId} import ch.epfl.bluebrain.nexus.delta.sourcing.query.StreamingQuery -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import doobie.syntax.all._ import doobie.postgres.implicits._ -import io.circe.Json import io.circe.syntax.EncoderOps +import fs2.Stream + import java.time.Instant /** @@ -46,21 +44,18 @@ final class ProjectionRestartStore(xas: Transactors, config: QueryConfig) { } .void - def stream(offset: Offset): ElemStream[ProjectionRestart] = + def stream(offset: Offset): Stream[IO, (Offset, ProjectionRestart)] = StreamingQuery - .apply[(Offset, Json, Instant)]( + .apply[(Offset, ProjectionRestart)]( offset, o => sql"""SELECT ordering, value, instant from public.projection_restarts |WHERE ordering > $o and acknowledged = false |ORDER BY ordering ASC - |LIMIT ${config.batchSize}""".stripMargin.query[(Offset, Json, Instant)], + |LIMIT ${config.batchSize}""".stripMargin.query[(Offset, ProjectionRestart)], _._1, config, xas ) - .map { case (id, json, instant) => - Elem.fromEither(entityType, restartId(id), None, instant, id, json.as[ProjectionRestart], 1) - } } object ProjectionRestartStore { diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala index 3741dc40cd..d74617327b 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/Projections.scala @@ -3,13 +3,14 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.sourcing.config.{PurgeConfig, QueryConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart import ch.epfl.bluebrain.nexus.delta.sourcing.query.{SelectFilter, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionMetadata, ProjectionProgress, ProjectionStore} import ch.epfl.bluebrain.nexus.delta.sourcing.{ProgressStatistics, Transactors} +import fs2.Stream import java.time.Instant @@ -69,7 +70,7 @@ trait Projections { * @param offset * the offset to start from */ - def restarts(offset: Offset): ElemStream[ProjectionRestart] + def restarts(offset: Offset): Stream[IO, (Offset, ProjectionRestart)] /** * Acknowledge that a restart has been performed @@ -119,7 +120,8 @@ object Projections { } } - override def restarts(offset: Offset): ElemStream[ProjectionRestart] = projectionRestartStore.stream(offset) + override def restarts(offset: Offset): Stream[IO, (Offset, ProjectionRestart)] = + projectionRestartStore.stream(offset) override def acknowledgeRestart(id: Offset): IO[Unit] = projectionRestartStore.acknowledge(id) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectionRestart.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectionRestart.scala index 5fe58f6eab..335ee7b8f1 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectionRestart.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/model/ProjectionRestart.scala @@ -1,11 +1,9 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections.model -import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.Serializer -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.pgDecoderGetT import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import doobie.Get import io.circe.Codec import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredCodec @@ -25,19 +23,12 @@ final case class ProjectionRestart(name: String, instant: Instant, subject: Subj object ProjectionRestart { - val entityType: EntityType = EntityType("projection-restart") - - /** - * Create an [[Iri]] for the projection restart from its offset - * @param offset - * @return - */ - def restartId(offset: Offset): Iri = nxv + s"projection/restart/${offset.value}" - implicit val projectionRestartCodec: Codec[ProjectionRestart] = { import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Database._ implicit val configuration: Configuration = Serializer.circeConfiguration deriveConfiguredCodec[ProjectionRestart] } + implicit val projectionRestartGet: Get[ProjectionRestart] = pgDecoderGetT[ProjectionRestart] + } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala index a4e608d397..dd34904ec1 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala @@ -99,9 +99,9 @@ object StreamingQuery { |LIMIT ${cfg.batchSize} |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Instant, Long, Int)].map { case (`newState`, entityType, id, org, project, instant, offset, rev) => - SuccessElem(entityType, id, Some(ProjectRef(org, project)), instant, Offset.at(offset), (), rev) + SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), (), rev) case (_, entityType, id, org, project, instant, offset, rev) => - DroppedElem(entityType, id, Some(ProjectRef(org, project)), instant, Offset.at(offset), rev) + DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) } } StreamingQuery[Elem[Unit], Iri](start, query, _.offset, _.id, cfg, xas) @@ -154,9 +154,9 @@ object StreamingQuery { |LIMIT ${cfg.batchSize} |""".stripMargin.query[(String, EntityType, Iri, Label, Label, Option[Json], Instant, Long, Int)].map { case (`newState`, entityType, id, org, project, Some(json), instant, offset, rev) => - SuccessElem(entityType, id, Some(ProjectRef(org, project)), instant, Offset.at(offset), json, rev) + SuccessElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), json, rev) case (_, entityType, id, org, project, _, instant, offset, rev) => - DroppedElem(entityType, id, Some(ProjectRef(org, project)), instant, Offset.at(offset), rev) + DroppedElem(entityType, id, ProjectRef(org, project), instant, Offset.at(offset), rev) } } StreamingQuery[Elem[Json], Iri](start, query, _.offset, _.id, cfg, xas) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala index 0a7226f827..28543c009c 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala @@ -5,7 +5,7 @@ import cats.implicits.{toFoldableOps, toFunctorOps, toTraverseOps} import cats.{Applicative, Eval, Traverse} import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem} import doobie.Read @@ -39,7 +39,7 @@ sealed trait Elem[+A] extends Product with Serializable { * @return * the underlying project if there is one */ - def project: Option[ProjectRef] + def project: ProjectRef /** * @return @@ -110,8 +110,8 @@ sealed trait Elem[+A] extends Product with Serializable { } /** - * Like `[[Elem#map]]`, but accepts a function returning a [[Task]]. If the task failed, the [[Elem.SuccessElem]] - * will become a [[Elem.FailedElem]] + * Like `[[Elem#map]]`, but accepts a function returning a [[IO]]. If the io failed, the [[Elem.SuccessElem]] will + * become a [[Elem.FailedElem]] * @param f * the mapping function */ @@ -155,7 +155,7 @@ sealed trait Elem[+A] extends Product with Serializable { } override def toString: String = - s"${this.getClass.getSimpleName}[${project.fold("")(_.toString)}/$id:$rev]{${offset.value}}" + s"${this.getClass.getSimpleName}[$project/$id:$rev]{${offset.value}}" } object Elem { @@ -180,7 +180,7 @@ object Elem { def fromEither[A]( tpe: EntityType, id: Iri, - project: Option[ProjectRef], + project: ProjectRef, instant: Instant, offset: Offset, either: Either[Throwable, A], @@ -209,15 +209,13 @@ object Elem { final case class SuccessElem[+A]( tpe: EntityType, id: Iri, - project: Option[ProjectRef], + project: ProjectRef, instant: Instant, offset: Offset, value: A, rev: Int ) extends Elem[A] { def mapValue[B](f: A => B): Elem.SuccessElem[B] = copy(value = f(value)) - - def withProject(project: ProjectRef): Elem.SuccessElem[A] = this.copy(project = Some(project)) } object SuccessElem { @@ -226,9 +224,9 @@ object Elem { import doobie.postgres.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ implicit val v: Get[Value] = pgDecoderGetT[Value] - Read[(EntityType, Iri, Value, Int, Instant, Long, String, String)].map { + Read[(EntityType, Iri, Value, Int, Instant, Long, Label, Label)].map { case (tpe, id, value, rev, instant, offset, org, proj) => - SuccessElem(tpe, id, Some(ProjectRef.unsafe(org, proj)), instant, Offset.at(offset), value, rev) + SuccessElem(tpe, id, ProjectRef(org, proj), instant, Offset.at(offset), value, rev) } } @@ -258,7 +256,7 @@ object Elem { final case class FailedElem( tpe: EntityType, id: Iri, - project: Option[ProjectRef], + project: ProjectRef, instant: Instant, offset: Offset, throwable: Throwable, @@ -279,7 +277,7 @@ object Elem { final case class DroppedElem( tpe: EntityType, id: Iri, - project: Option[ProjectRef], + project: ProjectRef, instant: Instant, offset: Offset, rev: Int diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala index 05a16b993a..d499d26a5d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Supervisor.scala @@ -6,10 +6,7 @@ import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.syntax._ import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy} import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections} -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStatus.Ignored import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.{EveryNode, PersistentSingleNode, TransientSingleNode} import fs2.Stream @@ -116,8 +113,6 @@ object Supervisor { stop = IO.unit ) - private[sourcing] val watchRestartMetadata = ProjectionMetadata("system", "watch-restarts", None, None) - /** * Constructs a new [[Supervisor]] instance using the provided `store` and `cfg`. * @@ -143,7 +138,7 @@ object Supervisor { supervisionRef <- Ref.of[IO, Fiber[IO, Throwable, Unit]](supervision) supervisor = new Impl(projections, projectionErrors, cfg, semaphore, mapRef, signal, supervisionRef) - _ <- watchRestarts(supervisor, projections) + _ <- WatchRestarts(supervisor, projections) _ <- log.info("Delta supervisor is up") } yield supervisor @@ -200,28 +195,6 @@ object Supervisor { } } - private def watchRestarts(supervisor: Supervisor, projections: Projections) = { - supervisor.run( - CompiledProjection.fromStream( - watchRestartMetadata, - ExecutionStrategy.EveryNode, - (offset: Offset) => - projections - .restarts(offset) - .evalMap { - case s: SuccessElem[ProjectionRestart] => - supervisor.restart(s.value.name).flatMap { status => - if (status.exists(_ != ExecutionStatus.Ignored)) - projections.acknowledgeRestart(s.offset).as(s.void) - else - IO.pure(s.dropped) - } - case other => IO.pure(other.void) - } - ) - ) - } - final private case class Supervised( metadata: ProjectionMetadata, executionStrategy: ExecutionStrategy, diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/WatchRestarts.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/WatchRestarts.scala new file mode 100644 index 0000000000..a814928b80 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/WatchRestarts.scala @@ -0,0 +1,57 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.stream + +import cats.effect.IO +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem + +/** + * Schedule an internal projection so that restarts can be executed and acknowledged + */ +object WatchRestarts { + + private[sourcing] val projectionMetadata = ProjectionMetadata("system", "watch-restarts", None, None) + + private val entityType: EntityType = EntityType("projection-restart") + + private def restartId(offset: Offset): Iri = nxv + s"projection/restart/${offset.value}" + + private def success(offset: Offset, restart: ProjectionRestart): SuccessElem[Unit] = + SuccessElem( + entityType, + restartId(offset), + ProjectRef.unsafe("projection", "restart"), + restart.instant, + offset, + (), + 1 + ) + + private def dropped(offset: Offset, restart: ProjectionRestart): Elem.DroppedElem = success(offset, restart).dropped + + //FIXME: Execute watch restarts so that they don't require to be mapped as elems + def apply(supervisor: Supervisor, projections: Projections): IO[ExecutionStatus] = { + supervisor.run( + CompiledProjection.fromStream( + projectionMetadata, + ExecutionStrategy.EveryNode, + (offset: Offset) => + projections + .restarts(offset) + .evalMap { case (offset, restart) => + supervisor.restart(restart.name).flatMap { status => + if (status.exists(_ != ExecutionStatus.Ignored)) + projections.acknowledgeRestart(offset).as(success(offset, restart)) + else + IO.pure(dropped(offset, restart)) + } + } + ) + ) + } + +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala index 5ee2652bbc..ae1a74ee90 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala @@ -48,7 +48,7 @@ class FailedElemLogStoreSuite extends NexusSuite with MutableClock.Fixture with private val entityType = EntityType("Test") private def createFailedElem(project: ProjectRef, offset: Long) = - FailedElem(entityType, id, Some(project), start.plusSeconds(offset), Offset.at(offset), error, rev) + FailedElem(entityType, id, project, start.plusSeconds(offset), Offset.at(offset), error, rev) private val fail1 = createFailedElem(project1, 1L) private val fail2 = createFailedElem(project1, 2L) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStoreSuite.scala index 843ddd69b6..759dfa4505 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionRestartStoreSuite.scala @@ -5,9 +5,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart -import ch.epfl.bluebrain.nexus.delta.sourcing.projections.model.ProjectionRestart.{entityType, restartId} import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite import munit.AnyFixture @@ -24,9 +22,6 @@ class ProjectionRestartStoreSuite extends NexusSuite with Doobie.Fixture with Do private val pr1 = ProjectionRestart("proj1", Instant.EPOCH, Anonymous) private val pr2 = ProjectionRestart("proj2", Instant.EPOCH.plusSeconds(5L), Anonymous) - private def toElem(id: Offset, restart: ProjectionRestart) = - SuccessElem(entityType, restartId(id), None, restart.instant, id, restart, 1) - test("Save a projection restart") { store.save(pr1).assertEquals(()) } @@ -38,13 +33,13 @@ class ProjectionRestartStoreSuite extends NexusSuite with Doobie.Fixture with Do test("Stream projection restarts") { store .stream(Offset.start) - .assert(toElem(Offset.at(1L), pr1), toElem(Offset.at(2L), pr2)) + .assert((Offset.at(1L), pr1), (Offset.at(2L), pr2)) } test("Delete older restarts and stream again") { for { _ <- store.deleteExpired(Instant.EPOCH.plusSeconds(2L)) - _ <- store.stream(Offset.start).assert(toElem(Offset.at(2L), pr2)) + _ <- store.stream(Offset.start).assert((Offset.at(2L), pr2)) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala index 69b1e2594f..6c675b3152 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuerySuite.scala @@ -63,18 +63,19 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { private val customTag = UserTag.unsafe("v0.1") private val rev = 1 - private val prState11 = PullRequestActive(id1, project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val prState12 = PullRequestActive(id2, project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val prState13 = - PullRequestActive(id3, project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice, Set(nxv + "Fix")) - private val prState14 = - PullRequestActive(id4, project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice, Set(nxv + "Feature")) - private val prState21 = PullRequestActive(id1, project2, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val prState34 = PullRequestActive(id4, project3, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - - private val release11 = Release(nxv + "a", project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val release12 = Release(nxv + "b", project1, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val release21 = Release(nxv + "c", project2, rev, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) + private val epoch: Instant = Instant.EPOCH + private val prState11 = PullRequestActive(id1, project1, rev, epoch, Anonymous, epoch, alice) + private val prState12 = PullRequestActive(id2, project1, rev, epoch, Anonymous, epoch, alice) + private val prState13 = + PullRequestActive(id3, project1, rev, epoch, Anonymous, epoch, alice, Set(nxv + "Fix")) + private val prState14 = + PullRequestActive(id4, project1, rev, epoch, Anonymous, epoch, alice, Set(nxv + "Feature")) + private val prState21 = PullRequestActive(id1, project2, rev, epoch, Anonymous, epoch, alice) + private val prState34 = PullRequestActive(id4, project3, rev, epoch, Anonymous, epoch, alice) + + private val release11 = Release(nxv + "a", project1, rev, epoch, Anonymous, epoch, alice) + private val release12 = Release(nxv + "b", project1, rev, epoch, Anonymous, epoch, alice) + private val release21 = Release(nxv + "c", project2, rev, epoch, Anonymous, epoch, alice) private def decodeValue(entityType: EntityType, json: Json) = IO.fromEither { @@ -119,12 +120,12 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { val (iri, void) = stream(project1, Offset.start, SelectFilter.latest) val expected = List( - SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(1L), id1, rev), - SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(2L), id2, rev), - SuccessElem(Release.entityType, release11.id, Some(project1), Instant.EPOCH, Offset.at(3L), release11.id, rev), - SuccessElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(7L), id3, rev), - SuccessElem(Release.entityType, release12.id, Some(project1), Instant.EPOCH, Offset.at(8L), release12.id, rev), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(15L), id4, rev) + SuccessElem(PullRequest.entityType, id1, project1, epoch, Offset.at(1L), id1, rev), + SuccessElem(PullRequest.entityType, id2, project1, epoch, Offset.at(2L), id2, rev), + SuccessElem(Release.entityType, release11.id, project1, epoch, Offset.at(3L), release11.id, rev), + SuccessElem(PullRequest.entityType, id3, project1, epoch, Offset.at(7L), id3, rev), + SuccessElem(Release.entityType, release12.id, project1, epoch, Offset.at(8L), release12.id, rev), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(15L), id4, rev) ) iri.compile.toList.assertEquals(expected) @@ -135,9 +136,9 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { val (iri, void) = stream(project1, Offset.at(3L), SelectFilter.latest) val expected = List( - SuccessElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(7L), id3, rev), - SuccessElem(Release.entityType, release12.id, Some(project1), Instant.EPOCH, Offset.at(8L), release12.id, rev), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(15L), id4, rev) + SuccessElem(PullRequest.entityType, id3, project1, epoch, Offset.at(7L), id3, rev), + SuccessElem(Release.entityType, release12.id, project1, epoch, Offset.at(8L), release12.id, rev), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(15L), id4, rev) ) iri.compile.toList.assertEquals(expected) @@ -149,8 +150,8 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { val (iri, void) = stream(project1, Offset.start, SelectFilter(None, allowedViewTypes, Tag.Latest)) val expected = List( - SuccessElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(7L), id3, rev), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(15L), id4, rev) + SuccessElem(PullRequest.entityType, id3, project1, epoch, Offset.at(7L), id3, rev), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(15L), id4, rev) ) iri.compile.toList.assertEquals(expected) @@ -161,13 +162,13 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { val (iri, void) = stream(project1, Offset.start, SelectFilter.tag(customTag)) val expected = List( - SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(6L), id1, rev), - SuccessElem(Release.entityType, release12.id, Some(project1), Instant.EPOCH, Offset.at(9L), release12.id, rev), - DroppedElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(11L), -1), - SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(12L), id2, rev), - DroppedElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(14L), -1), - DroppedElem(Release.entityType, release12.id, Some(project1), Instant.EPOCH, Offset.at(16L), -1), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(17L), id4, rev) + SuccessElem(PullRequest.entityType, id1, project1, epoch, Offset.at(6L), id1, rev), + SuccessElem(Release.entityType, release12.id, project1, epoch, Offset.at(9L), release12.id, rev), + DroppedElem(PullRequest.entityType, id3, project1, epoch, Offset.at(11L), -1), + SuccessElem(PullRequest.entityType, id2, project1, epoch, Offset.at(12L), id2, rev), + DroppedElem(PullRequest.entityType, id1, project1, epoch, Offset.at(14L), -1), + DroppedElem(Release.entityType, release12.id, project1, epoch, Offset.at(16L), -1), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(17L), id4, rev) ) iri.compile.toList.assertEquals(expected) @@ -177,10 +178,10 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { test(s"Running a stream on states with tag '${customTag.value}' on project 1 from offset 11") { val (iri, void) = stream(project1, Offset.at(11L), SelectFilter.tag(customTag)) val expected = List( - SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(12L), id2, rev), - DroppedElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(14L), -1), - DroppedElem(Release.entityType, release12.id, Some(project1), Instant.EPOCH, Offset.at(16L), -1), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(17L), id4, rev) + SuccessElem(PullRequest.entityType, id2, project1, epoch, Offset.at(12L), id2, rev), + DroppedElem(PullRequest.entityType, id1, project1, epoch, Offset.at(14L), -1), + DroppedElem(Release.entityType, release12.id, project1, epoch, Offset.at(16L), -1), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(17L), id4, rev) ) iri.compile.toList.assertEquals(expected) @@ -198,31 +199,16 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { } } - val result = StreamingQuery.elems[Iri](project1, Offset.start, SelectFilter.latest, qc, xas, incompleteDecode) + val result = StreamingQuery.elems[Iri](project1, Offset.start, SelectFilter.latest, qc, xas, incompleteDecode) + val releaseDecodingFailure = decodingFailure(Release.entityType) result.compile.toList.assertEquals( List( - SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(1L), id1, rev), - SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(2L), id2, rev), - FailedElem( - Release.entityType, - release11.id, - Some(project1), - Instant.EPOCH, - Offset.at(3L), - decodingFailure(Release.entityType), - rev - ), - SuccessElem(PullRequest.entityType, id3, Some(project1), Instant.EPOCH, Offset.at(7L), id3, rev), - FailedElem( - Release.entityType, - release12.id, - Some(project1), - Instant.EPOCH, - Offset.at(8L), - decodingFailure(Release.entityType), - rev - ), - SuccessElem(PullRequest.entityType, id4, Some(project1), Instant.EPOCH, Offset.at(15L), id4, rev) + SuccessElem(PullRequest.entityType, id1, project1, epoch, Offset.at(1L), id1, rev), + SuccessElem(PullRequest.entityType, id2, project1, epoch, Offset.at(2L), id2, rev), + FailedElem(Release.entityType, release11.id, project1, epoch, Offset.at(3L), releaseDecodingFailure, rev), + SuccessElem(PullRequest.entityType, id3, project1, epoch, Offset.at(7L), id3, rev), + FailedElem(Release.entityType, release12.id, project1, epoch, Offset.at(8L), releaseDecodingFailure, rev), + SuccessElem(PullRequest.entityType, id4, project1, epoch, Offset.at(15L), id4, rev) ) ) } @@ -232,7 +218,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { .remaining(project1, SelectFilter.latest, Offset.start, xas) .assertEquals( Some( - RemainingElems(6L, Instant.EPOCH) + RemainingElems(6L, epoch) ) ) } @@ -242,7 +228,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { .remaining(project1, SelectFilter.latestOfEntity(PullRequest.entityType), Offset.start, xas) .assertEquals( Some( - RemainingElems(4L, Instant.EPOCH) + RemainingElems(4L, epoch) ) ) } @@ -252,7 +238,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { .remaining(project1, SelectFilter.latest, Offset.at(6L), xas) .assertEquals( Some( - RemainingElems(3L, Instant.EPOCH) + RemainingElems(3L, epoch) ) ) } @@ -262,7 +248,7 @@ class StreamingQuerySuite extends NexusSuite with Doobie.Fixture { .remaining(project1, SelectFilter.tag(customTag), Offset.at(6L), xas) .assertEquals( Some( - RemainingElems(4L, Instant.EPOCH) + RemainingElems(4L, epoch) ) ) } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala index d0b20c31ea..2db0cedf1c 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala @@ -5,13 +5,13 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.kernel.error.ThrowableValue import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv +import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.{PullRequestActive, PullRequestClosed} -import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.{entityType, PullRequestState} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag} -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy @@ -31,8 +31,9 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private lazy val xas = doobie() - private lazy val store = ScopedStateStore[Iri, PullRequestState]( - PullRequest.entityType, + private val entityType: EntityType = PullRequest.entityType + private lazy val store = ScopedStateStore[Iri, PullRequestState]( + entityType, PullRequestState.serializer, QueryConfig(1, RefreshStrategy.Delay(500.millis)), xas @@ -50,27 +51,21 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A private val customTag = UserTag.unsafe("v0.1") - private val state1 = PullRequestActive(id1, project1, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val state2 = PullRequestActive(id2, project1, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val updatedState1 = PullRequestClosed(id1, project1, 2, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - - private val state3 = PullRequestActive(id1, project2, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - private val state4 = PullRequestActive(id4, project3, 1, Instant.EPOCH, Anonymous, Instant.EPOCH, alice) - - private val elem1 = - Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(1L), state1, 1) - private val elem2 = - Elem.SuccessElem(PullRequest.entityType, id2, Some(project1), Instant.EPOCH, Offset.at(2L), state2, 1) - private val elem3 = - Elem.SuccessElem(PullRequest.entityType, id1, Some(project2), Instant.EPOCH, Offset.at(3L), state3, 1) - private val elem4 = - Elem.SuccessElem(PullRequest.entityType, id4, Some(project3), Instant.EPOCH, Offset.at(4L), state4, 1) - private val elem1Tagged = - Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(5L), state1, 1) - private val elem3Tagged = - Elem.SuccessElem(PullRequest.entityType, id1, Some(project2), Instant.EPOCH, Offset.at(6L), state3, 1) - private val elem1Updated = - Elem.SuccessElem(PullRequest.entityType, id1, Some(project1), Instant.EPOCH, Offset.at(7L), updatedState1, 2) + private val epoch: Instant = Instant.EPOCH + private val state1 = PullRequestActive(id1, project1, 1, epoch, Anonymous, epoch, alice) + private val state2 = PullRequestActive(id2, project1, 1, epoch, Anonymous, epoch, alice) + private val updatedState1 = PullRequestClosed(id1, project1, 2, epoch, Anonymous, epoch, alice) + + private val state3 = PullRequestActive(id1, project2, 1, epoch, Anonymous, epoch, alice) + private val state4 = PullRequestActive(id4, project3, 1, epoch, Anonymous, epoch, alice) + + private val elem1 = Elem.SuccessElem(entityType, id1, project1, epoch, Offset.at(1L), state1, 1) + private val elem2 = Elem.SuccessElem(entityType, id2, project1, epoch, Offset.at(2L), state2, 1) + private val elem3 = Elem.SuccessElem(entityType, id1, project2, epoch, Offset.at(3L), state3, 1) + private val elem4 = Elem.SuccessElem(entityType, id4, project3, epoch, Offset.at(4L), state4, 1) + private val elem1Tagged = Elem.SuccessElem(entityType, id1, project1, epoch, Offset.at(5L), state1, 1) + private val elem3Tagged = Elem.SuccessElem(entityType, id1, project2, epoch, Offset.at(6L), state3, 1) + private val elem1Updated = Elem.SuccessElem(entityType, id1, project1, epoch, Offset.at(7L), updatedState1, 2) private def assertCount(expected: Int) = sql"select count(*) from scoped_states".query[Int].unique.transact(xas.read).assertEquals(expected) @@ -183,7 +178,7 @@ class ScopedStateStoreSuite extends NexusSuite with Doobie.Fixture with Doobie.A project1, xas ) - .assertEquals(Some(PullRequest.entityType)) + .assertEquals(Some(entityType)) } test("Get no entity type for an unknown id") { diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala index e7f75ccc4d..c0759834f6 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/CacheSink.scala @@ -43,11 +43,8 @@ final class CacheSink[A: Typeable] private (documentId: Elem[A] => Iri) extends } object CacheSink { - private val eventDocumentId: Elem[_] => Iri = elem => - elem.project match { - case Some(project) => iri"$project/${elem.id}:${elem.rev}" - case None => iri"${elem.id}:${elem.rev}" - } + private val eventDocumentId: Elem[_] => Iri = + elem => iri"${elem.project}/${elem.id}:${elem.rev}" /** CacheSink for events */ def events[A: Typeable]: CacheSink[A] = new CacheSink(eventDocumentId) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailedElemPersistenceSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailedElemPersistenceSuite.scala index f2c31c8a45..07a46c02c3 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailedElemPersistenceSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/FailedElemPersistenceSuite.scala @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem._ import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite @@ -20,6 +20,7 @@ class FailedElemPersistenceSuite extends NexusSuite { implicit private val patienceConfig: PatienceConfig = PatienceConfig(500.millis, 10.millis) private val projection1 = ProjectionMetadata("test", "name1", None, None) + private val project = ProjectRef.unsafe("org", "proj") private val id = nxv + "id" private val rev = 1 @@ -31,7 +32,7 @@ class FailedElemPersistenceSuite extends NexusSuite { FailedElem( EntityType("entity"), id, - None, + project, Instant.EPOCH, Offset.at(value.toLong), new RuntimeException("boom"), @@ -43,7 +44,9 @@ class FailedElemPersistenceSuite extends NexusSuite { (_: Offset) => Stream .range(1, 11) - .map { value => SuccessElem(EntityType("entity"), id, None, Instant.EPOCH, Offset.at(value.toLong), (), rev) } + .map { value => + SuccessElem(EntityType("entity"), id, project, Instant.EPOCH, Offset.at(value.toLong), (), rev) + } private val saveFailedElems: MutableSet[FailedElem] => List[FailedElem] => IO[Unit] = failedElemStore => failedElems => IO.delay { failedElems.foreach(failedElemStore.add) } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala index c756989e96..fb48c8b774 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/OperationSuite.scala @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Pipe @@ -70,9 +70,11 @@ object OperationSuite { private val numberType = EntityType("number") + private val project = ProjectRef.unsafe("org", "proj") + private def until(n: Int): Source = Source(offset => Stream.range(offset.value.toInt, n).covary[IO].map { i => - SuccessElem(numberType, nxv + i.toString, None, Instant.EPOCH, Offset.at(i.toLong), i, 1) + SuccessElem(numberType, nxv + i.toString, project, Instant.EPOCH, Offset.at(i.toLong), i, 1) } ) diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PullRequestStream.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PullRequestStream.scala index d8565b3bf8..0245414268 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PullRequestStream.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/PullRequestStream.scala @@ -44,7 +44,7 @@ object PullRequestStream { SuccessElem( tpe = PullRequest.entityType, id = pr1.id, - project = Some(projectRef), + project = projectRef, instant = pr1.updatedAt, offset = Offset.at(1L), value = PullRequestState.toGraphResource(pr1, base), @@ -53,7 +53,7 @@ object PullRequestStream { DroppedElem( tpe = PullRequest.entityType, id = nxv + "dropped", - project = Some(projectRef), + project = projectRef, Instant.EPOCH, Offset.at(2L), rev = 1 @@ -61,7 +61,7 @@ object PullRequestStream { SuccessElem( tpe = PullRequest.entityType, id = pr2.id, - project = Some(projectRef), + project = projectRef, instant = pr2.updatedAt, offset = Offset.at(3L), value = PullRequestState.toGraphResource(pr2, base), @@ -70,7 +70,7 @@ object PullRequestStream { FailedElem( tpe = PullRequest.entityType, id = nxv + "failed", - project = Some(projectRef), + project = projectRef, Instant.EPOCH, Offset.at(4L), new IllegalStateException("This is an error message"), diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSuite.scala index 54c64d4885..62187f08a0 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/SupervisorSuite.scala @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv -import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie @@ -36,6 +36,8 @@ class SupervisorSuite extends NexusSuite with SupervisorSetup.Fixture with Doobi private val ignoredByNode1 = ProjectionMetadata("test", "name2", None, None) private val random = ProjectionMetadata("test", "name3", None, None) + private val project = ProjectRef.unsafe("org", "proj") + private val rev = 1 private def evalStream(start: IO[Unit]) = @@ -43,7 +45,7 @@ class SupervisorSuite extends NexusSuite with SupervisorSetup.Fixture with Doobi Stream.eval(start) >> Stream .range(1, 21) .map { value => - SuccessElem(EntityType("entity"), nxv + "id", None, Instant.EPOCH, Offset.at(value.toLong), (), rev) + SuccessElem(EntityType("entity"), nxv + "id", project, Instant.EPOCH, Offset.at(value.toLong), (), rev) } private val expectedProgress = ProjectionProgress(Offset.at(20L), Instant.EPOCH, 20, 0, 0) @@ -94,7 +96,7 @@ class SupervisorSuite extends NexusSuite with SupervisorSetup.Fixture with Doobi private def assertWatchRestarts(offset: Offset, processed: Long, discarded: Long)(implicit loc: Location) = { val progress = ProjectionProgress(offset, Instant.EPOCH, processed, discarded, 0) - assertDescribe(Supervisor.watchRestartMetadata, EveryNode, 0, Running, progress) + assertDescribe(WatchRestarts.projectionMetadata, EveryNode, 0, Running, progress) } test("Watching restart projection restarts should be running") { @@ -137,7 +139,7 @@ class SupervisorSuite extends NexusSuite with SupervisorSetup.Fixture with Doobi .assertEquals( List( SupervisedDescription( - metadata = Supervisor.watchRestartMetadata, + metadata = WatchRestarts.projectionMetadata, EveryNode, 0, Running, @@ -247,7 +249,7 @@ class SupervisorSuite extends NexusSuite with SupervisorSetup.Fixture with Doobi .assertEquals( List( SupervisedDescription( - metadata = Supervisor.watchRestartMetadata, + WatchRestarts.projectionMetadata, EveryNode, restarts = 0, Running, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DataConstructQuerySuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DataConstructQuerySuite.scala index 9f47a71d05..d0cb151459 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DataConstructQuerySuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DataConstructQuerySuite.scala @@ -40,7 +40,7 @@ class DataConstructQuerySuite extends NexusSuite { SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DiscardMetadataSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DiscardMetadataSuite.scala index dee6c22057..9bfe07886a 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DiscardMetadataSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/DiscardMetadataSuite.scala @@ -38,7 +38,7 @@ class DiscardMetadataSuite extends NexusSuite { val elem = SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterBySchemaSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterBySchemaSuite.scala index a3679162ea..04f7f2c4b2 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterBySchemaSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterBySchemaSuite.scala @@ -40,7 +40,7 @@ class FilterBySchemaSuite extends NexusSuite { SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph.copy(schema = schema), diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterByTypeSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterByTypeSuite.scala index 960233d010..6ca3efe59e 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterByTypeSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterByTypeSuite.scala @@ -39,7 +39,7 @@ class FilterByTypeSuite extends NexusSuite { SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph.copy(types = types), diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterDeprecatedSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterDeprecatedSuite.scala index b155aa15ad..54c007aa87 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterDeprecatedSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/FilterDeprecatedSuite.scala @@ -45,7 +45,7 @@ class FilterDeprecatedSuite extends NexusSuite { val elem = SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph.copy(deprecated = true), @@ -59,7 +59,7 @@ class FilterDeprecatedSuite extends NexusSuite { val elem = SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SelectPredicatesSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SelectPredicatesSuite.scala index d26f6f80ae..9fb364aee9 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SelectPredicatesSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SelectPredicatesSuite.scala @@ -42,7 +42,7 @@ class SelectPredicatesSuite extends NexusSuite { SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SourceAsTextSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SourceAsTextSuite.scala index bfb270f04d..e1ee1fde80 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SourceAsTextSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/pipes/SourceAsTextSuite.scala @@ -39,7 +39,7 @@ class SourceAsTextSuite extends NexusSuite { val elem = SuccessElem( tpe = PullRequest.entityType, id = base / "id", - project = Some(project), + project = project, instant = instant, offset = Offset.at(1L), value = graph,