From 0b639fca1fa7401a75009d1e35896ba0a1ad75c7 Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Thu, 7 Sep 2023 14:08:57 +0200 Subject: [PATCH 1/9] initial work an aspects --- .../scala/fs2/grpc/client/ClientAspect.scala | 101 ++++++++++++++++ .../scala/fs2/grpc/server/ServiceAspect.scala | 114 ++++++++++++++++++ .../main/scala/fs2/grpc/shared/Trivial.scala | 9 ++ 3 files changed, 224 insertions(+) create mode 100644 runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala create mode 100644 runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala create mode 100644 runtime/src/main/scala/fs2/grpc/shared/Trivial.scala diff --git a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala new file mode 100644 index 00000000..b730973e --- /dev/null +++ b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala @@ -0,0 +1,101 @@ +package fs2.grpc.server + +import io.grpc._ +import cats._ +import cats.syntax.all._ +import fs2.Stream + +final case class ClientCallContext[Req, Res, Dom[_], Cod[_], A]( + ctx: A, + methodDescriptor: MethodDescriptor[Req, Res], + dom: Dom[Req], + cod: Cod[Res] +) + +trait ClientAspect[F[_], Dom[_], Cod[_], A] { self => + def visitUnaryToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + req: Req, + request: (Req, Metadata) => F[Res] + ): F[Res] + + def visitUnaryToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + req: Req, + request: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] + + def visitStreamingToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] + + def visitStreamingToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] + + def contraModify[B](f: B => F[A])(implicit F: Monad[F]): ClientAspect[F, Dom, Cod, B] = + new ClientAspect[F, Dom, Cod, B] { + def modCtx[Req, Res](ccc: ClientCallContext[Req, Res, Dom, Cod, B]): F[ClientCallContext[Req, Res, Dom, Cod, A]] = + f(ccc.ctx).map(a => ccc.copy(ctx = a)) + + override def visitUnaryToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + req: Req, + request: (Req, Metadata) => F[Res] + ): F[Res] = + modCtx(callCtx).flatMap(self.visitUnaryToUnary(_, req, request)) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + req: Req, + request: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream.eval(modCtx(callCtx)).flatMap(self.visitUnaryToStreaming(_, req, request)) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] = + modCtx(callCtx).flatMap(self.visitStreamingToUnary(_, req, request)) + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] = + Stream.eval(modCtx(callCtx)).flatMap(self.visitStreamingToStreaming(_, req, request)) + } +} + +object ClientAspect { + def default[F[_], Dom[_], Cod[_]] = new ClientAspect[F, Dom, Cod, Metadata] { + override def visitUnaryToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + req: Req, + request: (Req, Metadata) => F[Res] + ): F[Res] = request(req, callCtx.ctx) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + req: Req, + request: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] = request(req, callCtx.ctx) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] = request(req, callCtx.ctx) + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + req: Stream[F, Req], + request: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] = request(req, callCtx.ctx) + } +} diff --git a/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala new file mode 100644 index 00000000..85aa98dd --- /dev/null +++ b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala @@ -0,0 +1,114 @@ +package fs2.grpc.server + +import io.grpc._ +import cats._ +import cats.syntax.all._ +import fs2.Stream + +final case class ServerCallContext[Req, Res, Dom[_], Cod[_]]( + metadata: Metadata, + methodDescriptor: MethodDescriptor[Req, Res], + dom: Dom[Req], + cod: Cod[Res] +) + +trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => + def visitUnaryToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + next: (Req, A) => F[Res] + ): F[Res] + + def visitUnaryToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + next: (Req, A) => fs2.Stream[F, Res] + ): fs2.Stream[F, Res] + + def visitStreamingToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + next: (fs2.Stream[F, Req], A) => F[Res] + ): F[Res] + + def visitStreamingToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + next: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res] + ): fs2.Stream[F, Res] + + def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, Dom, Cod, B] = + new ServiceAspect[F, Dom, Cod, B] { + override def visitUnaryToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + request: (Req, B) => F[Res] + ): F[Res] = + self.visitUnaryToUnary[Req, Res]( + callCtx, + req, + (req, a) => f(a).flatMap(request(req, _)) + ) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + request: (Req, B) => Stream[F, Res] + ): Stream[F, Res] = + self.visitUnaryToStreaming[Req, Res]( + callCtx, + req, + (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _)) + ) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + request: (Stream[F, Req], B) => F[Res] + ): F[Res] = + self.visitStreamingToUnary[Req, Res]( + callCtx, + req, + (req, a) => f(a).flatMap(request(req, _)) + ) + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + request: (Stream[F, Req], B) => Stream[F, Res] + ): Stream[F, Res] = + self.visitStreamingToStreaming[Req, Res]( + callCtx, + req, + (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _)) + ) + } +} + +object ServiceAspect { + def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, Dom, Cod, Metadata] { + override def visitUnaryToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + request: (Req, Metadata) => F[Res] + ): F[Res] = request(req, callCtx.metadata) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: Req, + request: (Req, Metadata) => Stream[F, Res] + ): Stream[F, Res] = request(req, callCtx.metadata) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + request: (Stream[F, Req], Metadata) => F[Res] + ): F[Res] = request(req, callCtx.metadata) + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res, Dom, Cod], + req: fs2.Stream[F, Req], + request: (Stream[F, Req], Metadata) => Stream[F, Res] + ): Stream[F, Res] = request(req, callCtx.metadata) + } +} diff --git a/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala b/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala new file mode 100644 index 00000000..01a0a64f --- /dev/null +++ b/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala @@ -0,0 +1,9 @@ +package fs2.grpc.shared + +// A typeclass that exists for every type A +final class Trivial[A] private extends Serializable + +object Trivial { + private val any = new Trivial[Any] + implicit def instance[A]: Trivial[A] = any.asInstanceOf[Trivial[A]] +} From 2aa969e475aa307accdb8519a7d396c4eeea5d0b Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Thu, 7 Sep 2023 15:10:59 +0200 Subject: [PATCH 2/9] more work --- .scalafmt.conf | 2 +- .../grpc/codegen/Fs2GrpcServicePrinter.scala | 137 +++++++++++++++--- .../scala/fs2/grpc/GeneratedCompanion.scala | 17 ++- .../scala/fs2/grpc/client/ClientAspect.scala | 2 +- 4 files changed, 131 insertions(+), 27 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 1e01bb04..8d384f41 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -10,4 +10,4 @@ fileOverride { "glob:**/scala-3/**" { runner.dialect = scala3 } -} \ No newline at end of file +} diff --git a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala index 0f536c70..e38314d2 100644 --- a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala +++ b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala @@ -47,14 +47,19 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d }) } - private[this] def handleMethod(method: MethodDescriptor) = { + private[this] def methodName(method: MethodDescriptor) = method.streamType match { - case StreamType.Unary => "unaryToUnaryCall" - case StreamType.ClientStreaming => "streamingToUnaryCall" - case StreamType.ServerStreaming => "unaryToStreamingCall" - case StreamType.Bidirectional => "streamingToStreamingCall" + case StreamType.Unary => "unaryToUnary" + case StreamType.ClientStreaming => "streamingToUnary" + case StreamType.ServerStreaming => "unaryToStreaming" + case StreamType.Bidirectional => "streamingToStreaming" } - } + + private[this] def handleMethod(method: MethodDescriptor) = + methodName(method) + "Call" + + private[this] def visitMethod(method: MethodDescriptor) = + "visit" + methodName(method).capitalize private[this] def createClientCall(method: MethodDescriptor) = { val basicClientCall = @@ -66,17 +71,21 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d } private[this] def serviceMethodImplementation(method: MethodDescriptor): PrinterEndo = { p => - val mkMetadata = if (method.isServerStreaming) s"$Stream.eval(mkMetadata(ctx))" else "mkMetadata(ctx)" + val inType = method.inputType.scalaType + val outType = method.outputType.scalaType + val descriptor = method.grpcDescriptor.fullName - p.add(serviceMethodSignature(method) + " = {") - .indent - .add(s"$mkMetadata.flatMap { m =>") - .indent - .add(s"${createClientCall(method)}.flatMap(_.${handleMethod(method)}(request, m))") - .outdent - .add("}") - .outdent - .add("}") + p + .add(serviceMethodSignature(method) + " =") + .indented { + _.addStringMargin( + s"""|clientAspect.${visitMethod(method)}[$inType, $outType]( + | ${ClientCallContext}(ctx, $descriptor, implicitly[Dom[$inType]], implicitly[Cod[$outType]]), + | request, + | (req, m) => ${createClientCall(method)}.flatMap(_.${handleMethod(method)}(req, m)) + |)""".stripMargin + ) + } } private[this] def serviceBindingImplementation(method: MethodDescriptor): PrinterEndo = { p => @@ -86,9 +95,19 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d val handler = s"$Fs2ServerCallHandler[F](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]" val serviceCall = s"serviceImpl.${method.name}" - val eval = if (method.isServerStreaming) s"$Stream.eval(mkCtx(m))" else "mkCtx(m)" - p.add(s".addMethod($descriptor, $handler((r, m) => $eval.flatMap($serviceCall(r, _))))") + p.addStringMargin( + s"""|.addMethod( + | $descriptor, + | $handler{ (r, m) => + | serviceAspect.${visitMethod(method)}[$inType, $outType]( + | ${ServerCallContext}(m, $descriptor, implicitly[Dom[$inType]], implicitly[Cod[$outType]]), + | r, + | (r, m) => $serviceCall(r, m) + | ) + | } + |)""" + ) } private[this] def serviceMethods: PrinterEndo = _.seq(service.methods.map(serviceMethodSignature)) @@ -115,23 +134,85 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d .newline .add("}") + private[this] def typeclasses: PrinterEndo = { p => + val doms = service.methods + .map(_.inputType.scalaType) + .distinct + .zipWithIndex + .map { case (n, i) => s"dom$i: Dom[$n]" } + + val cods = service.methods + .map(_.outputType.scalaType) + .distinct + .zipWithIndex + .map { case (n, i) => s"cod$i: Cod[$n]" } + + p.addWithDelimiter(",")(doms ++ cods) + } + private[this] def serviceClient: PrinterEndo = { - _.add( - s"def mkClient[F[_]: $Async, $Ctx](dispatcher: $Dispatcher[F], channel: $Channel, mkMetadata: $Ctx => F[$Metadata], clientOptions: $ClientOptions): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {" - ).indent + _.addStringMargin( + s"""|def mkClientFull[F[_]: $Async, Dom[_], Cod[_], $Ctx]( + | dispatcher: $Dispatcher[F], + | channel: $Channel, + | clientAspect: ${ClientAspect}[F, Dom, Cod, $Ctx], + | clientOptions: $ClientOptions + |)(implicit""" + ) + .indented(typeclasses) + .add(") = {") + .indent .call(serviceMethodImplementations) .outdent .add("}") + .newline + .addStringMargin( + s"""|def mkClientTrivial[F[_]: $Async, $Ctx]( + | dispatcher: $Dispatcher[F], + | channel: $Channel, + | clientAspect: ${ClientAspect}[F, $Trivial, $Trivial, $Ctx], + | clientOptions: $ClientOptions + |) = + | mkClientFull[F, $Trivial, $Trivial, $Ctx]( + | dispatcher, + | channel, + | clientAspect, + | clientOptions + | )""" + ) } private[this] def serviceBinding: PrinterEndo = { - _.add( - s"protected def serviceBinding[F[_]: $Async, $Ctx](dispatcher: $Dispatcher[F], serviceImpl: $serviceNameFs2[F, $Ctx], mkCtx: $Metadata => F[$Ctx], serverOptions: $ServerOptions): $ServerServiceDefinition = {" - ).indent + _.addStringMargin( + s"""|protected def serviceBindingFull[F[_]: $Async, Dom[_], Cod[_], $Ctx]( + | dispatcher: $Dispatcher[F], + | serviceImpl: $serviceNameFs2[F, $Ctx], + | serviceAspect: ${ServiceAspect}[F, Dom, Cod, $Ctx], + | serverOptions: $ServerOptions + |)(implicit""" + ) + .indented(typeclasses) + .add(") = {") + .indent .add(s"$ServerServiceDefinition") .call(serviceBindingImplementations) .outdent .add("}") + .newline + .addStringMargin( + s"""|protected def serviceBindingTrivial[F[_]: $Async, $Ctx]( + | dispatcher: $Dispatcher[F], + | serviceImpl: $serviceNameFs2[F, $Ctx], + | serviceAspect: ${ServiceAspect}[F, $Trivial, $Trivial, $Ctx], + | serverOptions: $ServerOptions + |) = + | serviceBindingFull[F, $Trivial, $Trivial, $Ctx]( + | dispatcher, + | serviceImpl, + | serviceAspect, + | serverOptions + | )""" + ) } // / @@ -152,6 +233,9 @@ object Fs2GrpcServicePrinter { private val effPkg = "_root_.cats.effect" private val fs2Pkg = "_root_.fs2" private val fs2grpcPkg = "_root_.fs2.grpc" + private val fs2grpcServerPkg = "_root_.fs2.grpc.server" + private val fs2grpcClientPkg = "_root_.fs2.grpc.client" + private val fs2grpcSharedPkg = "_root_.fs2.grpc.shared" private val grpcPkg = "_root_.io.grpc" // / @@ -173,6 +257,11 @@ object Fs2GrpcServicePrinter { val Channel = s"$grpcPkg.Channel" val Metadata = s"$grpcPkg.Metadata" + val ServiceAspect = s"${fs2grpcServerPkg}.ServiceAspect" + val ServerCallContext = s"${fs2grpcServerPkg}.ServerCallContext" + val ClientAspect = s"${fs2grpcClientPkg}.ClientAspect" + val ClientCallContext = s"${fs2grpcClientPkg}.ClientCallContext" + val Trivial = s"${fs2grpcSharedPkg}.Trivial" } } diff --git a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala index a083d623..d432cde5 100644 --- a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala +++ b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala @@ -28,6 +28,8 @@ import cats.effect.std.Dispatcher import io.grpc._ import fs2.grpc.client.ClientOptions import fs2.grpc.server.ServerOptions +import fs2.grpc.client.ClientAspect +import fs2.grpc.shared.Trivial trait GeneratedCompanion[Service[*[_], _]] { @@ -35,12 +37,25 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Client ========================================================================================================== + def mkClientTrivial[F[_]: Async, A]( + dispatcher: Dispatcher[F], + channel: Channel, + clientAspect: ClientAspect[F, Trivial, Trivial, A], + clientOptions: ClientOptions + ): Service[F, A] + def mkClient[F[_]: Async, A]( dispatcher: Dispatcher[F], channel: Channel, mkMetadata: A => F[Metadata], clientOptions: ClientOptions - ): Service[F, A] + ): Service[F, A] = + mkClientTrivial[F, A]( + dispatcher, + channel, + ClientAspect.default[F, Trivial, Trivial].contraModify(mkMetadata), + clientOptions + ) final def mkClient[F[_]: Async, A]( dispatcher: Dispatcher[F], diff --git a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala index b730973e..c0a2333b 100644 --- a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala +++ b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala @@ -1,4 +1,4 @@ -package fs2.grpc.server +package fs2.grpc.client import io.grpc._ import cats._ From e00ab12f521054cc3d3ca16c67cc89191df3976a Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Thu, 7 Sep 2023 15:13:57 +0200 Subject: [PATCH 3/9] dsl --- .../fs2/grpc/codegen/Fs2GrpcServicePrinter.scala | 2 +- .../main/scala/fs2/grpc/GeneratedCompanion.scala | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala index e38314d2..6be93559 100644 --- a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala +++ b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala @@ -160,7 +160,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d |)(implicit""" ) .indented(typeclasses) - .add(") = {") + .add(s"): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {") .indent .call(serviceMethodImplementations) .outdent diff --git a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala index d432cde5..9a9d6598 100644 --- a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala +++ b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala @@ -30,6 +30,7 @@ import fs2.grpc.client.ClientOptions import fs2.grpc.server.ServerOptions import fs2.grpc.client.ClientAspect import fs2.grpc.shared.Trivial +import fs2.grpc.server.ServiceAspect trait GeneratedCompanion[Service[*[_], _]] { @@ -131,12 +132,25 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Service ========================================================================================================= + protected def serviceBindingTrivial[F[_]: Async, A]( + dispatcher: Dispatcher[F], + serviceImpl: Service[F, A], + serviceAspect: ServiceAspect[F, Trivial, Trivial, A], + serverOptions: ServerOptions + ): ServerServiceDefinition + protected def serviceBinding[F[_]: Async, A]( dispatcher: Dispatcher[F], serviceImpl: Service[F, A], mkCtx: Metadata => F[A], serverOptions: ServerOptions - ): ServerServiceDefinition + ): ServerServiceDefinition = + serviceBindingTrivial[F, A]( + dispatcher, + serviceImpl, + ServiceAspect.default[F, Trivial, Trivial].modify(mkCtx), + serverOptions + ) final def service[F[_]: Async, A]( dispatcher: Dispatcher[F], From 8b954b25bab9d5e6c8f972e100af1ba1f12ff6a5 Mon Sep 17 00:00:00 2001 From: ValdemarGr Date: Wed, 13 Sep 2023 15:17:34 +0200 Subject: [PATCH 4/9] generalize effect type --- .../grpc/codegen/Fs2GrpcServicePrinter.scala | 24 +++++++------- .../scala/fs2/grpc/GeneratedCompanion.scala | 4 +-- .../scala/fs2/grpc/client/ClientAspect.scala | 24 +++++++------- .../scala/fs2/grpc/server/ServiceAspect.scala | 32 +++++++++---------- 4 files changed, 42 insertions(+), 42 deletions(-) diff --git a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala index 6be93559..6c6d28b6 100644 --- a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala +++ b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala @@ -63,7 +63,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d private[this] def createClientCall(method: MethodDescriptor) = { val basicClientCall = - s"$Fs2ClientCall[F](channel, ${method.grpcDescriptor.fullName}, dispatcher, clientOptions)" + s"$Fs2ClientCall[G](channel, ${method.grpcDescriptor.fullName}, dispatcher, clientOptions)" if (method.isServerStreaming) s"$Stream.eval($basicClientCall)" else @@ -92,7 +92,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d val inType = method.inputType.scalaType val outType = method.outputType.scalaType val descriptor = method.grpcDescriptor.fullName - val handler = s"$Fs2ServerCallHandler[F](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]" + val handler = s"$Fs2ServerCallHandler[G](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]" val serviceCall = s"serviceImpl.${method.name}" @@ -152,10 +152,10 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d private[this] def serviceClient: PrinterEndo = { _.addStringMargin( - s"""|def mkClientFull[F[_]: $Async, Dom[_], Cod[_], $Ctx]( - | dispatcher: $Dispatcher[F], + s"""|def mkClientFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( + | dispatcher: $Dispatcher[G], | channel: $Channel, - | clientAspect: ${ClientAspect}[F, Dom, Cod, $Ctx], + | clientAspect: ${ClientAspect}[F, G, Dom, Cod, $Ctx], | clientOptions: $ClientOptions |)(implicit""" ) @@ -170,10 +170,10 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d s"""|def mkClientTrivial[F[_]: $Async, $Ctx]( | dispatcher: $Dispatcher[F], | channel: $Channel, - | clientAspect: ${ClientAspect}[F, $Trivial, $Trivial, $Ctx], + | clientAspect: ${ClientAspect}[F, F, $Trivial, $Trivial, $Ctx], | clientOptions: $ClientOptions |) = - | mkClientFull[F, $Trivial, $Trivial, $Ctx]( + | mkClientFull[F, F, $Trivial, $Trivial, $Ctx]( | dispatcher, | channel, | clientAspect, @@ -184,10 +184,10 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d private[this] def serviceBinding: PrinterEndo = { _.addStringMargin( - s"""|protected def serviceBindingFull[F[_]: $Async, Dom[_], Cod[_], $Ctx]( - | dispatcher: $Dispatcher[F], + s"""|protected def serviceBindingFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( + | dispatcher: $Dispatcher[G], | serviceImpl: $serviceNameFs2[F, $Ctx], - | serviceAspect: ${ServiceAspect}[F, Dom, Cod, $Ctx], + | serviceAspect: ${ServiceAspect}[F, G, Dom, Cod, $Ctx], | serverOptions: $ServerOptions |)(implicit""" ) @@ -203,10 +203,10 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d s"""|protected def serviceBindingTrivial[F[_]: $Async, $Ctx]( | dispatcher: $Dispatcher[F], | serviceImpl: $serviceNameFs2[F, $Ctx], - | serviceAspect: ${ServiceAspect}[F, $Trivial, $Trivial, $Ctx], + | serviceAspect: ${ServiceAspect}[F, F, $Trivial, $Trivial, $Ctx], | serverOptions: $ServerOptions |) = - | serviceBindingFull[F, $Trivial, $Trivial, $Ctx]( + | serviceBindingFull[F, F, $Trivial, $Trivial, $Ctx]( | dispatcher, | serviceImpl, | serviceAspect, diff --git a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala index 9a9d6598..98e104f1 100644 --- a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala +++ b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala @@ -41,7 +41,7 @@ trait GeneratedCompanion[Service[*[_], _]] { def mkClientTrivial[F[_]: Async, A]( dispatcher: Dispatcher[F], channel: Channel, - clientAspect: ClientAspect[F, Trivial, Trivial, A], + clientAspect: ClientAspect[F, F, Trivial, Trivial, A], clientOptions: ClientOptions ): Service[F, A] @@ -135,7 +135,7 @@ trait GeneratedCompanion[Service[*[_], _]] { protected def serviceBindingTrivial[F[_]: Async, A]( dispatcher: Dispatcher[F], serviceImpl: Service[F, A], - serviceAspect: ServiceAspect[F, Trivial, Trivial, A], + serviceAspect: ServiceAspect[F, F, Trivial, Trivial, A], serverOptions: ServerOptions ): ServerServiceDefinition diff --git a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala index c0a2333b..402db7f2 100644 --- a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala +++ b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala @@ -12,68 +12,68 @@ final case class ClientCallContext[Req, Res, Dom[_], Cod[_], A]( cod: Cod[Res] ) -trait ClientAspect[F[_], Dom[_], Cod[_], A] { self => +trait ClientAspect[F[_], G[_], Dom[_], Cod[_], A] { self => def visitUnaryToUnary[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, A], req: Req, - request: (Req, Metadata) => F[Res] + request: (Req, Metadata) => G[Res] ): F[Res] def visitUnaryToStreaming[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, A], req: Req, - request: (Req, Metadata) => Stream[F, Res] + request: (Req, Metadata) => Stream[G, Res] ): Stream[F, Res] def visitStreamingToUnary[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, A], req: Stream[F, Req], - request: (Stream[F, Req], Metadata) => F[Res] + request: (Stream[G, Req], Metadata) => G[Res] ): F[Res] def visitStreamingToStreaming[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, A], req: Stream[F, Req], - request: (Stream[F, Req], Metadata) => Stream[F, Res] + request: (Stream[G, Req], Metadata) => Stream[G, Res] ): Stream[F, Res] - def contraModify[B](f: B => F[A])(implicit F: Monad[F]): ClientAspect[F, Dom, Cod, B] = - new ClientAspect[F, Dom, Cod, B] { + def contraModify[B](f: B => F[A])(implicit F: Monad[F]): ClientAspect[F, G, Dom, Cod, B] = + new ClientAspect[F, G, Dom, Cod, B] { def modCtx[Req, Res](ccc: ClientCallContext[Req, Res, Dom, Cod, B]): F[ClientCallContext[Req, Res, Dom, Cod, A]] = f(ccc.ctx).map(a => ccc.copy(ctx = a)) override def visitUnaryToUnary[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, B], req: Req, - request: (Req, Metadata) => F[Res] + request: (Req, Metadata) => G[Res] ): F[Res] = modCtx(callCtx).flatMap(self.visitUnaryToUnary(_, req, request)) override def visitUnaryToStreaming[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, B], req: Req, - request: (Req, Metadata) => Stream[F, Res] + request: (Req, Metadata) => Stream[G, Res] ): Stream[F, Res] = Stream.eval(modCtx(callCtx)).flatMap(self.visitUnaryToStreaming(_, req, request)) override def visitStreamingToUnary[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, B], req: Stream[F, Req], - request: (Stream[F, Req], Metadata) => F[Res] + request: (Stream[G, Req], Metadata) => G[Res] ): F[Res] = modCtx(callCtx).flatMap(self.visitStreamingToUnary(_, req, request)) override def visitStreamingToStreaming[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, B], req: Stream[F, Req], - request: (Stream[F, Req], Metadata) => Stream[F, Res] + request: (Stream[G, Req], Metadata) => Stream[G, Res] ): Stream[F, Res] = Stream.eval(modCtx(callCtx)).flatMap(self.visitStreamingToStreaming(_, req, request)) } } object ClientAspect { - def default[F[_], Dom[_], Cod[_]] = new ClientAspect[F, Dom, Cod, Metadata] { + def default[F[_], Dom[_], Cod[_]] = new ClientAspect[F, F, Dom, Cod, Metadata] { override def visitUnaryToUnary[Req, Res]( callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], req: Req, diff --git a/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala index 85aa98dd..9c61d6f5 100644 --- a/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala +++ b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala @@ -12,38 +12,38 @@ final case class ServerCallContext[Req, Res, Dom[_], Cod[_]]( cod: Cod[Res] ) -trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => +trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => def visitUnaryToUnary[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], req: Req, next: (Req, A) => F[Res] - ): F[Res] + ): G[Res] def visitUnaryToStreaming[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], req: Req, next: (Req, A) => fs2.Stream[F, Res] - ): fs2.Stream[F, Res] + ): fs2.Stream[G, Res] def visitStreamingToUnary[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], - req: fs2.Stream[F, Req], + req: fs2.Stream[G, Req], next: (fs2.Stream[F, Req], A) => F[Res] - ): F[Res] + ): G[Res] def visitStreamingToStreaming[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], - req: fs2.Stream[F, Req], + req: fs2.Stream[G, Req], next: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res] - ): fs2.Stream[F, Res] + ): fs2.Stream[G, Res] - def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, Dom, Cod, B] = - new ServiceAspect[F, Dom, Cod, B] { + def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, G, Dom, Cod, B] = + new ServiceAspect[F, G, Dom, Cod, B] { override def visitUnaryToUnary[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], req: Req, request: (Req, B) => F[Res] - ): F[Res] = + ): G[Res] = self.visitUnaryToUnary[Req, Res]( callCtx, req, @@ -54,7 +54,7 @@ trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => callCtx: ServerCallContext[Req, Res, Dom, Cod], req: Req, request: (Req, B) => Stream[F, Res] - ): Stream[F, Res] = + ): Stream[G, Res] = self.visitUnaryToStreaming[Req, Res]( callCtx, req, @@ -63,9 +63,9 @@ trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => override def visitStreamingToUnary[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], - req: fs2.Stream[F, Req], + req: fs2.Stream[G, Req], request: (Stream[F, Req], B) => F[Res] - ): F[Res] = + ): G[Res] = self.visitStreamingToUnary[Req, Res]( callCtx, req, @@ -74,9 +74,9 @@ trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => override def visitStreamingToStreaming[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], - req: fs2.Stream[F, Req], + req: fs2.Stream[G, Req], request: (Stream[F, Req], B) => Stream[F, Res] - ): Stream[F, Res] = + ): Stream[G, Res] = self.visitStreamingToStreaming[Req, Res]( callCtx, req, @@ -86,7 +86,7 @@ trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self => } object ServiceAspect { - def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, Dom, Cod, Metadata] { + def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, F, Dom, Cod, Metadata] { override def visitUnaryToUnary[Req, Res]( callCtx: ServerCallContext[Req, Res, Dom, Cod], req: Req, From dc5e40959fb45a422e4a408710bba1ab04a880b2 Mon Sep 17 00:00:00 2001 From: ValdemarGr Date: Wed, 13 Sep 2023 16:36:33 +0200 Subject: [PATCH 5/9] work on api consistency --- .../grpc/codegen/Fs2GrpcServicePrinter.scala | 18 +++++++------- .../scala/fs2/grpc/GeneratedCompanion.scala | 24 ++++++++++++------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala index 6c6d28b6..9f4e6821 100644 --- a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala +++ b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala @@ -167,13 +167,13 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d .add("}") .newline .addStringMargin( - s"""|def mkClientTrivial[F[_]: $Async, $Ctx]( - | dispatcher: $Dispatcher[F], + s"""|def mkClientTrivial[F[_], G[_]: $Async, $Ctx]( + | dispatcher: $Dispatcher[G], | channel: $Channel, - | clientAspect: ${ClientAspect}[F, F, $Trivial, $Trivial, $Ctx], + | clientAspect: ${ClientAspect}[F, G, $Trivial, $Trivial, $Ctx], | clientOptions: $ClientOptions |) = - | mkClientFull[F, F, $Trivial, $Trivial, $Ctx]( + | mkClientFull[F, G, $Trivial, $Trivial, $Ctx]( | dispatcher, | channel, | clientAspect, @@ -184,7 +184,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d private[this] def serviceBinding: PrinterEndo = { _.addStringMargin( - s"""|protected def serviceBindingFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( + s"""|def serviceBindingFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( | dispatcher: $Dispatcher[G], | serviceImpl: $serviceNameFs2[F, $Ctx], | serviceAspect: ${ServiceAspect}[F, G, Dom, Cod, $Ctx], @@ -200,13 +200,13 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d .add("}") .newline .addStringMargin( - s"""|protected def serviceBindingTrivial[F[_]: $Async, $Ctx]( - | dispatcher: $Dispatcher[F], + s"""|def serviceBindingTrivial[F[_], G[_]: $Async, $Ctx]( + | dispatcher: $Dispatcher[G], | serviceImpl: $serviceNameFs2[F, $Ctx], - | serviceAspect: ${ServiceAspect}[F, F, $Trivial, $Trivial, $Ctx], + | serviceAspect: ${ServiceAspect}[F, G, $Trivial, $Trivial, $Ctx], | serverOptions: $ServerOptions |) = - | serviceBindingFull[F, F, $Trivial, $Trivial, $Ctx]( + | serviceBindingFull[F, G, $Trivial, $Trivial, $Ctx]( | dispatcher, | serviceImpl, | serviceAspect, diff --git a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala index 98e104f1..a3fe908c 100644 --- a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala +++ b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala @@ -38,10 +38,10 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Client ========================================================================================================== - def mkClientTrivial[F[_]: Async, A]( - dispatcher: Dispatcher[F], + def mkClientTrivial[F[_], G[_]: Async, A]( + dispatcher: Dispatcher[G], channel: Channel, - clientAspect: ClientAspect[F, F, Trivial, Trivial, A], + clientAspect: ClientAspect[F, G, Trivial, Trivial, A], clientOptions: ClientOptions ): Service[F, A] @@ -51,7 +51,7 @@ trait GeneratedCompanion[Service[*[_], _]] { mkMetadata: A => F[Metadata], clientOptions: ClientOptions ): Service[F, A] = - mkClientTrivial[F, A]( + mkClientTrivial[F, F, A]( dispatcher, channel, ClientAspect.default[F, Trivial, Trivial].contraModify(mkMetadata), @@ -132,10 +132,10 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Service ========================================================================================================= - protected def serviceBindingTrivial[F[_]: Async, A]( - dispatcher: Dispatcher[F], + def serviceBindingTrivial[F[_], G[_]: Async, A]( + dispatcher: Dispatcher[G], serviceImpl: Service[F, A], - serviceAspect: ServiceAspect[F, F, Trivial, Trivial, A], + serviceAspect: ServiceAspect[F, G, Trivial, Trivial, A], serverOptions: ServerOptions ): ServerServiceDefinition @@ -145,13 +145,21 @@ trait GeneratedCompanion[Service[*[_], _]] { mkCtx: Metadata => F[A], serverOptions: ServerOptions ): ServerServiceDefinition = - serviceBindingTrivial[F, A]( + serviceBindingTrivial[F, F, A]( dispatcher, serviceImpl, ServiceAspect.default[F, Trivial, Trivial].modify(mkCtx), serverOptions ) + def serviceTrivial[F[_], G[_]: Async, A]( + dispatcher: Dispatcher[G], + serviceImpl: Service[F, A], + serviceAspect: ServiceAspect[F, G, Trivial, Trivial, A], + serverOptions: ServerOptions + ): ServerServiceDefinition = + serviceBindingTrivial[F, G, A](dispatcher, serviceImpl, serviceAspect, serverOptions) + final def service[F[_]: Async, A]( dispatcher: Dispatcher[F], serviceImpl: Service[F, A], From 7f574dbe02f635d379b7f9d3d97f286f7f2e4426 Mon Sep 17 00:00:00 2001 From: ValdemarGr Date: Fri, 15 Sep 2023 15:19:08 +0200 Subject: [PATCH 6/9] removed req/res typeclasses --- .../grpc/codegen/Fs2GrpcServicePrinter.scala | 74 +++---------------- .../scala/fs2/grpc/GeneratedCompanion.scala | 25 +++---- .../scala/fs2/grpc/client/ClientAspect.scala | 40 +++++----- .../scala/fs2/grpc/server/ServiceAspect.scala | 38 +++++----- .../main/scala/fs2/grpc/shared/Trivial.scala | 9 --- 5 files changed, 59 insertions(+), 127 deletions(-) delete mode 100644 runtime/src/main/scala/fs2/grpc/shared/Trivial.scala diff --git a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala index 9f4e6821..eec85c6b 100644 --- a/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala +++ b/codegen/src/main/scala/fs2/grpc/codegen/Fs2GrpcServicePrinter.scala @@ -80,7 +80,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d .indented { _.addStringMargin( s"""|clientAspect.${visitMethod(method)}[$inType, $outType]( - | ${ClientCallContext}(ctx, $descriptor, implicitly[Dom[$inType]], implicitly[Cod[$outType]]), + | ${ClientCallContext}(ctx, $descriptor), | request, | (req, m) => ${createClientCall(method)}.flatMap(_.${handleMethod(method)}(req, m)) |)""".stripMargin @@ -101,7 +101,7 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d | $descriptor, | $handler{ (r, m) => | serviceAspect.${visitMethod(method)}[$inType, $outType]( - | ${ServerCallContext}(m, $descriptor, implicitly[Dom[$inType]], implicitly[Cod[$outType]]), + | ${ServerCallContext}(m, $descriptor), | r, | (r, m) => $serviceCall(r, m) | ) @@ -134,85 +134,33 @@ class Fs2GrpcServicePrinter(service: ServiceDescriptor, serviceSuffix: String, d .newline .add("}") - private[this] def typeclasses: PrinterEndo = { p => - val doms = service.methods - .map(_.inputType.scalaType) - .distinct - .zipWithIndex - .map { case (n, i) => s"dom$i: Dom[$n]" } - - val cods = service.methods - .map(_.outputType.scalaType) - .distinct - .zipWithIndex - .map { case (n, i) => s"cod$i: Cod[$n]" } - - p.addWithDelimiter(",")(doms ++ cods) - } - private[this] def serviceClient: PrinterEndo = { _.addStringMargin( - s"""|def mkClientFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( + s"""|def mkClientFull[F[_], G[_]: $Async, $Ctx]( | dispatcher: $Dispatcher[G], | channel: $Channel, - | clientAspect: ${ClientAspect}[F, G, Dom, Cod, $Ctx], + | clientAspect: ${ClientAspect}[F, G, $Ctx], | clientOptions: $ClientOptions - |)(implicit""" - ) - .indented(typeclasses) - .add(s"): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {") - .indent + |): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {""" + ).indent .call(serviceMethodImplementations) .outdent .add("}") - .newline - .addStringMargin( - s"""|def mkClientTrivial[F[_], G[_]: $Async, $Ctx]( - | dispatcher: $Dispatcher[G], - | channel: $Channel, - | clientAspect: ${ClientAspect}[F, G, $Trivial, $Trivial, $Ctx], - | clientOptions: $ClientOptions - |) = - | mkClientFull[F, G, $Trivial, $Trivial, $Ctx]( - | dispatcher, - | channel, - | clientAspect, - | clientOptions - | )""" - ) } private[this] def serviceBinding: PrinterEndo = { _.addStringMargin( - s"""|def serviceBindingFull[F[_], G[_]: $Async, Dom[_], Cod[_], $Ctx]( + s"""|protected def serviceBindingFull[F[_], G[_]: $Async, $Ctx]( | dispatcher: $Dispatcher[G], | serviceImpl: $serviceNameFs2[F, $Ctx], - | serviceAspect: ${ServiceAspect}[F, G, Dom, Cod, $Ctx], + | serviceAspect: ${ServiceAspect}[F, G, $Ctx], | serverOptions: $ServerOptions - |)(implicit""" - ) - .indented(typeclasses) - .add(") = {") - .indent + |) = {""" + ).indent .add(s"$ServerServiceDefinition") .call(serviceBindingImplementations) .outdent .add("}") - .newline - .addStringMargin( - s"""|def serviceBindingTrivial[F[_], G[_]: $Async, $Ctx]( - | dispatcher: $Dispatcher[G], - | serviceImpl: $serviceNameFs2[F, $Ctx], - | serviceAspect: ${ServiceAspect}[F, G, $Trivial, $Trivial, $Ctx], - | serverOptions: $ServerOptions - |) = - | serviceBindingFull[F, G, $Trivial, $Trivial, $Ctx]( - | dispatcher, - | serviceImpl, - | serviceAspect, - | serverOptions - | )""" - ) } // / @@ -235,7 +183,6 @@ object Fs2GrpcServicePrinter { private val fs2grpcPkg = "_root_.fs2.grpc" private val fs2grpcServerPkg = "_root_.fs2.grpc.server" private val fs2grpcClientPkg = "_root_.fs2.grpc.client" - private val fs2grpcSharedPkg = "_root_.fs2.grpc.shared" private val grpcPkg = "_root_.io.grpc" // / @@ -261,7 +208,6 @@ object Fs2GrpcServicePrinter { val ServerCallContext = s"${fs2grpcServerPkg}.ServerCallContext" val ClientAspect = s"${fs2grpcClientPkg}.ClientAspect" val ClientCallContext = s"${fs2grpcClientPkg}.ClientCallContext" - val Trivial = s"${fs2grpcSharedPkg}.Trivial" } } diff --git a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala index a3fe908c..070a6398 100644 --- a/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala +++ b/runtime/src/main/scala/fs2/grpc/GeneratedCompanion.scala @@ -27,9 +27,8 @@ import cats.effect.{Async, Resource} import cats.effect.std.Dispatcher import io.grpc._ import fs2.grpc.client.ClientOptions -import fs2.grpc.server.ServerOptions import fs2.grpc.client.ClientAspect -import fs2.grpc.shared.Trivial +import fs2.grpc.server.ServerOptions import fs2.grpc.server.ServiceAspect trait GeneratedCompanion[Service[*[_], _]] { @@ -38,10 +37,10 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Client ========================================================================================================== - def mkClientTrivial[F[_], G[_]: Async, A]( + def mkClientFull[F[_], G[_]: Async, A]( dispatcher: Dispatcher[G], channel: Channel, - clientAspect: ClientAspect[F, G, Trivial, Trivial, A], + clientAspect: ClientAspect[F, G, A], clientOptions: ClientOptions ): Service[F, A] @@ -51,10 +50,10 @@ trait GeneratedCompanion[Service[*[_], _]] { mkMetadata: A => F[Metadata], clientOptions: ClientOptions ): Service[F, A] = - mkClientTrivial[F, F, A]( + mkClientFull[F, F, A]( dispatcher, channel, - ClientAspect.default[F, Trivial, Trivial].contraModify(mkMetadata), + ClientAspect.default[F].contraModify(mkMetadata), clientOptions ) @@ -132,10 +131,10 @@ trait GeneratedCompanion[Service[*[_], _]] { ///=== Service ========================================================================================================= - def serviceBindingTrivial[F[_], G[_]: Async, A]( + protected def serviceBindingFull[F[_], G[_]: Async, A]( dispatcher: Dispatcher[G], serviceImpl: Service[F, A], - serviceAspect: ServiceAspect[F, G, Trivial, Trivial, A], + serviceAspect: ServiceAspect[F, G, A], serverOptions: ServerOptions ): ServerServiceDefinition @@ -145,20 +144,20 @@ trait GeneratedCompanion[Service[*[_], _]] { mkCtx: Metadata => F[A], serverOptions: ServerOptions ): ServerServiceDefinition = - serviceBindingTrivial[F, F, A]( + serviceBindingFull[F, F, A]( dispatcher, serviceImpl, - ServiceAspect.default[F, Trivial, Trivial].modify(mkCtx), + ServiceAspect.default[F].modify(mkCtx), serverOptions ) - def serviceTrivial[F[_], G[_]: Async, A]( + final def serviceFull[F[_], G[_]: Async, A]( dispatcher: Dispatcher[G], serviceImpl: Service[F, A], - serviceAspect: ServiceAspect[F, G, Trivial, Trivial, A], + serviceAspect: ServiceAspect[F, G, A], serverOptions: ServerOptions ): ServerServiceDefinition = - serviceBindingTrivial[F, G, A](dispatcher, serviceImpl, serviceAspect, serverOptions) + serviceBindingFull[F, G, A](dispatcher, serviceImpl, serviceAspect, serverOptions) final def service[F[_]: Async, A]( dispatcher: Dispatcher[F], diff --git a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala index 402db7f2..24938be9 100644 --- a/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala +++ b/runtime/src/main/scala/fs2/grpc/client/ClientAspect.scala @@ -5,66 +5,64 @@ import cats._ import cats.syntax.all._ import fs2.Stream -final case class ClientCallContext[Req, Res, Dom[_], Cod[_], A]( +final case class ClientCallContext[Req, Res, A]( ctx: A, - methodDescriptor: MethodDescriptor[Req, Res], - dom: Dom[Req], - cod: Cod[Res] + methodDescriptor: MethodDescriptor[Req, Res] ) -trait ClientAspect[F[_], G[_], Dom[_], Cod[_], A] { self => +trait ClientAspect[F[_], G[_], A] { self => def visitUnaryToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + callCtx: ClientCallContext[Req, Res, A], req: Req, request: (Req, Metadata) => G[Res] ): F[Res] def visitUnaryToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + callCtx: ClientCallContext[Req, Res, A], req: Req, request: (Req, Metadata) => Stream[G, Res] ): Stream[F, Res] def visitStreamingToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + callCtx: ClientCallContext[Req, Res, A], req: Stream[F, Req], request: (Stream[G, Req], Metadata) => G[Res] ): F[Res] def visitStreamingToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, A], + callCtx: ClientCallContext[Req, Res, A], req: Stream[F, Req], request: (Stream[G, Req], Metadata) => Stream[G, Res] ): Stream[F, Res] - def contraModify[B](f: B => F[A])(implicit F: Monad[F]): ClientAspect[F, G, Dom, Cod, B] = - new ClientAspect[F, G, Dom, Cod, B] { - def modCtx[Req, Res](ccc: ClientCallContext[Req, Res, Dom, Cod, B]): F[ClientCallContext[Req, Res, Dom, Cod, A]] = + def contraModify[B](f: B => F[A])(implicit F: Monad[F]): ClientAspect[F, G, B] = + new ClientAspect[F, G, B] { + def modCtx[Req, Res](ccc: ClientCallContext[Req, Res, B]): F[ClientCallContext[Req, Res, A]] = f(ccc.ctx).map(a => ccc.copy(ctx = a)) override def visitUnaryToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + callCtx: ClientCallContext[Req, Res, B], req: Req, request: (Req, Metadata) => G[Res] ): F[Res] = modCtx(callCtx).flatMap(self.visitUnaryToUnary(_, req, request)) override def visitUnaryToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + callCtx: ClientCallContext[Req, Res, B], req: Req, request: (Req, Metadata) => Stream[G, Res] ): Stream[F, Res] = Stream.eval(modCtx(callCtx)).flatMap(self.visitUnaryToStreaming(_, req, request)) override def visitStreamingToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + callCtx: ClientCallContext[Req, Res, B], req: Stream[F, Req], request: (Stream[G, Req], Metadata) => G[Res] ): F[Res] = modCtx(callCtx).flatMap(self.visitStreamingToUnary(_, req, request)) override def visitStreamingToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, B], + callCtx: ClientCallContext[Req, Res, B], req: Stream[F, Req], request: (Stream[G, Req], Metadata) => Stream[G, Res] ): Stream[F, Res] = @@ -73,27 +71,27 @@ trait ClientAspect[F[_], G[_], Dom[_], Cod[_], A] { self => } object ClientAspect { - def default[F[_], Dom[_], Cod[_]] = new ClientAspect[F, F, Dom, Cod, Metadata] { + def default[F[_]] = new ClientAspect[F, F, Metadata] { override def visitUnaryToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + callCtx: ClientCallContext[Req, Res, Metadata], req: Req, request: (Req, Metadata) => F[Res] ): F[Res] = request(req, callCtx.ctx) override def visitUnaryToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + callCtx: ClientCallContext[Req, Res, Metadata], req: Req, request: (Req, Metadata) => Stream[F, Res] ): Stream[F, Res] = request(req, callCtx.ctx) override def visitStreamingToUnary[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + callCtx: ClientCallContext[Req, Res, Metadata], req: Stream[F, Req], request: (Stream[F, Req], Metadata) => F[Res] ): F[Res] = request(req, callCtx.ctx) override def visitStreamingToStreaming[Req, Res]( - callCtx: ClientCallContext[Req, Res, Dom, Cod, Metadata], + callCtx: ClientCallContext[Req, Res, Metadata], req: Stream[F, Req], request: (Stream[F, Req], Metadata) => Stream[F, Res] ): Stream[F, Res] = request(req, callCtx.ctx) diff --git a/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala index 9c61d6f5..56c7365d 100644 --- a/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala +++ b/runtime/src/main/scala/fs2/grpc/server/ServiceAspect.scala @@ -5,42 +5,40 @@ import cats._ import cats.syntax.all._ import fs2.Stream -final case class ServerCallContext[Req, Res, Dom[_], Cod[_]]( +final case class ServerCallContext[Req, Res]( metadata: Metadata, - methodDescriptor: MethodDescriptor[Req, Res], - dom: Dom[Req], - cod: Cod[Res] + methodDescriptor: MethodDescriptor[Req, Res] ) -trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => +trait ServiceAspect[F[_], G[_], A] { self => def visitUnaryToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, next: (Req, A) => F[Res] ): G[Res] def visitUnaryToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, next: (Req, A) => fs2.Stream[F, Res] ): fs2.Stream[G, Res] def visitStreamingToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[G, Req], next: (fs2.Stream[F, Req], A) => F[Res] ): G[Res] def visitStreamingToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[G, Req], next: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res] ): fs2.Stream[G, Res] - def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, G, Dom, Cod, B] = - new ServiceAspect[F, G, Dom, Cod, B] { + def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, G, B] = + new ServiceAspect[F, G, B] { override def visitUnaryToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, request: (Req, B) => F[Res] ): G[Res] = @@ -51,7 +49,7 @@ trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => ) override def visitUnaryToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, request: (Req, B) => Stream[F, Res] ): Stream[G, Res] = @@ -62,7 +60,7 @@ trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => ) override def visitStreamingToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[G, Req], request: (Stream[F, Req], B) => F[Res] ): G[Res] = @@ -73,7 +71,7 @@ trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => ) override def visitStreamingToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[G, Req], request: (Stream[F, Req], B) => Stream[F, Res] ): Stream[G, Res] = @@ -86,27 +84,27 @@ trait ServiceAspect[F[_], G[_], Dom[_], Cod[_], A] { self => } object ServiceAspect { - def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, F, Dom, Cod, Metadata] { + def default[F[_]] = new ServiceAspect[F, F, Metadata] { override def visitUnaryToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, request: (Req, Metadata) => F[Res] ): F[Res] = request(req, callCtx.metadata) override def visitUnaryToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: Req, request: (Req, Metadata) => Stream[F, Res] ): Stream[F, Res] = request(req, callCtx.metadata) override def visitStreamingToUnary[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[F, Req], request: (Stream[F, Req], Metadata) => F[Res] ): F[Res] = request(req, callCtx.metadata) override def visitStreamingToStreaming[Req, Res]( - callCtx: ServerCallContext[Req, Res, Dom, Cod], + callCtx: ServerCallContext[Req, Res], req: fs2.Stream[F, Req], request: (Stream[F, Req], Metadata) => Stream[F, Res] ): Stream[F, Res] = request(req, callCtx.metadata) diff --git a/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala b/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala deleted file mode 100644 index 01a0a64f..00000000 --- a/runtime/src/main/scala/fs2/grpc/shared/Trivial.scala +++ /dev/null @@ -1,9 +0,0 @@ -package fs2.grpc.shared - -// A typeclass that exists for every type A -final class Trivial[A] private extends Serializable - -object Trivial { - private val any = new Trivial[Any] - implicit def instance[A]: Trivial[A] = any.asInstanceOf[Trivial[A]] -} From 113bab386c075ee3dc4042901e944acc73d10f43 Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Sun, 17 Sep 2023 18:48:24 +0200 Subject: [PATCH 7/9] test resource --- .../resources/TestServiceFs2Grpc.scala.txt | 102 +++++++++++++----- 1 file changed, 76 insertions(+), 26 deletions(-) diff --git a/e2e/src/test/resources/TestServiceFs2Grpc.scala.txt b/e2e/src/test/resources/TestServiceFs2Grpc.scala.txt index 3c4b5add..c738c35e 100644 --- a/e2e/src/test/resources/TestServiceFs2Grpc.scala.txt +++ b/e2e/src/test/resources/TestServiceFs2Grpc.scala.txt @@ -11,36 +11,86 @@ trait TestServiceFs2Grpc[F[_], A] { object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2Grpc] { - def mkClient[F[_]: _root_.cats.effect.Async, A](dispatcher: _root_.cats.effect.std.Dispatcher[F], channel: _root_.io.grpc.Channel, mkMetadata: A => F[_root_.io.grpc.Metadata], clientOptions: _root_.fs2.grpc.client.ClientOptions): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] { - def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] = { - mkMetadata(ctx).flatMap { m => - _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(request, m)) - } - } - def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] = { - mkMetadata(ctx).flatMap { m => - _root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(request, m)) - } - } - def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = { - _root_.fs2.Stream.eval(mkMetadata(ctx)).flatMap { m => - _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(request, m)) - } - } - def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = { - _root_.fs2.Stream.eval(mkMetadata(ctx)).flatMap { m => - _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(request, m)) - } - } + def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A]( + dispatcher: _root_.cats.effect.std.Dispatcher[G], + channel: _root_.io.grpc.Channel, + clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A], + clientOptions: _root_.fs2.grpc.client.ClientOptions + ): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] { + def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] = + clientAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING), + request, + (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m)) + ) + def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] = + clientAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING), + request, + (req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m)) + ) + def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = + clientAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING), + request, + (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m)) + ) + def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = + clientAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING), + request, + (req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m)) + ) } - protected def serviceBinding[F[_]: _root_.cats.effect.Async, A](dispatcher: _root_.cats.effect.std.Dispatcher[F], serviceImpl: TestServiceFs2Grpc[F, A], mkCtx: _root_.io.grpc.Metadata => F[A], serverOptions: _root_.fs2.grpc.server.ServerOptions): _root_.io.grpc.ServerServiceDefinition = { + protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A]( + dispatcher: _root_.cats.effect.std.Dispatcher[G], + serviceImpl: TestServiceFs2Grpc[F, A], + serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A], + serverOptions: _root_.fs2.grpc.server.ServerOptions + ) = { _root_.io.grpc.ServerServiceDefinition .builder(hello.world.TestServiceGrpc.SERVICE) - .addMethod(hello.world.TestServiceGrpc.METHOD_NO_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => mkCtx(m).flatMap(serviceImpl.noStreaming(r, _)))) - .addMethod(hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => mkCtx(m).flatMap(serviceImpl.clientStreaming(r, _)))) - .addMethod(hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => _root_.fs2.Stream.eval(mkCtx(m)).flatMap(serviceImpl.serverStreaming(r, _)))) - .addMethod(hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => _root_.fs2.Stream.eval(mkCtx(m)).flatMap(serviceImpl.bothStreaming(r, _)))) + .addMethod( + hello.world.TestServiceGrpc.METHOD_NO_STREAMING, + _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => + serviceAspect.visitUnaryToUnary[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING), + r, + (r, m) => serviceImpl.noStreaming(r, m) + ) + } + ) + .addMethod( + hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, + _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => + serviceAspect.visitStreamingToUnary[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING), + r, + (r, m) => serviceImpl.clientStreaming(r, m) + ) + } + ) + .addMethod( + hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, + _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => + serviceAspect.visitUnaryToStreaming[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING), + r, + (r, m) => serviceImpl.serverStreaming(r, m) + ) + } + ) + .addMethod( + hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, + _root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) => + serviceAspect.visitStreamingToStreaming[hello.world.TestMessage, hello.world.TestMessage]( + _root_.fs2.grpc.server.ServerCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING), + r, + (r, m) => serviceImpl.bothStreaming(r, m) + ) + } + ) .build() } From 584c51a658bafb860e73577f2522e30dc2f111a7 Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Sun, 17 Sep 2023 23:05:39 +0200 Subject: [PATCH 8/9] very big test --- build.sbt | 5 +- .../test/scala/fs2/grpc/e2e/AspectSpec.scala | 301 ++++++++++++++++++ 2 files changed, 305 insertions(+), 1 deletion(-) create mode 100644 e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala diff --git a/build.sbt b/build.sbt index e5028fc4..73fd4662 100644 --- a/build.sbt +++ b/build.sbt @@ -134,7 +134,10 @@ lazy val e2e = (projectMatrix in file("e2e")) .settings( codeGenClasspath := (codeGenJVM212 / Compile / fullClasspath).value, libraryDependencies := Nil, - libraryDependencies ++= List(scalaPbGrpcRuntime, scalaPbRuntime, scalaPbRuntime % "protobuf", ceMunit % Test), + libraryDependencies ++= List(grpcApi, scalaPbGrpcRuntime, scalaPbRuntime, scalaPbRuntime % "protobuf", ceMunit % Test), + libraryDependencies ++= Seq( + "io.grpc" % "grpc-services" % versions.grpc % Test, + ), Compile / PB.targets := Seq( scalapb.gen() -> (Compile / sourceManaged).value / "scalapb", genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "fs2-grpc" diff --git a/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala b/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala new file mode 100644 index 00000000..78f4ab77 --- /dev/null +++ b/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala @@ -0,0 +1,301 @@ +package fs2.grpc.e2e + +import hello.world._ +import io.grpc.inprocess._ +import cats.effect._ +import cats.implicits._ +import munit._ +import cats.effect.std.UUIDGen +import io.grpc._ +import scala.jdk.CollectionConverters._ +import cats.effect.std.Dispatcher +import fs2.grpc.server._ +import fs2.grpc.client._ +import fs2.grpc.syntax.all._ +import cats.data._ +import cats._ + +class AspectSpec extends CatsEffectSuite with CatsEffectFunFixtures { + def startServices[F[_]](id: String)(xs: ServerServiceDefinition*)(implicit F: Sync[F]): Resource[F, Server] = + InProcessServerBuilder + .forName(id.toString()) + .addServices(xs.toList.asJava) + .resource[F] + .evalTap(s => F.delay(s.start())) + + def testConnection[F[_]: Async, G[_], A, B]( + service: TestServiceFs2Grpc[G, A], + serviceAspect: ServiceAspect[G, F, A], + clientAspect: ClientAspect[G, F, B] + ): Resource[F, TestServiceFs2Grpc[G, B]] = + Dispatcher.parallel[F].flatMap { d => + Resource.eval(UUIDGen.randomUUID[F]).flatMap { id => + startServices[F](id.toString())( + TestServiceFs2Grpc.serviceFull[G, F, A]( + d, + service, + serviceAspect, + ServerOptions.default + ) + ) >> InProcessChannelBuilder.forName(id.toString()).usePlaintext().resource[F].map { conn => + TestServiceFs2Grpc.mkClientFull[G, F, B]( + d, + conn, + clientAspect, + ClientOptions.default + ) + } + } + } + + test("tracing requests should work as expected") { + case class TracingKey(value: String) + case class Span(name: String, parent: Either[Span, Option[TracingKey]]) { + def traceKey: Option[TracingKey] = parent.leftMap(_.traceKey).merge + } + case class SpanInfo(span: Span, messages: List[String]) + type WriteSpan[A] = WriterT[IO, List[SpanInfo], A] + type Traced[A] = Kleisli[WriteSpan, Span, A] + val Traced: Monad[Traced] = Monad[Traced] + val liftK: IO ~> Traced = WriterT.liftK[IO, List[SpanInfo]] andThen Kleisli.liftK[WriteSpan, Span] + def span[A](name: String)(fa: Traced[A]): Traced[A] = + fa.local[Span](parent => Span(name, Left(parent))) + + def spanStream[A](name: String)(fa: fs2.Stream[Traced, A]): fs2.Stream[Traced, A] = { + val current: Traced[Span] = Kleisli.ask + fs2.Stream.eval(current).flatMap { parent => + fa.translate(new (Traced ~> Traced) { + def apply[B](fa: Traced[B]): Traced[B] = + Kleisli.local((_: Span) => Span(name, Left(parent)))(fa) + }) + } + } + + def tell(spanInfos: List[SpanInfo]): Traced[Unit] = + Kleisli.liftF(WriterT.tell[IO, List[SpanInfo]](spanInfos)) + + def log(msgs: String*): Traced[Unit] = Kleisli.ask[WriteSpan, Span].flatMap { current => + tell(List(SpanInfo(current, msgs.toList))) + } + + val tracingHeaderKey = Metadata.Key.of("TRACE_KEY", Metadata.ASCII_STRING_MARSHALLER) + def getTraceHeader(ctx: Metadata): Option[TracingKey] = + Option(ctx.get(tracingHeaderKey)).map(TracingKey(_)) + + def serializeTraceHeader(key: TracingKey): Metadata = { + val m = new Metadata + m.put(tracingHeaderKey, key.value) + m + } + + def getTracingHeader: Traced[Metadata] = + Kleisli.ask[WriteSpan, Span].map { span => + span.traceKey.map(serializeTraceHeader).getOrElse(new Metadata) + } + + val service = new TestServiceFs2Grpc[Traced, Metadata] { + override def noStreaming(request: TestMessage, ctx: Metadata): Traced[TestMessage] = + span("noStreaming") { + log("noStreaming") >> + Traced.pure(TestMessage.defaultInstance) + } + + override def clientStreaming(request: fs2.Stream[Traced, TestMessage], ctx: Metadata): Traced[TestMessage] = + span("clientStreaming") { + log("clientStreaming") >> + request.compile.last.map(_.getOrElse(TestMessage.defaultInstance)) + } + + override def serverStreaming(request: TestMessage, ctx: Metadata): fs2.Stream[Traced, TestMessage] = + spanStream("serverStreaming") { + fs2.Stream.eval(log("serverStreaming")) >> + fs2.Stream(request).repeatN(2) + } + + override def bothStreaming( + request: fs2.Stream[Traced, TestMessage], + ctx: Metadata + ): fs2.Stream[Traced, TestMessage] = + spanStream("bothStreaming") { + fs2.Stream.eval(log("bothStreaming")) >> + request + } + } + + IO.ref(List.empty[SpanInfo]).map { state => + def runRootTrace[A](cctx: ServerCallContext[?, ?])(fa: Traced[A]): IO[A] = { + val root = Span(cctx.methodDescriptor.getFullMethodName(), Right(getTraceHeader(cctx.metadata))) + fa.run(root).run.flatMap { case (xs, a) => + state.update(_ ++ xs) as a + } + } + + def runRootTraceStreamed[A](cctx: ServerCallContext[?, ?])(fa: fs2.Stream[Traced, A]): fs2.Stream[IO, A] = + fa.translate(new (Traced ~> IO) { + def apply[B](fa: Traced[B]): IO[B] = runRootTrace(cctx)(fa) + }) + + val tracingServiceAspect = new ServiceAspect[Traced, IO, Metadata] { + override def visitUnaryToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res], + req: Req, + next: (Req, Metadata) => Traced[Res] + ): IO[Res] = runRootTrace(callCtx)(next(req, callCtx.metadata)) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res], + req: Req, + next: (Req, Metadata) => fs2.Stream[Traced, Res] + ): fs2.Stream[IO, Res] = runRootTraceStreamed(callCtx)(next(req, callCtx.metadata)) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ServerCallContext[Req, Res], + req: fs2.Stream[IO, Req], + next: (fs2.Stream[Traced, Req], Metadata) => Traced[Res] + ): IO[Res] = runRootTrace(callCtx)(next(req.translate(liftK), callCtx.metadata)) + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ServerCallContext[Req, Res], + req: fs2.Stream[IO, Req], + next: (fs2.Stream[Traced, Req], Metadata) => fs2.Stream[Traced, Res] + ): fs2.Stream[IO, Res] = runRootTraceStreamed(callCtx)(next(req.translate(liftK), callCtx.metadata)) + } + + val tracingClientAspect = new ClientAspect[Traced, IO, Unit] { + override def visitUnaryToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Unit], + req: Req, + request: (Req, Metadata) => IO[Res] + ): Traced[Res] = + getTracingHeader.flatMap(md => liftK(request(req, md))) + + override def visitUnaryToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Unit], + req: Req, + request: (Req, Metadata) => fs2.Stream[IO, Res] + ): fs2.Stream[Traced, Res] = + fs2.Stream.eval(getTracingHeader).flatMap(md => request(req, md).translate(liftK)) + + override def visitStreamingToUnary[Req, Res]( + callCtx: ClientCallContext[Req, Res, Unit], + req: fs2.Stream[Traced, Req], + request: (fs2.Stream[IO, Req], Metadata) => IO[Res] + ): Traced[Res] = Kleisli.ask[WriteSpan, Span].flatMap { parent => + getTracingHeader.flatMap { md => + liftK(IO.ref(List.empty[SpanInfo])).flatMap { state => + val req2 = req.translate(new (Traced ~> IO) { + def apply[A](fa: Traced[A]): IO[A] = + fa.run(parent).run.flatMap { case (xs, a) => + state.update(_ ++ xs) as a + } + }) + liftK(request(req2, md)) <* (liftK(state.get) >>= tell) + } + } + } + + override def visitStreamingToStreaming[Req, Res]( + callCtx: ClientCallContext[Req, Res, Unit], + req: fs2.Stream[Traced, Req], + request: (fs2.Stream[IO, Req], Metadata) => fs2.Stream[IO, Res] + ): fs2.Stream[Traced, Res] = + fs2.Stream.eval(Kleisli.ask[WriteSpan, Span]).flatMap { parent => + fs2.Stream.eval(getTracingHeader).flatMap { md => + fs2.Stream.eval(liftK(IO.ref(List.empty[SpanInfo]))).flatMap { state => + val req2 = req.translate(new (Traced ~> IO) { + def apply[A](fa: Traced[A]): IO[A] = + fa.run(parent).run.flatMap { case (xs, a) => + state.update(_ ++ xs) as a + } + }) + request(req2, md).translate(liftK) ++ fs2.Stream.exec((liftK(state.get) >>= tell)) + } + } + } + } + + testConnection[IO, Traced, Metadata, Unit]( + service, + tracingServiceAspect, + tracingClientAspect + ).use{ (client: TestServiceFs2Grpc[Traced, Unit]) => + + def testWithKey(rootKey: Option[TracingKey] = None) = { + def trackServer[A](fa: IO[A]): IO[List[SpanInfo]] = + state.set(Nil) >> fa >> state.get + + def trackClient[A](traced: Traced[A]): IO[List[SpanInfo]] = + traced.run(Span("root", Right(rootKey))).written + + def trackAndAssertServer[A](name: String, n: Int)(fa: IO[A]): IO[Unit] = + trackServer(fa).map{ serverInfos => + assertEquals(serverInfos.size, n) + serverInfos.foreach{ si => + assertEquals(si.span.name, name) + assert(clue(si.span.parent).isRight, "is root") + assertEquals(si.span.parent.toOption.get, rootKey) + assertEquals(si.messages, List(name)) + } + } + + def trackAndAssertClient[A](name: String, n: Int)(fa: Traced[A]): IO[Unit] = + trackClient(fa).map{ clientInfos => + assertEquals(clientInfos.size, n) + clientInfos.foreach{ ci => + assertEquals(ci.span.name, s"client-${name}") + assert(clue(ci.span.parent).isRight, "is root") + assertEquals(ci.span.parent.toOption.get, rootKey) + assertEquals(ci.messages, List(s"client-${name}")) + } + } + + val noStreaming = trackAndAssertServer("noStreaming", 1) { + trackAndAssertClient("noStreaming", 1){ + span("client-noStreaming") { + log("client-noStreaming") >> + client.noStreaming(TestMessage.defaultInstance, ()) + } + } + } + + val clientStreaming = trackAndAssertServer("clientStreaming", 1) { + trackAndAssertClient("clientStreaming", 2){ + val req = fs2.Stream.eval{ + span("client-clientStreaming") { + log("client-clientStreaming").as(TestMessage.defaultInstance) + } + }.repeatN(2) + + client.clientStreaming(req, ()) + } + } + + val serverStreaming = trackAndAssertServer("serverStreaming", 2) { + trackAndAssertClient("serverStreaming", 2){ + span("client-serverStreaming") { + log("client-serverStreaming") >> + client.serverStreaming(TestMessage.defaultInstance, ()).compile.drain + } + } + } + + val bothStreaming = trackAndAssertServer("bothStreaming", 2) { + trackAndAssertClient("bothStreaming", 2){ + val req = fs2.Stream.eval{ + span("client-bothStreaming") { + log("client-bothStreaming").as(TestMessage.defaultInstance) + } + }.repeatN(2) + + client.bothStreaming(req, ()).compile.drain + } + } + + noStreaming >> clientStreaming >> serverStreaming >> bothStreaming + } + + testWithKey() >> testWithKey(Some(TracingKey("my_tracing_key"))) + } + } + } +} From 8f9b947d5ef1fe6fa979659b91d5a1d0161cad1a Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Sun, 17 Sep 2023 23:18:41 +0200 Subject: [PATCH 9/9] tracing test done --- .../test/scala/fs2/grpc/e2e/AspectSpec.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala b/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala index 78f4ab77..6cefd257 100644 --- a/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala +++ b/e2e/src/test/scala/fs2/grpc/e2e/AspectSpec.scala @@ -108,8 +108,7 @@ class AspectSpec extends CatsEffectSuite with CatsEffectFunFixtures { override def serverStreaming(request: TestMessage, ctx: Metadata): fs2.Stream[Traced, TestMessage] = spanStream("serverStreaming") { - fs2.Stream.eval(log("serverStreaming")) >> - fs2.Stream(request).repeatN(2) + fs2.Stream(request).repeatN(2).evalTap(_ => log("serverStreaming")) } override def bothStreaming( @@ -117,12 +116,11 @@ class AspectSpec extends CatsEffectSuite with CatsEffectFunFixtures { ctx: Metadata ): fs2.Stream[Traced, TestMessage] = spanStream("bothStreaming") { - fs2.Stream.eval(log("bothStreaming")) >> - request + request.evalTap(_ => log("bothStreaming")) } } - IO.ref(List.empty[SpanInfo]).map { state => + IO.ref(List.empty[SpanInfo]).flatMap { state => def runRootTrace[A](cctx: ServerCallContext[?, ?])(fa: Traced[A]): IO[A] = { val root = Span(cctx.methodDescriptor.getFullMethodName(), Right(getTraceHeader(cctx.metadata))) fa.run(root).run.flatMap { case (xs, a) => @@ -227,24 +225,22 @@ class AspectSpec extends CatsEffectSuite with CatsEffectFunFixtures { def trackClient[A](traced: Traced[A]): IO[List[SpanInfo]] = traced.run(Span("root", Right(rootKey))).written - def trackAndAssertServer[A](name: String, n: Int)(fa: IO[A]): IO[Unit] = + def trackAndAssertServer[A](name: String, n: Int)(fa: IO[A])(implicit loc: Location): IO[Unit] = trackServer(fa).map{ serverInfos => assertEquals(serverInfos.size, n) serverInfos.foreach{ si => assertEquals(si.span.name, name) - assert(clue(si.span.parent).isRight, "is root") - assertEquals(si.span.parent.toOption.get, rootKey) + assert(clue(si.span.parent).isLeft, "is child") assertEquals(si.messages, List(name)) } } - def trackAndAssertClient[A](name: String, n: Int)(fa: Traced[A]): IO[Unit] = + def trackAndAssertClient[A](name: String, n: Int)(fa: Traced[A])(implicit loc: Location): IO[Unit] = trackClient(fa).map{ clientInfos => assertEquals(clientInfos.size, n) clientInfos.foreach{ ci => assertEquals(ci.span.name, s"client-${name}") - assert(clue(ci.span.parent).isRight, "is root") - assertEquals(ci.span.parent.toOption.get, rootKey) + assert(clue(ci.span.parent).isLeft, "is child") assertEquals(ci.messages, List(s"client-${name}")) } } @@ -271,7 +267,7 @@ class AspectSpec extends CatsEffectSuite with CatsEffectFunFixtures { } val serverStreaming = trackAndAssertServer("serverStreaming", 2) { - trackAndAssertClient("serverStreaming", 2){ + trackAndAssertClient("serverStreaming", 1){ span("client-serverStreaming") { log("client-serverStreaming") >> client.serverStreaming(TestMessage.defaultInstance, ()).compile.drain