Skip to content

Commit

Permalink
Optimize netty integration and default config (#3114)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyri-petrou authored Sep 10, 2024
1 parent b900d12 commit 188ff2e
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 66 deletions.
18 changes: 13 additions & 5 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.238.1/containers/java/.devcontainer/base.Dockerfile

# [Choice] Java version (use -bullseye variants on local arm64/Apple Silicon): 11, 17, 11-bullseye, 17-bullseye, 11-buster, 17-buster
ARG VARIANT="11"
FROM mcr.microsoft.com/vscode/devcontainers/java:0-${VARIANT}
ARG VARIANT="17"
FROM mcr.microsoft.com/vscode/devcontainers/java:${VARIANT}


RUN curl -s "https://get.sdkman.io" | bash

# Install Scala Lang
ARG SBT_VERSION="1.7.1"
ARG SBT_VERSION="1.10.1"
RUN \
curl -L "https://github.com/sbt/sbt/releases/download/v$SBT_VERSION/sbt-$SBT_VERSION.tgz" | tar zxf - -C /usr/share && \
cd /usr/share/sbt/bin && \
rm sbt.bat sbtn-x86_64-apple-darwin sbtn-x86_64-pc-linux sbtn-x86_64-pc-win32.exe && \
ln -s /usr/share/sbt/bin/sbt /usr/local/bin/sbt

ARG SCALA_VERSION="3.1.3"
ARG SCALA_VERSION="3.3.3"
RUN \
mkdir /setup-project && \
cd /setup-project && \
Expand All @@ -24,4 +23,13 @@ RUN \
sbt compile && \
rm -rf /setup-project

RUN \
mkdir /setup-wrk && \
sudo apt-get update -y && sudo apt-get install build-essential libssl-dev git -y && \
git clone https://github.com/wg/wrk.git wrk && \
cd wrk && \
make && \
cp wrk /usr/local/bin && \
rm -rf /setup-wrk

CMD ["sbt"]
4 changes: 2 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// Update the VARIANT arg to pick a Java version: 11, 17
// Append -bullseye or -buster to pin to an OS version.
// Use the -bullseye variants on local arm64/Apple Silicon.
"VARIANT": "11",
"SCALA_VERSION": "3.1.3"
"VARIANT": "17",
"SCALA_VERSION": "3.3.3"
}
},

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ lazy val zioHttpExample = (project in file("zio-http-example"))
.settings(runSettings(Debug.Main))
.settings(libraryDependencies ++= Seq(`jwt-core`, `zio-schema-json`))
.settings(
run / fork := true,
run / javaOptions ++= Seq("-Xms4G", "-Xmx4G", "-XX:+UseG1GC"),
libraryDependencies ++= Seq(
`zio-config`,
`zio-config-magnolia`,
Expand Down Expand Up @@ -404,7 +406,7 @@ lazy val docs = project
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework"),
libraryDependencies ++= Seq(
`jwt-core`,
"dev.zio" %% "zio-test" % ZioVersion,
"dev.zio" %% "zio-test" % ZioVersion,
`zio-config`,
`zio-config-magnolia`,
`zio-config-typesafe`,
Expand Down
18 changes: 9 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ object Dependencies {

val netty =
Seq(
"io.netty" % "netty-codec-http" % NettyVersion,
"io.netty" % "netty-handler-proxy" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-x86_64",
"io.netty" % "netty-transport-native-epoll" % NettyVersion % Runtime classifier "linux-aarch_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion,
"io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-x86_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion % Runtime classifier "osx-aarch_64",
"com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided",
"io.netty" % "netty-codec-http" % NettyVersion,
"io.netty" % "netty-handler-proxy" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion,
"io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-x86_64",
"io.netty" % "netty-transport-native-epoll" % NettyVersion classifier "linux-aarch_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion,
"io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-x86_64",
"io.netty" % "netty-transport-native-kqueue" % NettyVersion classifier "osx-aarch_64",
"com.aayushatharva.brotli4j" % "brotli4j" % "1.16.0" % "provided",
)

val `netty-incubator` =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ object PlainTextBenchmarkServer extends ZIOAppDefault {

private val config = Server.Config.default
.port(8080)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
.maxThreads(8)

private val configLayer = ZLayer.succeed(config)
private val nettyConfigLayer = ZLayer.succeed(nettyConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ object SimpleEffectBenchmarkServer extends ZIOAppDefault {

private val config = Server.Config.default
.port(8080)
.enableRequestStreaming

private val nettyConfig = NettyConfig.default
.leakDetection(LeakDetectionLevel.DISABLED)
.maxThreads(8)

private val configLayer = ZLayer.succeed(config)
private val nettyConfigLayer = ZLayer.succeed(nettyConfig)
Expand Down
129 changes: 95 additions & 34 deletions zio-http/jvm/src/main/scala/zio/http/netty/NettyConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@ final case class NettyConfig(
nThreads: Int,
shutdownQuietPeriodDuration: Duration,
shutdownTimeoutDuration: Duration,
bossGroup: NettyConfig.BossGroup,
) extends EventLoopGroups.Config { self =>

/**
* Configure Netty's boss event-loop group. This only applies to server
* applications and is ignored for the Client
*/
def bossGroup(cfg: NettyConfig.BossGroup): NettyConfig = self.copy(bossGroup = cfg)

def channelType(channelType: ChannelType): NettyConfig = self.copy(channelType = channelType)

/**
* Configure the server to use the leak detection level provided.
* Configure Netty to use the leak detection level provided.
*
* @see
* <a
Expand All @@ -44,50 +51,104 @@ final case class NettyConfig(
def leakDetection(level: LeakDetectionLevel): NettyConfig = self.copy(leakDetectionLevel = level)

/**
* Configure the server to use a maximum of nThreads to process requests.
* Configure Netty to use a maximum of `nThreads` for the worker event-loop
* group.
*/
def maxThreads(nThreads: Int): NettyConfig = self.copy(nThreads = nThreads)

val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS

val shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
val shutdownTimeOut: Long = shutdownTimeoutDuration.toMillis
def shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
def shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
def shutdownTimeOut: Long = shutdownTimeoutDuration.toMillis
}

object NettyConfig {
def config: Config[NettyConfig] =
(LeakDetectionLevel.config.nested("leak-detection-level").withDefault(NettyConfig.default.leakDetectionLevel) ++
Config
.string("channel-type")
.mapOrFail {
case "auto" => Right(ChannelType.AUTO)
case "nio" => Right(ChannelType.NIO)
case "epoll" => Right(ChannelType.EPOLL)
case "kqueue" => Right(ChannelType.KQUEUE)
case "uring" => Right(ChannelType.URING)
case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other"))
}
.withDefault(NettyConfig.default.channelType) ++
final case class BossGroup(
channelType: ChannelType,
nThreads: Int,
shutdownQuietPeriodDuration: Duration,
shutdownTimeOutDuration: Duration,
) extends EventLoopGroups.Config {
def shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
def shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
def shutdownTimeOut: Long = shutdownTimeOutDuration.toMillis
}

private def baseConfig: Config[EventLoopGroups.Config] =
(Config
.string("channel-type")
.mapOrFail {
case "auto" => Right(ChannelType.AUTO)
case "nio" => Right(ChannelType.NIO)
case "epoll" => Right(ChannelType.EPOLL)
case "kqueue" => Right(ChannelType.KQUEUE)
case "uring" => Right(ChannelType.URING)
case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other"))
}
.withDefault(NettyConfig.default.channelType) ++
Config.int("max-threads").withDefault(NettyConfig.default.nThreads) ++
Config.duration("shutdown-quiet-period").withDefault(NettyConfig.default.shutdownQuietPeriodDuration) ++
Config.duration("shutdown-timeout").withDefault(NettyConfig.default.shutdownTimeoutDuration)).map {
case (leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout) =>
NettyConfig(leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout)
case (channelT, maxThreads, quietPeriod, timeout) =>
new EventLoopGroups.Config {
override val channelType: ChannelType = channelT
override val nThreads: Int = maxThreads
override val shutdownQuietPeriod: Long = quietPeriod.toMillis
override val shutdownTimeOut: Long = timeout.toMillis
override val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
}
}

val default: NettyConfig = NettyConfig(
LeakDetectionLevel.SIMPLE,
ChannelType.AUTO,
0,
// Defaults taken from io.netty.util.concurrent.AbstractEventExecutor
Duration.fromSeconds(2),
Duration.fromSeconds(15),
)

val defaultWithFastShutdown: NettyConfig = default.copy(
shutdownQuietPeriodDuration = Duration.fromMillis(50),
shutdownTimeoutDuration = Duration.fromMillis(250),
)
def config: Config[NettyConfig] =
(LeakDetectionLevel.config.nested("leak-detection-level").withDefault(NettyConfig.default.leakDetectionLevel) ++
baseConfig.nested("worker-group").orElse(baseConfig) ++
baseConfig.nested("boss-group")).map { case (leakDetectionLevel, worker, boss) =>
def toDuration(n: Long, timeUnit: TimeUnit) = Duration.fromJava(java.time.Duration.of(n, timeUnit.toChronoUnit))
NettyConfig(
leakDetectionLevel,
worker.channelType,
worker.nThreads,
shutdownQuietPeriodDuration = toDuration(worker.shutdownQuietPeriod, worker.shutdownTimeUnit),
shutdownTimeoutDuration = toDuration(worker.shutdownTimeOut, worker.shutdownTimeUnit),
NettyConfig.BossGroup(
boss.channelType,
boss.nThreads,
shutdownQuietPeriodDuration = toDuration(boss.shutdownQuietPeriod, boss.shutdownTimeUnit),
shutdownTimeOutDuration = toDuration(boss.shutdownTimeOut, boss.shutdownTimeUnit),
),
)
}

val default: NettyConfig = {
val quietPeriod = Duration.fromSeconds(2)
val timeout = Duration.fromSeconds(15)
NettyConfig(
LeakDetectionLevel.SIMPLE,
ChannelType.AUTO,
java.lang.Runtime.getRuntime.availableProcessors(),
// Defaults taken from io.netty.util.concurrent.AbstractEventExecutor
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeoutDuration = timeout,
NettyConfig.BossGroup(
ChannelType.AUTO,
1,
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeOutDuration = timeout,
),
)
}

val defaultWithFastShutdown: NettyConfig = {
val quietPeriod = Duration.fromMillis(50)
val timeout = Duration.fromMillis(250)
default.copy(
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeoutDuration = timeout,
bossGroup = default.bossGroup.copy(
shutdownQuietPeriodDuration = quietPeriod,
shutdownTimeOutDuration = timeout,
),
)
}

sealed trait LeakDetectionLevel {
self =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[zio] final case class NettyDriver(
channelFactory: ChannelFactory[ServerChannel],
channelInitializer: ChannelInitializer[Channel],
serverInboundHandler: ServerInboundHandler,
eventLoopGroup: EventLoopGroup,
eventLoopGroups: ServerEventLoopGroups,
serverConfig: Server.Config,
nettyConfig: NettyConfig,
) extends Driver { self =>
Expand All @@ -44,7 +44,7 @@ private[zio] final case class NettyDriver(
for {
chf <- ZIO.attempt {
new ServerBootstrap()
.group(eventLoopGroup)
.group(eventLoopGroups.boss, eventLoopGroups.worker)
.channelFactory(channelFactory)
.childHandler(channelInitializer)
.option[Integer](ChannelOption.SO_BACKLOG, serverConfig.soBacklog)
Expand Down Expand Up @@ -84,7 +84,7 @@ private[zio] final case class NettyDriver(
channelFactory <- ChannelFactories.Client.live.build
.provideSomeEnvironment[Scope](_ ++ ZEnvironment[ChannelType.Config](nettyConfig))
nettyRuntime <- NettyRuntime.live.build
} yield NettyClientDriver(channelFactory.get, eventLoopGroup, nettyRuntime.get)
} yield NettyClientDriver(channelFactory.get, eventLoopGroups.worker, nettyRuntime.get)

override def toString: String = s"NettyDriver($serverConfig)"
}
Expand All @@ -97,7 +97,7 @@ object NettyDriver {
RoutesRef
& ChannelFactory[ServerChannel]
& ChannelInitializer[Channel]
& EventLoopGroup
& ServerEventLoopGroups
& Server.Config
& NettyConfig
& ServerInboundHandler,
Expand All @@ -108,7 +108,7 @@ object NettyDriver {
app <- ZIO.service[RoutesRef]
cf <- ZIO.service[ChannelFactory[ServerChannel]]
cInit <- ZIO.service[ChannelInitializer[Channel]]
elg <- ZIO.service[EventLoopGroup]
elg <- ZIO.service[ServerEventLoopGroups]
sc <- ZIO.service[Server.Config]
nsc <- ZIO.service[NettyConfig]
sih <- ZIO.service[ServerInboundHandler]
Expand All @@ -117,14 +117,15 @@ object NettyDriver {
channelFactory = cf,
channelInitializer = cInit,
serverInboundHandler = sih,
eventLoopGroup = elg,
eventLoopGroups = elg,
serverConfig = sc,
nettyConfig = nsc,
)

val manual: ZLayer[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = {
val manual
: ZLayer[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Nothing, Driver] = {
implicit val trace: Trace = Trace.empty
ZLayer.makeSome[EventLoopGroup & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver](
ZLayer.makeSome[ServerEventLoopGroups & ChannelFactory[ServerChannel] & Server.Config & NettyConfig, Driver](
ZLayer(AppRef.empty),
ServerChannelInitializer.layer,
ServerInboundHandler.live,
Expand All @@ -135,7 +136,7 @@ object NettyDriver {
val customized: ZLayer[Server.Config & NettyConfig, Throwable, Driver] = {
val serverChannelFactory: ZLayer[NettyConfig, Nothing, ChannelFactory[ServerChannel]] =
ChannelFactories.Server.fromConfig
val eventLoopGroup: ZLayer[NettyConfig, Nothing, EventLoopGroup] = EventLoopGroups.live
val eventLoopGroup: ZLayer[NettyConfig, Nothing, ServerEventLoopGroups] = ServerEventLoopGroups.live

ZLayer.makeSome[Server.Config & NettyConfig, Driver](
eventLoopGroup,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package zio.http.netty.server

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import zio.http.netty.{EventLoopGroups, NettyConfig}

import io.netty.channel.EventLoopGroup

final case class ServerEventLoopGroups(
boss: EventLoopGroup,
worker: EventLoopGroup,
)

object ServerEventLoopGroups {
private implicit val trace: Trace = Trace.empty

private def groupLayer(cfg: EventLoopGroups.Config): ULayer[EventLoopGroup] =
(ZLayer.succeed(cfg) >>> EventLoopGroups.live).fresh

val live: ZLayer[NettyConfig, Nothing, ServerEventLoopGroups] = ZLayer.fromZIO {
ZIO.serviceWith[NettyConfig] { cfg =>
val boss = groupLayer(cfg.bossGroup)
val worker = groupLayer(cfg)
boss.zipWithPar(worker) { (boss, worker) =>
ZEnvironment(ServerEventLoopGroups(boss.get, worker.get))
}
}
}.flatten
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ private[zio] final case class ServerInboundHandler(
def fastEncode(response: Response, bytes: Array[Byte]) = {
val jResponse = NettyResponseEncoder.fastEncode(method, response, bytes)
val djResponse = jResponse.retainedDuplicate()
ctx.writeAndFlush(djResponse, ctx.voidPromise())

// This handler sits at the tail of the pipeline, so using ctx.channel.writeAndFlush won't add any
// overhead of passing through the pipeline. It's also better to use ctx.channel.writeAndFlush in
// cases that we're writing to the channel from a different thread (which is most of the time as we're
// creating responses in ZIO's executor).
val ch = ctx.channel()
ch.writeAndFlush(djResponse, ch.voidPromise())
true
}

Expand Down
Loading

0 comments on commit 188ff2e

Please sign in to comment.