Skip to content

Commit

Permalink
Close http4s#7578: http4s 0.22: Support Jetty 12
Browse files Browse the repository at this point in the history
- Jetty versions from 7.0.0 up to 12.0.11 are affected by CVE-2024-6763 (Eclipse Jetty URI parsing of invalid authority).
- http4s 0.22's http4s-jetty uses Jetty 9.
- Jetty 9's community support ended in June 2022.
- Community support for Jetty 10 and Jetty 11 ended in January 2024.
- To solve the issue, http4s should use Jetty 12, the current stable version.
- Updating the 0.22 version is for those who cannot use 0.23 as they are inextricably bound to cats-effect 2.
- Jetty 12 requires Java 17, so dropping support for Java 8 and 11 is necessary.
- Jetty has multiple versions supporting different versions of Jakarta EE (Java EE), but support for only Jakarta EE 8 is added to minimize changes, as the API namespace moved from javax to jakarta starting with Jakarta EE 9.
  • Loading branch information
kevin-lee committed Nov 12, 2024
1 parent f446326 commit 33ce9a3
Show file tree
Hide file tree
Showing 20 changed files with 246 additions and 430 deletions.
347 changes: 63 additions & 284 deletions .github/workflows/ci.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.http4s.util.StringWriter

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets.ISO_8859_1
import scala.annotation.nowarn
import scala.collection.mutable.Buffer
import scala.concurrent._

Expand All @@ -38,6 +39,7 @@ private[http4s] class CachingChunkWriter[F[_]](
import ChunkWriter._

private[this] var pendingHeaders: StringWriter = _
@nowarn("msg=local var .* is never updated")
private[this] var bodyBuffer: Buffer[Chunk[Byte]] = Buffer()
private[this] var size: Int = 0

Expand Down
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ lazy val server = libraryProject("server")
.settings(
description := "Base library for building http4s servers",
startYear := Some(2014),
libraryDependencies ++= Seq(
scalacCompatAnnotation
),
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.http4s.server.middleware.CSRF.this"
Expand Down Expand Up @@ -581,6 +584,7 @@ lazy val jettyClient = libraryProject("jetty-client")
Http4sPlugin.jettyClient,
jettyHttp,
jettyUtil,
scalaJava8Compat,
),
)
.dependsOn(core, testing % "test->test", client % "compile;test->test")
Expand Down Expand Up @@ -888,7 +892,7 @@ lazy val scalafixInternalRules = project
.settings(
startYear := Some(2021),
libraryDependencies ++= Seq(
"ch.epfl.scala" %% "scalafix-core" % _root_.scalafix.sbt.BuildInfo.scalafixVersion
"ch.epfl.scala" %% "scalafix-core" % V.scalafix
).filter(_ => !tlIsScala3.value),
)

Expand Down Expand Up @@ -931,6 +935,12 @@ lazy val scalafixInternalTests = project
.settings(headerSources / excludeFilter := AllPassFilter)
.disablePlugins(ScalafixPlugin)
.dependsOn(scalafixInternalRules)
.settings(
dependencyOverrides ++= Seq(
"ch.epfl.scala" %% "scalafix-core" % V.scalafix,
"ch.epfl.scala" %% "scalafix-testkit" % V.scalafix % Test cross CrossVersion.full,
)
)

