Skip to content

Commit

Permalink
Refactoring failed elem to allow adding further details, introduce th…
Browse files Browse the repository at this point in the history
…e projection to validate resources within a project (#5150)

* Refactoring failed elem to allow adding further details, use json to make it more structured

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Sep 25, 2024
1 parent 5daecf9 commit efab603
Show file tree
Hide file tree
Showing 41 changed files with 626 additions and 170 deletions.
6 changes: 6 additions & 0 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ app {
schemas {
# the schemas event-log configuration
event-log = ${app.defaults.event-log}
cache {
# The max number of schemas in cache
max-size = 50
# The duration after an entry in the cache expires
expire-after = 5 minutes
}
}

# Type hierarchy configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.{ResolverContextResolution, Resolvers}
import ch.epfl.bluebrain.nexus.delta.sdk.resources.FetchResource
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{FetchResource, Resources, ValidateResource}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas.{SchemaDefinition, SchemaLog}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas._
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.job.{SchemaValidationCoordinator, SchemaValidationStream}
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaEvent}
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import izumi.distage.model.definition.{Id, ModuleDef}

Expand All @@ -36,6 +38,8 @@ object SchemasModule extends ModuleDef {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

make[SchemasConfig].from { config: AppConfig => config.schemas }

make[ValidateShacl].fromEffect { (rcr: RemoteContextResolution @Id("aggregate")) => ValidateShacl(rcr) }

make[ValidateSchema].from { (validateShacl: ValidateShacl, api: JsonLdApi) => ValidateSchema(validateShacl)(api) }
Expand All @@ -44,8 +48,8 @@ object SchemasModule extends ModuleDef {
Schemas.definition(validateSchema, clock)
}

make[SchemaLog].from { (scopedDefinition: SchemaDefinition, config: AppConfig, xas: Transactors) =>
ScopedEventLog(scopedDefinition, config.schemas.eventLog, xas)
make[SchemaLog].from { (scopedDefinition: SchemaDefinition, config: SchemasConfig, xas: Transactors) =>
ScopedEventLog(scopedDefinition, config.eventLog, xas)
}

make[FetchSchema].from { (schemaLog: SchemaLog) =>
Expand Down Expand Up @@ -79,6 +83,22 @@ object SchemasModule extends ModuleDef {
SchemaImports(aclCheck, resolvers, fetchSchema, fetchResource)
}

make[SchemaValidationStream].fromEffect {
(resources: Resources, fetchSchema: FetchSchema, validateResource: ValidateResource, config: SchemasConfig) =>
FetchSchema.cached(fetchSchema, config.cache).map { cached =>
SchemaValidationStream(
resources.currentStates,
cached,
validateResource
)
}

}

make[SchemaValidationCoordinator].from { (supervisor: Supervisor, schemaValidationStream: SchemaValidationStream) =>
SchemaValidationCoordinator(supervisor, schemaValidationStream)
}

make[SchemasRoutes].from {
(
identities: Identities,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ class SchemasRoutesSpec extends BaseRouteSpec with IOFromMap with CatsIOValues {
private val fetchContext = FetchContextDummy(List(project.value))
private val groupDirectives = DeltaSchemeDirectives(fetchContext)

private val config = SchemasConfig(eventLogConfig)

private val schemaDef = Schemas.definition(ValidateSchema(ValidateShacl(rcr).accepted), clock)
private lazy val schemaLog = ScopedEventLog(schemaDef, config.eventLog, xas)
private lazy val schemaLog = ScopedEventLog(schemaDef, eventLogConfig, xas)

private lazy val routes =
Route.seal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@
"_total": 2,
"_results": [
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"project": "org/proj",
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
},
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment.{IriSegment, StringSegment}
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
Expand Down Expand Up @@ -127,7 +128,6 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures {
}

"fail to restart offset from view without resources/write permission" in {

Delete(s"$viewEndpoint/offset") ~> routes ~> check {
response.shouldBeForbidden
}
Expand Down Expand Up @@ -155,7 +155,7 @@ class BlazegraphViewsIndexingRoutesSpec extends BlazegraphViewRoutesFixtures {
aclCheck.append(AclAddress.Root, Anonymous -> Set(permissions.write)).accepted
Get(s"$viewEndpoint/failures") ~> routes ~> check {
response.status shouldBe StatusCodes.OK
response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json")
response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ final class CompositeViews private (
/**
* Return all existing views for the given project in a finite stream
*/
def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] =
def currentViews(project: ProjectRef): SuccessElemStream[CompositeViewDef] =
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object CompositeViewsDeletionTask {

def apply(views: CompositeViews) =
new CompositeViewsDeletionTask(
project => views.currentViews(project).evalMapFilter(_.toIO),
project => views.currentViews(project).map(_.value),
(v: ActiveViewDef, subject: Subject) =>
views
.internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@
"_total": 2,
"_results": [
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"project": "myorg/myproj",
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
},
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class CompositeViewsIndexingRoutesSpec extends CompositeViewsRoutesFixtures {
"return failures as a listing" in {
Get(s"$viewEndpoint/failures") ~> asWriter ~> routes ~> check {
response.status shouldBe StatusCodes.OK
response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json")
response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchA
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.BulkResponse.{MixedOutcomes, Success}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.{BulkResponse, Refresh}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchAction, ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{Elem, FailureReason}
import fs2.Chunk
import io.circe.{Json, JsonObject}
import shapeless.Typeable

import scala.concurrent.duration.FiniteDuration
import scala.util.control.NoStackTrace

/**
* Sink that pushes json documents into an Elasticsearch index
Expand Down Expand Up @@ -99,16 +99,9 @@ object ElasticSearchSink {
case element: FailedElem => element
case element =>
items.get(documentId(element)) match {
case None =>
element.failed(
BulkUpdateException(
JsonObject(
"reason" -> Json.fromString(s"${element.id} was not found in Elasticsearch response")
)
)
)
case None => element.failed(onMissingInResponse(element.id))
case Some(MixedOutcomes.Outcome.Success) => element.void
case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(BulkUpdateException(json))
case Some(MixedOutcomes.Outcome.Error(json)) => element.failed(onIndexingFailure(element.id, json))
}
}
}
Expand Down Expand Up @@ -173,7 +166,15 @@ object ElasticSearchSink {
refresh
)

final case class BulkUpdateException(json: JsonObject)
extends Exception("Error updating elasticsearch: " + Json.fromJsonObject(json).noSpaces)
with NoStackTrace
private def onMissingInResponse(id: Iri) = FailureReason(
"MissingInResponse",
s"$id was not found in Elasticsearch response",
JsonObject.empty
)

private def onIndexingFailure(id: Iri, error: JsonObject) = FailureReason(
"IndexingFailure",
s"$id could not be indexed",
error
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,35 @@
"_total": 2,
"_results": [
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"project": "myorg/myproject",
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
},
{
"errorType": "java.lang.Exception",
"id": "https://bluebrain.github.io/nexus/vocabulary/myid",
"message": "boom",
"offset": {
"@type": "At",
"value": 42
},
"_rev": 1
"_rev": 1,
"reason": {
"type": "UnexpectedError",
"message": "boom",
"details": {
"exception" : "java.lang.Exception"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import akka.http.scaladsl.model.Uri.Query
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{IndexLabel, QueryBuilder}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink.BulkUpdateException
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.FailureReason
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.Chunk
Expand Down Expand Up @@ -106,10 +106,17 @@ class ElasticSearchSinkSuite extends NexusSuite with ElasticSearchClientSetup.Fi
// The failed elem should be return intact
_ = assertEquals(Some(failed), result.headOption)
// The invalid one should hold the Elasticsearch error
_ = assert(
result.lift(1).flatMap(_.toThrowable).exists(_.isInstanceOf[BulkUpdateException]),
"We expect a 'BulkUpdateException' as an error here"
)
_ = result.lift(1) match {
case Some(f: FailedElem) =>
f.throwable match {
case reason: FailureReason =>
assertEquals(reason.`type`, "IndexingFailure")
val detailKeys = reason.details.asObject.map(_.keys.toSet)
assertEquals(detailKeys, Some(Set("type", "reason", "caused_by")))
case t => fail(s"An indexing failure was expected, got '$t'", t)
}
case other => fail(s"A failed elem was expected, got '$other'")
}
// The valid one should remain a success and hold a Unit value
_ = assert(result.lift(2).flatMap(_.toOption).contains(()))
_ <- client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment.{IriSegment, StringSegm
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, FetchContextDummy}
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
Expand Down Expand Up @@ -193,7 +194,7 @@ class ElasticSearchIndexingRoutesSpec extends ElasticSearchViewsRoutesFixtures {
aclCheck.append(AclAddress.Root, Anonymous -> Set(esPermissions.write)).accepted
Get(s"$viewEndpoint/failures") ~> routes ~> check {
response.status shouldBe StatusCodes.OK
response.asJson shouldEqual jsonContentOf("routes/list-indexing-errors.json")
response.asJson.removeAllKeys("stacktrace") shouldEqual jsonContentOf("routes/list-indexing-errors.json")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ object StorageDeletionTask {
new StorageDeletionTask(project =>
storages
.currentStorages(project)
.evalMapFilter {
_.map(_.value).toIO
}
.map(_.value.value)
)
}
Loading

0 comments on commit efab603

Please sign in to comment.