diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/IdentitiesModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/IdentitiesModule.scala index a7674443ec..489e0387e8 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/IdentitiesModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/IdentitiesModule.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.wiring +import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority import ch.epfl.bluebrain.nexus.delta.config.AppConfig import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig @@ -34,8 +35,8 @@ object IdentitiesModule extends ModuleDef { new OpenIdAuthService(httpClient, realms) } - make[AuthTokenProvider].fromEffect { (authService: OpenIdAuthService) => - AuthTokenProvider(authService) + make[AuthTokenProvider].fromEffect { (authService: OpenIdAuthService, clock: Clock[IO]) => + AuthTokenProvider(authService)(clock) } many[RemoteContextResolution].addEffect(ContextValue.fromFile("contexts/identities.json").map { ctx => diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala index abce4fdd32..27fbd59de6 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/client/DeltaClient.scala @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept} import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes} import akka.stream.alpakka.sse.scaladsl.EventSource +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri @@ -87,11 +88,12 @@ object DeltaClient { )(implicit as: ActorSystem[Nothing], scheduler: Scheduler - ) extends DeltaClient { + ) extends DeltaClient + with MigrateEffectSyntax { override def projectStatistics(source: RemoteProjectSource): HttpResult[ProjectStatistics] = { for { - authToken <- authTokenProvider(credentials) + authToken <- authTokenProvider(credentials).toBIO request = Get( source.endpoint / "projects" / source.project.organization.value / source.project.project.value / "statistics" @@ -104,7 +106,7 @@ object DeltaClient { override def remaining(source: RemoteProjectSource, offset: Offset): HttpResult[RemainingElems] = { for { - authToken <- authTokenProvider(credentials) + authToken <- authTokenProvider(credentials).toBIO request = Get(elemAddress(source) / "remaining") .addHeader(accept) .addHeader(`Last-Event-ID`(offset.value.toString)) @@ -115,7 +117,7 @@ object DeltaClient { override def checkElems(source: RemoteProjectSource): HttpResult[Unit] = { for { - authToken <- authTokenProvider(credentials) + authToken <- authTokenProvider(credentials).toBIO result <- client(Head(elemAddress(source)).withCredentials(authToken)) { case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.unit } @@ -130,7 +132,7 @@ object DeltaClient { def send(request: HttpRequest): Future[HttpResponse] = { (for { - authToken <- authTokenProvider(credentials) + authToken <- authTokenProvider(credentials).toBIO result <- client[HttpResponse](request.withCredentials(authToken))(IO.pure(_)) } yield result).runToFuture } @@ -164,7 +166,7 @@ object DeltaClient { val resourceUrl = source.endpoint / "resources" / source.project.organization.value / source.project.project.value / "_" / id.toString for { - authToken <- authTokenProvider(credentials) + authToken <- authTokenProvider(credentials).toBIO req = Get( source.resourceTag.fold(resourceUrl)(t => resourceUrl.withQuery(Query("tag" -> t.value))) ).addHeader(Accept(RdfMediaTypes.`application/n-quads`)).withCredentials(authToken) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/remote/client/RemoteDiskStorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/remote/client/RemoteDiskStorageClient.scala index 02f3faa8a1..adf9f2bc37 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/remote/client/RemoteDiskStorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/remote/client/RemoteDiskStorageClient.scala @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.Multipart.FormData import akka.http.scaladsl.model.Multipart.FormData.BodyPart import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.Uri.Path +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.MoveFileRejection.UnexpectedMoveError import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchFileRejection, MoveFileRejection, SaveFileRejection} @@ -34,7 +35,7 @@ import scala.concurrent.duration._ */ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenProvider, credentials: Credentials)( implicit as: ActorSystem -) { +) extends MigrateEffectSyntax { import as.dispatcher private val serviceName = Name.unsafe("remoteStorage") @@ -58,7 +59,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP * the storage bucket name */ def exists(bucket: Label)(implicit baseUri: BaseUri): IO[HttpClientError, Unit] = { - getAuthToken(credentials).flatMap { authToken => + getAuthToken(credentials).toBIO.flatMap { authToken => val endpoint = baseUri.endpoint / "buckets" / bucket.value val req = Head(endpoint).withCredentials(authToken) client(req) { @@ -82,7 +83,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP relativePath: Path, entity: BodyPartEntity )(implicit baseUri: BaseUri): IO[SaveFileRejection, RemoteDiskStorageFileAttributes] = { - getAuthToken(credentials).flatMap { authToken => + getAuthToken(credentials).toBIO.flatMap { authToken => val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath val filename = relativePath.lastSegment.getOrElse("filename") val multipartForm = FormData(BodyPart("file", entity, Map("filename" -> filename))).toEntity() @@ -106,7 +107,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP * the relative path to the file location */ def getFile(bucket: Label, relativePath: Path)(implicit baseUri: BaseUri): IO[FetchFileRejection, AkkaSource] = { - getAuthToken(credentials).flatMap { authToken => + getAuthToken(credentials).toBIO.flatMap { authToken => val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath client.toDataBytes(Get(endpoint).withCredentials(authToken)).mapError { case error @ HttpClientStatusError(_, `NotFound`, _) if !bucketNotFoundType(error) => @@ -129,7 +130,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP bucket: Label, relativePath: Path )(implicit baseUri: BaseUri): IO[FetchFileRejection, RemoteDiskStorageFileAttributes] = { - getAuthToken(credentials).flatMap { authToken => + getAuthToken(credentials).toBIO.flatMap { authToken => val endpoint = baseUri.endpoint / "buckets" / bucket.value / "attributes" / relativePath client.fromJsonTo[RemoteDiskStorageFileAttributes](Get(endpoint).withCredentials(authToken)).mapError { case error @ HttpClientStatusError(_, `NotFound`, _) if !bucketNotFoundType(error) => @@ -156,7 +157,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP sourceRelativePath: Path, destRelativePath: Path )(implicit baseUri: BaseUri): IO[MoveFileRejection, RemoteDiskStorageFileAttributes] = { - getAuthToken(credentials).flatMap { authToken => + getAuthToken(credentials).toBIO.flatMap { authToken => val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / destRelativePath val payload = Json.obj("source" -> sourceRelativePath.toString.asJson) client.fromJsonTo[RemoteDiskStorageFileAttributes](Put(endpoint, payload).withCredentials(authToken)).mapError { diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/AuthTokenProvider.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/AuthTokenProvider.scala index 8d3057a074..31f9ed3691 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/AuthTokenProvider.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/AuthTokenProvider.scala @@ -1,14 +1,14 @@ package ch.epfl.bluebrain.nexus.delta.sdk.auth -import cats.effect.Clock +import cats.effect.{Clock, IO} import ch.epfl.bluebrain.nexus.delta.kernel.Logger -import ch.epfl.bluebrain.nexus.delta.kernel.cache.KeyValueStore +import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax -import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils +import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant import ch.epfl.bluebrain.nexus.delta.sdk.auth.Credentials.ClientCredentials import ch.epfl.bluebrain.nexus.delta.sdk.identities.ParsedToken import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.AuthToken -import monix.bio.UIO +import monix.bio import java.time.{Duration, Instant} @@ -16,21 +16,23 @@ import java.time.{Duration, Instant} * Provides an auth token for the service account, for use when comunicating with remote storage */ trait AuthTokenProvider { - def apply(credentials: Credentials): UIO[Option[AuthToken]] + def apply(credentials: Credentials): IO[Option[AuthToken]] } object AuthTokenProvider { - def apply(authService: OpenIdAuthService): UIO[AuthTokenProvider] = { - KeyValueStore[ClientCredentials, ParsedToken]().map(cache => new CachingOpenIdAuthTokenProvider(authService, cache)) + def apply(authService: OpenIdAuthService)(implicit clock: Clock[IO]): bio.UIO[AuthTokenProvider] = { + LocalCache[ClientCredentials, ParsedToken]() + .map(cache => new CachingOpenIdAuthTokenProvider(authService, cache)) + .toBIO } def anonymousForTest: AuthTokenProvider = new AnonymousAuthTokenProvider def fixedForTest(token: String): AuthTokenProvider = new AuthTokenProvider { - override def apply(credentials: Credentials): UIO[Option[AuthToken]] = UIO.pure(Some(AuthToken(token))) + override def apply(credentials: Credentials): IO[Option[AuthToken]] = IO.pure(Some(AuthToken(token))) } } private class AnonymousAuthTokenProvider extends AuthTokenProvider { - override def apply(credentials: Credentials): UIO[Option[AuthToken]] = UIO.pure(None) + override def apply(credentials: Credentials): IO[Option[AuthToken]] = IO.pure(None) } /** @@ -39,42 +41,42 @@ private class AnonymousAuthTokenProvider extends AuthTokenProvider { */ private class CachingOpenIdAuthTokenProvider( service: OpenIdAuthService, - cache: KeyValueStore[ClientCredentials, ParsedToken] + cache: LocalCache[ClientCredentials, ParsedToken] )(implicit - clock: Clock[UIO] + clock: Clock[IO] ) extends AuthTokenProvider with MigrateEffectSyntax { private val logger = Logger.cats[CachingOpenIdAuthTokenProvider] - override def apply(credentials: Credentials): UIO[Option[AuthToken]] = { + override def apply(credentials: Credentials): IO[Option[AuthToken]] = { credentials match { - case Credentials.Anonymous => UIO.pure(None) - case Credentials.JWTToken(token) => UIO.pure(Some(AuthToken(token))) + case Credentials.Anonymous => IO.pure(None) + case Credentials.JWTToken(token) => IO.pure(Some(AuthToken(token))) case credentials: ClientCredentials => clientCredentialsFlow(credentials) } } - private def clientCredentialsFlow(credentials: ClientCredentials) = { + private def clientCredentialsFlow(credentials: ClientCredentials): IO[Some[AuthToken]] = { for { existingValue <- cache.get(credentials) - now <- IOUtils.instant + now <- IOInstant.now finalValue <- existingValue match { case None => - logger.info("Fetching auth token, no initial value.").toUIO >> + logger.info("Fetching auth token, no initial value.") *> fetchValue(credentials) case Some(value) if isExpired(value, now) => - logger.info("Fetching new auth token, current value near expiry.").toUIO >> + logger.info("Fetching new auth token, current value near expiry.") *> fetchValue(credentials) - case Some(value) => UIO.pure(value) + case Some(value) => IO.pure(value) } } yield { Some(AuthToken(finalValue.rawToken)) } } - private def fetchValue(credentials: ClientCredentials) = { + private def fetchValue(credentials: ClientCredentials): IO[ParsedToken] = { cache.getOrElseUpdate(credentials, service.auth(credentials)) } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/OpenIdAuthService.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/OpenIdAuthService.scala index e1dc547bb4..38685f73e2 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/OpenIdAuthService.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/auth/OpenIdAuthService.scala @@ -4,6 +4,7 @@ import akka.http.javadsl.model.headers.HttpCredentials import akka.http.scaladsl.model.HttpMethods.POST import akka.http.scaladsl.model.headers.Authorization import akka.http.scaladsl.model.{HttpRequest, Uri} +import cats.effect.IO import ch.epfl.bluebrain.nexus.delta.kernel.Secret import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax import ch.epfl.bluebrain.nexus.delta.sdk.auth.Credentials.ClientCredentials @@ -15,7 +16,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.realms.Realms import ch.epfl.bluebrain.nexus.delta.sdk.realms.model.Realm import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label import io.circe.Json -import monix.bio.{IO, UIO} /** * Exchanges client credentials for an auth token with a remote OpenId service, as defined in the specified realm @@ -25,7 +25,7 @@ class OpenIdAuthService(httpClient: HttpClient, realms: Realms) extends MigrateE /** * Exchanges client credentials for an auth token with a remote OpenId service, as defined in the specified realm */ - def auth(credentials: ClientCredentials): UIO[ParsedToken] = { + def auth(credentials: ClientCredentials): IO[ParsedToken] = { for { realm <- findRealm(credentials.realm) response <- requestToken(realm.tokenEndpoint, credentials.user, credentials.password) @@ -35,14 +35,14 @@ class OpenIdAuthService(httpClient: HttpClient, realms: Realms) extends MigrateE } } - private def findRealm(id: Label): UIO[Realm] = { + private def findRealm(id: Label): IO[Realm] = { for { - realm <- realms.fetch(id).toUIO - _ <- UIO.when(realm.deprecated)(UIO.terminate(RealmIsDeprecated(realm.value))) + realm <- realms.fetch(id) + _ <- IO.raiseWhen(realm.deprecated)(RealmIsDeprecated(realm.value)) } yield realm.value } - private def requestToken(tokenEndpoint: Uri, user: String, password: Secret[String]): UIO[Json] = { + private def requestToken(tokenEndpoint: Uri, user: String, password: Secret[String]): IO[Json] = { httpClient .toJson( HttpRequest( @@ -62,13 +62,13 @@ class OpenIdAuthService(httpClient: HttpClient, realms: Realms) extends MigrateE .hideErrorsWith(AuthTokenHttpError) } - private def parseResponse(json: Json): UIO[ParsedToken] = { + private def parseResponse(json: Json): IO[ParsedToken] = { for { rawToken <- json.hcursor.get[String]("access_token") match { - case Left(failure) => IO.terminate(AuthTokenNotFoundInResponse(failure)) - case Right(value) => UIO.pure(value) + case Left(failure) => IO.raiseError(AuthTokenNotFoundInResponse(failure)) + case Right(value) => IO.pure(value) } - parsedToken <- IO.fromEither(ParsedToken.fromToken(AuthToken(rawToken))).hideErrors + parsedToken <- IO.fromEither(ParsedToken.fromToken(AuthToken(rawToken))) } yield { parsedToken }