Skip to content

Commit

Permalink
feat: one less daemon fiber (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
runtologist authored Oct 25, 2021
1 parent 3e6271d commit 47d92fa
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ project/secrets.tar.xz
project/travis-deploy-key
project/zecret
target
test-output
test-output/
metals.sbt
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ on `Stream` failure. The type parameter on `toSink` is the error type of *the St
val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
asSink.use { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
}
)
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/zio/interop/reactivestreams/Adapters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ object Adapters {

def subscriberToSink[E <: Throwable, I](
subscriber: Subscriber[I]
): UIO[(Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] =
): ZManaged[Any, Nothing, (Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] =
for {
runtime <- ZIO.runtime[Any]
demand <- Queue.unbounded[Long]
error <- Promise.make[E, Nothing]
runtime <- ZIO.runtime[Any].toManaged
demand <- Queue.unbounded[Long].toManaged
error <- Promise.make[E, Nothing].toManaged
subscription = createSubscription(subscriber, demand, runtime)
_ <- UIO(subscriber.onSubscribe(subscription))
_ <- error.await.catchAll(t => UIO(subscriber.onError(t)) *> demand.shutdown).forkDaemon
_ <- UIO(subscriber.onSubscribe(subscription)).toManaged
_ <- error.await.catchAll(t => UIO(subscriber.onError(t)) *> demand.shutdown).toManaged.fork
} yield (error, demandUnfoldSink(subscriber, demand))

def publisherToStream[O](publisher: Publisher[O], bufferSize: Int): ZStream[Any, Throwable, O] = {
Expand Down
21 changes: 12 additions & 9 deletions src/main/scala/zio/interop/reactivestreams/package.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package zio.interop

import org.reactivestreams.{ Publisher, Subscriber }
import zio.stream.{ ZSink, ZStream }
import zio.{ IO, Promise, UIO, ZIO, ZManaged }
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import zio.IO
import zio.Promise
import zio.ZIO
import zio.ZManaged
import zio.stream.ZSink
import zio.stream.ZStream

package object reactivestreams {

Expand Down Expand Up @@ -46,14 +51,12 @@ package object reactivestreams {
* ```
* val subscriber: Subscriber[Int] = ???
* val stream: Stream[Any, Throwable, Int] = ???
* for {
* sinkError <- subscriberToSink(subscriber)
* (error, sink) = sinkError
* _ <- stream.run(sink).catchAll(e => error.fail(e)).fork
* } yield ()
* subscriber.toSink.use { case (error, sink) =>
* stream.run(sink).catchAll(e => error.fail(e))
* }
* ```
*/
def toSink[E <: Throwable]: UIO[(Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] =
def toSink[E <: Throwable]: ZManaged[Any, Nothing, (Promise[E, Nothing], ZSink[Any, Nothing, I, I, Unit])] =
Adapters.subscriberToSink(subscriber)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package zio.interop.reactivestreams

import java.lang.reflect.InvocationTargetException
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment }
import org.reactivestreams.tck.PublisherVerification
import org.reactivestreams.tck.TestEnvironment
import org.testng.annotations.Test
import zio.Task
import zio.UIO
import zio.ZIO
import zio.stream.Stream
import zio.test._
import zio.test.Assertion._
import zio.test._

import java.lang.reflect.InvocationTargetException

object StreamToPublisherSpec extends DefaultRunnableSpec {
override def spec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@ package zio.interop.reactivestreams

import org.reactivestreams.tck.TestEnvironment
import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport
import scala.jdk.CollectionConverters._
import zio.{ Task, UIO }
import zio.IO
import zio.Task
import zio.UIO
import zio.stream.Stream
import zio.test._
import zio.test.Assertion._
import zio.IO
import zio.test._

import scala.jdk.CollectionConverters._

object SubscriberToSinkSpec extends DefaultRunnableSpec {
override def spec =
suite("Converting a `Subscriber` to a `Sink`")(
test("works on the happy path") {
for {
probe <- makeSubscriber
errorSink <- probe.underlying.toSink[Throwable]
(error, sink) = errorSink
fiber <- Stream.fromIterable(seq).run(sink).fork
_ <- probe.request(length + 1)
elements <- probe.nextElements(length).exit
completion <- probe.expectCompletion.exit
_ <- fiber.join
} yield assert(elements)(succeeds(equalTo(seq))) && assert(completion)(succeeds(isUnit))
makeSubscriber.flatMap(probe =>
probe.underlying
.toSink[Throwable]
.use { case (_, sink) =>
for {
fiber <- Stream.fromIterable(seq).run(sink).fork
_ <- probe.request(length + 1)
elements <- probe.nextElements(length).exit
completion <- probe.expectCompletion.exit
_ <- fiber.join
} yield assert(elements)(succeeds(equalTo(seq))) && assert(completion)(succeeds(isUnit))
}
)
},
test("transports errors") {
for {
probe <- makeSubscriber
errorSink <- probe.underlying.toSink[Throwable]
(error, sink) = errorSink
fiber <- (Stream.fromIterable(seq) ++
Stream.fail(e)).run(sink).catchAll(t => error.fail(t)).fork
_ <- probe.request(length + 1)
elements <- probe.nextElements(length).exit
err <- probe.expectError.exit
_ <- fiber.join
} yield assert(elements)(succeeds(equalTo(seq))) && assert(err)(succeeds(equalTo(e)))
makeSubscriber.flatMap(probe =>
probe.underlying
.toSink[Throwable]
.use { case (error, sink) =>
for {
fiber <- (Stream.fromIterable(seq) ++ Stream.fail(e)).run(sink).catchAll(t => error.fail(t)).fork
_ <- probe.request(length + 1)
elements <- probe.nextElements(length).exit
err <- probe.expectError.exit
_ <- fiber.join
} yield assert(elements)(succeeds(equalTo(seq))) && assert(err)(succeeds(equalTo(e)))
}
)
}
)

Expand Down

0 comments on commit 47d92fa

Please sign in to comment.