Skip to content

Commit

Permalink
Add an enpoint to get the number of ntriples per blazegraph view (#5193)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Oct 18, 2024
1 parent 1f93898 commit 36211a3
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ final class BlazegraphViews(
IO.pure(toIndexViewDef(elem))
}

/**
* Return the existing indexing views in all projects in a finite stream
*/
def currentIndexingViews: SuccessElemStream[IndexingViewDef] =
log.currentStates(Scope.Root).evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return the indexing views in a non-ending stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client
import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding.{Delete, Get, Post}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpCredentials, RawHeader}
import akka.http.scaladsl.model.{HttpEntity, HttpHeader, Uri}
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
Expand Down Expand Up @@ -137,6 +138,25 @@ class BlazegraphClient(
}
}

/**
* List all namespaces in the blazegraph instance
*/
def listNamespaces: IO[Vector[String]] = {
val namespacePredicate = "http://www.bigdata.com/rdf#/features/KB/Namespace"
val describeEndpoint = (endpoint / "namespace").withQuery(Query("describe-each-named-graph" -> "false"))
val request = Get(describeEndpoint).withHeaders(accept(SparqlResultsJson.mediaTypes.toList))
client.fromJsonTo[SparqlResults](request).map { response =>
response.results.bindings.foldLeft(Vector.empty[String]) { case (acc, binding) =>
val isNamespace = binding.get("predicate").exists(_.value == namespacePredicate)
val namespaceName = binding.get("object").map(_.value)
if (isNamespace)
acc ++ namespaceName
else
acc
}
}
}

