From 3e6271d0371edd46043f237a5a8e8e78be4e909e Mon Sep 17 00:00:00 2001 From: Simon Schenk Date: Mon, 25 Oct 2021 15:42:17 +0200 Subject: [PATCH] fix: run on a single thread without blocking --- .../interop/reactivestreams/Adapters.scala | 2 +- .../PublisherToStreamSpec.scala | 5 +- .../SinkToSubscriberSpec.scala | 73 +++++++++---------- 3 files changed, 38 insertions(+), 42 deletions(-) diff --git a/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 22d355c..4979856 100644 --- a/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -137,7 +137,7 @@ object Adapters { override def onSubscribe(s: Subscription): Unit = if (s == null) { val e = new NullPointerException("s was null in onSubscribe") - runtime.unsafeRun(p.fail(e)) + p.unsafeDone(IO.fail(e)) throw e } else { runtime.unsafeRun( diff --git a/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala b/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala index 6c1e897..6ecc0a2 100644 --- a/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala +++ b/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala @@ -74,14 +74,13 @@ object PublisherToStreamSpec extends DefaultRunnableSpec { for { subscriberP <- Promise.make[Nothing, Subscriber[_]] cancelledLatch <- Promise.make[Nothing, Unit] - runtime <- ZIO.runtime[Any] subscription = new Subscription { override def request(x$1: Long): Unit = () - override def cancel(): Unit = runtime.unsafeRun(cancelledLatch.succeed(()).unit) + override def cancel(): Unit = cancelledLatch.unsafeDone(UIO.unit) } probe = new Publisher[Int] { override def subscribe(subscriber: Subscriber[_ >: Int]): Unit = - runtime.unsafeRun(subscriberP.succeed(subscriber).unit) + subscriberP.unsafeDone(UIO.succeedNow(subscriber)) } fiber <- probe.toStream(bufferSize).run(Sink.drain).fork subscriber <- subscriberP.await diff --git a/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala b/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala index 7c52b3d..0481e99 100644 --- a/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala +++ b/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala @@ -49,42 +49,40 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec { } yield assert(r)(succeeds(equalTo(List(1, 2, 3, 4, 5)))) ), test("cancels subscription on interruption after subscription")( - ZIO.blocking( - for { - (publisher, subscribed, _, canceled) <- makePublisherProbe - fiber <- Sink.drain - .toSubscriber() - .use { case (subscriber, _) => UIO(publisher.subscribe(subscriber)) *> UIO.never } - .fork - _ <- - Live.live( - assertM(subscribed.await.timeoutFail("timeout awaiting subscribe.")(500.millis).exit)(succeeds(isUnit)) - ) - _ <- fiber.interrupt - _ <- Live.live( - assertM(canceled.await.timeoutFail("timeout awaiting cancel.")(500.millis).exit)(succeeds(isUnit)) - ) - r <- fiber.join.exit - } yield assert(r)(isInterrupted) - ) + for { + (publisher, subscribed, _, canceled) <- makePublisherProbe + fiber <- Sink + .foreachChunk[Any, Nothing, Int](_ => ZIO.yieldNow) + .toSubscriber() + .use { case (subscriber, _) => UIO(publisher.subscribe(subscriber)) *> UIO.never } + .fork + _ <- + Live.live( + assertM(subscribed.await.timeoutFail("timeout awaiting subscribe.")(500.millis).exit)(succeeds(isUnit)) + ) + _ <- fiber.interrupt + _ <- Live.live( + assertM(canceled.await.timeoutFail("timeout awaiting cancel.")(500.millis).exit)(succeeds(isUnit)) + ) + r <- fiber.join.exit + } yield assert(r)(isInterrupted) ), test("cancels subscription on interruption during consuption")( - ZIO.blocking( - for { - (publisher, subscribed, requested, canceled) <- makePublisherProbe - fiber <- Sink.drain - .toSubscriber() - .use { case (subscriber, _) => - Task.attemptBlockingInterrupt(publisher.subscribe(subscriber)) *> UIO.never - } - .fork - _ <- assertM(subscribed.await.exit)(succeeds(isUnit)) - _ <- assertM(requested.await.exit)(succeeds(isUnit)) - _ <- fiber.interrupt - _ <- assertM(canceled.await.exit)(succeeds(isUnit)) - r <- fiber.join.exit - } yield assert(r)(isInterrupted) - ) + for { + (publisher, subscribed, requested, canceled) <- makePublisherProbe + fiber <- Sink + .foreachChunk[Any, Nothing, Int](_ => ZIO.yieldNow) + .toSubscriber() + .use { case (subscriber, _) => + Task.attemptBlockingInterrupt(publisher.subscribe(subscriber)) *> UIO.never + } + .fork + _ <- assertM(subscribed.await.exit)(succeeds(isUnit)) + _ <- assertM(requested.await.exit)(succeeds(isUnit)) + _ <- fiber.interrupt + _ <- assertM(canceled.await.exit)(succeeds(isUnit)) + r <- fiber.join.exit + } yield assert(r)(isInterrupted) ), suite("passes all required and optional TCK tests")( tests: _* @@ -93,7 +91,6 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec { val makePublisherProbe = for { - runtime <- ZIO.runtime[Any] subscribed <- Promise.make[Nothing, Unit] requested <- Promise.make[Nothing, Unit] canceled <- Promise.make[Nothing, Unit] @@ -102,14 +99,14 @@ object SinkToSubscriberSpec extends DefaultRunnableSpec { s.onSubscribe( new Subscription { override def request(n: Long): Unit = { - runtime.unsafeRun(requested.succeed(()).unit) + requested.unsafeDone(UIO.unit) (1 to n.toInt).foreach(s.onNext(_)) } override def cancel(): Unit = - runtime.unsafeRun(canceled.succeed(()).unit) + canceled.unsafeDone(UIO.unit) } ) - runtime.unsafeRun(subscribed.succeed(()).unit) + subscribed.unsafeDone(UIO.unit) } } } yield (publisher, subscribed, requested, canceled)