Skip to content

Commit

Permalink
Apply indexing rev rules to composite view indexing (#4168)
Browse files Browse the repository at this point in the history
* Apply indexing rev rules to composite view indexing

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Aug 18, 2023
1 parent a6921e1 commit fea3c31
Show file tree
Hide file tree
Showing 24 changed files with 874 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client.DeltaClient
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.deletion.CompositeViewsDeletionTask
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{CompositeProjectionLifeCycle, CompositeSpaces, CompositeViewsCoordinator, MetadataPredicates}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{CompositeProjectionLifeCycle, CompositeSinks, CompositeSpaces, CompositeViewsCoordinator, MetadataPredicates}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.migration.MigrateCompositeViews
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model._
Expand Down Expand Up @@ -157,15 +157,24 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
.as(compositeProjections)
}

make[CompositeSpaces.Builder].from {
make[CompositeSpaces].from {
(
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient @Id("blazegraph-composite-indexing-client"),
cfg: CompositeViewsConfig
) =>
CompositeSpaces(cfg.prefix, esClient, blazeClient)
}

make[CompositeSinks].from {
(
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient @Id("blazegraph-composite-indexing-client"),
cfg: CompositeViewsConfig,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate")
) =>
CompositeSpaces.Builder(cfg.prefix, esClient, blazeClient, cfg)(
CompositeSinks(cfg.prefix, esClient, blazeClient, cfg)(
baseUri,
cr
)
Expand Down Expand Up @@ -198,14 +207,16 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
hooks: Set[CompositeProjectionLifeCycle.Hook],
registry: ReferenceRegistry,
graphStream: CompositeGraphStream,
buildSpaces: CompositeSpaces.Builder,
spaces: CompositeSpaces,
sinks: CompositeSinks,
compositeProjections: CompositeProjections
) =>
CompositeProjectionLifeCycle(
hooks,
PipeChain.compile(_, registry),
graphStream,
buildSpaces.apply,
spaces,
sinks,
compositeProjections
)
}
Expand Down Expand Up @@ -301,7 +312,7 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
aclCheck,
views.fetchIndexingView,
views.expand,
CompositeIndexingDetails(projections, graphStream),
CompositeIndexingDetails(projections, graphStream, config.prefix),
projections,
projectionErrors,
schemeDirectives
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.CompositeProjections
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeGraphStream
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.TransientSingleNode
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, PipeChain}
import monix.bio.Task
import monix.bio.{IO, Task}

/**
* Handle the different life stages of a composite view projection
Expand All @@ -23,13 +26,15 @@ trait CompositeProjectionLifeCycle {
def build(view: ActiveViewDef): Task[CompiledProjection]

/**
* Destroy the projection related to the view
* Destroy the projection related to the view if changes related to indexing need to be applied
*/
def destroy(view: ActiveViewDef): Task[Unit]
def destroyOnIndexingChange(prev: ActiveViewDef, next: CompositeViewDef): Task[Unit]
}

