Skip to content

Commit

Permalink
Merge pull request #428 from typelevel/wip-fix-stream-ingest-series0.x
Browse files Browse the repository at this point in the history
misc: fix stream ingest + update dependencies.
  • Loading branch information
ahjohannessen authored Oct 28, 2021
2 parents f65e636 + 6afd62e commit 9e248fc
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 42 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Dependencies._

inThisBuild(
List(
scalaVersion := "2.13.5",
scalaVersion := "2.13.6",
organization := "org.lyranthe.fs2-grpc",
git.useGitDescribe := true,
scmInfo := Some(ScmInfo(url("https://github.com/fiadliel/fs2-grpc"), "git@github.com:fiadliel/fs2-grpc.git"))
Expand Down Expand Up @@ -37,15 +37,15 @@ lazy val root = project
lazy val `java-gen` = project
.enablePlugins(GitVersioning)
.settings(
scalaVersion := "2.12.13",
scalaVersion := "2.12.15",
publishTo := sonatypePublishToBundle.value,
libraryDependencies += scalaPbCompiler
)

lazy val `sbt-java-gen` = project
.enablePlugins(GitVersioning, BuildInfoPlugin)
.settings(
scalaVersion := "2.12.13",
scalaVersion := "2.12.15",
publishTo := sonatypePublishToBundle.value,
sbtPlugin := true,
buildInfoPackage := "org.lyranthe.fs2_grpc.buildinfo",
Expand All @@ -65,8 +65,8 @@ lazy val `sbt-java-gen` = project
lazy val `java-runtime` = project
.enablePlugins(GitVersioning)
.settings(
scalaVersion := "2.13.5",
crossScalaVersions := List(scalaVersion.value, "2.12.13"),
scalaVersion := "2.13.6",
crossScalaVersions := List(scalaVersion.value, "2.12.15"),
publishTo := sonatypePublishToBundle.value,
libraryDependencies ++= List(fs2, catsEffect, grpcApi) ++ List(grpcNetty, catsEffectLaws, minitest).map(_ % Test),
mimaPreviousArtifacts := Set(organization.value %% name.value % "0.3.0"),
Expand Down
4 changes: 2 additions & 2 deletions java-runtime/src/main/scala/client/Fs2ClientCall.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] (

def unaryToStreamingCall(message: Request, headers: Metadata)(implicit F: ConcurrentEffect[F]): Stream[F, Response] =
Stream
.bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers))(
.bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers) <* request(1))(
handleExitCase(cancelComplete = true)
)
.flatMap(Stream.eval_(sendSingleMessage(message)) ++ _.stream.adaptError(ea))
Expand All @@ -76,7 +76,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] (
F: ConcurrentEffect[F]
): Stream[F, Response] =
Stream
.bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers))(
.bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers) <* request(1))(
handleExitCase(cancelComplete = true)
)
.flatMap(_.stream.adaptError(ea).concurrently(sendStream(messages)))
Expand Down
30 changes: 11 additions & 19 deletions java-runtime/src/main/scala/client/StreamIngest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package client

import cats.syntax.all._
import cats.effect._
import cats.effect.concurrent.Ref
import fs2.Stream
import fs2.concurrent.InspectableQueue

Expand All @@ -20,40 +19,33 @@ private[client] object StreamIngest {
request: Int => F[Unit],
prefetchN: Int
): F[StreamIngest[F, T]] =
(Ref.of[F, Int](prefetchN), InspectableQueue.unbounded[F, Either[GrpcStatus, T]])
.mapN((d, q) => create[F, T](request, prefetchN, d, q)) <* request(prefetchN)
InspectableQueue
.unbounded[F, Either[GrpcStatus, T]]
.map(q => create[F, T](request, prefetchN, q))

