From 6afd62e652171690212d4320d608c054abfda3a7 Mon Sep 17 00:00:00 2001 From: Alex Henning Johannessen Date: Thu, 28 Oct 2021 08:42:46 +0100 Subject: [PATCH] misc: fix stream ingest + update dependencies. - see https://github.com/typelevel/fs2-grpc/commit/48e08db4b1cd9daf6e82013a515768663ba068b2 and https://github.com/typelevel/fs2-grpc/commit/51d3496dca67ca648bac9a98beac7e332e71cadf - update various dependencies. --- build.sbt | 10 +++---- .../src/main/scala/client/Fs2ClientCall.scala | 4 +-- .../src/main/scala/client/StreamIngest.scala | 30 +++++++------------ .../server/Fs2StreamServerCallListener.scala | 2 +- .../src/test/scala/client/ClientSuite.scala | 4 +-- .../test/scala/client/StreamIngestSuite.scala | 7 +++-- project/Dependencies.scala | 12 ++++---- project/build.properties | 2 +- project/plugins.sbt | 6 ++-- 9 files changed, 35 insertions(+), 42 deletions(-) diff --git a/build.sbt b/build.sbt index 02d8811f..0fe32b6e 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import Dependencies._ inThisBuild( List( - scalaVersion := "2.13.5", + scalaVersion := "2.13.6", organization := "org.lyranthe.fs2-grpc", git.useGitDescribe := true, scmInfo := Some(ScmInfo(url("https://github.com/fiadliel/fs2-grpc"), "git@github.com:fiadliel/fs2-grpc.git")) @@ -37,7 +37,7 @@ lazy val root = project lazy val `java-gen` = project .enablePlugins(GitVersioning) .settings( - scalaVersion := "2.12.13", + scalaVersion := "2.12.15", publishTo := sonatypePublishToBundle.value, libraryDependencies += scalaPbCompiler ) @@ -45,7 +45,7 @@ lazy val `java-gen` = project lazy val `sbt-java-gen` = project .enablePlugins(GitVersioning, BuildInfoPlugin) .settings( - scalaVersion := "2.12.13", + scalaVersion := "2.12.15", publishTo := sonatypePublishToBundle.value, sbtPlugin := true, buildInfoPackage := "org.lyranthe.fs2_grpc.buildinfo", @@ -65,8 +65,8 @@ lazy val `sbt-java-gen` = project lazy val `java-runtime` = project .enablePlugins(GitVersioning) .settings( - scalaVersion := "2.13.5", - crossScalaVersions := List(scalaVersion.value, "2.12.13"), + scalaVersion := "2.13.6", + crossScalaVersions := List(scalaVersion.value, "2.12.15"), publishTo := sonatypePublishToBundle.value, libraryDependencies ++= List(fs2, catsEffect, grpcApi) ++ List(grpcNetty, catsEffectLaws, minitest).map(_ % Test), mimaPreviousArtifacts := Set(organization.value %% name.value % "0.3.0"), diff --git a/java-runtime/src/main/scala/client/Fs2ClientCall.scala b/java-runtime/src/main/scala/client/Fs2ClientCall.scala index 87257ce2..1650aedf 100644 --- a/java-runtime/src/main/scala/client/Fs2ClientCall.scala +++ b/java-runtime/src/main/scala/client/Fs2ClientCall.scala @@ -67,7 +67,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] ( def unaryToStreamingCall(message: Request, headers: Metadata)(implicit F: ConcurrentEffect[F]): Stream[F, Response] = Stream - .bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers))( + .bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers) <* request(1))( handleExitCase(cancelComplete = true) ) .flatMap(Stream.eval_(sendSingleMessage(message)) ++ _.stream.adaptError(ea)) @@ -76,7 +76,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] ( F: ConcurrentEffect[F] ): Stream[F, Response] = Stream - .bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers))( + .bracketCase(startListener(Fs2StreamClientCallListener[F, Response](request, prefetchN), headers) <* request(1))( handleExitCase(cancelComplete = true) ) .flatMap(_.stream.adaptError(ea).concurrently(sendStream(messages))) diff --git a/java-runtime/src/main/scala/client/StreamIngest.scala b/java-runtime/src/main/scala/client/StreamIngest.scala index 7308c603..524f4964 100644 --- a/java-runtime/src/main/scala/client/StreamIngest.scala +++ b/java-runtime/src/main/scala/client/StreamIngest.scala @@ -4,7 +4,6 @@ package client import cats.syntax.all._ import cats.effect._ -import cats.effect.concurrent.Ref import fs2.Stream import fs2.concurrent.InspectableQueue @@ -20,40 +19,33 @@ private[client] object StreamIngest { request: Int => F[Unit], prefetchN: Int ): F[StreamIngest[F, T]] = - (Ref.of[F, Int](prefetchN), InspectableQueue.unbounded[F, Either[GrpcStatus, T]]) - .mapN((d, q) => create[F, T](request, prefetchN, d, q)) <* request(prefetchN) + InspectableQueue + .unbounded[F, Either[GrpcStatus, T]] + .map(q => create[F, T](request, prefetchN, q)) def create[F[_], T]( request: Int => F[Unit], prefetchN: Int, - demand: Ref[F, Int], queue: InspectableQueue[F, Either[GrpcStatus, T]] )(implicit F: ConcurrentEffect[F]): StreamIngest[F, T] = new StreamIngest[F, T] { + val limit: Int = + math.max(1, prefetchN) + + val ensureMessages: F[Unit] = + queue.getSize.flatMap(qs => request(1).whenA(qs < limit)) + def onMessage(msg: T): F[Unit] = - decreaseDemandBy(1) *> queue.enqueue1(msg.asRight) + queue.enqueue1(msg.asRight) *> ensureMessages def onClose(status: GrpcStatus): F[Unit] = queue.enqueue1(status.asLeft) - def ensureMessages(nextWhenEmpty: Int): F[Unit] = (demand.get, queue.getSize) - .mapN((cd, qs) => fetch(nextWhenEmpty).whenA((cd + qs) < 1)) - .flatten - - def decreaseDemandBy(n: Int): F[Unit] = - demand.update(d => math.max(d - n, 0)) - - def increaseDemandBy(n: Int): F[Unit] = - demand.update(_ + n) - - def fetch(n: Int): F[Unit] = - request(n) *> increaseDemandBy(n) - val messages: Stream[F, T] = { val run: F[Option[T]] = queue.dequeue1.flatMap { - case Right(v) => v.some.pure[F] <* ensureMessages(prefetchN) + case Right(v) => ensureMessages *> v.some.pure[F] case Left(GrpcStatus(status, trailers)) => if (!status.isOk) F.raiseError(status.asRuntimeException(trailers)) else none[T].pure[F] diff --git a/java-runtime/src/main/scala/server/Fs2StreamServerCallListener.scala b/java-runtime/src/main/scala/server/Fs2StreamServerCallListener.scala index 6add6bf5..50daf5ba 100644 --- a/java-runtime/src/main/scala/server/Fs2StreamServerCallListener.scala +++ b/java-runtime/src/main/scala/server/Fs2StreamServerCallListener.scala @@ -15,7 +15,7 @@ class Fs2StreamServerCallListener[F[_], Request, Response] private ( val call: Fs2ServerCall[F, Request, Response] )(implicit F: Effect[F]) extends ServerCall.Listener[Request] - with Fs2ServerCallListener[F, Stream[F, ?], Request, Response] { + with Fs2ServerCallListener[F, Stream[F, *], Request, Response] { override def onCancel(): Unit = { isCancelled.complete(()).unsafeRun() diff --git a/java-runtime/src/test/scala/client/ClientSuite.scala b/java-runtime/src/test/scala/client/ClientSuite.scala index 1fd2832c..e5b2c478 100644 --- a/java-runtime/src/test/scala/client/ClientSuite.scala +++ b/java-runtime/src/test/scala/client/ClientSuite.scala @@ -187,7 +187,7 @@ object ClientSuite extends SimpleTestSuite { ec.tick() assertEquals(result.value, Some(Success(List(1, 2, 3)))) assertEquals(dummy.messagesSent.size, 1) - assertEquals(dummy.requested, 2) + assertEquals(dummy.requested, 3) } test("single message to unaryToStreaming - back pressure") { @@ -214,7 +214,7 @@ object ClientSuite extends SimpleTestSuite { assertEquals(result.value, Some(Success(List(1, 2)))) assertEquals(dummy.messagesSent.size, 1) - assertEquals(dummy.requested, 1) + assertEquals(dummy.requested, 2) } test("stream to streamingToStreaming") { diff --git a/java-runtime/src/test/scala/client/StreamIngestSuite.scala b/java-runtime/src/test/scala/client/StreamIngestSuite.scala index b5995605..a8ce0829 100644 --- a/java-runtime/src/test/scala/client/StreamIngestSuite.scala +++ b/java-runtime/src/test/scala/client/StreamIngestSuite.scala @@ -28,10 +28,11 @@ object StreamIngestSuite extends SimpleTestSuite { } val test = - run(prefetchN = 1, takeN = 1, expectedReq = 2 /* queue becomes empty */, expectedCount = 1) *> + run(prefetchN = 1, takeN = 1, expectedReq = 1, expectedCount = 1) *> run(prefetchN = 2, takeN = 1, expectedReq = 2, expectedCount = 1) *> - run(prefetchN = 1024, takeN = 1024, expectedReq = 2048 /* queue becomes empty */, expectedCount = 1024) *> - run(prefetchN = 1024, takeN = 1023, expectedReq = 1024, expectedCount = 1023) + run(prefetchN = 2, takeN = 2, expectedReq = 3, expectedCount = 2) *> + run(prefetchN = 1024, takeN = 1024, expectedReq = 2047, expectedCount = 1024) *> + run(prefetchN = 1024, takeN = 1023, expectedReq = 2046, expectedCount = 1023) test.unsafeRunSync() diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 06290d1b..4c6bc20e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,12 +6,12 @@ object Dependencies { val grpc = scalapb.compiler.Version.grpcJavaVersion val scalaPb = scalapb.compiler.Version.scalapbVersion - val fs2 = "2.5.5" - val catsEffect = "2.5.0" - val minitest = "2.9.5" + val fs2 = "2.5.10" + val catsEffect = "2.5.4" + val minitest = "2.9.6" - val kindProjector = "0.10.3" - val sbtProtoc = "1.0.3" + val kindProjector = "0.13.2" + val sbtProtoc = "1.0.4" } @@ -31,6 +31,6 @@ object Dependencies { val sbtProtoc = "com.thesamet" % "sbt-protoc" % versions.sbtProtoc val scalaPbCompiler = "com.thesamet.scalapb" %% "compilerplugin" % versions.scalaPb - val kindProjector = "org.typelevel" %% "kind-projector" % versions.kindProjector cross CrossVersion.binary + val kindProjector = "org.typelevel" %% "kind-projector" % versions.kindProjector cross CrossVersion.full } diff --git a/project/build.properties b/project/build.properties index 5b6d0735..bb5389da 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.5.0 \ No newline at end of file +sbt.version=1.5.5 \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 7d23559e..e47b3450 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,10 +4,10 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.10.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2") -addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.1") +addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.8.1") -addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.16") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.20") -libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.2" +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.6"