def http4sProject(name: String) =
Project(name, file(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import cats.effect._
import cats.syntax.all._
import fs2._
import org.eclipse.jetty.client.HttpClient
import org.eclipse.jetty.client.api.{Request => JettyRequest}
import org.eclipse.jetty.client.{Request => JettyRequest}
import org.eclipse.jetty.http.{HttpVersion => JHttpVersion}
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.http4s.client.Client
import org.log4s.Logger
import org.log4s.getLogger
Expand All @@ -34,19 +33,19 @@ object JettyClient {

def allocate[F[_]](
client: HttpClient = defaultHttpClient()
)(implicit F: ConcurrentEffect[F]): F[(Client[F], F[Unit])] = {
)(implicit F: ConcurrentEffect[F], CS: ContextShift[F]): F[(Client[F], F[Unit])] = {
val acquire = F
.pure(client)
.flatTap(client => F.delay(client.start()))
.map(client =>
Client[F] { req =>
Resource.suspend(F.asyncF[Resource[F, Response[F]]] { cb =>
F.bracket(StreamRequestContentProvider()) { dcp =>
F.bracket(StreamRequestContent()) { dcp =>
(for {
jReq <- F.catchNonFatal(toJettyRequest(client, req, dcp))
rl <- ResponseListener(cb)
_ <- F.delay(jReq.send(rl))
_ <- dcp.write(req)
_ <- dcp.write(req.body)
} yield ()).recover { case e =>
cb(Left(e))
}
Expand All @@ -62,19 +61,18 @@ object JettyClient {
acquire.map((_, dispose))
}

def resource[F[_]](client: HttpClient = defaultHttpClient())(implicit
F: ConcurrentEffect[F]
def resource[F[_]: ConcurrentEffect: ContextShift](
client: HttpClient = defaultHttpClient()
): Resource[F, Client[F]] =
Resource(allocate[F](client))

def stream[F[_]](client: HttpClient = defaultHttpClient())(implicit
F: ConcurrentEffect[F]
def stream[F[_]: ConcurrentEffect: ContextShift](
client: HttpClient = defaultHttpClient()
): Stream[F, Client[F]] =
Stream.resource(resource(client))

def defaultHttpClient(): HttpClient = {
val sslCtxFactory = new SslContextFactory.Client();
val c = new HttpClient(sslCtxFactory)
val c = new HttpClient()
c.setFollowRedirects(false)
c.setDefaultRequestContentType(null)
c
Expand All @@ -83,7 +81,7 @@ object JettyClient {
private def toJettyRequest[F[_]](
client: HttpClient,
request: Request[F],
dcp: StreamRequestContentProvider[F],
dcp: StreamRequestContent[F],
): JettyRequest = {
val jReq = client
.newRequest(request.uri.toString)
Expand All @@ -96,9 +94,10 @@ object JettyClient {
case _ => JHttpVersion.HTTP_1_1
}
)

for (h <- request.headers.headers if h.isNameValid)
jReq.header(h.name.toString, h.value)
jReq.content(dcp)
jReq.headers { jettyHeaders =>
for (h <- request.headers.headers if h.isNameValid)
jettyHeaders.add(h.name.toString, h.value)
}
jReq.body(dcp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ import cats.syntax.all._
import fs2.Stream._
import fs2._
import fs2.concurrent.Queue
import org.eclipse.jetty.client.api.Result
import org.eclipse.jetty.client.api.{Response => JettyResponse}
import org.eclipse.jetty.client.Result
import org.eclipse.jetty.client.{Response => JettyResponse}
import org.eclipse.jetty.http.HttpFields
import org.eclipse.jetty.http.{HttpVersion => JHttpVersion}
import org.eclipse.jetty.util.{Callback => JettyCallback}
import org.http4s.internal.CollectionCompat.CollectionConverters._
import org.http4s.internal.invokeCallback
import org.http4s.internal.loggingAsyncCallback
Expand All @@ -40,8 +39,8 @@ import java.nio.ByteBuffer
private[jetty] final case class ResponseListener[F[_]](
queue: Queue[F, Item],
cb: Callback[Resource[F, Response[F]]],
)(implicit F: ConcurrentEffect[F])
extends JettyResponse.Listener.Adapter {
)(implicit F: ConcurrentEffect[F], CS: ContextShift[F])
extends JettyResponse.Listener {
import ResponseListener.logger

/* Needed to properly propagate client errors */
Expand Down Expand Up @@ -87,14 +86,13 @@ private[jetty] final case class ResponseListener[F[_]](
override def onContent(
response: JettyResponse,
content: ByteBuffer,
callback: JettyCallback,
): Unit = {
val copy = ByteBuffer.allocate(content.remaining())
copy.put(content).flip()
enqueue(Item.Buf(copy)) {
case Right(_) => IO(callback.succeeded())
case Right(_) => IO.unit
case Left(e) =>
IO(logger.error(e)("Error in asynchronous callback")) >> IO(callback.failed(e))
IO(logger.error(e)("Error in asynchronous callback"))
}
}

Expand All @@ -110,11 +108,21 @@ private[jetty] final case class ResponseListener[F[_]](
// (the request might complete after the response has been entirely received)
override def onComplete(result: Result): Unit = ()

private def abort(t: Throwable, response: JettyResponse): Unit =
if (!response.abort(t)) // this also aborts the request
logger.error(t)("Failed to abort the response")
else
closeStream()
private def abort(t: Throwable, response: JettyResponse): Unit = {
import scala.compat.java8.FutureConverters._

Async
.fromFuture(F.delay(response.abort(t).toScala))
.runAsync { attempt =>
loggingAsyncCallback(logger)(attempt.map { aborted =>
if (!aborted)
logger.error(t)("Failed to abort the response")
else
closeStream()
})
}
.unsafeRunSync()
}

private def closeStream(): Unit =
enqueue(Item.Done)(loggingAsyncCallback(logger))
Expand All @@ -135,9 +143,9 @@ private[jetty] object ResponseListener {

private val logger = getLogger

def apply[F[_]](
def apply[F[_]: ConcurrentEffect: ContextShift](
cb: Callback[Resource[F, Response[F]]]
)(implicit F: ConcurrentEffect[F]): F[ResponseListener[F]] =
): F[ResponseListener[F]] =
Queue
.synchronous[F, Item]
.map(q => ResponseListener(q, cb))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ import cats.effect.concurrent.Semaphore
import cats.effect.implicits._
import cats.syntax.all._
import fs2._
import org.eclipse.jetty.client.util.DeferredContentProvider
import org.eclipse.jetty.client.AsyncRequestContent
import org.eclipse.jetty.util.{Callback => JettyCallback}
import org.http4s.internal.loggingAsyncCallback
import org.log4s.getLogger

private[jetty] final case class StreamRequestContentProvider[F[_]](s: Semaphore[F])(implicit
private[jetty] final case class StreamRequestContent[F[_]](s: Semaphore[F])(implicit
F: Effect[F]
) extends DeferredContentProvider {
import StreamRequestContentProvider.logger
) extends AsyncRequestContent {
import StreamRequestContent.logger

def write(req: Request[F]): F[Unit] =
req.body.chunks
def write(body: Stream[F, Byte]): F[Unit] =
body.chunks
.through(pipe)
.compile
.drain
Expand All @@ -43,23 +43,22 @@ private[jetty] final case class StreamRequestContentProvider[F[_]](s: Semaphore[
private val pipe: Pipe[F, Chunk[Byte], Unit] =
_.evalMap { c =>
write(c)
.ensure(new Exception("something terrible has happened"))(res => res)
.map(_ => ())
}

private def write(chunk: Chunk[Byte]): F[Boolean] =
private def write(chunk: Chunk[Byte]): F[Unit] =
s.acquire
.map(_ => super.offer(chunk.toByteBuffer, callback))
.map(_ => super.write(chunk.toByteBuffer, callback))

private val callback: JettyCallback = new JettyCallback {
override def succeeded(): Unit =
s.release.runAsync(loggingAsyncCallback(logger)).unsafeRunSync()
}
}

private[jetty] object StreamRequestContentProvider {
private[jetty] object StreamRequestContent {
private val logger = getLogger

def apply[F[_]]()(implicit F: ConcurrentEffect[F]): F[StreamRequestContentProvider[F]] =
Semaphore[F](1).map(StreamRequestContentProvider(_))
def apply[F[_]]()(implicit F: ConcurrentEffect[F]): F[StreamRequestContent[F]] =
Semaphore[F](1).map(new StreamRequestContent(_))

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ package server

import cats.effect._
import cats.syntax.all._
import org.eclipse.jetty.ee8.servlet.FilterHolder
import org.eclipse.jetty.ee8.servlet.ServletContextHandler
import org.eclipse.jetty.ee8.servlet.ServletHolder
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory
import org.eclipse.jetty.server.HttpConfiguration
import org.eclipse.jetty.server.HttpConnectionFactory
import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.server.handler.StatisticsHandler
import org.eclipse.jetty.server.{Server => JServer}
import org.eclipse.jetty.servlet.FilterHolder
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.util.thread.ThreadPool
import org.http4s.jetty.server.JettyBuilder._
Expand Down
Loading

0 comments on commit 33ce9a3

Please sign in to comment.