Skip to content

Commit

Permalink
Migrate Identities to Cats-effect (#4288)
Browse files Browse the repository at this point in the history
* Migrate Identities to Cats-effect

---------

Co-authored-by: Simon Dumas <simon.dumas@epfl.ch>
  • Loading branch information
imsdu and Simon Dumas authored Sep 23, 2023
1 parent f3cadd6 commit 154af46
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 455 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller._
import kamon.instrumentation.akka.http.TracingDirectives.operationName
import monix.bio.IO
import monix.execution.Scheduler

/**
* The identities routes
*/
class IdentitiesRoutes(identities: Identities, aclCheck: AclCheck)(implicit
override val s: Scheduler,
baseUri: BaseUri,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
Expand Down Expand Up @@ -48,6 +46,6 @@ object IdentitiesRoutes {
def apply(
identities: Identities,
aclCheck: AclCheck
)(implicit baseUri: BaseUri, s: Scheduler, cr: RemoteContextResolution, ordering: JsonKeyOrdering): Route =
)(implicit baseUri: BaseUri, cr: RemoteContextResolution, ordering: JsonKeyOrdering): Route =
new IdentitiesRoutes(identities, aclCheck).routes
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ import ch.epfl.bluebrain.nexus.delta.sdk.realms.model.{Realm, RealmRejection}
import io.circe.Decoder
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder
import monix.execution.Scheduler

import scala.annotation.nowarn

class RealmsRoutes(identities: Identities, realms: Realms, aclCheck: AclCheck)(implicit
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
) extends AuthDirectives(identities, aclCheck)
Expand Down Expand Up @@ -142,7 +140,6 @@ object RealmsRoutes {
def apply(identities: Identities, realms: Realms, aclCheck: AclCheck)(implicit
baseUri: BaseUri,
paginationConfig: PaginationConfig,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering
): Route =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.SchemaRejection.SchemaNotFound
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.{Schema, SchemaRejection}
import io.circe.{Json, Printer}
import monix.execution.Scheduler

/**
* The schemas routes
Expand All @@ -52,7 +51,6 @@ final class SchemasRoutes(
indexAction: IndexingAction.Execute[Schema]
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
Expand Down Expand Up @@ -192,7 +190,6 @@ object SchemasRoutes {
index: IndexingAction.Execute[Schema]
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
import akka.http.scaladsl.model.{HttpRequest, Uri}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.IdentitiesRoutes
import ch.epfl.bluebrain.nexus.delta.sdk.PriorityRoute
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.auth.{AuthTokenProvider, OpenIdAuthService}
import ch.epfl.bluebrain.nexus.delta.sdk.http.{HttpClient, HttpClientError}
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.identities.{Identities, IdentitiesImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SearchParams.RealmSearchParams
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceF}
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms
import ch.epfl.bluebrain.nexus.delta.sdk.realms.model.Realm
import io.circe.Json
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.{IO, UIO}
import monix.execution.Scheduler
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Identities module wiring config.
Expand All @@ -35,16 +27,7 @@ object IdentitiesModule extends ModuleDef {
make[CacheConfig].from((cfg: AppConfig) => cfg.identities)

make[Identities].fromEffect { (realms: Realms, hc: HttpClient @Id("realm"), config: CacheConfig) =>
val findActiveRealm: String => UIO[Option[Realm]] = { (issuer: String) =>
val pagination = FromPagination(0, 1000)
val params = RealmSearchParams(issuer = Some(issuer), deprecated = Some(false))
val sort = ResourceF.defaultSort[Realm]
realms.list(pagination, params, sort).map { _.results.map(entry => entry.source.value).headOption }.toUIO
}
val getUserInfo: (Uri, OAuth2BearerToken) => IO[HttpClientError, Json] = { (uri: Uri, token: OAuth2BearerToken) =>
hc.toJson(HttpRequest(uri = uri, headers = List(Authorization(token))))
}
IdentitiesImpl(findActiveRealm, getUserInfo, config)
IdentitiesImpl(realms, hc, config).toUIO
}

make[OpenIdAuthService].from { (httpClient: HttpClient @Id("realm"), realms: Realms) =>
Expand All @@ -63,11 +46,10 @@ object IdentitiesModule extends ModuleDef {
(
identities: Identities,
aclCheck: AclCheck,
s: Scheduler,
baseUri: BaseUri,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) => new IdentitiesRoutes(identities, aclCheck)(s, baseUri, cr, ordering)
) => new IdentitiesRoutes(identities, aclCheck)(baseUri, cr, ordering)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.http.scaladsl.model.{HttpRequest, Uri}
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
Expand All @@ -20,7 +21,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Realms module wiring config.
Expand All @@ -46,11 +46,10 @@ object RealmsModule extends ModuleDef {
realms: Realms,
cfg: AppConfig,
aclCheck: AclCheck,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering
) =>
new RealmsRoutes(identities, realms, aclCheck)(cfg.http.baseUri, cfg.realms.pagination, s, cr, ordering)
new RealmsRoutes(identities, realms, aclCheck)(cfg.http.baseUri, cfg.realms.pagination, cr, ordering)
}

make[HttpClient].named("realm").from { (as: ActorSystem[Nothing], sc: Scheduler) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.{SchemaImports, Schemas, Schema
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.execution.Scheduler

/**
* Schemas wiring
Expand Down Expand Up @@ -74,14 +73,12 @@ object SchemasModule extends ModuleDef {
indexingAction: IndexingAction @Id("aggregate"),
shift: Schema.Shift,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
) =>
new SchemasRoutes(identities, aclCheck, schemas, schemeDirectives, indexingAction(_, _, _)(shift, cr))(
baseUri,
s,
cr,
ordering,
fusionConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package ch.epfl.bluebrain.nexus.delta.kernel.cache

import cats.effect.IO
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

/**
* An arbitrary key value store.
*
* @tparam K
* the key type
* @tparam V
* the value type
*/
trait LocalCache[K, V] {

/**
* Adds the (key, value) to the store, replacing the current value if the key already exists.
*
* @param key
* the key under which the value is stored
* @param value
* the value stored
*/
def put(key: K, value: V): IO[Unit]

/**
* Deletes a key from the store.
*
* @param key
* the key to be deleted from the store
*/
def remove(key: K): IO[Unit]

/**
* @return
* all the entries in the store
*/
def entries: IO[Map[K, V]]

/**
* @return
* a vector of all the values in the store
*/
def values: IO[Vector[V]] = entries.map(_.values.toVector)

/**
* @param key
* the key
* @return
* an optional value for the provided key
*/
def get(key: K): IO[Option[V]]

/**
* Fetch the value for the given key and if not, compute the new value, insert it in the store and return it This
* operation is not atomic.
* @param key
* the key
* @param op
* the computation yielding the value to associate with `key`, if `key` is previously unbound.
*/
def getOrElseUpdate(key: K, op: => IO[V]): IO[V] =
get(key).flatMap {
case Some(value) => IO.pure(value)
case None =>
op.flatMap { newValue =>
put(key, newValue).as(newValue)
}
}

/**
* Fetch the value for the given key and if not, compute the new value, insert it in the store if defined and return
* it This operation is not atomic.
* @param key
* the key
* @param op
* the computation yielding the value to associate with `key`, if `key` is previously unbound.
*/
def getOrElseAttemptUpdate(key: K, op: => IO[Option[V]]): IO[Option[V]] =
get(key).flatMap {
case Some(value) => IO.pure(Some(value))
case None =>
op.flatMap {
case Some(newValue) => put(key, newValue).as(Some(newValue))
case None => IO.none
}
}

/**
* Tests whether the cache contains the given key.
* @param key
* the key to be tested
*/
def containsKey(key: K): IO[Boolean] = get(key).map(_.isDefined)

}

object LocalCache {

/**
* Constructs a local key-value store
*/
final def apply[K, V](): IO[LocalCache[K, V]] =
IO.delay {
val cache: Cache[K, V] =
Caffeine
.newBuilder()
.build[K, V]()
new LocalCacheImpl(cache)
}

/**
* Constructs a local key-value store following a LRU policy
*
* @param config
* the cache configuration
*/
final def lru[K, V](config: CacheConfig): IO[LocalCache[K, V]] =
lru(config.maxSize.toLong, config.expireAfter)

/**
* Constructs a local key-value store following a LRU policy
*
* @param maxSize
* the max number of entries
* @param expireAfterAccess
* Entries will be removed one the givenduration has elapsed after the entry's creation, the most recent
* replacement of its value, or its last access.
*/
final def lru[K, V](maxSize: Long, expireAfterAccess: FiniteDuration = 1.hour): IO[LocalCache[K, V]] =
IO.delay {
val cache: Cache[K, V] =
Caffeine
.newBuilder()
.expireAfterAccess(expireAfterAccess.toJava)
.maximumSize(maxSize)
.build[K, V]()
new LocalCacheImpl(cache)
}

/**
* Constructs a local key-value store
*
* @param config
* the cache configuration
*/
final def apply[K, V](config: CacheConfig): IO[LocalCache[K, V]] =
apply(config.maxSize.toLong, config.expireAfter)

/**
* Constructs a local key-value store
* @param maxSize
* the max number of entries
* @param expireAfterWrite
* Entries will be removed one the givenduration has elapsed after the entry's creation or the most recent
* replacement of its value.
*/
final def apply[K, V](maxSize: Long, expireAfterWrite: FiniteDuration = 1.hour): IO[LocalCache[K, V]] =
IO.delay {
val cache: Cache[K, V] =
Caffeine
.newBuilder()
.expireAfterWrite(expireAfterWrite.toJava)
.maximumSize(maxSize)
.build[K, V]()
new LocalCacheImpl(cache)
}

private class LocalCacheImpl[K, V](cache: Cache[K, V]) extends LocalCache[K, V] {

override def put(key: K, value: V): IO[Unit] = IO.delay(cache.put(key, value))

override def get(key: K): IO[Option[V]] = IO.delay(Option(cache.getIfPresent(key)))

override def remove(key: K): IO[Unit] = IO.delay(cache.invalidate(key))

override def entries: IO[Map[K, V]] = IO.delay(cache.asMap().asScala.toMap)
}
}
Loading

0 comments on commit 154af46

Please sign in to comment.