Skip to content

Commit

Permalink
add Observable#switchMapResource
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Mar 29, 2022
1 parent 1155349 commit 701cb57
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
42 changes: 41 additions & 1 deletion colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ object Observable {
RunEffect[F].unsafeRunSyncOrAsyncCancelable(resource.allocated) { either =>
either match {
case Right((value, finalizer)) =>
sink.unsafeOnNext(value)

cancelableBuilder.addExisting(Cancelable { () =>
// async and forget the finalizer, we do not need to cancel it.
// just pass the error to the sink.
Expand All @@ -464,8 +466,46 @@ object Observable {
}
})

case Left(error) =>
sink.unsafeOnError(error)
}

consecutive.switch()
},
)
},
sink.unsafeOnError,
),
)

Cancelable.composite(subscription, consecutive, cancelableBuilder)
}
}

def switchMapResource[F[_]: RunEffect: Sync, B](f: A => Resource[F, B]): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = {
val consecutive = Cancelable.consecutive()
val cancelableVariable = Cancelable.variable()

val subscription = source.unsafeSubscribe(
Observer.create[A](
{ value =>
val resource = f(value)
consecutive += (() =>
RunEffect[F].unsafeRunSyncOrAsyncCancelable(resource.allocated) { either =>
either match {
case Right((value, finalizer)) =>
sink.unsafeOnNext(value)

cancelableVariable.updateExisting(Cancelable { () =>
// async and forget the finalizer, we do not need to cancel it.
// just pass the error to the sink.
val _ = RunEffect[F].unsafeRunSyncOrAsyncCancelable(finalizer) {
case Right(()) => ()
case Left(error) => sink.unsafeOnError(error)
}
})

case Left(error) =>
sink.unsafeOnError(error)
}
Expand All @@ -478,7 +518,7 @@ object Observable {
),
)

Cancelable.composite(subscription, consecutive, cancelableBuilder)
Cancelable.composite(subscription, consecutive, cancelableVariable)
}
}

Expand Down
58 changes: 58 additions & 0 deletions colibri/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,64 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
finalizeCalls shouldBe List(3, 2, 1)
}

it should "switchMapResource" in {
var received = List.empty[Int]
var acquireCalls = List.empty[Int]
var finalizeCalls = List.empty[Int]
var errors = 0

def resource(i: Int) = Resource.make(SyncIO { acquireCalls ::= i })(_ => SyncIO { finalizeCalls ::= i }).as(i)
val subject = Subject.publish[Int]()
val stream = subject.switchMapResource(resource)

val cancelable = stream.unsafeSubscribe(
Observer.create[Int](
received ::= _,
_ => errors += 1,
),
)

received shouldBe List.empty
errors shouldBe 0
acquireCalls shouldBe List.empty
finalizeCalls shouldBe List.empty

subject.unsafeOnNext(1)

received shouldBe List(1)
errors shouldBe 0
acquireCalls shouldBe List(1)
finalizeCalls shouldBe List.empty

subject.unsafeOnNext(2)

received shouldBe List(2, 1)
errors shouldBe 0
acquireCalls shouldBe List(2, 1)
finalizeCalls shouldBe List(1)

subject.unsafeOnNext(3)

received shouldBe List(3, 2, 1)
errors shouldBe 0
acquireCalls shouldBe List(3, 2, 1)
finalizeCalls shouldBe List(2, 1)

cancelable.unsafeCancel()

received shouldBe List(3, 2, 1)
errors shouldBe 0
acquireCalls shouldBe List(3, 2, 1)
finalizeCalls shouldBe List(3, 2, 1)

cancelable.unsafeCancel()

received shouldBe List(3, 2, 1)
errors shouldBe 0
acquireCalls shouldBe List(3, 2, 1)
finalizeCalls shouldBe List(3, 2, 1)
}

it should "fromEffect" in {
var received = List.empty[Int]
var errors = 0
Expand Down

0 comments on commit 701cb57

Please sign in to comment.