implicit private val resolvedServiceDescriptionDecoder: FromEntityUnmarshaller[ResolvedServiceDescription] =
stringUnmarshaller.map {
serviceVersion.findFirstMatchIn(_).map(_.group(2)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,6 @@ class SparqlClient(client: HttpClient, endpoint: SparqlQueryEndpoint)(implicit
case None => SparqlRdfXmlResponse(NodeSeq.Empty)
}

private def accept(mediaType: Seq[MediaType]): Accept =
protected def accept(mediaType: Seq[MediaType]): Accept =
Accept(mediaType.map(MediaRange.One(_, 1f)))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives.emit
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives.baseUriPrefix
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import io.circe.syntax.EncoderOps

class BlazegraphSupervisionRoutes(
blazegraphSupervision: BlazegraphSupervision,
identities: Identities,
aclCheck: AclCheck
)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering)
extends AuthDirectives(identities, aclCheck)
with RdfMarshalling {

def routes: Route = baseUriPrefix(baseUri.prefix) {
pathPrefix("supervision") {
extractCaller { implicit caller =>
authorizeFor(AclAddress.Root, supervision.read).apply {
(pathPrefix("blazegraph") & get & pathEndOrSingleSlash) {
emit(blazegraphSupervision.get.map(_.asJson))
}
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import io.circe.syntax.KeyOps
import io.circe.{Encoder, Json, JsonObject}

trait BlazegraphSupervision {
def get: IO[BlazegraphNamespaces]
}

object BlazegraphSupervision {

final case class BlazegraphNamespaces(assigned: Map[ViewRef, Long], unassigned: Map[String, Long]) {
def +(view: ViewRef, count: Long): BlazegraphNamespaces = copy(assigned = assigned + (view -> count))
def +(namespace: String, count: Long): BlazegraphNamespaces = copy(unassigned = unassigned + (namespace -> count))
}

object BlazegraphNamespaces {
val empty: BlazegraphNamespaces = BlazegraphNamespaces(Map.empty, Map.empty)

implicit final val blazegraphNamespacesEncoder: Encoder[BlazegraphNamespaces] = Encoder.AsObject.instance { value =>
val assigned = value.assigned.toVector.sortBy(_._1.toString).map { case (view, count) =>
Json.obj("project" := view.project, "view" := view.viewId, "count" := count)
}

val unassigned = value.unassigned.toVector.sortBy(_._1).map { case (namespace, count) =>
Json.obj("namespace" := namespace, "count" := count)
}

JsonObject("assigned" := Json.arr(assigned: _*), "unassigned" := Json.arr(unassigned: _*))
}
}

def apply(client: BlazegraphClient, viewsByNamespace: ViewByNamespace): BlazegraphSupervision =
new BlazegraphSupervision {
override def get: IO[BlazegraphNamespaces] = {
for {
namespaces <- client.listNamespaces
viewsByNamespace <- viewsByNamespace.get
result <- namespaces.foldLeftM(BlazegraphNamespaces.empty) { case (acc, namespace) =>
client.count(namespace).map { count =>
viewsByNamespace.get(namespace) match {
case Some(view) => acc + (view, count)
case None => acc + (namespace, count)
}
}
}
} yield result
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViews
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import fs2.Stream

/**
* Allows to get a mapping for the active blazegraph views between their namespace and their view reference
*/
object BlazegraphViewByNamespace {

def apply(blazegraphViews: BlazegraphViews): ViewByNamespace = apply(
blazegraphViews.currentIndexingViews.map(_.value)
)

def apply(stream: Stream[IO, IndexingViewDef]): ViewByNamespace = new ViewByNamespace {
override def get: IO[Map[String, ViewRef]] = stream
.fold(Map.empty[String, ViewRef]) {
case (acc, view: ActiveViewDef) => acc + (view.namespace -> view.ref)
case (acc, _) => acc
}
.compile
.last
.map(_.getOrElse(Map.empty))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef

trait ViewByNamespace {
def get: IO[Map[String, ViewRef]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class BlazegraphClientSpec(docker: BlazegraphDocker)
client.createNamespace("some").accepted shouldEqual true
}

"list namespaces" in {
client.listNamespaces.accepted shouldEqual Vector("kb", "some")
}

"attempt to create namespace a second time" in {
client.createNamespace("some", Map.empty).accepted shouldEqual false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.OAuth2BearerToken
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclSimpleCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.identities.IdentitiesDummy
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.supervision
import ch.epfl.bluebrain.nexus.delta.sdk.utils.BaseRouteSpec
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authenticated, Group, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef

class BlazegraphSupervisionRoutesSpec extends BaseRouteSpec {

private val supervisor = User("supervisor", realm)

implicit private val callerSupervisor: Caller =
Caller(supervisor, Set(supervisor, Anonymous, Authenticated(realm), Group("group", realm)))

private val asSupervisor = addCredentials(OAuth2BearerToken("supervisor"))

private val identities = IdentitiesDummy(callerSupervisor)
private val aclCheck = AclSimpleCheck(
(supervisor, AclAddress.Root, Set(supervision.read))
).accepted

private val project = ProjectRef.unsafe("org", "project")
private val first = ViewRef(project, nxv + "first")
private val second = ViewRef(project, nxv + "second")

private val blazegraphSupervision = new BlazegraphSupervision {
override def get: IO[BlazegraphSupervision.BlazegraphNamespaces] = IO.pure(
BlazegraphNamespaces(
Map(first -> 42L, second -> 99L),
Map("kb" -> 0L, "unknown" -> 12L)
)
)
}

private val routes = Route.seal(new BlazegraphSupervisionRoutes(blazegraphSupervision, identities, aclCheck).routes)

"The blazegraph supervision endpoint" should {
"be forbidden without supervision/read permission" in {
Get("/v1/supervision/blazegraph") ~> routes ~> check {
response.shouldBeForbidden
}
}

"be accessible with supervision/read permission and return expected payload" in {
val expected =
json"""
{
"assigned" : [
{
"count" : 42,
"project" : "org/project",
"view" : "https://bluebrain.github.io/nexus/vocabulary/first"
},
{
"count" : 99,
"project" : "org/project",
"view" : "https://bluebrain.github.io/nexus/vocabulary/second"
}
],
"unassigned" : [
{
"count" : 0,
"namespace" : "kb"
},
{
"count" : 12,
"namespace" : "unknown"
}
]
}"""

Get("/v1/supervision/blazegraph") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual expected
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision.BlazegraphSupervision.BlazegraphNamespaces
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import munit.AnyFixture

class BlazegraphSupervisionSuite extends NexusSuite with BlazegraphClientSetup.Fixture {

override def munitFixtures: Seq[AnyFixture[_]] = List(blazegraphClient)

private val project = ProjectRef.unsafe("org", "project")
private val first = ViewRef(project, nxv + "first")
private val second = ViewRef(project, nxv + "second")

private lazy val client = blazegraphClient()
private val viewsByNamespace: ViewByNamespace = new ViewByNamespace {
override def get: IO[Map[String, ViewRef]] = IO.pure(Map("first" -> first, "second" -> second))
}

private lazy val supervision = BlazegraphSupervision(client, viewsByNamespace)

test("Return the supervision for the different namespaces") {
val expected = BlazegraphNamespaces(
Map(first -> 0L, second -> 0L),
Map("kb" -> 0L, "unknown" -> 0L)
)

client.createNamespace("first") >>
client.createNamespace("second") >>
client.createNamespace("unknown") >> supervision.get.assertEquals(expected)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.supervision

import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.Stream

class BlazegraphViewByNamespaceSuite extends NexusSuite {

test("Get the different views by their namespace value") {
val indexingRev = 1
val rev = 2
val project = ProjectRef.unsafe("org", "proj")

def activeView(suffix: String) = {
val id = nxv + suffix
ActiveViewDef(
ViewRef(project, id),
projection = id.toString,
SelectFilter.latest,
None,
namespace = suffix,
indexingRev,
rev
)
}

val view1 = activeView("view1")
val view2 = activeView("view2")

val id3 = nxv + "view3"
val deprecatedView = DeprecatedViewDef(ViewRef(project, id3))

val stream = Stream(view1, view2, deprecatedView)
val expected = Map(view1.namespace -> view1.ref, view2.namespace -> view2.ref)
BlazegraphViewByNamespace(stream).get.assertEquals(expected)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ object Identity extends Generators {
val Radar = UserCredentials(genString(), genString(), testRealm)
}

object supervision {
val Mickey = UserCredentials(genString(), genString(), testRealm)
}

object files {
val Writer = UserCredentials(genString(), genString(), testRealm)
}
Expand All @@ -109,6 +105,6 @@ object Identity extends Generators {
}

lazy val allUsers =
userPermissions.UserWithNoPermissions :: userPermissions.UserWithPermissions :: acls.Marge :: archives.Tweety :: compositeviews.Jerry :: events.BugsBunny :: listings.Bob :: listings.Alice :: aggregations.Charlie :: aggregations.Rose :: orgs.Fry :: orgs.Leela :: projects.Bojack :: projects.PrincessCarolyn :: resources.Rick :: resources.Morty :: storages.Coyote :: views.ScoobyDoo :: mash.Radar :: supervision.Mickey :: files.Writer :: typehierarchy.Writer :: writer :: Nil
userPermissions.UserWithNoPermissions :: userPermissions.UserWithPermissions :: acls.Marge :: archives.Tweety :: compositeviews.Jerry :: events.BugsBunny :: listings.Bob :: listings.Alice :: aggregations.Charlie :: aggregations.Rose :: orgs.Fry :: orgs.Leela :: projects.Bojack :: projects.PrincessCarolyn :: resources.Rick :: resources.Morty :: storages.Coyote :: views.ScoobyDoo :: mash.Radar :: files.Writer :: typehierarchy.Writer :: writer :: Nil

}
Loading

0 comments on commit 36211a3

Please sign in to comment.