Skip to content

Commit

Permalink
Introduce pit endpoint for Elasticsearch views (#5084)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Aug 8, 2024
1 parent bfd2cff commit 6172316
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch
import akka.http.scaladsl.model.Uri
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel, PointInTime}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.{DifferentElasticSearchViewType, ViewIsDeprecated, WrappedElasticSearchClientError}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model._
Expand All @@ -21,6 +21,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.{Json, JsonObject}

import scala.concurrent.duration.FiniteDuration

/**
* Allows operations on Elasticsearch views
*/
Expand Down Expand Up @@ -61,6 +63,30 @@ trait ElasticSearchViewsQuery {
): IO[Json] =
this.query(view.viewId, view.project, query, qp)

/**
* Creates a point-in-time to be used in further searches
*
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* @param id
* the target view
* @param project
* project reference in which the view is
* @param keepAlive
* extends the time to live of the corresponding point in time
*/
def createPointInTime(id: IdSegment, project: ProjectRef, keepAlive: FiniteDuration)(implicit
caller: Caller
): IO[PointInTime]

/**
* Deletes the given point-in-time
*
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
*/
def deletePointInTime(pointInTime: PointInTime)(implicit caller: Caller): IO[Unit]

/**
* Fetch the elasticsearch mapping of the provided view
* @param id
Expand Down Expand Up @@ -117,22 +143,41 @@ final class ElasticSearchViewsQueryImpl private[elasticsearch] (
project: ProjectRef
)(implicit caller: Caller): IO[Json] =
for {
_ <- aclCheck.authorizeForOr(project, permissions.write)(AuthorizationFailed(project, permissions.write))
view <- viewStore.fetch(id, project)
idx <- indexOrError(view, id)
search <- client.mapping(IndexLabel.unsafe(idx)).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield search
_ <- aclCheck.authorizeForOr(project, permissions.write)(AuthorizationFailed(project, permissions.write))
view <- viewStore.fetch(id, project)
index <- indexOrError(view, id)
mapping <- client.mapping(index).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield mapping

private def indexOrError(view: View, id: IdSegment): IO[String] = view match {
case IndexingView(_, index, _) => index.pure[IO]
override def createPointInTime(id: IdSegment, project: ProjectRef, keepAlive: FiniteDuration)(implicit
caller: Caller
): IO[PointInTime] =
for {
_ <- aclCheck.authorizeForOr(project, permissions.write)(AuthorizationFailed(project, permissions.write))
view <- viewStore.fetch(id, project)
index <- indexOrError(view, id)
pit <- client.createPointInTime(index, keepAlive).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}
} yield pit

override def deletePointInTime(pointInTime: PointInTime)(implicit caller: Caller): IO[Unit] =
client.deletePointInTime(pointInTime).adaptError { case e: HttpClientError =>
WrappedElasticSearchClientError(e)
}

private def indexOrError(view: View, id: IdSegment): IO[IndexLabel] = view match {
case IndexingView(_, index, _) => IO.fromEither(IndexLabel(index))
case _: AggregateView =>
DifferentElasticSearchViewType(
id.toString,
ElasticSearchViewType.AggregateElasticSearch,
ElasticSearchViewType.ElasticSearch
).raiseError[IO, String]
IO.raiseError(
DifferentElasticSearchViewType(
id.toString,
ElasticSearchViewType.AggregateElasticSearch,
ElasticSearchViewType.ElasticSearch
)
)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength:
private val searchPath = "_search"
private val source = "_source"
private val mapping = "_mapping"
private val pit = "_pit"
private val newLine = System.lineSeparator()
private val `application/x-ndjson`: MediaType.WithFixedCharset =
MediaType.applicationWithFixedCharset("x-ndjson", HttpCharsets.`UTF-8`, "json")
Expand Down Expand Up @@ -516,6 +517,32 @@ class ElasticSearchClient(client: HttpClient, endpoint: Uri, maxIndexPathLength:
def mapping(index: IndexLabel): IO[Json] =
client.toJson(Get(endpoint / index.value / mapping).withHttpCredentials)

/**
* Creates a point-in-time to be used in further searches
*
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
* @param index
* the target index
* @param keepAlive
* extends the time to live of the corresponding point in time
*/
def createPointInTime(index: IndexLabel, keepAlive: FiniteDuration): IO[PointInTime] = {
val pitEndpoint = (endpoint / index.value / pit).withQuery(Uri.Query("keep_alive" -> s"${keepAlive.toSeconds}s"))
client.fromJsonTo[PointInTime](Post(pitEndpoint).withHttpCredentials)
}

