Skip to content

Commit

Permalink
Add schema validation job endpoints (#5155)
Browse files Browse the repository at this point in the history
* Add schema validation job endpoints

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Sep 27, 2024
1 parent efab603 commit 772ebfb
Show file tree
Hide file tree
Showing 34 changed files with 535 additions and 76 deletions.
1 change: 1 addition & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ app {
"views/query",
"views/write",
"schemas/write",
"schemas/run",
"files/write",
"storages/write",
"version/read",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,24 @@ class ElemRoutes(
concat(
(get & pathPrefix("continuous")) {
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
emit(
sseElemStream.continuous(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset)
)
}
},
(get & pathPrefix("currents")) {
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
emit(sseElemStream.currents(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("remaining")) {
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter(types, tag.getOrElse(Latest)), offset).map {
r => r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
sseElemStream
.remaining(project, SelectFilter(None, types, tag.getOrElse(Latest)), offset)
.map { r =>
r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.{ContentTypes, StatusCodes}
import akka.http.scaladsl.server.Route
import akka.util.ByteString
import cats.effect.IO
import cats.implicits.catsSyntaxApplicativeError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, FileResponse}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resources.Resources
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.utils.StreamingUtils

/**
* Routes to trigger and get results from a schema validation job
*/
class SchemaJobRoutes(
identities: Identities,
aclCheck: AclCheck,
fetchContext: FetchContext,
schemaValidationCoordinator: SchemaValidationCoordinator,
projections: Projections,
projectionsErrors: ProjectionErrors
)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck) {

private def projectionName(project: ProjectRef) = SchemaValidationCoordinator.projectionMetadata(project).name

private def projectExists(project: ProjectRef) = fetchContext.onRead(project).void

private def streamValidationErrors(project: ProjectRef): IO[FileResponse] = {
IO.delay {
StreamConverter(
projectionsErrors
.failedElemEntries(projectionName(project), Offset.start)
.map(_.failedElemData)
.through(StreamingUtils.ndjson)
.map(ByteString(_))
)
}
}.map { s =>
FileResponse("validation.json", ContentTypes.`application/json`, None, s)
}

def routes: Route =
baseUriPrefix(baseUri.prefix) {
pathPrefix("jobs") {
extractCaller { implicit caller =>
pathPrefix("schemas") {
(pathPrefix("validation") & projectRef) { project =>
authorizeFor(project, Permissions.schemas.run).apply {
concat(
(post & pathEndOrSingleSlash) {
emit(
StatusCodes.Accepted,
projectExists(project) >> schemaValidationCoordinator.run(project).start.void
)
},
(pathPrefix("statistics") & get & pathEndOrSingleSlash) {
emit(
projectExists(project) >> projections
.statistics(
project,
SelectFilter.latestOfEntity(Resources.entityType),
projectionName(project)
)
)
},
(pathPrefix("errors") & get & pathEndOrSingleSlash) {
emit(
projectExists(project) >> streamValidationErrors(project).attemptNarrow[Nothing]
)
}
)
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.shacl.ValidateShacl
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.SchemasRoutes
import ch.epfl.bluebrain.nexus.delta.routes.{SchemaJobRoutes, SchemasRoutes}
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.AggregateIndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -27,6 +27,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas._
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.{SchemaValidationCoordinator, SchemaValidationStream}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaEvent}
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import izumi.distage.model.definition.{Id, ModuleDef}
Expand Down Expand Up @@ -120,6 +121,32 @@ object SchemasModule extends ModuleDef {
)
}

make[SchemaJobRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
fetchContext: FetchContext,
schemaValidationCoordinator: SchemaValidationCoordinator,
projections: Projections,
projectionsErrors: ProjectionErrors,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new SchemaJobRoutes(
identities,
aclCheck,
fetchContext,
schemaValidationCoordinator,
projections,
projectionsErrors
)(
baseUri,
cr,
ordering
)
}

many[SseEncoder[_]].add { base: BaseUri => SchemaEvent.sseEncoder(base) }

many[ScopedEventMetricEncoder[_]].add { SchemaEvent.schemaEventMetricEncoder }
Expand All @@ -144,6 +171,10 @@ object SchemasModule extends ModuleDef {
PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true)
}

many[PriorityRoute].add { (route: SchemaJobRoutes) =>
PriorityRoute(pluginsMaxPriority + 8, route.routes, requiresStrictEntity = true)
}

make[Schema.Shift].from { (schemas: Schemas, base: BaseUri) =>
Schema.shift(schemas)(base)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.model.MediaRanges.`*/*`
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.{Accept, OAuth2BearerToken}
import akka.http.scaladsl.server.Route
import cats.effect.{IO, Ref}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress.Root
import ch.epfl.bluebrain.nexus.delta.sdk.generators.ProjectGen
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.SchemaValidationCoordinator
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{FailureReason, ProjectionProgress}

import java.time.Instant

class SchemaJobRoutesSpec extends BaseRouteSpec {

private val caller = Caller(alice, Set(alice, Anonymous, Authenticated(realm), Group("group", realm)))

private val asAlice = addCredentials(OAuth2BearerToken("alice"))

private val project = ProjectGen.project("org", "project")
private val rev = 1
private val resourceId = nxv + "myid"

private val fetchContext = FetchContextDummy(List(project))

private val identities = IdentitiesDummy(caller)

private val aclCheck = AclSimpleCheck((alice, Root, Set(Permissions.schemas.run))).accepted

private lazy val projections = Projections(xas, queryConfig, clock)
private lazy val projectionErrors = ProjectionErrors(xas, queryConfig, clock)

private val progress = ProjectionProgress(Offset.at(15L), Instant.EPOCH, 9000L, 400L, 30L)

private val runTrigger = Ref.unsafe[IO, Boolean](false)

private val schemaValidationCoordinator = new SchemaValidationCoordinator {
override def run(project: ProjectRef): IO[Unit] = runTrigger.set(true).void
}

private lazy val routes = Route.seal(
new SchemaJobRoutes(
identities,
aclCheck,
fetchContext,
schemaValidationCoordinator,
projections,
projectionErrors
).routes
)

override def beforeAll(): Unit = {
super.beforeAll()
val projectionMetadata = SchemaValidationCoordinator.projectionMetadata(project.ref)

val reason = FailureReason("ValidationFail", json"""{ "details": "..." }""")
val fail1 = FailedElem(EntityType("ACL"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)
val fail2 =
FailedElem(EntityType("Schema"), resourceId, Some(project.ref), Instant.EPOCH, Offset.At(42L), reason, rev)

(
projections.save(projectionMetadata, progress) >>
projectionErrors.saveFailedElems(projectionMetadata, List(fail1, fail2))
).accepted
}

"The schema validation job route" should {
"fail to start a validation job without permission" in {
Post("/v1/jobs/schemas/validation/org/project") ~> routes ~> check {
response.shouldBeForbidden
runTrigger.get.accepted shouldEqual false
}
}

"fail to start a validation job for an unknown project" in {
Post("/v1/jobs/schemas/validation/xxx/xxx") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
runTrigger.get.accepted shouldEqual false
}
}

"start a validation job on the project with appropriate access" in {
Post("/v1/jobs/schemas/validation/org/project") ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.Accepted
runTrigger.get.accepted shouldEqual true
}
}

"fail to get statistics on a validation job without permission" in {
Get("/v1/jobs/schemas/validation/org/project/statistics") ~> routes ~> check {
response.shouldBeForbidden
}
}

"fail to get statistics on a validation job for an unknown project" in {
Get("/v1/jobs/schemas/validation/xxx/xxx/statistics") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
}
}

"get statistics on a validation job with appropriate access" in {
val expectedResponse =
json"""
{
"@context": "https://bluebrain.github.io/nexus/contexts/statistics.json",
"delayInSeconds" : 0,
"discardedEvents": 400,
"evaluatedEvents": 8570,
"failedEvents": 30,
"lastEventDateTime": "${Instant.EPOCH}",
"lastProcessedEventDateTime": "${Instant.EPOCH}",
"processedEvents": 9000,
"remainingEvents": 0,
"totalEvents": 9000
}"""

Get("/v1/jobs/schemas/validation/org/project/statistics") ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual expectedResponse
}
}

"fail to download errors on a validation job without permission" in {
Get("/v1/jobs/schemas/validation/org/project/errors") ~> routes ~> check {
response.shouldBeForbidden
}
}

"fail to download errors on a validation job for an unknown project" in {
Get("/v1/jobs/schemas/validation/xxx/xxx/errors") ~> asAlice ~> routes ~> check {
response.shouldFail(StatusCodes.NotFound, "ProjectNotFound")
}
}

"download errors on a validation job with appropriate access" in {
val expectedResponse =
s"""{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}}
|{"id":"$resourceId","project":"${project.ref}","offset":{"value":42,"@type":"At"},"rev":1,"reason":{"type":"ValidationFail","value":{"details":"..."}}}
|""".stripMargin

Get("/v1/jobs/schemas/validation/org/project/errors") ~> Accept(`*/*`) ~> asAlice ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asString shouldEqual expectedResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ArchiveRoutes(

private def emitArchiveFile(source: IO[AkkaSource]) = {
val response = source.map { s =>
FileResponse(s"archive.zip", Zip.contentType, 0L, s)
FileResponse(s"archive.zip", Zip.contentType, None, s)
}
emit(response.attemptNarrow[ArchiveRejection])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,21 @@ class ArchiveDownloadSpec
val fetchFileContent: (Iri, ProjectRef) => IO[FileResponse] = {
case (`id1`, `projectRef`) =>
IO.pure(
FileResponse(file1Name, ContentTypes.`text/plain(UTF-8)`, file1Size, Source.single(ByteString(file1Content)))
FileResponse(
file1Name,
ContentTypes.`text/plain(UTF-8)`,
Some(file1Size),
Source.single(ByteString(file1Content))
)
)
case (`id2`, `projectRef`) =>
IO.pure(
FileResponse(file2Name, ContentTypes.`text/plain(UTF-8)`, file2Size, Source.single(ByteString(file2Content)))
FileResponse(
file2Name,
ContentTypes.`text/plain(UTF-8)`,
Some(file2Size),
Source.single(ByteString(file2Content))
)
)
case (id, ref) =>
IO.raiseError(FileNotFound(id, ref))
Expand Down
Loading

0 comments on commit 772ebfb

Please sign in to comment.