Skip to content

Commit

Permalink
Migrate resources to Cats Effect (#4370)
Browse files Browse the repository at this point in the history
* Migrate ResourceRoutes to Cats Effect

* Migrate ResourcesTrialRoutes to Cats Effect

* Migrate ResourcesTrialRoutes to Cats Effect

* Migrate Resources to Cats Effect

* Migrate ResourceGenerationResult to Cats Effect

* Migrate ValidateResource to Cats Effect

* Use .rethrow

* Migrate ResourceShift to Cats Effect

* Migrate Expanded and CompactedJsonLd to Cats Effect

* use ioJsonContentOf to fix test
  • Loading branch information
shinyhappydan authored Oct 18, 2023
1 parent 9136b71 commit 7ef5876
Show file tree
Hide file tree
Showing 48 changed files with 349 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.routes
import akka.http.scaladsl.model.StatusCodes.{Created, OK}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas
Expand All @@ -13,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.routes.ResourcesRoutes.asSourceWithMetadata
import ch.epfl.bluebrain.nexus.delta.sdk._
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.fusion.FusionConfig
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
Expand All @@ -28,8 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.model.{Resource, ResourceReje
import ch.epfl.bluebrain.nexus.delta.sdk.resources.{NexusSource, Resources}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.{Json, Printer}
import monix.bio.IO
import monix.execution.Scheduler

/**
* The resource routes
Expand All @@ -53,7 +52,6 @@ final class ResourcesRoutes(
index: IndexingAction.Execute[Resource]
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig,
Expand Down Expand Up @@ -86,8 +84,9 @@ final class ResourcesRoutes(
Created,
resources
.create(ref, resourceSchema, source.value, tag)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
)
}
},
Expand All @@ -102,8 +101,9 @@ final class ResourcesRoutes(
Created,
resources
.create(ref, schema, source.value, tag)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -123,17 +123,19 @@ final class ResourcesRoutes(
Created,
resources
.create(id, ref, schema, source.value, tag)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
case (Some(rev), source, _) =>
// Update a resource
emit(
resources
.update(id, ref, schemaOpt, rev, source.value)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -145,8 +147,9 @@ final class ResourcesRoutes(
emit(
resources
.deprecate(id, ref, schemaOpt, rev)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -160,7 +163,7 @@ final class ResourcesRoutes(
emit(
resources
.fetch(id, ref, schemaOpt)
.leftWiden[ResourceRejection]
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -174,8 +177,9 @@ final class ResourcesRoutes(
OK,
resources
.refresh(id, ref, schemaOpt)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -190,29 +194,43 @@ final class ResourcesRoutes(
resources
.fetch(id, ref, schemaOpt)
.flatMap(asSourceWithMetadata)
.attemptNarrow[ResourceRejection]
)
} else {
val sourceIO = resources.fetch(id, ref, schemaOpt).map(_.value.source)
val value = sourceIO.leftWiden[ResourceRejection]
emit(value.rejectWhen(wrongJsonOrNotFound))
emit(
resources
.fetch(id, ref, schemaOpt)
.map(_.value.source)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
}
}
},
// Get remote contexts
pathPrefix("remote-contexts") {
(get & idSegmentRef(id) & pathEndOrSingleSlash & authorizeFor(ref, Read)) { id =>
val remoteContextsIO = resources.fetchState(id, ref, schemaOpt).map(_.remoteContexts)
emit(remoteContextsIO.leftWiden[ResourceRejection])
emit(
resources
.fetchState(id, ref, schemaOpt)
.map(_.remoteContexts)
.attemptNarrow[ResourceRejection]
)
}
},
// Tag a resource
pathPrefix("tags") {
concat(
// Fetch a resource tags
(get & idSegmentRef(id) & pathEndOrSingleSlash & authorizeFor(ref, Read)) { id =>
val tagsIO = resources.fetch(id, ref, schemaOpt).map(_.value.tags)
emit(tagsIO.leftWiden[ResourceRejection].rejectWhen(wrongJsonOrNotFound))
emit(
resources
.fetch(id, ref, schemaOpt)
.map(_.value.tags)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
},
// Tag a resource
(post & parameter("rev".as[Int]) & pathEndOrSingleSlash) { rev =>
Expand All @@ -222,8 +240,9 @@ final class ResourcesRoutes(
Created,
resources
.tag(id, ref, schemaOpt, tag, tagRev, rev)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectWhen(wrongJsonOrNotFound)
)
}
Expand All @@ -237,8 +256,9 @@ final class ResourcesRoutes(
emit(
resources
.deleteTag(id, ref, schemaOpt, tag, rev)
.tapEval(indexUIO(ref, _, mode))
.flatTap(indexUIO(ref, _, mode))
.map(_.void)
.attemptNarrow[ResourceRejection]
.rejectOn[ResourceNotFound]
)
}
Expand Down Expand Up @@ -274,7 +294,6 @@ object ResourcesRoutes {
index: IndexingAction.Execute[Resource]
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig,
Expand All @@ -283,7 +302,7 @@ object ResourcesRoutes {

def asSourceWithMetadata(
resource: ResourceF[Resource]
)(implicit baseUri: BaseUri, cr: RemoteContextResolution): IO[ResourceRejection, Json] =
)(implicit baseUri: BaseUri, cr: RemoteContextResolution): IO[Json] =
AnnotatedSource(resource, resource.value.source).mapError(e => InvalidJsonLdFormat(Some(resource.id), e))

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package ch.epfl.bluebrain.nexus.delta.routes

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.schemas
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.rdf.utils.JsonKeyOrdering
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesTrialRoutes.SchemaInput._
import ch.epfl.bluebrain.nexus.delta.routes.ResourcesTrialRoutes.{GenerateSchema, GenerationInput}
import ch.epfl.bluebrain.nexus.delta.sdk.SchemaResource
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.ce.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.circe.CirceUnmarshalling
import ch.epfl.bluebrain.nexus.delta.sdk.directives.DeltaDirectives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.{AuthDirectives, DeltaSchemeDirectives}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
Expand All @@ -29,8 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredDecoder
import io.circe.{Decoder, Json}
import monix.bio.IO
import monix.execution.Scheduler

import scala.annotation.nowarn

Expand All @@ -45,7 +43,6 @@ final class ResourcesTrialRoutes(
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
decodingOption: DecodingOption
Expand All @@ -67,7 +64,9 @@ final class ResourcesTrialRoutes(
authorizeFor(project, Write).apply {
val schemaOpt = underscoreToOption(schema)
emit(
resourcesTrial.validate(id, project, schemaOpt).leftWiden[ResourceRejection]
resourcesTrial
.validate(id, project, schemaOpt)
.attemptNarrow[ResourceRejection]
)
}
}
Expand All @@ -92,19 +91,27 @@ final class ResourcesTrialRoutes(
private def generate(project: ProjectRef, input: GenerationInput)(implicit caller: Caller) =
input.schema match {
case ExistingSchema(schemaId) =>
emit(resourcesTrial.generate(project, schemaId, input.resource).flatMap(_.asJson))
emit(
resourcesTrial
.generate(project, schemaId, input.resource)
.flatMap(_.asJson)
)
case NewSchema(schemaSource) =>
emit(
generateSchema(project, schemaSource, caller).flatMap { schema =>
resourcesTrial.generate(project, schema, input.resource).flatMap(_.asJson)
}
generateSchema(project, schemaSource, caller)
.flatMap { schema =>
resourcesTrial
.generate(project, schema, input.resource)
.flatMap(_.asJson)
}
.attemptNarrow[SchemaRejection]
)
}
}

object ResourcesTrialRoutes {

type GenerateSchema = (ProjectRef, Json, Caller) => IO[SchemaRejection, SchemaResource]
type GenerateSchema = (ProjectRef, Json, Caller) => IO[SchemaResource]

sealed private[routes] trait SchemaInput extends Product

Expand Down Expand Up @@ -146,15 +153,14 @@ object ResourcesTrialRoutes {
schemeDirectives: DeltaSchemeDirectives
)(implicit
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution,
ordering: JsonKeyOrdering,
decodingOption: DecodingOption
): ResourcesTrialRoutes =
new ResourcesTrialRoutes(
identities,
aclCheck,
(project, source, caller) => schemas.createDryRun(project, source)(caller).toBIO[SchemaRejection],
(project, source, caller) => schemas.createDryRun(project, source)(caller),
resourcesTrial,
schemeDirectives
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.multifetch.MultiFetch
import ch.epfl.bluebrain.nexus.delta.sdk.multifetch.model.MultiFetchRequest
import ch.epfl.bluebrain.nexus.delta.sdk.{PriorityRoute, ResourceShifts}
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import distage.ModuleDef
import izumi.distage.model.definition.Id
import monix.execution.Scheduler
Expand All @@ -23,7 +24,7 @@ object MultiFetchModule extends ModuleDef {
) =>
MultiFetch(
aclCheck,
(input: MultiFetchRequest.Input) => shifts.fetch(input.id, input.project)
(input: MultiFetchRequest.Input) => shifts.fetch(input.id, input.project).toUIO
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.Clock
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMinPriority
import ch.epfl.bluebrain.nexus.delta.config.AppConfig
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
Expand Down Expand Up @@ -29,8 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.schemas.model.Schema
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.UIO
import monix.execution.Scheduler

/**
* Resources wiring
Expand All @@ -51,7 +49,7 @@ object ResourcesModule extends ModuleDef {
resolverContextResolution: ResolverContextResolution,
api: JsonLdApi,
xas: Transactors,
clock: Clock[UIO],
clock: Clock[IO],
uuidF: UUIDF
) =>
ResourcesImpl(
Expand Down Expand Up @@ -85,7 +83,6 @@ object ResourcesModule extends ModuleDef {
indexingAction: AggregateIndexingAction,
shift: Resource.Shift,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig,
Expand All @@ -99,7 +96,6 @@ object ResourcesModule extends ModuleDef {
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
cr,
ordering,
fusionConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.wiring

import cats.effect.Clock
import cats.effect.{Clock, IO}
import ch.epfl.bluebrain.nexus.delta.Main.pluginsMinPriority
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
Expand All @@ -20,8 +20,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.{Resources, ResourcesConfig,
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.Schemas
import distage.ModuleDef
import izumi.distage.model.definition.Id
import monix.bio.UIO
import monix.execution.Scheduler

/**
* Resources trial wiring
Expand All @@ -35,7 +33,7 @@ object ResourcesTrialModule extends ModuleDef {
fetchContext: FetchContext[ContextRejection],
contextResolution: ResolverContextResolution,
api: JsonLdApi,
clock: Clock[UIO],
clock: Clock[IO],
uuidF: UUIDF
) =>
ResourcesTrial(
Expand All @@ -54,7 +52,6 @@ object ResourcesTrialModule extends ModuleDef {
resourcesTrial: ResourcesTrial,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
config: ResourcesConfig
Expand All @@ -67,7 +64,6 @@ object ResourcesTrialModule extends ModuleDef {
schemeDirectives
)(
baseUri,
s,
cr,
ordering,
config.decodingOption
Expand Down
Loading

0 comments on commit 7ef5876

Please sign in to comment.