This library provides an interoperability layer for reactive streams.
ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream
to org.reactivestreams.Publisher
and from zio.stream.Sink
to org.reactivestreams.Subscriber
and vice versa. Simply import import zio.interop.reactiveStreams._
to make the
conversions available.
First, let's get a few imports out of the way.
import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactiveStreams._
import zio.stream._
val runtime = new DefaultRuntime {}
We use the following Publisher
and Subscriber
for the examples:
val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}
A Publisher
used as a Stream
buffers up to qSize
elements. If possible, qSize
should be
a power of two for best performance. The default is 16.
val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
streamFromPublisher.run(Sink.collectAll[Integer])
)
When running a Stream
to a Subscriber
, a side channel is needed for signalling failures.
For this reason toSink
returns a tuple of Promise
and Sink
. The Promise
must be failed
on Stream
failure. The type parameter on toSink
is the error type of the Stream.
val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
}
)
val stream = Stream.range(3, 13)
runtime.unsafeRun(
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
)
toSubscriber
returns a Subscriber
and an IO
which completes with the result of running the
Sink
or the error if the Publisher
fails.
A Sink
used as a Subscriber
buffers up to qSize
elements. If possible, qSize
should be
a power of two for best performance. The default is 16.
val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
)