Skip to content

Commit

Permalink
Make project in Elem mandatory (#5170)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Oct 4, 2024
1 parent 0c5969a commit 24f88bc
Show file tree
Hide file tree
Showing 59 changed files with 285 additions and 317 deletions.
9 changes: 1 addition & 8 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,8 @@ app {

# SSE configuration
sse {
# the schemas event-log configuration
# the SSE query configuration
query = ${app.defaults.query}
# the sse cache to get projects uuids
cache {
# The max number of tokens in the projects uuids cache
max-size = 500
# The duration after which the cache expires
expire-after = 15 minutes
}
}

# projection configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ class SchemaJobRoutesSpec extends BaseRouteSpec {
val projectionMetadata = SchemaValidationCoordinator.projectionMetadata(project.ref)

val reason = FailureReason("ValidationFail", json"""{ "details": "..." }""")
val fail1 = FailedElem(EntityType("ACL"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)
val fail2 =
FailedElem(EntityType("Schema"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)
val fail1 = FailedElem(EntityType("ACL"), resourceId, project.ref, Instant.EPOCH, Offset.At(42L), reason, rev)
val fail2 = FailedElem(EntityType("Schema"), resourceId, project.ref, Instant.EPOCH, Offset.At(42L), reason, rev)

(
projections.save(projectionMetadata, progress) >>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,7 @@ final class BlazegraphViews(
}

private def toIndexViewDef(elem: Elem.SuccessElem[BlazegraphViewState]) =
elem.withProject(elem.value.project).traverse { v =>
IndexingViewDef(v, prefix)
}
elem.traverse { v => IndexingViewDef(v, prefix) }

private def eval(cmd: BlazegraphViewCommand): IO[ViewResource] =
log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
}
},
{
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid2",
"offset": {
"@type": "At",
"value": 42
"value": 43
},
"project": "org/proj",
"_rev": 1,
"reason": {
"type": "UnexpectedError",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view1.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(1L),
value = view1,
Expand All @@ -89,7 +89,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view2.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(2L),
value = view2,
Expand All @@ -98,7 +98,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view3.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(3L),
value = view3,
Expand All @@ -107,7 +107,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view4.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(4L),
value = view4,
Expand Down Expand Up @@ -141,7 +141,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
private val elem = SuccessElem(
tpe = PullRequest.entityType,
id = pr.id,
project = Some(project),
project = project,
instant = pr.updatedAt,
offset = Offset.at(1L),
value = PullRequestState.toGraphResource(pr, base),
Expand Down Expand Up @@ -171,7 +171,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
val failed = FailedElem(
tpe = PullRequest.entityType,
id = pr.id,
project = Some(project),
project = project,
instant = pr.updatedAt,
offset = Offset.at(1L),
new IllegalStateException("Boom"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view1.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(1L),
value = view1,
Expand All @@ -101,7 +101,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view2.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(2L),
value = view2,
Expand All @@ -110,7 +110,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view3.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(3L),
value = view3,
Expand All @@ -120,7 +120,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = deprecatedView1.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(4L),
value = deprecatedView1,
Expand All @@ -129,7 +129,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = updatedView2.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(5L),
value = updatedView2,
Expand All @@ -139,7 +139,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
SuccessElem(
tpe = BlazegraphViews.entityType,
id = updatedView2.ref.viewId,
project = Some(project),
project = project,
instant = Instant.EPOCH,
offset = Offset.at(6L),
value = updatedView2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples}
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
Expand All @@ -32,6 +32,8 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture

private lazy val sink = createSink(namespace)

private val project = ProjectRef.unsafe("org", "project")

private val resource1Id = iri"https://bbp.epfl.ch/resource1"
private val resource1Ntriples = NTriples(contentOf("sparql/resource1.ntriples"), resource1Id)
private val resource1NtriplesUpdated = NTriples(contentOf("sparql/resource1_updated.ntriples"), resource1Id)
Expand All @@ -54,14 +56,14 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture

private def asElems(chunk: Chunk[(Iri, NTriples)]) =
chunk.zipWithIndex.map { case ((id, ntriples), index) =>
SuccessElem(entityType, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1)
SuccessElem(entityType, id, project, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1)
}

private def createGraph(chunk: Chunk[(Iri, NTriples)]) = chunk.foldLeft(Graph.empty) { case (acc, (_, ntriples)) =>
acc ++ Graph(ntriples).getOrElse(Graph.empty)
}

private def dropped(id: Iri, offset: Offset) = DroppedElem(entityType, id, None, Instant.EPOCH, offset, 1)
private def dropped(id: Iri, offset: Offset) = DroppedElem(entityType, id, project, Instant.EPOCH, offset, 1)

private def query(namespace: String) =
client
Expand Down Expand Up @@ -96,7 +98,7 @@ class BlazegraphSinkSuite extends NexusSuite with BlazegraphClientSetup.Fixture

test("Report errors when the id is not a valid absolute iri") {
val chunk = Chunk(
SuccessElem(entityType, nxv + "é-wrong", None, Instant.EPOCH, Offset.at(5L), resource1Ntriples, 1)
SuccessElem(entityType, nxv + "é-wrong", project, Instant.EPOCH, Offset.at(5L), resource1Ntriples, 1)
)
val expected = createGraph(Chunk(resource1Id -> resource1Ntriples, resource3Id -> resource3Ntriples))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures {
private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock)

private val myId = nxv + "myid"
private val myId2 = nxv + "myid2"
private val indexingView = ActiveViewDef(
ViewRef(projectRef, myId),
"projection",
Expand Down Expand Up @@ -65,8 +66,8 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures {
super.beforeAll()
val error = new Exception("boom")
val rev = 1
val fail1 = FailedElem(EntityType("ACL"), myId, Some(projectRef), Instant.EPOCH, Offset.At(42L), error, rev)
val fail2 = FailedElem(EntityType("Schema"), myId, None, Instant.EPOCH, Offset.At(42L), error, rev)
val fail1 = FailedElem(EntityType("ACL"), myId, projectRef, Instant.EPOCH, Offset.At(42L), error, rev)
val fail2 = FailedElem(EntityType("Schema"), myId2, projectRef, Instant.EPOCH, Offset.At(43L), error, rev)
val save = for {
_ <- projections.save(indexingView.projectionMetadata, progress)
_ <- projectionErrors.saveFailedElems(indexingView.projectionMetadata, List(fail1, fail2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,11 @@ final class CompositeViews private (
/**
* Return the indexing views in a non-ending stream
*/
def views(start: Offset): ElemStream[CompositeViewDef] =
def views(start: Offset): SuccessElemStream[CompositeViewDef] =
log.states(Scope.Root, start).map(toCompositeViewDef)

private def toCompositeViewDef(elem: Elem.SuccessElem[CompositeViewState]) =
elem.withProject(elem.value.project).mapValue { v =>
CompositeViewDef(v)
}
elem.mapValue { v => CompositeViewDef(v) }

private def eval(cmd: CompositeViewCommand): IO[ViewResource] =
log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ final class CompositeRestartStore(xas: Transactors) {
|LIMIT 1""".stripMargin
.query[(Offset, ProjectRef, Iri, Json, Instant)]
.map { case (offset, project, id, json, instant) =>
Elem.fromEither(entityType, id, Some(project), instant, offset, json.as[CompositeRestart], 1)
Elem.fromEither(entityType, id, project, instant, offset, json.as[CompositeRestart], 1)
}
.option
.transact(xas.read)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
}
},
{
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid2",
"offset": {
"@type": "At",
"value": 42
"value": 43
},
"project": "myorg/myproj",
"_rev": 1,
"reason": {
"type": "UnexpectedError",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,7 @@ class DeltaClientSpec
private val invalidTag = Some(UserTag.unsafe("unknowntag"))

private def elem(i: Int): Elem[Unit] =
SuccessElem(
EntityType("test"),
iri"https://bbp.epfl.ch/$i",
Some(project),
Instant.EPOCH,
Offset.at(i.toLong),
(),
1
)
SuccessElem(EntityType("test"), iri"https://bbp.epfl.ch/$i", project, Instant.EPOCH, Offset.at(i.toLong), (), 1)

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst
} yield SuccessElem(
entityType,
value.id,
Some(project),
project,
Instant.EPOCH,
Offset.at(offset),
resource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures {
private val nowPlus5 = now.plusSeconds(5)

private val myId = nxv + "myid"
private val myId2 = nxv + "myid2"
private val view = CompositeViewsGen.resourceFor(projectRef, myId, uuid, viewValue, source = Json.obj())
private val indexingView = ActiveViewDef(
ViewRef(view.value.project, view.id),
Expand Down Expand Up @@ -93,8 +94,8 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures {
super.beforeAll()
val error = new Exception("boom")
val rev = 1
val fail1 = FailedElem(EntityType("ACL"), myId, Some(projectRef), Instant.EPOCH, Offset.At(42L), error, rev)
val fail2 = FailedElem(EntityType("Schema"), myId, None, Instant.EPOCH, Offset.At(42L), error, rev)
val fail1 = FailedElem(EntityType("ACL"), myId, projectRef, Instant.EPOCH, Offset.At(42L), error, rev)
val fail2 = FailedElem(EntityType("Schema"), myId2, projectRef, Instant.EPOCH, Offset.At(43L), error, rev)
val saveFailedElems = projectionErrors.saveFailedElems(indexingView.metadata, List(fail1, fail2))

saveFailedElems.accepted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CompositeRestartStoreSuite extends NexusSuite with Doobie.Fixture with Doo
private val cr3 = PartialRebuild(viewRef, projection, Instant.EPOCH.plusSeconds(5L), Anonymous)

private def toElem(offset: Offset, restart: CompositeRestart) =
SuccessElem(entityType, restart.id, Some(restart.project), restart.instant, offset, restart, 1)
SuccessElem(entityType, restart.id, restart.project, restart.instant, offset, restart, 1)

test("Save composite restarts") {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class RemoteGraphStreamSuite extends NexusSuite {
private val id = iri"https://example.com/testresource"

private val project = ProjectRef.unsafe("org", "proj")
private val elem = SuccessElem(EntityType("test"), id, Some(project), Instant.EPOCH, Offset.Start, (), 1)
private val elem = SuccessElem(EntityType("test"), id, project, Instant.EPOCH, Offset.Start, (), 1)
private val nQuads = NQuads(contentOf("remote/resource.nq"), id)

test("Metadata should be filtered correctly") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ final class ElasticSearchViews private (
}

private def toIndexViewDef(elem: Elem.SuccessElem[ElasticSearchViewState]) =
elem.withProject(elem.value.project).traverse { v =>
elem.traverse { v =>
IndexingViewDef(v, defaultElasticsearchMapping, defaultElasticsearchSettings, prefix)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,8 @@ object ElasticSearchSink {
* @return
* a function that maps an elem to a documentId based on the project (if available), the elem id, and the revision
*/
private val eventDocumentId: Elem[_] => String = elem =>
elem.project match {
case Some(project) => s"$project/${elem.id}:${elem.rev}"
case None => s"${elem.id}/${elem.rev}"
}
private val eventDocumentId: Elem[_] => String =
elem => s"${elem.project}/${elem.id}:${elem.rev}"

/**
* Mark and update the elements according to the elasticsearch response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ object EventMetricsProjection {
init: IO[Unit]
): IO[EventMetricsProjection] = {

val source = Source { (offset: Offset) =>
metrics(offset).map(e => e.withProject(e.value.project))
}
val source = Source { (offset: Offset) => metrics(offset) }

val compiledProjection =
CompiledProjection.compile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
}
},
{
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid2",
"offset": {
"@type": "At",
"value": 42
"value": 43
},
"project": "myorg/myproject",
"_rev": 1,
"reason": {
"type": "UnexpectedError",
Expand Down
Loading

0 comments on commit 24f88bc

Please sign in to comment.