Skip to content

Commit

Permalink
Ignore publisher after cancelling subscription (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeal18 authored May 12, 2022
1 parent 076db69 commit ef13c84
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
14 changes: 8 additions & 6 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ object Adapters {

val subscriber =
new Subscriber[A] {

override def onSubscribe(s: Subscription): Unit =
if (s == null) {
val e = new NullPointerException("s was null in onSubscribe")
Expand All @@ -154,7 +153,7 @@ object Adapters {
override def onNext(t: A): Unit =
if (t == null) {
val e = new NullPointerException("t was null in onNext")
runtime.unsafeRun(q.offer(Exit.fail(Some(e))))
runtime.unsafeRunSync(q.offer(Exit.fail(Some(e))))
throw e
} else {
runtime.unsafeRunSync(q.offer(Exit.succeed(t)))
Expand All @@ -164,14 +163,17 @@ object Adapters {
override def onError(e: Throwable): Unit =
if (e == null) {
val e = new NullPointerException("t was null in onError")
runtime.unsafeRun(q.offer(Exit.fail(Some(e))))
runtime.unsafeRunSync(q.offer(Exit.fail(Some(e))))
throw e
} else {
runtime.unsafeRun(q.offer(Exit.fail(Some(e))).unit)
runtime.unsafeRunSync(q.offer(Exit.fail(Some(e))))
()
}

override def onComplete(): Unit =
runtime.unsafeRun(q.offer(Exit.fail(None)).unit)
override def onComplete(): Unit = {
runtime.unsafeRunSync(q.offer(Exit.fail(None)))
()
}
}
(subscriber, p)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@ object PublisherToStreamSpec extends DefaultRunnableSpec {
fails(anything)
)
)
},
testM("ignores publisher calls after stream ending") {
withProbe(probe =>
assertM((for {
fiber <- probe.toStream(bufferSize).runHead.fork
demand <- Task(probe.expectRequest())
_ <- Task(probe.sendNext(0))
_ <- Task(probe.expectCancelling())

_ <- Task((1 to demand.toInt).foreach(i => probe.sendNext(i)))
_ <- Task(probe.sendCompletion())
_ <- Task(probe.sendError(e))

_ <- fiber.join
} yield ()).run)(
succeeds(isUnit)
)
)
}
)

Expand Down

0 comments on commit ef13c84

Please sign in to comment.