Skip to content

Commit

Permalink
Merge branch 'master' into refactor-s3-storage-access
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb authored Mar 28, 2024
2 parents 69e7896 + aef5755 commit af4829f
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 84 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/ci-delta-ship.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ jobs:
- name: Clean, build Delta & Storage images
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
clean \
app/Docker/publishLocal
- name: Start services
run: docker-compose -f tests/docker/docker-compose.yml up -d
Expand All @@ -46,4 +45,8 @@ jobs:
- name: Unit tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
ship-unit-tests
"ship/testOnly *Suite"
- name: Integration tests
run: |
sbt -Dsbt.color=always -Dsbt.supershell=false \
"ship/testOnly *Spec"
14 changes: 8 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -743,16 +743,18 @@ lazy val delta = project
lazy val ship = project
.in(file("ship"))
.settings(
name := "nexus-ship",
moduleName := "nexus-ship"
name := "nexus-ship",
moduleName := "nexus-ship",
Test / parallelExecution := false
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, JavaAgent, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, kamonSettings, coverage, release)
.dependsOn(
sdk % "compile->compile;test->test",
blazegraphPlugin % "compile->compile",
elasticsearchPlugin % "compile->compile",
tests % "test->compile;test->test"
sdk % "compile->compile;test->test",
blazegraphPlugin % "compile->compile",
compositeViewsPlugin % "compile->compile",
elasticsearchPlugin % "compile->compile",
tests % "test->compile;test->test"
)
.settings(
libraryDependencies ++= Seq(declineEffect),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViews._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewCommand._
Expand All @@ -27,13 +26,16 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects}
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import ch.epfl.bluebrain.nexus.delta.sourcing._
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.DependsOn
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

import scala.concurrent.duration.FiniteDuration

/**
* Composite views resource lifecycle operations.
*/
Expand Down Expand Up @@ -467,7 +469,8 @@ object CompositeViews {
fetchContext: FetchContext,
contextResolution: ResolverContextResolution,
validate: ValidateCompositeView,
config: CompositeViewsConfig,
minIntervalRebuild: FiniteDuration,
eventLogConfig: EventLogConfig,
xas: Transactors,
clock: Clock[IO]
)(implicit
Expand All @@ -476,13 +479,13 @@ object CompositeViews {
): IO[CompositeViews] =
IO
.delay(
CompositeViewFieldsJsonLdSourceDecoder(uuidF, contextResolution, config.minIntervalRebuild)
CompositeViewFieldsJsonLdSourceDecoder(uuidF, contextResolution, minIntervalRebuild)
)
.map { sourceDecoder =>
new CompositeViews(
ScopedEventLog(
definition(validate, clock),
config.eventLog,
eventLogConfig,
xas
),
fetchContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
fetchContext,
contextResolution,
validate,
config,
config.minIntervalRebuild,
config.eventLog,
xas,
clock
)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import cats.data.NonEmptyList
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.RebuildStrategy
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.{Configuration, JsonLdDecoder}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoderError.ParsingFailure
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.configuration.semiauto.deriveConfigJsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.semiauto.deriveDefaultJsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.iriStringContextSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, Json}
Expand Down Expand Up @@ -73,6 +75,12 @@ object CompositeViewFields {
}
deriveDefaultJsonLdDecoder[RebuildStrategy]
}
deriveDefaultJsonLdDecoder[CompositeViewFields]

val ctx = Configuration.default.context
.addAliasIdType("description", iri"http://schema.org/description")
.addAliasIdType("name", iri"http://schema.org/name")
implicit val config = Configuration.default.copy(context = ctx)

deriveConfigJsonLdDecoder[CompositeViewFields]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class CompositeViewsSpec
fetchContext,
ResolverContextResolution(rcr),
alwaysValidate,
config,
config.minIntervalRebuild,
config.eventLog,
xas,
clock
).accepted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class CompositeViewsRoutesSpec extends CompositeViewsRoutesFixtures {
fetchContext,
ResolverContextResolution(rcr),
alwaysValidate,
config,
config.minIntervalRebuild,
config.eventLog,
xas,
clock
).accepted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class SearchScopeInitializationSpec
fetchContext,
ResolverContextResolution(rcr),
alwaysValidate,
config,
config.minIntervalRebuild,
config.eventLog,
xas,
clock
).accepted
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,34 @@ import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverWiring

import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts => esContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts => bgContexts}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{contexts => compositeViewContexts}

