Skip to content

Commit

Permalink
fix: run on a single thread without blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
runtologist committed Oct 25, 2021
1 parent 0dffb69 commit 3e6271d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object Adapters {
override def onSubscribe(s: Subscription): Unit =
if (s == null) {
val e = new NullPointerException("s was null in onSubscribe")
runtime.unsafeRun(p.fail(e))
p.unsafeDone(IO.fail(e))
throw e
} else {
runtime.unsafeRun(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ object PublisherToStreamSpec extends DefaultRunnableSpec {
for {
subscriberP <- Promise.make[Nothing, Subscriber[_]]
cancelledLatch <- Promise.make[Nothing, Unit]
runtime <- ZIO.runtime[Any]
subscription = new Subscription {
override def request(x$1: Long): Unit = ()
override def cancel(): Unit = runtime.unsafeRun(cancelledLatch.succeed(()).unit)
override def cancel(): Unit = cancelledLatch.unsafeDone(UIO.unit)
}
probe = new Publisher[Int] {
override def subscribe(subscriber: Subscriber[_ >: Int]): Unit =
runtime.unsafeRun(subscriberP.succeed(subscriber).unit)
subscriberP.unsafeDone(UIO.succeedNow(subscriber))
}
fiber <- probe.toStream(bufferSize).run(Sink.drain).fork
subscriber <- subscriberP.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,40 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec {
} yield assert(r)(succeeds(equalTo(List(1, 2, 3, 4, 5))))
),
test("cancels subscription on interruption after subscription")(
ZIO.blocking(
for {
(publisher, subscribed, _, canceled) <- makePublisherProbe
fiber <- Sink.drain
.toSubscriber()
.use { case (subscriber, _) => UIO(publisher.subscribe(subscriber)) *> UIO.never }
.fork
_ <-
Live.live(
assertM(subscribed.await.timeoutFail("timeout awaiting subscribe.")(500.millis).exit)(succeeds(isUnit))
)
_ <- fiber.interrupt
_ <- Live.live(
assertM(canceled.await.timeoutFail("timeout awaiting cancel.")(500.millis).exit)(succeeds(isUnit))
)
r <- fiber.join.exit
} yield assert(r)(isInterrupted)
)
for {
(publisher, subscribed, _, canceled) <- makePublisherProbe
fiber <- Sink
.foreachChunk[Any, Nothing, Int](_ => ZIO.yieldNow)
.toSubscriber()
.use { case (subscriber, _) => UIO(publisher.subscribe(subscriber)) *> UIO.never }
.fork
_ <-
Live.live(
assertM(subscribed.await.timeoutFail("timeout awaiting subscribe.")(500.millis).exit)(succeeds(isUnit))
)
_ <- fiber.interrupt
_ <- Live.live(
assertM(canceled.await.timeoutFail("timeout awaiting cancel.")(500.millis).exit)(succeeds(isUnit))
)
r <- fiber.join.exit
} yield assert(r)(isInterrupted)
),
test("cancels subscription on interruption during consuption")(
ZIO.blocking(
for {
(publisher, subscribed, requested, canceled) <- makePublisherProbe
fiber <- Sink.drain
.toSubscriber()
.use { case (subscriber, _) =>
Task.attemptBlockingInterrupt(publisher.subscribe(subscriber)) *> UIO.never
}
.fork
_ <- assertM(subscribed.await.exit)(succeeds(isUnit))
_ <- assertM(requested.await.exit)(succeeds(isUnit))
_ <- fiber.interrupt
_ <- assertM(canceled.await.exit)(succeeds(isUnit))
r <- fiber.join.exit
} yield assert(r)(isInterrupted)
)
for {
(publisher, subscribed, requested, canceled) <- makePublisherProbe
fiber <- Sink
.foreachChunk[Any, Nothing, Int](_ => ZIO.yieldNow)
.toSubscriber()
.use { case (subscriber, _) =>
Task.attemptBlockingInterrupt(publisher.subscribe(subscriber)) *> UIO.never
}
.fork
_ <- assertM(subscribed.await.exit)(succeeds(isUnit))
_ <- assertM(requested.await.exit)(succeeds(isUnit))
_ <- fiber.interrupt
_ <- assertM(canceled.await.exit)(succeeds(isUnit))
r <- fiber.join.exit
} yield assert(r)(isInterrupted)
),
suite("passes all required and optional TCK tests")(
tests: _*
Expand All @@ -93,7 +91,6 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec {

val makePublisherProbe =
for {
runtime <- ZIO.runtime[Any]
subscribed <- Promise.make[Nothing, Unit]
requested <- Promise.make[Nothing, Unit]
canceled <- Promise.make[Nothing, Unit]
Expand All @@ -102,14 +99,14 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec {
s.onSubscribe(
new Subscription {
override def request(n: Long): Unit = {
runtime.unsafeRun(requested.succeed(()).unit)
requested.unsafeDone(UIO.unit)
(1 to n.toInt).foreach(s.onNext(_))
}
override def cancel(): Unit =
runtime.unsafeRun(canceled.succeed(()).unit)
canceled.unsafeDone(UIO.unit)
}
)
runtime.unsafeRun(subscribed.succeed(()).unit)
subscribed.unsafeDone(UIO.unit)
}
}
} yield (publisher, subscribed, requested, canceled)
Expand Down

0 comments on commit 3e6271d

Please sign in to comment.