def create[F[_], T](
request: Int => F[Unit],
prefetchN: Int,
demand: Ref[F, Int],
queue: InspectableQueue[F, Either[GrpcStatus, T]]
)(implicit F: ConcurrentEffect[F]): StreamIngest[F, T] = new StreamIngest[F, T] {

val limit: Int =
math.max(1, prefetchN)

val ensureMessages: F[Unit] =
queue.getSize.flatMap(qs => request(1).whenA(qs < limit))

def onMessage(msg: T): F[Unit] =
decreaseDemandBy(1) *> queue.enqueue1(msg.asRight)
queue.enqueue1(msg.asRight) *> ensureMessages

def onClose(status: GrpcStatus): F[Unit] =
queue.enqueue1(status.asLeft)

def ensureMessages(nextWhenEmpty: Int): F[Unit] = (demand.get, queue.getSize)
.mapN((cd, qs) => fetch(nextWhenEmpty).whenA((cd + qs) < 1))
.flatten

def decreaseDemandBy(n: Int): F[Unit] =
demand.update(d => math.max(d - n, 0))

def increaseDemandBy(n: Int): F[Unit] =
demand.update(_ + n)

def fetch(n: Int): F[Unit] =
request(n) *> increaseDemandBy(n)

val messages: Stream[F, T] = {

val run: F[Option[T]] =
queue.dequeue1.flatMap {
case Right(v) => v.some.pure[F] <* ensureMessages(prefetchN)
case Right(v) => ensureMessages *> v.some.pure[F]
case Left(GrpcStatus(status, trailers)) =>
if (!status.isOk) F.raiseError(status.asRuntimeException(trailers))
else none[T].pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Fs2StreamServerCallListener[F[_], Request, Response] private (
val call: Fs2ServerCall[F, Request, Response]
)(implicit F: Effect[F])
extends ServerCall.Listener[Request]
with Fs2ServerCallListener[F, Stream[F, ?], Request, Response] {
with Fs2ServerCallListener[F, Stream[F, *], Request, Response] {

override def onCancel(): Unit = {
isCancelled.complete(()).unsafeRun()
Expand Down
4 changes: 2 additions & 2 deletions java-runtime/src/test/scala/client/ClientSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ object ClientSuite extends SimpleTestSuite {
ec.tick()
assertEquals(result.value, Some(Success(List(1, 2, 3))))
assertEquals(dummy.messagesSent.size, 1)
assertEquals(dummy.requested, 2)
assertEquals(dummy.requested, 3)
}

test("single message to unaryToStreaming - back pressure") {
Expand All @@ -214,7 +214,7 @@ object ClientSuite extends SimpleTestSuite {

assertEquals(result.value, Some(Success(List(1, 2))))
assertEquals(dummy.messagesSent.size, 1)
assertEquals(dummy.requested, 1)
assertEquals(dummy.requested, 2)
}

test("stream to streamingToStreaming") {
Expand Down
7 changes: 4 additions & 3 deletions java-runtime/src/test/scala/client/StreamIngestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ object StreamIngestSuite extends SimpleTestSuite {
}

val test =
run(prefetchN = 1, takeN = 1, expectedReq = 2 /* queue becomes empty */, expectedCount = 1) *>
run(prefetchN = 1, takeN = 1, expectedReq = 1, expectedCount = 1) *>
run(prefetchN = 2, takeN = 1, expectedReq = 2, expectedCount = 1) *>
run(prefetchN = 1024, takeN = 1024, expectedReq = 2048 /* queue becomes empty */, expectedCount = 1024) *>
run(prefetchN = 1024, takeN = 1023, expectedReq = 1024, expectedCount = 1023)
run(prefetchN = 2, takeN = 2, expectedReq = 3, expectedCount = 2) *>
run(prefetchN = 1024, takeN = 1024, expectedReq = 2047, expectedCount = 1024) *>
run(prefetchN = 1024, takeN = 1023, expectedReq = 2046, expectedCount = 1023)

test.unsafeRunSync()

Expand Down
12 changes: 6 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ object Dependencies {

val grpc = scalapb.compiler.Version.grpcJavaVersion
val scalaPb = scalapb.compiler.Version.scalapbVersion
val fs2 = "2.5.5"
val catsEffect = "2.5.0"
val minitest = "2.9.5"
val fs2 = "2.5.10"
val catsEffect = "2.5.4"
val minitest = "2.9.6"

val kindProjector = "0.10.3"
val sbtProtoc = "1.0.3"
val kindProjector = "0.13.2"
val sbtProtoc = "1.0.4"

}

Expand All @@ -31,6 +31,6 @@ object Dependencies {

val sbtProtoc = "com.thesamet" % "sbt-protoc" % versions.sbtProtoc
val scalaPbCompiler = "com.thesamet.scalapb" %% "compilerplugin" % versions.scalaPb
val kindProjector = "org.typelevel" %% "kind-projector" % versions.kindProjector cross CrossVersion.binary
val kindProjector = "org.typelevel" %% "kind-projector" % versions.kindProjector cross CrossVersion.full

}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.0
sbt.version=1.5.5
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.10.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0")

addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.8.1")

addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.16")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.20")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.2"
libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.6"

0 comments on commit 9e248fc

Please sign in to comment.