Skip to content

Commit

Permalink
Merge branch 'master' into ce-migrate-search
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Oct 3, 2023
2 parents 90631e1 + 523ff52 commit f8a05ff
Show file tree
Hide file tree
Showing 23 changed files with 517 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.acls.model.AclAddress
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.AuthDirectives
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegment, IdSegmentRef}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.StoragePermissionProvider
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.StoragePermissionProvider.AccessType
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission

/**
Expand All @@ -19,10 +23,11 @@ import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
* @param aclCheck
* verify the acls for users
*/
final class UserPermissionsRoutes(identities: Identities, aclCheck: AclCheck)(implicit
baseUri: BaseUri
final class UserPermissionsRoutes(identities: Identities, aclCheck: AclCheck, storages: StoragePermissionProvider)(
implicit baseUri: BaseUri
) extends AuthDirectives(identities, aclCheck)
with CirceUnmarshalling {
with CirceUnmarshalling
with MigrateEffectSyntax {

def routes: Route =
baseUriPrefix(baseUri.prefix) {
Expand All @@ -31,11 +36,21 @@ final class UserPermissionsRoutes(identities: Identities, aclCheck: AclCheck)(im
projectRef { project =>
extractCaller { implicit caller =>
head {
parameter("permission".as[Permission]) { permission =>
authorizeFor(project, permission)(caller) {
complete(StatusCodes.NoContent)
concat(
parameter("permission".as[Permission]) { permission =>
authorizeFor(project, permission)(caller) {
complete(StatusCodes.NoContent)
}
},
parameters("storage".as[IdSegment], "type".as[AccessType]) { (storageId, `type`) =>
authorizeForIO(
AclAddress.fromProject(project),
storages.permissionFor(IdSegmentRef(storageId), project, `type`)
)(caller) {
complete(StatusCodes.NoContent)
}
}
}
)
}
}
}
Expand All @@ -45,8 +60,8 @@ final class UserPermissionsRoutes(identities: Identities, aclCheck: AclCheck)(im
}

object UserPermissionsRoutes {
def apply(identities: Identities, aclCheck: AclCheck)(implicit
def apply(identities: Identities, aclCheck: AclCheck, storagePermissionProvider: StoragePermissionProvider)(implicit
baseUri: BaseUri
): Route =
new UserPermissionsRoutes(identities, aclCheck: AclCheck).routes
new UserPermissionsRoutes(identities, aclCheck: AclCheck, storagePermissionProvider).routes
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.{AclCheck, Acls, AclsImpl}
import ch.epfl.bluebrain.nexus.delta.sdk.deletion.ProjectDeletionTask
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, MetadataContextValue}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.{Permissions, StoragePermissionProvider}
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import izumi.distage.model.definition.{Id, ModuleDef}
Expand Down Expand Up @@ -72,8 +72,14 @@ object AclsModule extends ModuleDef {
} yield RemoteContextResolution.fixed(contexts.acls -> aclsCtx, contexts.aclsMetadata -> aclsMetaCtx)
)

make[UserPermissionsRoutes].from { (identities: Identities, aclCheck: AclCheck, baseUri: BaseUri) =>
new UserPermissionsRoutes(identities, aclCheck)(baseUri)
make[UserPermissionsRoutes].from {
(
identities: Identities,
aclCheck: AclCheck,
baseUri: BaseUri,
storagePermissionProvider: StoragePermissionProvider
) =>
new UserPermissionsRoutes(identities, aclCheck, storagePermissionProvider)(baseUri)
}

many[PriorityRoute].add { (alcs: AclsRoutes, userPermissions: UserPermissionsRoutes) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMaxPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.cache.CacheConfig
Expand Down Expand Up @@ -34,8 +35,8 @@ object IdentitiesModule extends ModuleDef {
new OpenIdAuthService(httpClient, realms)
}

make[AuthTokenProvider].fromEffect { (authService: OpenIdAuthService) =>
AuthTokenProvider(authService)
make[AuthTokenProvider].fromEffect { (authService: OpenIdAuthService, clock: Clock[IO]) =>
AuthTokenProvider(authService)(clock)
}

many[RemoteContextResolution].addEffect(ContextValue.fromFile("contexts/identities.json").map { ctx =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.{`Last-Event-ID`, Accept}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.stream.alpakka.sse.scaladsl.EventSource
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.RemoteProjectSource
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeBranch
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
Expand Down Expand Up @@ -87,11 +88,12 @@ object DeltaClient {
)(implicit
as: ActorSystem[Nothing],
scheduler: Scheduler
) extends DeltaClient {
) extends DeltaClient
with MigrateEffectSyntax {

override def projectStatistics(source: RemoteProjectSource): HttpResult[ProjectStatistics] = {
for {
authToken <- authTokenProvider(credentials)
authToken <- authTokenProvider(credentials).toBIO
request =
Get(
source.endpoint / "projects" / source.project.organization.value / source.project.project.value / "statistics"
Expand All @@ -104,7 +106,7 @@ object DeltaClient {

override def remaining(source: RemoteProjectSource, offset: Offset): HttpResult[RemainingElems] = {
for {
authToken <- authTokenProvider(credentials)
authToken <- authTokenProvider(credentials).toBIO
request = Get(elemAddress(source) / "remaining")
.addHeader(accept)
.addHeader(`Last-Event-ID`(offset.value.toString))
Expand All @@ -115,7 +117,7 @@ object DeltaClient {

override def checkElems(source: RemoteProjectSource): HttpResult[Unit] = {
for {
authToken <- authTokenProvider(credentials)
authToken <- authTokenProvider(credentials).toBIO
result <- client(Head(elemAddress(source)).withCredentials(authToken)) {
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> IO.unit
}
Expand All @@ -130,7 +132,7 @@ object DeltaClient {

def send(request: HttpRequest): Future[HttpResponse] = {
(for {
authToken <- authTokenProvider(credentials)
authToken <- authTokenProvider(credentials).toBIO
result <- client[HttpResponse](request.withCredentials(authToken))(IO.pure(_))
} yield result).runToFuture
}
Expand Down Expand Up @@ -164,7 +166,7 @@ object DeltaClient {
val resourceUrl =
source.endpoint / "resources" / source.project.organization.value / source.project.project.value / "_" / id.toString
for {
authToken <- authTokenProvider(credentials)
authToken <- authTokenProvider(credentials).toBIO
req = Get(
source.resourceTag.fold(resourceUrl)(t => resourceUrl.withQuery(Query("tag" -> t.value)))
).addHeader(Accept(RdfMediaTypes.`application/n-quads`)).withCredentials(authToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storagesSchemaId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageDeletionTask, Storages, StoragesStatistics}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{StorageDeletionTask, StoragePermissionProviderImpl, Storages, StoragesStatistics}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
Expand All @@ -33,7 +33,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.ServiceAccount
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.model.metrics.ScopedEventMetricEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.{Permissions, StoragePermissionProvider}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext.ContextRejection
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
Expand Down Expand Up @@ -94,6 +94,10 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
)
}

make[StoragePermissionProvider].from { (storages: Storages) =>
new StoragePermissionProviderImpl(storages)
}

make[StoragesStatistics].from {
(
client: ElasticSearchClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages

import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.StoragePermissionProvider
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.StoragePermissionProvider.AccessType.{Read, Write}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import monix.bio.UIO

class StoragePermissionProviderImpl(storages: Storages) extends StoragePermissionProvider {
override def permissionFor(
id: IdSegmentRef,
project: ProjectRef,
accessType: StoragePermissionProvider.AccessType
): UIO[Permission] = {
storages
.fetch(id, project)
.map(storage => storage.value.storageValue)
.map(storage =>
accessType match {
case Read => storage.readPermission
case Write => storage.writePermission
}
)
.hideErrors
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import akka.http.scaladsl.model.Multipart.FormData
import akka.http.scaladsl.model.Multipart.FormData.BodyPart
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri.Path
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection.UnexpectedFetchError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.MoveFileRejection.UnexpectedMoveError
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.{FetchFileRejection, MoveFileRejection, SaveFileRejection}
Expand Down Expand Up @@ -34,7 +35,7 @@ import scala.concurrent.duration._
*/
final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenProvider, credentials: Credentials)(
implicit as: ActorSystem
) {
) extends MigrateEffectSyntax {
import as.dispatcher

private val serviceName = Name.unsafe("remoteStorage")
Expand All @@ -58,7 +59,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP
* the storage bucket name
*/
def exists(bucket: Label)(implicit baseUri: BaseUri): IO[HttpClientError, Unit] = {
getAuthToken(credentials).flatMap { authToken =>
getAuthToken(credentials).toBIO.flatMap { authToken =>
val endpoint = baseUri.endpoint / "buckets" / bucket.value
val req = Head(endpoint).withCredentials(authToken)
client(req) {
Expand All @@ -82,7 +83,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP
relativePath: Path,
entity: BodyPartEntity
)(implicit baseUri: BaseUri): IO[SaveFileRejection, RemoteDiskStorageFileAttributes] = {
getAuthToken(credentials).flatMap { authToken =>
getAuthToken(credentials).toBIO.flatMap { authToken =>
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath
val filename = relativePath.lastSegment.getOrElse("filename")
val multipartForm = FormData(BodyPart("file", entity, Map("filename" -> filename))).toEntity()
Expand All @@ -106,7 +107,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP
* the relative path to the file location
*/
def getFile(bucket: Label, relativePath: Path)(implicit baseUri: BaseUri): IO[FetchFileRejection, AkkaSource] = {
getAuthToken(credentials).flatMap { authToken =>
getAuthToken(credentials).toBIO.flatMap { authToken =>
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath
client.toDataBytes(Get(endpoint).withCredentials(authToken)).mapError {
case error @ HttpClientStatusError(_, `NotFound`, _) if !bucketNotFoundType(error) =>
Expand All @@ -129,7 +130,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP
bucket: Label,
relativePath: Path
)(implicit baseUri: BaseUri): IO[FetchFileRejection, RemoteDiskStorageFileAttributes] = {
getAuthToken(credentials).flatMap { authToken =>
getAuthToken(credentials).toBIO.flatMap { authToken =>
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "attributes" / relativePath
client.fromJsonTo[RemoteDiskStorageFileAttributes](Get(endpoint).withCredentials(authToken)).mapError {
case error @ HttpClientStatusError(_, `NotFound`, _) if !bucketNotFoundType(error) =>
Expand All @@ -156,7 +157,7 @@ final class RemoteDiskStorageClient(client: HttpClient, getAuthToken: AuthTokenP
sourceRelativePath: Path,
destRelativePath: Path
)(implicit baseUri: BaseUri): IO[MoveFileRejection, RemoteDiskStorageFileAttributes] = {
getAuthToken(credentials).flatMap { authToken =>
getAuthToken(credentials).toBIO.flatMap { authToken =>
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / destRelativePath
val payload = Json.obj("source" -> sourceRelativePath.toString.asJson)
client.fromJsonTo[RemoteDiskStorageFileAttributes](Put(endpoint, payload).withCredentials(authToken)).mapError {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
package ch.epfl.bluebrain.nexus.delta.sdk.auth

import cats.effect.Clock
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.cache.KeyValueStore
import ch.epfl.bluebrain.nexus.delta.kernel.cache.LocalCache
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.MigrateEffectSyntax
import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils
import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOInstant
import ch.epfl.bluebrain.nexus.delta.sdk.auth.Credentials.ClientCredentials
import ch.epfl.bluebrain.nexus.delta.sdk.identities.ParsedToken
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.AuthToken
import monix.bio.UIO
import monix.bio

import java.time.{Duration, Instant}

/**
* Provides an auth token for the service account, for use when comunicating with remote storage
*/
trait AuthTokenProvider {
def apply(credentials: Credentials): UIO[Option[AuthToken]]
def apply(credentials: Credentials): IO[Option[AuthToken]]
}

object AuthTokenProvider {
def apply(authService: OpenIdAuthService): UIO[AuthTokenProvider] = {
KeyValueStore[ClientCredentials, ParsedToken]().map(cache => new CachingOpenIdAuthTokenProvider(authService, cache))
def apply(authService: OpenIdAuthService)(implicit clock: Clock[IO]): bio.UIO[AuthTokenProvider] = {
LocalCache[ClientCredentials, ParsedToken]()
.map(cache => new CachingOpenIdAuthTokenProvider(authService, cache))
.toBIO
}
def anonymousForTest: AuthTokenProvider = new AnonymousAuthTokenProvider
def fixedForTest(token: String): AuthTokenProvider = new AuthTokenProvider {
override def apply(credentials: Credentials): UIO[Option[AuthToken]] = UIO.pure(Some(AuthToken(token)))
override def apply(credentials: Credentials): IO[Option[AuthToken]] = IO.pure(Some(AuthToken(token)))
}
}

private class AnonymousAuthTokenProvider extends AuthTokenProvider {
override def apply(credentials: Credentials): UIO[Option[AuthToken]] = UIO.pure(None)
override def apply(credentials: Credentials): IO[Option[AuthToken]] = IO.pure(None)
}

/**
Expand All @@ -39,42 +41,42 @@ private class AnonymousAuthTokenProvider extends AuthTokenProvider {
*/
private class CachingOpenIdAuthTokenProvider(
service: OpenIdAuthService,
cache: KeyValueStore[ClientCredentials, ParsedToken]
cache: LocalCache[ClientCredentials, ParsedToken]
)(implicit
clock: Clock[UIO]
clock: Clock[IO]
) extends AuthTokenProvider
with MigrateEffectSyntax {

private val logger = Logger.cats[CachingOpenIdAuthTokenProvider]

override def apply(credentials: Credentials): UIO[Option[AuthToken]] = {
override def apply(credentials: Credentials): IO[Option[AuthToken]] = {

credentials match {
case Credentials.Anonymous => UIO.pure(None)
case Credentials.JWTToken(token) => UIO.pure(Some(AuthToken(token)))
case Credentials.Anonymous => IO.pure(None)
case Credentials.JWTToken(token) => IO.pure(Some(AuthToken(token)))
case credentials: ClientCredentials => clientCredentialsFlow(credentials)
}
}

private def clientCredentialsFlow(credentials: ClientCredentials) = {
private def clientCredentialsFlow(credentials: ClientCredentials): IO[Some[AuthToken]] = {
for {
existingValue <- cache.get(credentials)
now <- IOUtils.instant
now <- IOInstant.now
finalValue <- existingValue match {
case None =>
logger.info("Fetching auth token, no initial value.").toUIO >>
logger.info("Fetching auth token, no initial value.") *>
fetchValue(credentials)
case Some(value) if isExpired(value, now) =>
logger.info("Fetching new auth token, current value near expiry.").toUIO >>
logger.info("Fetching new auth token, current value near expiry.") *>
fetchValue(credentials)
case Some(value) => UIO.pure(value)
case Some(value) => IO.pure(value)
}
} yield {
Some(AuthToken(finalValue.rawToken))
}
}

private def fetchValue(credentials: ClientCredentials) = {
private def fetchValue(credentials: ClientCredentials): IO[ParsedToken] = {
cache.getOrElseUpdate(credentials, service.auth(credentials))
}

Expand Down
Loading

0 comments on commit f8a05ff

Please sign in to comment.