object CompositeProjectionLifeCycle {

private val logger: Logger = Logger[CompositeProjectionLifeCycle]

/**
* Hook that allows to capture changes to apply before starting the indexing of a composite view
*/
Expand All @@ -50,28 +55,36 @@ object CompositeProjectionLifeCycle {
hooks: Set[Hook],
compilePipeChain: PipeChain.Compile,
graphStream: CompositeGraphStream,
buildSpaces: ActiveViewDef => CompositeSpaces,
spaces: CompositeSpaces,
sink: CompositeSinks,
compositeProjections: CompositeProjections
): CompositeProjectionLifeCycle = {
def init(view: ActiveViewDef): Task[Unit] = buildSpaces(view).init
def init(view: ActiveViewDef): Task[Unit] = spaces.init(view)

def index(view: ActiveViewDef): Task[CompiledProjection] =
CompositeViewDef.compile(view, buildSpaces(view), compilePipeChain, graphStream, compositeProjections)
CompositeViewDef.compile(view, sink, compilePipeChain, graphStream, compositeProjections)

def destroy(view: ActiveViewDef): Task[Unit] =
def destroyAll(view: ActiveViewDef): Task[Unit] =
for {
_ <- buildSpaces(view).destroy
_ <- spaces.destroyAll(view)
_ <- compositeProjections.deleteAll(view.indexingRef)
} yield ()

apply(hooks, init, index, destroy)
def destroyProjection(view: ActiveViewDef, projection: CompositeViewProjection): Task[Unit] =
for {
_ <- spaces.destroyProjection(view, projection)
_ <- compositeProjections.partialRebuild(view.ref, projection.id)
} yield ()

apply(hooks, init, index, destroyAll, destroyProjection)
}

private[indexing] def apply(
hooks: Set[Hook],
onInit: ActiveViewDef => Task[Unit],
index: ActiveViewDef => Task[CompiledProjection],
onDestroy: ActiveViewDef => Task[Unit]
destroyAll: ActiveViewDef => Task[Unit],
destroyProjection: (ActiveViewDef, CompositeViewProjection) => Task[Unit]
): CompositeProjectionLifeCycle = new CompositeProjectionLifeCycle {

override def init(view: ActiveViewDef): Task[Unit] = onInit(view)
Expand All @@ -93,7 +106,35 @@ object CompositeProjectionLifeCycle {
}
}

override def destroy(view: ActiveViewDef): Task[Unit] = onDestroy(view)
override def destroyOnIndexingChange(prev: ActiveViewDef, next: CompositeViewDef): Task[Unit] =
(prev, next) match {
case (prev, next) if prev.ref != next.ref =>
IO.terminate(new IllegalArgumentException(s"Different views were provided: '${prev.ref}' and '${next.ref}'"))
case (prev, _: DeprecatedViewDef) =>
logger.info(s"View '${prev.ref}' has been deprecated, cleaning up the current one.") >> destroyAll(prev)
case (prev, nextActive: ActiveViewDef) if prev.indexingRev != nextActive.indexingRev =>
logger.info(s"View '${prev.ref}' sources have changed, cleaning up the current one..") >> destroyAll(prev)
case (prev, nextActive: ActiveViewDef) =>
checkProjections(prev, nextActive)
}

private def checkProjections(prev: ActiveViewDef, nextActive: ActiveViewDef) =
prev.projections.nonEmptyTraverse { prevProjection =>
nextActive.projection(prevProjection.id) match {
case Right(nextProjection) =>
Task.when(prevProjection.indexingRev != nextProjection.indexingRev)(
logger.info(
s"Projection ${prevProjection.id} of view '${prev.ref}' has changed, cleaning up the current one.."
) >>
destroyProjection(prev, prevProjection)
)
case Left(_) =>
logger.info(
s"Projection ${prevProjection.id} of view '${prev.ref}' was removed, cleaning up the current one.."
) >>
destroyProjection(prev, prevProjection)
}
}.void
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphSink
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeSink
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink

/**
* Defines the sinks for the indexing progress for a composite view
*/
trait CompositeSinks {

/**
* The sink for the current namespace
*/
def commonSink(view: ActiveViewDef): Sink

/**
* The sink for a given projection
*/
def projectionSink(view: ActiveViewDef, target: CompositeViewProjection): Sink
}

object CompositeSinks {

def apply(
prefix: String,
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient,
cfg: CompositeViewsConfig
)(implicit base: BaseUri, rcr: RemoteContextResolution): CompositeSinks = new CompositeSinks {

/**
* The sink for the current namespace
*/
override def commonSink(view: ActiveViewDef): Sink = {
val common = commonNamespace(view.uuid, view.indexingRev, prefix)
BlazegraphSink(blazeClient, cfg.blazegraphBatch, common)
}

/**
* The sink for a given projection
*/
override def projectionSink(view: ActiveViewDef, target: CompositeViewProjection): Sink = {
val common = commonNamespace(view.uuid, view.indexingRev, prefix)
target match {
case e: ElasticSearchProjection =>
val index = projectionIndex(e, view.uuid, prefix)
CompositeSink.elasticSink(blazeClient, esClient, index, common, cfg).apply(e)
case s: SparqlProjection =>
val namespace = projectionNamespace(s, view.uuid, prefix)
CompositeSink.blazeSink(blazeClient, namespace, common, cfg).apply(s)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,98 +1,77 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.BlazegraphSink
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeSink
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import com.typesafe.scalalogging.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import monix.bio.Task

/**
* Defines the pipes, sinks for the indexing progress as well as the init and destroy tasks for a composite view
* @param init
* the task to create the different namespaces and indices
* @param destroy
* the task to destroy the different namespaces and indices
* @param commonSink
* the sink for the common sparql space
* @param targetSink
* the function to create a sink for a [[CompositeViewProjection]]
* Defines the operations to create and destroy the namespaces of a composite view
*/
final case class CompositeSpaces(
init: Task[Unit],
destroy: Task[Unit],
commonSink: Sink,
targetSink: CompositeViewProjection => Sink
)
trait CompositeSpaces {

object CompositeSpaces {

private val logger: Logger = Logger[CompositeSpaces]
/**
* Creates all spaces for the given view
*/
def init(view: ActiveViewDef): Task[Unit]

trait Builder {
/**
* Destroys all spaces for the given view
*/
def destroyAll(view: ActiveViewDef): Task[Unit]

/**
* Compute the spaces for the given view
* @param view
* the active view
*/
def apply(view: ActiveViewDef): CompositeSpaces
}
/**
* Destroys space for the projection of the given view
*/
def destroyProjection(view: ActiveViewDef, projection: CompositeViewProjection): Task[Unit]

object Builder {
def apply(
prefix: String,
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient,
cfg: CompositeViewsConfig
)(implicit base: BaseUri, rcr: RemoteContextResolution): CompositeSpaces.Builder = (view: ActiveViewDef) => {
}

// Operations and sinks for the common space
val common = commonNamespace(view.uuid, view.indexingRev, prefix)
val commonSink = BlazegraphSink(blazeClient, cfg.blazegraphBatch, common)
val createCommon = blazeClient.createNamespace(common)
val deleteCommon = blazeClient.deleteNamespace(common)
object CompositeSpaces {

// Create sinks
def createBlazeSink(namespace: String): SparqlProjection => Sink =
CompositeSink.blazeSink(blazeClient, namespace, common, cfg)
def createEsSink(index: IndexLabel): ElasticSearchProjection => Sink =
CompositeSink.elasticSink(blazeClient, esClient, index, common, cfg)
private val logger: Logger = Logger[CompositeSpaces]

// Compute the init and destroy operations as well as the sink for the different projections of the composite views
val start: (Task[Unit], Task[Unit], Map[Iri, Sink]) = (createCommon.void, deleteCommon.void, Map.empty[Iri, Sink])
val (init, destroy, sinkMap) = view.value.projections.foldLeft(start) {
case ((create, delete, sinkMap), p: ElasticSearchProjection) =>
val index = projectionIndex(p, view.uuid, prefix)
(
create >> esClient.createIndex(index, Some(p.mapping), p.settings).void,
delete >> esClient.deleteIndex(index).void,
sinkMap.updated(p.id, createEsSink(index)(p))
)
case ((create, delete, sinkMap), s: SparqlProjection) =>
def apply(
prefix: String,
esClient: ElasticSearchClient,
blazeClient: BlazegraphClient
): CompositeSpaces = new CompositeSpaces {
override def init(view: ActiveViewDef): Task[Unit] = {
val common = commonNamespace(view.uuid, view.indexingRev, prefix)
val createCommon = blazeClient.createNamespace(common).void
val result = view.value.projections.foldLeft[Task[Unit]](createCommon) {
case (acc, e: ElasticSearchProjection) =>
val index = projectionIndex(e, view.uuid, prefix)
acc >> esClient.createIndex(index, Some(e.mapping), e.settings).void
case (acc, s: SparqlProjection) =>
val namespace = projectionNamespace(s, view.uuid, prefix)
(
create >> blazeClient.createNamespace(namespace).void,
delete >> blazeClient.deleteNamespace(namespace).void,
sinkMap.updated(s.id, createBlazeSink(namespace)(s))
)
acc >> blazeClient.createNamespace(namespace).void
}
logger.debug(s"Creating namespaces and indices for composite view ${view.ref}") >> result
}

CompositeSpaces(
Task.delay(logger.debug("Creating namespaces and indices for composite view {}", view.ref)) >> init,
Task.delay(logger.debug("Deleting namespaces and indices for composite view {}", view.ref)) >> destroy,
commonSink,
(p: CompositeViewProjection) => sinkMap(p.id)
)
override def destroyAll(view: ActiveViewDef): Task[Unit] = {
val common = commonNamespace(view.uuid, view.indexingRev, prefix)
val deleteCommon = blazeClient.deleteNamespace(common).void
val result = view.value.projections.foldLeft[Task[Unit]](deleteCommon) { case (acc, p) =>
acc >> destroyProjection(view, p)
}
logger.debug(s"Deleting namespaces and indices for composite view ${view.ref}") >> result
}

override def destroyProjection(view: ActiveViewDef, projection: CompositeViewProjection): Task[Unit] =
logger.debug(s"Deleting namespace/index for projection ${projection.id} of composite view ${view.ref}") >> {
projection match {
case e: ElasticSearchProjection =>
val index = projectionIndex(e, view.uuid, prefix)
esClient.deleteIndex(index).void
case s: SparqlProjection =>
val namespace = projectionNamespace(s, view.uuid, prefix)
blazeClient.deleteNamespace(namespace).void
}
}
}
}
Loading

0 comments on commit fea3c31

Please sign in to comment.