Skip to content

Latest commit



118 lines (80 loc) · 5.22 KB

File metadata and controls

118 lines (80 loc) · 5.22 KB

Stream converters

Stream converters convert Akka Stream Sources, Flows and Sinks to FS2 Streams, Pipes and Sinks, respectively, and vice versa. They are provided by the

resolvers += Resolver.bintrayRepo("krasserm", "maven")

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.11-RC1"

artifact and can be imported with:

import streamz.converter._

They require the following implicits in scope:


import scala.concurrent.ExecutionContext

implicit val system: ActorSystem = ActorSystem("example")

implicit val executionContext: ExecutionContext = system.dispatcher
implicit val materializer: Materializer = Materializer.createMaterializer(system)

Conversions from Akka Stream to FS2


From With To
Graph[SourceShape[A], NotUsed] toStream[F] Stream[F, A]
Graph[SourceShape[A], M] toStreamMat[F] F[(Stream[F, A], M)]
Graph[SinkShape[A], NotUsed] toSink[F] Sink[F, A]
Graph[SinkShape[A], M] toSinkMat[F] F[(Sink[F, A], M)]
Graph[FlowShape[A, B], NotUsed] toPipe[F] Pipe[F, A, B]
Graph[FlowShape[A, B], M] toPipeMat[F] F[(Pipe[F, A, B], M)]
Graph[FlowShape[A, B], Future[M]] toPipeMatWithResult[F] F[Pipe[F, A, Either[Throwable, M]]]

Examples (source code):

import{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource }
import akka.{ Done, NotUsed }

import cats.effect.IO
import fs2.{ Pipe, Stream }

import scala.collection.immutable.Seq
import scala.concurrent.Future

val numbers: Seq[Int] = 1 to 10
def f(i: Int) = List(s"$i-1", s"$i-2")

val aSink1: AkkaSink[Int, Future[Done]] = AkkaSink.foreach[Int](println)
val fSink1: Pipe[IO, Int, Unit] = aSink1.toPipe[IO]

val aSource1: AkkaSource[Int, NotUsed] = AkkaSource(numbers)
val fStream1: Stream[IO, Int] = aSource1.toStream[IO]

val aFlow1: AkkaFlow[Int, String, NotUsed] = AkkaFlow[Int].mapConcat(f)
val fPipe1: Pipe[IO, Int, String] = aFlow1.toPipe[IO] // prints numbers
assert(fStream1.compile.toVector.unsafeRunSync() == numbers)
assert(fStream1.through(fPipe1).compile.toVector.unsafeRunSync() == numbers.flatMap(f))

aSink1, aSource1 and aFlow1 are materialized when the IOs of the FS2 streams that compose fSink1, fStream1 and fPipe1 are run. Their materialized value can be obtained via the onMaterialization callback that is a parameter of toStream(onMaterialization: M => Unit), toSink(onMaterialization: M => Unit) and toPipe(onMaterialization: M => Unit) (not shown in the examples).

Conversions from FS2 to Akka Stream


From With To
Stream[F[_], A] toSource() Graph[SourceShape[A], NotUsed]
Sink[F[_], A] toSink() Graph[SinkShape[A], Future[Done]]
Pipe[F[_], A, B] toFlow() Graph[FlowShape[A, B], NotUsed]

Examples (source code):

import{ Flow => AkkaFlow, Sink => AkkaSink, Source => AkkaSource, Keep }
import akka.{ Done, NotUsed }

import fs2._

import scala.collection.immutable.Seq
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

val numbers: Seq[Int] = 1 to 10
def g(i: Int) = i + 10

val fSink2: Pipe[IO, Int, Unit] = s => => IO(println(i)))
val aSink2: AkkaSink[Int, Future[Done]] = AkkaSink.fromGraph(fSink2.toSink)

val fStream2: Stream[IO, Int] = Stream.emits(numbers).covary[IO]
val aSource2: AkkaSource[Int, NotUsed] = AkkaSource.fromGraph(fStream2.toSource)

val fpipe2: Pipe[IO, Int, Int] = s => s.evalTap(i => IO(println(i)))
val aFlow2: AkkaFlow[Int, Int, NotUsed] = AkkaFlow.fromGraph(fpipe2.toFlow)

aSource2.toMat(aSink2)(Keep.right).run() // prints numbers
assert(Await.result(aSource2.toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) == numbers)
assert(Await.result(aSource2.via(aFlow2).toMat(AkkaSink.seq)(Keep.right).run(), 5.seconds) ==

fSink2, fStream2 and fPipe2 are run when the Akka Streams that compose aSink2, aSource2 and aFlow2 are materialized.

Backpressure, cancellation, completion and errors

Downstream demand and cancellation as well as upstream completion and error signals are properly mediated between Akka Stream and FS2 (see also ConverterSpec).