object ContextWiring {

implicit private val loader: ClasspathResourceLoader = ClasspathResourceLoader.withContext(getClass)

def remoteContextResolution: IO[RemoteContextResolution] =
for {
metadataCtx <- ContextValue.fromFile("contexts/metadata.json")
pipelineCtx <- ContextValue.fromFile("contexts/pipeline.json")
shaclCtx <- ContextValue.fromFile("contexts/shacl.json")
schemasMetaCtx <- ContextValue.fromFile("contexts/schemas-metadata.json")
elasticsearchCtx <- ContextValue.fromFile("contexts/elasticsearch.json")
blazegraphCtx <- ContextValue.fromFile("contexts/sparql.json")
compositeCtx <- ContextValue.fromFile("contexts/composite-views.json")
} yield RemoteContextResolution.fixed(
// Delta
contexts.pipeline -> pipelineCtx,
contexts.metadata -> metadataCtx,
contexts.pipeline -> pipelineCtx,
// Schema
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx,
contexts.shacl -> shaclCtx,
contexts.schemasMetadata -> schemasMetaCtx,
// ElasticSearch
esContexts.elasticsearch -> elasticsearchCtx,
esContexts.elasticsearch -> elasticsearchCtx,
// Blazegraph
bgContexts.blazegraph -> blazegraphCtx
bgContexts.blazegraph -> blazegraphCtx,
// Composite views
compositeViewContexts.compositeViews -> compositeCtx
)

def resolverContextResolution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import ch.epfl.bluebrain.nexus.ship.projects.ProjectProcessor
import ch.epfl.bluebrain.nexus.ship.resolvers.ResolverProcessor
import ch.epfl.bluebrain.nexus.ship.resources.{ResourceProcessor, ResourceWiring}
import ch.epfl.bluebrain.nexus.ship.schemas.{SchemaProcessor, SchemaWiring}
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, ElasticSearchViewProcessor}
import ch.epfl.bluebrain.nexus.ship.views.{BlazegraphViewProcessor, CompositeViewProcessor, ElasticSearchViewProcessor}
import fs2.Stream
import fs2.io.file.{Files, Path}
import io.circe.parser.decode
Expand Down Expand Up @@ -66,6 +66,7 @@ class RunShip {
resourceProcessor = ResourceProcessor(resourceLog, fetchContext, eventClock)
esViewsProcessor <- ElasticSearchViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
bgViewsProcessor = BlazegraphViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
compositeViewsProcessor = CompositeViewProcessor(fetchContext, rcr, eventLogConfig, eventClock, xas)
report <- EventProcessor
.run(
events,
Expand All @@ -74,7 +75,8 @@ class RunShip {
schemaProcessor,
resourceProcessor,
esViewsProcessor,
bgViewsProcessor
bgViewsProcessor,
compositeViewsProcessor
)
} yield report
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object ShipConfig {
def merge(externalConfigPath: Option[Path]): IO[(ShipConfig, Config)] =
for {
externalConfig <- Configs.parseFile(externalConfigPath.map(_.toNioPath.toFile))
defaultConfig <- Configs.parseResource("default.conf")
defaultConfig <- Configs.parseResource("ship-default.conf")
result <- Configs.merge[ShipConfig]("ship", externalConfig, defaultConfig)
} yield result

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ch.epfl.bluebrain.nexus.ship.views

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection.{IncorrectRev, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{CompositeViewEvent, CompositeViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.{CompositeViews, ValidateCompositeView}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityType
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.ship.views.CompositeViewProcessor.logger
import ch.epfl.bluebrain.nexus.ship.{EventClock, EventProcessor, ImportStatus}
import io.circe.Decoder

import java.util.UUID
import scala.concurrent.duration.DurationInt

class CompositeViewProcessor(views: UUID => IO[CompositeViews], clock: EventClock)
extends EventProcessor[CompositeViewEvent] {
override def resourceType: EntityType = CompositeViews.entityType

override def decoder: Decoder[CompositeViewEvent] = CompositeViewEvent.serializer.codec

override def evaluate(event: CompositeViewEvent): IO[ImportStatus] =
for {
_ <- clock.setInstant(event.instant)
result <- evaluateInternal(event)
} yield result

private def evaluateInternal(event: CompositeViewEvent): IO[ImportStatus] = {
implicit val s: Subject = event.subject
implicit val c: Caller = Caller(s, Set.empty)
val cRev = event.rev - 1

event match {
case e: CompositeViewCreated => views(event.uuid).flatMap(_.create(e.project, e.source))
case e: CompositeViewUpdated => views(event.uuid).flatMap(_.update(e.id, e.project, cRev, e.source))
case e: CompositeViewDeprecated => views(event.uuid).flatMap(_.deprecate(e.id, e.project, cRev))
case e: CompositeViewUndeprecated => views(event.uuid).flatMap(_.undeprecate(e.id, e.project, cRev))
case _: CompositeViewTagAdded => IO.unit // TODO: Can/should we tag?
}
}.redeemWith(
{
case a: ResourceAlreadyExists => logger.warn(a)("The resource already exists").as(ImportStatus.Dropped)
case i: IncorrectRev => logger.warn(i)("An incorrect revision has been provided").as(ImportStatus.Dropped)
case other => IO.raiseError(other)
},
_ => IO.pure(ImportStatus.Success)
)
}

object CompositeViewProcessor {

private val logger = Logger[CompositeViewProcessor]

def apply(
fetchContext: FetchContext,
rcr: ResolverContextResolution,
config: EventLogConfig,
clock: EventClock,
xas: Transactors
)(implicit
jsonLdApi: JsonLdApi
): CompositeViewProcessor = {
val noValidation = new ValidateCompositeView {
override def apply(uuid: UUID, value: CompositeViewValue): IO[Unit] = IO.unit
}

val views = (uuid: UUID) =>
CompositeViews(
fetchContext,
rcr,
noValidation,
3.seconds,
config,
xas,
clock
)(jsonLdApi, UUIDF.fixed(uuid))

new CompositeViewProcessor(views, clock)

}

}
55 changes: 0 additions & 55 deletions ship/src/test/resources/default.conf

This file was deleted.

Loading

0 comments on commit af4829f

Please sign in to comment.