Skip to content

Commit

Permalink
Fix: publisherToStream freezes on stream end inside Stream.flatten (#96)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Schenk <simon.schenk@risk42.com>
  • Loading branch information
runtologist and Simon Schenk authored Mar 6, 2020
1 parent 8eccb43 commit fc7b16b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
16 changes: 11 additions & 5 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,17 @@ object Adapters {
case false => demand.get.flatMap(d => if (d < capacity) requestAndTake else take)
}
case _ => take
}.orElse(
completion.poll.flatMap {
case None => Pull.end
case Some(io) => io.foldM(e => Pull.fail(e), _ => Pull.end)
}
}.foldCauseM(
cause =>
if (cause.interruptedOnly) {
completion.poll.flatMap {
case None => Pull.end
case Some(io) => io.foldM(e => Pull.fail(e), _ => Pull.end)
}
} else {
ZIO.halt(cause)
},
UIO.succeed(_)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import org.reactivestreams.Subscription
import org.reactivestreams.tck.TestEnvironment
import org.reactivestreams.tck.TestEnvironment.ManualPublisher
import zio.{ Exit, Task, UIO, ZIO }
import zio.duration._
import zio.stream.Sink
import zio.stream.Stream
import zio.test._
import zio.test.Assertion._

Expand All @@ -23,7 +25,7 @@ object PublisherToStreamSpec extends DefaultRunnableSpec {
testM("fails with an eventually failing `Publisher`") {
assertM(publish(seq, Some(e)))(fails(equalTo(e)))
},
testM("Does not fail a fiber on failing `Publisher`") {
testM("does not fail a fiber on failing `Publisher`") {
val publisher = new Publisher[Int] {
override def subscribe(s: Subscriber[_ >: Int]): Unit =
s.onSubscribe(
Expand All @@ -40,6 +42,24 @@ object PublisherToStreamSpec extends DefaultRunnableSpec {
assert(exit)(fails(anything)) && assert(fibersFailed)(equalTo(0))
}
},
testM("does not freeze on stream end") {
for {
probe <- makeProbe
fiber <- Stream
.fromEffect(
UIO(
probe.toStream()
)
)
.flatMap(identity)
.run(Sink.collectAll[Int])
.fork
_ <- Task(probe.expectRequest())
_ <- UIO(probe.sendNext(1))
_ <- UIO(probe.sendCompletion)
r <- fiber.join
} yield assert(r)(equalTo(List(1)))
} @@ TestAspect.timeout(1000.millis),
testM("cancels subscription when interrupted before subscription") {
for {
probe <- makeProbe
Expand Down

0 comments on commit fc7b16b

Please sign in to comment.