Skip to content

Commit

Permalink
Filter by type in the SQL query for Composite Views (#4231)
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Aug 31, 2023
1 parent 89edef4 commit f7b24c6
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.events
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems
import io.circe.syntax.EncoderOps
Expand Down Expand Up @@ -54,30 +54,32 @@ class ElemRoutes(
pathPrefix("elems") {
resolveProjectRef { project =>
authorizeFor(project, events.read).apply {
concat(
(get & pathPrefix("continuous") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter.tagOrLatest(tag), offset))
(parameter("tag".as[UserTag].?) & types(project)) { (tag, types) =>
concat(
(get & pathPrefix("continuous")) {
operationName(s"$prefixSegment/$project/elems/continuous") {
emit(sseElemStream.continuous(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("currents")) {
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter(types, tag.getOrElse(Latest)), offset))
}
},
(get & pathPrefix("remaining")) {
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter(types, tag.getOrElse(Latest)), offset).map {
r => r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
head {
complete(OK)
}
},
(get & pathPrefix("currents") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/currents") {
emit(sseElemStream.currents(project, SelectFilter.tagOrLatest(tag), offset))
}
},
(get & pathPrefix("remaining") & parameter("tag".as[UserTag].?)) { tag =>
operationName(s"$prefixSegment/$project/elems/remaining") {
emit(
sseElemStream.remaining(project, SelectFilter.tagOrLatest(tag), offset).map { r =>
r.getOrElse(RemainingElems(0L, Instant.EPOCH))
}
)
}
},
head {
complete(OK)
}
)
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,14 @@ object DeltaClient {
}
}

private def typeQuery(types: Set[Iri]) =
if (types.isEmpty) Query.Empty
else Query(types.map(t => "type" -> t.toString).toList: _*)

private def elemAddress(source: RemoteProjectSource) =
source.endpoint / "elems" / source.project.organization.value / source.project.project.value
(source.endpoint / "elems" / source.project.organization.value / source.project.project.value)
.withQuery(Query("tag" -> source.selectFilter.tag.toString))
.withQuery(typeQuery(source.selectFilter.types))

override def resourceAsNQuads(source: RemoteProjectSource, id: Iri): HttpResult[Option[NQuads]] = {
implicit val cred: Option[AuthToken] = token(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.semiauto.deriveDefaultJs
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.instances._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Identity, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.{Latest, UserTag}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PipeChain
import io.circe.{Encoder, Json}

Expand Down Expand Up @@ -54,6 +55,13 @@ sealed trait CompositeViewSource extends Product with Serializable {
*/
def resourceTag: Option[UserTag]

/**
* @return
* the [[SelectFilter]] for the given view; used to filter the data that is indexed
*/
def selectFilter: SelectFilter =
SelectFilter(resourceTypes, resourceTag.getOrElse(Latest))

/**
* @return
* whether to consider deprecated resources for indexing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewS
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemPipe, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{RemainingElems, Source}
import io.circe.Json
Expand Down Expand Up @@ -56,27 +55,27 @@ object CompositeGraphStream {
override def main(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.continuous(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
Source(local.continuous(project, p.selectFilter, _).through(drainSource))
case c: CrossProjectSource =>
Source(local.continuous(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
Source(local.continuous(c.project, c.selectFilter, _).through(drainSource))
case r: RemoteProjectSource => remote.main(r)
}
}

override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
Source(local.currents(project, SelectFilter.tagOrLatest(p.resourceTag), _).through(drainSource))
Source(local.currents(project, p.selectFilter, _).through(drainSource))
case c: CrossProjectSource =>
Source(local.currents(c.project, SelectFilter.tagOrLatest(c.resourceTag), _).through(drainSource))
Source(local.currents(c.project, c.selectFilter, _).through(drainSource))
case r: RemoteProjectSource => remote.rebuild(r)
}
}

override def remaining(source: CompositeViewSource, project: ProjectRef): Offset => UIO[Option[RemainingElems]] =
source match {
case p: ProjectSource => local.remaining(project, SelectFilter.tagOrLatest(p.resourceTag), _)
case c: CrossProjectSource => local.remaining(c.project, SelectFilter.tagOrLatest(c.resourceTag), _)
case p: ProjectSource => local.remaining(project, p.selectFilter, _)
case c: CrossProjectSource => local.remaining(c.project, c.selectFilter, _)
case r: RemoteProjectSource => remote.remaining(r, _).map(Some(_))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,4 @@ object SelectFilter {
/** All types with latest tag */
val latest: SelectFilter = SelectFilter(Set.empty, Tag.Latest)

/** All types with specified tag if it exists, otherwise latest */
val tagOrLatest: Option[Tag] => SelectFilter =
tag => SelectFilter(Set.empty, tag.getOrElse(Tag.Latest))

}
11 changes: 11 additions & 0 deletions docs/src/main/paradox/docs/releases/v1.9-release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@ It is now possible to aggregate resources by `@type` or `project`.

### Views

#### Indexing errors listing

Indexing errors can now be listed and filtered for a given view.

@ref:[More information](../delta/api/views/index.md#listing-indexing-failures)

#### Resource type filtering performance improvement

To improve indexing performance, the types defined in the
@ref:[FilterByType pipe](../delta/api/views/pipes.md#filter-by-type),
@ref:[Sparql View payload](../delta/api/views/sparql-view-api.md#payload), or the
@ref:[Composite View source payload](../delta/api/views/composite-view-api.md#sources)
are filtered in PostgreSQL rather than in Nexus Delta.
This avoids querying for data just to discard it straight away.

### Composite views

To enhance performance of indexing of composite views, Nexus Delta introduces the following features.
Expand Down

0 comments on commit f7b24c6

Please sign in to comment.