diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 42d7a84fc6..8f3d0434dc 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,21 +1,20 @@ # See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.238.1/containers/java/.devcontainer/base.Dockerfile # [Choice] Java version (use -bullseye variants on local arm64/Apple Silicon): 11, 17, 11-bullseye, 17-bullseye, 11-buster, 17-buster -ARG VARIANT="11" -FROM mcr.microsoft.com/vscode/devcontainers/java:0-${VARIANT} +ARG VARIANT="17" +FROM mcr.microsoft.com/vscode/devcontainers/java:${VARIANT} RUN curl -s "https://get.sdkman.io" | bash # Install Scala Lang -ARG SBT_VERSION="1.7.1" +ARG SBT_VERSION="1.10.1" RUN \ curl -L "https://github.com/sbt/sbt/releases/download/v$SBT_VERSION/sbt-$SBT_VERSION.tgz" | tar zxf - -C /usr/share && \ cd /usr/share/sbt/bin && \ - rm sbt.bat sbtn-x86_64-apple-darwin sbtn-x86_64-pc-linux sbtn-x86_64-pc-win32.exe && \ ln -s /usr/share/sbt/bin/sbt /usr/local/bin/sbt -ARG SCALA_VERSION="3.1.3" +ARG SCALA_VERSION="3.3.3" RUN \ mkdir /setup-project && \ cd /setup-project && \ @@ -24,4 +23,13 @@ RUN \ sbt compile && \ rm -rf /setup-project +RUN \ + mkdir /setup-wrk && \ + sudo apt-get update -y && sudo apt-get install build-essential libssl-dev git -y && \ + git clone https://github.com/wg/wrk.git wrk && \ + cd wrk && \ + make && \ + cp wrk /usr/local/bin && \ + rm -rf /setup-wrk + CMD ["sbt"] diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index ee9972a020..08305f0851 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -8,8 +8,8 @@ // Update the VARIANT arg to pick a Java version: 11, 17 // Append -bullseye or -buster to pin to an OS version. // Use the -bullseye variants on local arm64/Apple Silicon. - "VARIANT": "11", - "SCALA_VERSION": "3.1.3" + "VARIANT": "17", + "SCALA_VERSION": "3.3.3" } }, diff --git a/build.sbt b/build.sbt index fd13e006d5..51eca7c909 100644 --- a/build.sbt +++ b/build.sbt @@ -281,6 +281,8 @@ lazy val zioHttpExample = (project in file("zio-http-example")) .settings(runSettings(Debug.Main)) .settings(libraryDependencies ++= Seq(`jwt-core`, `zio-schema-json`)) .settings( + run / fork := true, + run / javaOptions ++= Seq("-Xms4G", "-Xmx4G", "-XX:+UseG1GC"), libraryDependencies ++= Seq( `zio-config`, `zio-config-magnolia`, @@ -404,7 +406,7 @@ lazy val docs = project testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"), libraryDependencies ++= Seq( `jwt-core`, - "dev.zio" %% "zio-test" % ZioVersion, + "dev.zio" %% "zio-test" % ZioVersion, `zio-config`, `zio-config-magnolia`, `zio-config-typesafe`, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a8c6e1c563..abca59886e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,15 +21,15 @@ object Dependencies { val netty = Seq( - "io.netty" % "netty-codec-http" % NettyVersion, - "io.netty" % "netty-handler-proxy" % NettyVersion, - "io.netty" % "netty-transport-native-epoll" % NettyVersion, - "io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-x86_64", - "io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-aarch_64", - "io.netty" % "netty-transport-native-kqueue" % NettyVersion, - "io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-x86_64", - "io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-aarch_64", - "com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided", + "io.netty" % "netty-codec-http" % NettyVersion, + "io.netty" % "netty-handler-proxy" % NettyVersion, + "io.netty" % "netty-transport-native-epoll" % NettyVersion, + "io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-x86_64", + "io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-aarch_64", + "io.netty" % "netty-transport-native-kqueue" % NettyVersion, + "io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-x86_64", + "io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-aarch_64", + "com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided", ) val `netty-incubator` = diff --git a/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala b/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala index e0e33d7bed..f5f61b1ece 100644 --- a/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala +++ b/zio-http-example/src/main/scala/example/PlainTextBenchmarkServer.scala @@ -36,11 +36,9 @@ object PlainTextBenchmarkServer extends ZIOAppDefault { private val config = Server.Config.default .port(8080) - .enableRequestStreaming private val nettyConfig = NettyConfig.default .leakDetection(LeakDetectionLevel.DISABLED) - .maxThreads(8) private val configLayer = ZLayer.succeed(config) private val nettyConfigLayer = ZLayer.succeed(nettyConfig) diff --git a/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala b/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala index 60a93aa8b6..0f54036120 100644 --- a/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala +++ b/zio-http-example/src/main/scala/example/SimpleEffectBenchmarkServer.scala @@ -33,11 +33,9 @@ object SimpleEffectBenchmarkServer extends ZIOAppDefault { private val config = Server.Config.default .port(8080) - .enableRequestStreaming private val nettyConfig = NettyConfig.default .leakDetection(LeakDetectionLevel.DISABLED) - .maxThreads(8) private val configLayer = ZLayer.succeed(config) private val nettyConfigLayer = ZLayer.succeed(nettyConfig) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala b/zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala index b9b96e83d2..fd7941b621 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala @@ -30,12 +30,19 @@ final case class NettyConfig( nThreads: Int, shutdownQuietPeriodDuration: Duration, shutdownTimeoutDuration: Duration, + bossGroup: NettyConfig.BossGroup, ) extends EventLoopGroups.Config { self => + /** + * Configure Netty's boss event-loop group. This only applies to server + * applications and is ignored for the Client + */ + def bossGroup(cfg: NettyConfig.BossGroup): NettyConfig = self.copy(bossGroup = cfg) + def channelType(channelType: ChannelType): NettyConfig = self.copy(channelType = channelType) /** - * Configure the server to use the leak detection level provided. + * Configure Netty to use the leak detection level provided. * * @see * Right(ChannelType.AUTO) - case "nio" => Right(ChannelType.NIO) - case "epoll" => Right(ChannelType.EPOLL) - case "kqueue" => Right(ChannelType.KQUEUE) - case "uring" => Right(ChannelType.URING) - case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other")) - } - .withDefault(NettyConfig.default.channelType) ++ + final case class BossGroup( + channelType: ChannelType, + nThreads: Int, + shutdownQuietPeriodDuration: Duration, + shutdownTimeOutDuration: Duration, + ) extends EventLoopGroups.Config { + def shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS + def shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis + def shutdownTimeOut: Long = shutdownTimeOutDuration.toMillis + } + + private def baseConfig: Config[EventLoopGroups.Config] = + (Config + .string("channel-type") + .mapOrFail { + case "auto" => Right(ChannelType.AUTO) + case "nio" => Right(ChannelType.NIO) + case "epoll" => Right(ChannelType.EPOLL) + case "kqueue" => Right(ChannelType.KQUEUE) + case "uring" => Right(ChannelType.URING) + case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other")) + } + .withDefault(NettyConfig.default.channelType) ++ Config.int("max-threads").withDefault(NettyConfig.default.nThreads) ++ Config.duration("shutdown-quiet-period").withDefault(NettyConfig.default.shutdownQuietPeriodDuration) ++ Config.duration("shutdown-timeout").withDefault(NettyConfig.default.shutdownTimeoutDuration)).map { - case (leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout) => - NettyConfig(leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout) + case (channelT, maxThreads, quietPeriod, timeout) => + new EventLoopGroups.Config { + override val channelType: ChannelType = channelT + override val nThreads: Int = maxThreads + override val shutdownQuietPeriod: Long = quietPeriod.toMillis + override val shutdownTimeOut: Long = timeout.toMillis + override val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS + } } - val default: NettyConfig = NettyConfig( - LeakDetectionLevel.SIMPLE, - ChannelType.AUTO, - 0, - // Defaults taken from io.netty.util.concurrent.AbstractEventExecutor - Duration.fromSeconds(2), - Duration.fromSeconds(15), - ) - - val defaultWithFastShutdown: NettyConfig = default.copy( - shutdownQuietPeriodDuration = Duration.fromMillis(50), - shutdownTimeoutDuration = Duration.fromMillis(250), - ) + def config: Config[NettyConfig] = + (LeakDetectionLevel.config.nested("leak-detection-level").withDefault(NettyConfig.default.leakDetectionLevel) ++ + baseConfig.nested("worker-group").orElse(baseConfig) ++ + baseConfig.nested("boss-group")).map { case (leakDetectionLevel, worker, boss) => + def toDuration(n: Long, timeUnit: TimeUnit) = Duration.fromJava(java.time.Duration.of(n, timeUnit.toChronoUnit)) + NettyConfig( + leakDetectionLevel, + worker.channelType, + worker.nThreads, + shutdownQuietPeriodDuration = toDuration(worker.shutdownQuietPeriod, worker.shutdownTimeUnit), + shutdownTimeoutDuration = toDuration(worker.shutdownTimeOut, worker.shutdownTimeUnit), + NettyConfig.BossGroup( + boss.channelType, + boss.nThreads, + shutdownQuietPeriodDuration = toDuration(boss.shutdownQuietPeriod, boss.shutdownTimeUnit), + shutdownTimeOutDuration = toDuration(boss.shutdownTimeOut, boss.shutdownTimeUnit), + ), + ) + } + + val default: NettyConfig = { + val quietPeriod = Duration.fromSeconds(2) + val timeout = Duration.fromSeconds(15) + NettyConfig( + LeakDetectionLevel.SIMPLE, + ChannelType.AUTO, + java.lang.Runtime.getRuntime.availableProcessors(), + // Defaults taken from io.netty.util.concurrent.AbstractEventExecutor + shutdownQuietPeriodDuration = quietPeriod, + shutdownTimeoutDuration = timeout, + NettyConfig.BossGroup( + ChannelType.AUTO, + 1, + shutdownQuietPeriodDuration = quietPeriod, + shutdownTimeOutDuration = timeout, + ), + ) + } + + val defaultWithFastShutdown: NettyConfig = { + val quietPeriod = Duration.fromMillis(50) + val timeout = Duration.fromMillis(250) + default.copy( + shutdownQuietPeriodDuration = quietPeriod, + shutdownTimeoutDuration = timeout, + bossGroup = default.bossGroup.copy( + shutdownQuietPeriodDuration = quietPeriod, + shutdownTimeOutDuration = timeout, + ), + ) + } sealed trait LeakDetectionLevel { self => diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/server/NettyDriver.scala b/zio-http/jvm/src/main/scala/zio/http/netty/server/NettyDriver.scala index 918be5d148..334b186711 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/server/NettyDriver.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/server/NettyDriver.scala @@ -35,7 +35,7 @@ private[zio] final case class NettyDriver( channelFactory: ChannelFactory[ServerChannel], channelInitializer: ChannelInitializer[Channel], serverInboundHandler: ServerInboundHandler, - eventLoopGroup: EventLoopGroup, + eventLoopGroups: ServerEventLoopGroups, serverConfig: Server.Config, nettyConfig: NettyConfig, ) extends Driver { self => @@ -44,7 +44,7 @@ private[zio] final case class NettyDriver( for { chf <- ZIO.attempt { new ServerBootstrap() - .group(eventLoopGroup) + .group(eventLoopGroups.boss, eventLoopGroups.worker) .channelFactory(channelFactory) .childHandler(channelInitializer) .option[Integer](ChannelOption.SO_BACKLOG, serverConfig.soBacklog) @@ -84,7 +84,7 @@ private[zio] final case class NettyDriver( channelFactory <- ChannelFactories.Client.live.build .provideSomeEnvironment[Scope](_ ++ ZEnvironment[ChannelType.Config](nettyConfig)) nettyRuntime <- NettyRuntime.live.build - } yield NettyClientDriver(channelFactory.get, eventLoopGroup, nettyRuntime.get) + } yield NettyClientDriver(channelFactory.get, eventLoopGroups.worker, nettyRuntime.get) override def toString: String = s"NettyDriver($serverConfig)" } @@ -97,7 +97,7 @@ object NettyDriver { RoutesRef & ChannelFactory[ServerChannel] & ChannelInitializer[Channel] - & EventLoopGroup + & ServerEventLoopGroups & Server.Config & NettyConfig & ServerInboundHandler, @@ -108,7 +108,7 @@ object NettyDriver { app <- ZIO.service[RoutesRef] cf <- ZIO.service[ChannelFactory[ServerChannel]] cInit <- ZIO.service[ChannelInitializer[Channel]] - elg <- ZIO.service[EventLoopGroup] + elg <- ZIO.service[ServerEventLoopGroups] sc <- ZIO.service[Server.Config] nsc <- ZIO.service[NettyConfig] sih <- ZIO.service[ServerInboundHandler] @@ -117,14 +117,15 @@ object NettyDriver { channelFactory = cf, channelInitializer = cInit, serverInboundHandler = sih, - eventLoopGroup = elg, + eventLoopGroups = elg, serverConfig = sc, nettyConfig = nsc, ) - val manual: ZLayer[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = { + val manual + : ZLayer[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = { implicit val trace: Trace = Trace.empty - ZLayer.makeSome[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver]( + ZLayer.makeSome[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver]( ZLayer(AppRef.empty), ServerChannelInitializer.layer, ServerInboundHandler.live, @@ -135,7 +136,7 @@ object NettyDriver { val customized: ZLayer[Server.Config & NettyConfig, Throwable, Driver] = { val serverChannelFactory: ZLayer[NettyConfig, Nothing, ChannelFactory[ServerChannel]] = ChannelFactories.Server.fromConfig - val eventLoopGroup: ZLayer[NettyConfig, Nothing, EventLoopGroup] = EventLoopGroups.live + val eventLoopGroup: ZLayer[NettyConfig, Nothing, ServerEventLoopGroups] = ServerEventLoopGroups.live ZLayer.makeSome[Server.Config & NettyConfig, Driver]( eventLoopGroup, diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerEventLoopGroups.scala b/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerEventLoopGroups.scala new file mode 100644 index 0000000000..0763b84669 --- /dev/null +++ b/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerEventLoopGroups.scala @@ -0,0 +1,30 @@ +package zio.http.netty.server + +import zio._ +import zio.stacktracer.TracingImplicits.disableAutoTrace + +import zio.http.netty.{EventLoopGroups, NettyConfig} + +import io.netty.channel.EventLoopGroup + +final case class ServerEventLoopGroups( + boss: EventLoopGroup, + worker: EventLoopGroup, +) + +object ServerEventLoopGroups { + private implicit val trace: Trace = Trace.empty + + private def groupLayer(cfg: EventLoopGroups.Config): ULayer[EventLoopGroup] = + (ZLayer.succeed(cfg) >>> EventLoopGroups.live).fresh + + val live: ZLayer[NettyConfig, Nothing, ServerEventLoopGroups] = ZLayer.fromZIO { + ZIO.serviceWith[NettyConfig] { cfg => + val boss = groupLayer(cfg.bossGroup) + val worker = groupLayer(cfg) + boss.zipWithPar(worker) { (boss, worker) => + ZEnvironment(ServerEventLoopGroups(boss.get, worker.get)) + } + } + }.flatten +} diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala index 0c6f5354dd..5c11953cbf 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala @@ -148,7 +148,13 @@ private[zio] final case class ServerInboundHandler( def fastEncode(response: Response, bytes: Array[Byte]) = { val jResponse = NettyResponseEncoder.fastEncode(method, response, bytes) val djResponse = jResponse.retainedDuplicate() - ctx.writeAndFlush(djResponse, ctx.voidPromise()) + + // This handler sits at the tail of the pipeline, so using ctx.channel.writeAndFlush won't add any + // overhead of passing through the pipeline. It's also better to use ctx.channel.writeAndFlush in + // cases that we're writing to the channel from a different thread (which is most of the time as we're + // creating responses in ZIO's executor). + val ch = ctx.channel() + ch.writeAndFlush(djResponse, ch.voidPromise()) true } diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/server/package.scala b/zio-http/jvm/src/main/scala/zio/http/netty/server/package.scala index 9df69d0b62..11da4dc374 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/server/package.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/server/package.scala @@ -37,6 +37,7 @@ package object server { val live: ZLayer[Server.Config, Throwable, Driver] = NettyDriver.live - val manual: ZLayer[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = + val manual + : ZLayer[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = NettyDriver.manual } diff --git a/zio-http/jvm/src/test/scala/zio/http/netty/server/ServerEventLoopGroupsSpec.scala b/zio-http/jvm/src/test/scala/zio/http/netty/server/ServerEventLoopGroupsSpec.scala new file mode 100644 index 0000000000..315a728179 --- /dev/null +++ b/zio-http/jvm/src/test/scala/zio/http/netty/server/ServerEventLoopGroupsSpec.scala @@ -0,0 +1,52 @@ +package zio.http.netty.server + +import zio._ +import zio.test.TestAspect.{sequential, withLiveClock} +import zio.test.{Spec, TestEnvironment, assertTrue} + +import zio.http.ZIOHttpSpec +import zio.http.netty.NettyConfig + +object ServerEventLoopGroupsSpec extends ZIOHttpSpec { + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ServerEventLoopGroups")( + test("group sizes are as configured") { + val nBoss = 2 + val nWorker = 5 + val config = ZLayer.succeed { + val c = NettyConfig.defaultWithFastShutdown + c.copy(nThreads = nWorker, bossGroup = c.bossGroup.copy(nThreads = nBoss)) + } + + ZIO.scoped { + ServerEventLoopGroups.live.build.map { env => + val groups = env.get[ServerEventLoopGroups] + var bossThreads = 0 + var workerThreads = 0 + groups.boss.forEach(_ => bossThreads += 1) + groups.worker.forEach(_ => workerThreads += 1) + assertTrue(bossThreads == nBoss, workerThreads == nWorker) + } + }.provide(config) + }, + test("finalizers are run in parallel") { + val configLayer = ZLayer.succeed { + val c = NettyConfig.default + c.copy( + shutdownQuietPeriodDuration = 500.millis, + bossGroup = c.bossGroup.copy(shutdownQuietPeriodDuration = 500.millis), + ) + } + + val st = Ref.unsafe.make(0L)(Unsafe) + + (for { + _ <- ZIO.scoped(ServerEventLoopGroups.live.build <* Clock.nanoTime.flatMap(st.set)) + et <- Clock.nanoTime + st <- st.get + d = Duration.fromNanos(et - st).toMillis + } yield assertTrue(d >= 500L, d < 700L)).provide(configLayer) + } @@ withLiveClock, + ) @@ sequential +}