Skip to content

Commit

Permalink
Filter by type in the SQL query for Elasticsearch views (#4201)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Aug 30, 2023
1 parent 18424e6 commit 1bf093f
Show file tree
Hide file tree
Showing 25 changed files with 272 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.HttpResponseFields
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
import io.circe.{Encoder, JsonObject}
Expand Down Expand Up @@ -56,18 +57,18 @@ class ElemRoutes(
concat(
(get & pathPrefix("continuous") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, tag.getOrElse(Tag.latest), offset))
emit(sseElemStream.continuous(project, SelectFilter.tagOrLatest(tag), offset))
}
},
(get & pathPrefix("currents") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, tag.getOrElse(Tag.latest), offset))
emit(sseElemStream.currents(project, SelectFilter.tagOrLatest(tag), offset))
}
},
(get & pathPrefix("remaining") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, tag.getOrElse(Tag.latest), offset).map { r =>
sseElemStream.remaining(project, SelectFilter.tagOrLatest(tag), offset).map { r =>
r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContextDummy
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{ServerSentEventStream, SseElemStream}
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import fs2.Stream
Expand Down Expand Up @@ -45,10 +46,16 @@ class ElemRoutesSpec extends BaseRouteSpec with CirceLiteral {

private val stream = Stream.emits(List(elem1, elem2, elem3)).covary[Task]

override def continuous(project: ProjectRef, tag: Tag, start: Offset): ServerSentEventStream = stream
override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
stream

override def currents(project: ProjectRef, tag: Tag, start: Offset): ServerSentEventStream = stream
override def remaining(project: ProjectRef, tag: Tag, start: Offset): UIO[Option[RemainingElems]] =
override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ServerSentEventStream =
stream
override def remaining(
project: ProjectRef,
selectFilter: SelectFilter,
start: Offset
): UIO[Option[RemainingElems]] =
UIO.some(RemainingElems(999L, Instant.EPOCH))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewState
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -89,7 +90,12 @@ object IndexingViewDef {
graphStream: GraphResourceStream,
sink: Sink
): Task[CompiledProjection] =
compile(v, compilePipeChain, graphStream.continuous(v.ref.project, v.resourceTag.getOrElse(Tag.latest), _), sink)
compile(
v,
compilePipeChain,
graphStream.continuous(v.ref.project, SelectFilter.tagOrLatest(v.resourceTag), _),
sink
)

private def compile(
v: ActiveViewDef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemD
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import io.circe.Encoder
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
Expand Down Expand Up @@ -69,7 +70,9 @@ class BlazegraphViewsIndexingRoutes(
authorizeFor(ref, permissions.read).apply {
emit(
fetch(id, ref)
.flatMap(v => projections.statistics(ref, v.resourceTag, v.projection))
.flatMap(v =>
projections.statistics(ref, SelectFilter.tagOrLatest(v.resourceTag), v.projection)
) // TODO: Fix when introducing BG selectFilter
.rejectOn[ViewNotFound]
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.{CrossProjectSource, ProjectSource, RemoteProjectSource}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemPipe, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemPipe, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{RemainingElems, Source}
import io.circe.Json
Expand Down Expand Up @@ -55,27 +56,27 @@ object CompositeGraphStream {
override def main(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.continuous(project, p.resourceTag.getOrElse(Tag.Latest), _).through(drainSource))
Source(local.continuous(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
case c: CrossProjectSource =>
Source(local.continuous(c.project, c.resourceTag.getOrElse(Tag.Latest), _).through(drainSource))
Source(local.continuous(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
case r: RemoteProjectSource => remote.main(r)
}
}

override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.currents(project, p.resourceTag.getOrElse(Tag.Latest), _).through(drainSource))
Source(local.currents(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
case c: CrossProjectSource =>
Source(local.currents(c.project, c.resourceTag.getOrElse(Tag.Latest), _).through(drainSource))
Source(local.currents(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
case r: RemoteProjectSource => remote.rebuild(r)
}
}

override def remaining(source: CompositeViewSource, project: ProjectRef): Offset => UIO[Option[RemainingElems]] =
source match {
case p: ProjectSource => local.remaining(project, p.resourceTag.getOrElse(Tag.Latest), _)
case c: CrossProjectSource => local.remaining(c.project, c.resourceTag.getOrElse(Tag.Latest), _)
case p: ProjectSource => local.remaining(project, SelectFilter.tagOrLatest(p.resourceTag), _)
case c: CrossProjectSource => local.remaining(c.project, SelectFilter.tagOrLatest(c.resourceTag), _)
case r: RemoteProjectSource => remote.remaining(r, _).map(Some(_))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSear
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -38,7 +38,7 @@ final class ElasticSearchIndexingAction(
cr: RemoteContextResolution
): Task[Option[CompiledProjection]] = view match {
// Synchronous indexing only applies to views that index the latest version
case active: ActiveViewDef if active.resourceTag.isEmpty =>
case active: ActiveViewDef if active.selectFilter.tag == Tag.latest =>
IndexingViewDef
.compile(
active,
Expand All @@ -47,8 +47,8 @@ final class ElasticSearchIndexingAction(
sink(active)
)
.map(Some(_))
case _: ActiveViewDef => UIO.none
case _: DeprecatedViewDef => UIO.none
case _: ActiveViewDef => UIO.none
case _: DeprecatedViewDef => UIO.none
}

def projections(project: ProjectRef, elem: Elem[GraphResource])(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObje
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -42,8 +42,8 @@ object IndexingViewDef {
final case class ActiveViewDef(
ref: ViewRef,
projection: String,
resourceTag: Option[UserTag],
pipeChain: Option[PipeChain],
selectFilter: SelectFilter,
index: IndexLabel,
mapping: JsonObject,
settings: JsonObject,
Expand Down Expand Up @@ -81,8 +81,8 @@ object IndexingViewDef {
ActiveViewDef(
ViewRef(state.project, state.id),
ElasticSearchViews.projectionName(state),
indexing.resourceTag,
indexing.pipeChain,
indexing.selectFilter,
ElasticSearchViews.index(state.uuid, state.indexingRev, prefix),
indexing.mapping.getOrElse(defaultMapping),
indexing.settings.getOrElse(defaultSettings),
Expand All @@ -107,7 +107,7 @@ object IndexingViewDef {
graphStream: GraphResourceStream,
sink: Sink
)(implicit cr: RemoteContextResolution): Task[CompiledProjection] =
compile(v, compilePipeChain, graphStream.continuous(v.ref.project, v.resourceTag.getOrElse(Tag.latest), _), sink)
compile(v, compilePipeChain, graphStream.continuous(v.ref.project, v.selectFilter, _), sink)

private def compile(
v: ActiveViewDef,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model

import cats.syntax.all._
import cats.data.{NonEmptyChain, NonEmptySet}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.IndexingElasticSearchViewValue
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.IndexingElasticSearchViewValue.defaultPipeline
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, PipeStep, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, DiscardMetadata, FilterDeprecated}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.FilterByType.FilterByTypeConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, DiscardMetadata, FilterByType, FilterDeprecated}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, PipeRef}
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}
Expand Down Expand Up @@ -94,6 +97,20 @@ object ElasticSearchViewValue {
PipeChain(pipes)
}

/**
* Creates a [[SelectFilter]] for this view
*/
def selectFilter: SelectFilter = {
val types = pipeline
.collectFirst {
case PipeStep(label, _, Some(config)) if label == FilterByType.ref.label =>
val filterByTypeConfig = JsonLdDecoder[FilterByTypeConfig].apply(config)
filterByTypeConfig.map(_.types).getOrElse(Set.empty)
}
.getOrElse(Set.empty)
SelectFilter(types, resourceTag.getOrElse(Latest))
}

/**
* Returns true if this [[IndexingElasticSearchViewValue]] is equal to the provided
* [[IndexingElasticSearchViewValue]] on the fields which should trigger a reindexing of the view when modified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ final class ElasticSearchIndexingRoutes(
authorizeFor(ref, Read).apply {
emit(
fetch(id, ref)
.flatMap(v => projections.statistics(ref, v.resourceTag, v.projection))
.flatMap(v => projections.statistics(ref, v.selectFilter, v.projection))
.rejectOn[ViewNotFound]
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{NoopSink, PipeChain, PipeRef}
Expand All @@ -42,7 +43,7 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F
ViewRef(project, id1),
projection = id1.toString,
None,
None,
SelectFilter.latest,
index = IndexLabel.unsafe("view1"),
mapping = jobj"""{"properties": { }}""",
settings = jobj"""{"analysis": { }}""",
Expand All @@ -55,8 +56,8 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F
private val view2 = ActiveViewDef(
ViewRef(project, id2),
projection = id2.toString,
Some(UserTag.unsafe("tag")),
None,
SelectFilter.tag(UserTag.unsafe("tag")),
index = IndexLabel.unsafe("view2"),
mapping = jobj"""{"properties": { }}""",
settings = jobj"""{"analysis": { }}""",
Expand All @@ -70,8 +71,8 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F
private val view3 = ActiveViewDef(
ViewRef(project, id3),
projection = id3.toString,
None,
Some(PipeChain(PipeRef.unsafe("xxx") -> ExpandedJsonLd.empty)),
SelectFilter.latest,
index = IndexLabel.unsafe("view3"),
mapping = jobj"""{"properties": { }}""",
settings = jobj"""{"analysis": { }}""",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
import fs2.Stream
Expand All @@ -29,7 +30,7 @@ class ElasticSearchDeletionTaskSuite extends BioSuite with CirceLiteral {
ref,
projection = ref.viewId.toString,
None,
None,
SelectFilter.latest,
index = IndexLabel.unsafe("view1"),
mapping = jobj"""{"properties": { }}""",
settings = jobj"""{"analysis": { }}""",
Expand Down
Loading

0 comments on commit 1bf093f

Please sign in to comment.