Skip to content

Commit

Permalink
Migrate files to Cats Effect (#4392)
Browse files Browse the repository at this point in the history
* Migrate Files to Cats Effect

* Add syntax for indexing / narrowing errors

* Refactor file Id passing and Iri generation

* Fix after merge

* Inject global EC in storage module

* Suppress scapegoat warning

* Use actor EC in FormDataExtractor

* Reject on FileNotFound + renaming + use CatsEffectSpec
  • Loading branch information
dantb authored Oct 20, 2023
1 parent 72dc3ba commit ee92a35
Show file tree
Hide file tree
Showing 22 changed files with 399 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object RetryStrategy {
RetryStrategy(
config,
(t: Throwable) => NonFatal(t),
(t: Throwable, d: RetryDetails) => logError[Throwable](logger, action)(t, d).toBIO
(t: Throwable, d: RetryDetails) => logError[Throwable](logger, action)(t, d).toBIOThrowable
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import cats.effect.IO
import cats.~>
import monix.bio.{IO => BIO, Task, UIO}
import monix.execution.Scheduler.Implicits.global
import shapeless.=:!=

import scala.annotation.nowarn

import scala.reflect.ClassTag

Expand Down Expand Up @@ -34,7 +37,20 @@ final class MonixBioToCatsIOEitherOps[E, A](private val io: BIO[E, A]) extends A
}

final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal {
def toBIO[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] =

/**
* Safe conversion between CE and Monix, forcing the user to specify a strict subtype of [[Throwable]]. If omitted,
* the compiler may infer [[Throwable]] and bypass any custom error handling.
*/
@SuppressWarnings(Array("UnusedMethodParameter"))
@nowarn
def toBIO[E <: Throwable](implicit E: ClassTag[E], ev: E =:!= Throwable): BIO[E, A] =
toBIOThrowable[E]

/**
* Prefer [[toBIO]]. Only use this when we are sure there's no custom error handling logic.
*/
def toBIOThrowable[E <: Throwable](implicit E: ClassTag[E]): BIO[E, A] =
BIO.from(io).mapErrorPartialWith {
case E(e) => monix.bio.IO.raiseError(e)
case other => BIO.terminate(other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveReference.{Fil
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model.ArchiveRejection._
import ch.epfl.bluebrain.nexus.delta.plugins.archive.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileId, FileRejection}
import ch.epfl.bluebrain.nexus.delta.rdf.implicits._
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
Expand All @@ -27,7 +27,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent
import ch.epfl.bluebrain.nexus.delta.sdk.marshalling.AnnotatedSource
import ch.epfl.bluebrain.nexus.delta.sdk.model.ResourceRepresentation._
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceRepresentation}
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.resources
import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter
import ch.epfl.bluebrain.nexus.delta.sdk.{AkkaSource, JsonLdValue, ResourceShifts}
Expand Down Expand Up @@ -282,7 +282,7 @@ object ArchiveDownload {
ArchiveDownload(
aclCheck,
shifts.fetch,
(id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(IdSegmentRef(id), project)(caller),
(id: ResourceRef, project: ProjectRef, caller: Caller) => files.fetchContent(FileId(id, project))(caller),
fileSelf
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object DeltaClient {

override def projectStatistics(source: RemoteProjectSource): HttpResult[ProjectStatistics] = {
for {
authToken <- authTokenProvider(credentials).toBIO
authToken <- authTokenProvider(credentials).toBIOThrowable
request =
Get(
source.endpoint / "projects" / source.project.organization.value / source.project.project.value / "statistics"
Expand All @@ -108,7 +108,7 @@ object DeltaClient {

override def remaining(source: RemoteProjectSource, offset: Offset): HttpResult[RemainingElems] = {
for {
authToken <- authTokenProvider(credentials).toBIO
authToken <- authTokenProvider(credentials).toBIOThrowable
request = Get(elemAddress(source) / "remaining")
.addHeader(accept)
.addHeader(`Last-Event-ID`(offset.value.toString))
Expand All @@ -119,7 +119,7 @@ object DeltaClient {

override def checkElems(source: RemoteProjectSource): HttpResult[Unit] = {
for {
authToken <- authTokenProvider(credentials).toBIO
authToken <- authTokenProvider(credentials).toBIOThrowable
result <- client(Head(elemAddress(source)).withCredentials(authToken)) {
case resp if resp.status.isSuccess() => UIO.delay(resp.discardEntityBytes()) >> BIO.unit
}
Expand All @@ -134,7 +134,7 @@ object DeltaClient {

def send(request: HttpRequest): Future[HttpResponse] = {
(for {
authToken <- authTokenProvider(credentials).toBIO
authToken <- authTokenProvider(credentials).toBIOThrowable
result <- client[HttpResponse](request.withCredentials(authToken))(BIO.pure(_))
} yield result).runToFuture
}
Expand Down Expand Up @@ -169,7 +169,7 @@ object DeltaClient {
val resourceUrl =
source.endpoint / "resources" / source.project.organization.value / source.project.project.value / "_" / id.toString
for {
authToken <- authTokenProvider(credentials).toBIO
authToken <- authTokenProvider(credentials).toBIOThrowable
req = Get(
source.resourceTag.fold(resourceUrl)(t => resourceUrl.withQuery(Query("tag" -> t.value)))
).addHeader(Accept(RdfMediaTypes.`application/n-quads`)).withCredentials(authToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage

import akka.actor
import akka.actor.typed.ActorSystem
import cats.effect.{Clock, IO}
import cats.effect.{Clock, ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
Expand Down Expand Up @@ -166,11 +166,11 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
supervisor: Supervisor,
storagesStatistics: StoragesStatistics,
xas: Transactors,
clock: Clock[UIO],
clock: Clock[IO],
uuidF: UUIDF,
as: ActorSystem[Nothing],
remoteDiskStorageClient: RemoteDiskStorageClient,
scheduler: Scheduler
cs: ContextShift[IO]
) =>
IO
.delay(
Expand All @@ -186,7 +186,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
)(
clock,
uuidF,
scheduler,
cs,
as
)
)
Expand All @@ -205,7 +205,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
indexingAction: AggregateIndexingAction,
shift: File.Shift,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
Expand All @@ -214,7 +213,6 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
new FilesRoutes(identities, aclCheck, files, schemeDirectives, indexingAction(_, _, _)(shift))(
baseUri,
storageConfig,
s,
cr,
ordering,
fusionConfig
Expand Down
Loading

0 comments on commit ee92a35

Please sign in to comment.