/**
* Deletes the given point-in-time
*
* @see
* https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html
*/
def deletePointInTime(pointInTime: PointInTime): IO[Unit] =
client.run(Delete(endpoint / pit, pointInTime.asJson).withHttpCredentials) {
case resp if resp.status.isSuccess() => discardEntity(resp)
}

private def discardEntity(resp: HttpResponse) =
IO.delay(resp.discardEntityBytes()) >> IO.unit

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client

import io.circe.Codec
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredCodec

final case class PointInTime(id: String) extends AnyVal with Serializable

object PointInTime {
implicit private val config: Configuration = Configuration.default
implicit val pointInTimeDecoder: Codec[PointInTime] = deriveConfiguredCodec[PointInTime]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.http.scaladsl.model.{StatusCode, StatusCodes}
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.PointInTime
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model._
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.permissions.{read => Read, write => Write}
Expand All @@ -22,8 +23,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdRejection.{DecodingFailed, InvalidJsonLdFormat}
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.RdfMarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import io.circe.syntax.EncoderOps
import io.circe.{Json, JsonObject, Printer}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

/**
* The elasticsearch views routes
*
Expand Down Expand Up @@ -154,6 +159,23 @@ final class ElasticSearchViewsRoutes(
emit(viewsQuery.query(id, project, query, qp).attemptNarrow[ElasticSearchViewRejection])
}
},
// Create a point in time for the given view
(pathPrefix("_pit") & parameter("keep_alive".as[Long]) & post & pathEndOrSingleSlash) { keepAlive =>
val keepAliveDuration = Duration(keepAlive, TimeUnit.SECONDS)
emit(
viewsQuery
.createPointInTime(id, project, keepAliveDuration)
.map(_.asJson)
.attemptNarrow[ElasticSearchViewRejection]
)
},
// Delete a point in time
(pathPrefix("_pit") & entity(as[PointInTime]) & delete & pathEndOrSingleSlash) { pit =>
emit(
StatusCodes.NoContent,
viewsQuery.deletePointInTime(pit).attemptNarrow[ElasticSearchViewRejection]
)
},
// Fetch an elasticsearch view original source
(pathPrefix("source") & get & pathEndOrSingleSlash & idSegmentRef(id)) { id =>
authorizeFor(project, Read).apply {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import io.circe.{Decoder, Json, JsonObject}
import munit.{AnyFixture, Location}

import java.time.Instant
import scala.concurrent.duration._

class ElasticSearchViewsQuerySuite
extends NexusSuite
Expand Down Expand Up @@ -379,6 +380,29 @@ class ElasticSearchViewsQuerySuite
implicit val caller: Caller = alice
viewsQuery.mapping(view1Proj1.viewId, project1.ref)
}

test("Creating a point in time without permission should fail") {
implicit val caller: Caller = anon
viewsQuery
.createPointInTime(view1Proj1.viewId, project1.ref, 30.seconds)
.intercept[AuthorizationFailed]
}

test("Creating a point in time for a view that doesn't exist in the project should fail") {
implicit val caller: Caller = alice
viewsQuery
.createPointInTime(view1Proj2.viewId, project1.ref, 30.seconds)
.interceptEquals(ViewNotFound(view1Proj2.viewId, project1.ref))
}

test("Creating and deleting a point in time with the right access should succeed") {
implicit val caller: Caller = alice
viewsQuery
.createPointInTime(view1Proj1.viewId, project1.ref, 30.seconds)
.flatMap { pit =>
viewsQuery.deletePointInTime(pit)
}
}
}

object ElasticSearchViewsQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ import ch.epfl.bluebrain.nexus.testkit.CirceLiteral
import ch.epfl.bluebrain.nexus.testkit.elasticsearch.ElasticSearchDocker
import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec
import io.circe.{Json, JsonObject}
import org.scalatest.{Assertion, DoNotDiscover}
import org.scalatest.Assertion
import org.scalatest.concurrent.Eventually

import scala.concurrent.duration._

@DoNotDiscover
class ElasticSearchClientSpec(override val docker: ElasticSearchDocker)
class ElasticSearchClientSpec
extends TestKit(ActorSystem("ElasticSearchClientSpec"))
with CatsEffectSpec
with ElasticSearchDocker
with ScalaTestElasticSearchClientSetup
with CirceLiteral
with Eventually {

override val docker: ElasticSearchDocker = this

implicit override def patienceConfig: PatienceConfig = PatienceConfig(6.seconds, 100.millis)
implicit val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))

Expand Down Expand Up @@ -283,5 +285,14 @@ class ElasticSearchClientSpec(override val docker: ElasticSearchDocker)
} yield ()
}.accepted
}

"create a point in time for the given index" in {
val index = IndexLabel.unsafe(genString())
for {
_ <- esClient.createIndex(index)
pit <- esClient.createPointInTime(index, 30.seconds)
_ <- esClient.deletePointInTime(pit)
} yield ()
}.accepted
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.routes

import akka.http.scaladsl.model.Uri
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.PointInTime
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewRejection.ViewIsDeprecated
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViews, ElasticSearchViewsQuery}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
Expand All @@ -11,6 +12,8 @@ import ch.epfl.bluebrain.nexus.testkit.CirceLiteral._
import io.circe.syntax._
import io.circe.{Json, JsonObject}

import scala.concurrent.duration.FiniteDuration

private[routes] class DummyElasticSearchViewsQuery(views: ElasticSearchViews) extends ElasticSearchViewsQuery {

private def toJsonObject(value: Map[String, String]) =
Expand All @@ -30,7 +33,14 @@ private[routes] class DummyElasticSearchViewsQuery(views: ElasticSearchViews) ex
).asJson deepMerge query.asJson
}

def mapping(id: IdSegment, project: ProjectRef)(implicit caller: Caller): IO[Json] =
override def mapping(id: IdSegment, project: ProjectRef)(implicit caller: Caller): IO[Json] =
IO.pure(json"""{"mappings": "mapping"}""")

override def createPointInTime(id: IdSegment, project: ProjectRef, keepAlive: FiniteDuration)(implicit
caller: Caller
): IO[PointInTime] =
IO.pure(PointInTime("xxx"))

override def deletePointInTime(pointInTime: PointInTime)(implicit caller: Caller): IO[Unit] =
IO.unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,20 @@ class ElasticSearchViewsRoutesSpec extends ElasticSearchViewsRoutesFixtures {
}
}

"create a point in time" in {
Post("/v1/views/myorg/myproject/myid2/_pit?keep_alive=30") ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual json"""{"id" : "xxx"}"""
}
}

"delete a point in time" in {
val pit = json"""{"id" : "xxx"}"""
Delete("/v1/views/myorg/myproject/myid2/_pit", pit) ~> routes ~> check {
response.status shouldEqual StatusCodes.NoContent
}
}

"redirect to fusion for the latest version if the Accept header is set to text/html" in {
Get("/v1/views/myorg/myproject/myid") ~> Accept(`text/html`) ~> routes ~> check {
response.status shouldEqual StatusCodes.SeeOther
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
curl -XDELETE \
-H "Content-Type: application/json" \
"http://localhost:8080/v1/views/myorg/myproj/myview/_pit" -d \
'{
"id": "xxx"
}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"id": "xxx"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
curl -XPOST \
-H "Content-Type: application/json" \
"http://localhost:8080/v1/views/myorg/myproj/myview/_pit?keep_alive=300"
Loading

0 comments on commit 6172316

Please sign in to comment.