From 403f2ffc23838bf9a80eb29bde71050039468aec Mon Sep 17 00:00:00 2001 From: "John A. De Goes" Date: Thu, 20 Jul 2023 18:05:24 +0100 Subject: [PATCH] revert netty changes for now --- .../src/main/scala/zio/http/Handler.scala | 34 +- zio-http/src/main/scala/zio/http/Route.scala | 19 +- .../netty/server/ServerInboundHandler.scala | 530 ++++++++++++++++-- .../src/test/scala/zio/http/HandlerSpec.scala | 11 +- .../scala/zio/http/StaticFileServerSpec.scala | 16 +- .../zio/http/internal/HttpRunnableSpec.scala | 28 - 6 files changed, 521 insertions(+), 117 deletions(-) diff --git a/zio-http/src/main/scala/zio/http/Handler.scala b/zio-http/src/main/scala/zio/http/Handler.scala index 615275eb1b..9ee6f8f334 100644 --- a/zio-http/src/main/scala/zio/http/Handler.scala +++ b/zio-http/src/main/scala/zio/http/Handler.scala @@ -596,6 +596,12 @@ sealed trait Handler[-R, +Err, -In, +Out] { self => self(request).timeout(duration).map(_.getOrElse(out)) } + /** + * Converts the request handler into an HTTP application. Note that the + * handler of the HTTP application is not identical to this handler, because + * the handler has been appropriately sandboxed, turning all possible failures + * into well-formed HTTP responses. + */ def toHttpApp(implicit err: Err <:< Response, in: Request <:< In, out: Out <:< Response): HttpApp[R] = { val handler: Handler[R, Response, Request, Response] = self.asInstanceOf[Handler[R, Response, Request, Response]] @@ -603,14 +609,8 @@ sealed trait Handler[-R, +Err, -In, +Out] { self => HttpApp(Routes.singleton(handler.contramap[(Path, Request)](_._2))) } - def toHttpAppWS(implicit err: Err <:< Throwable, in: WebSocketChannel <:< In): HttpApp[R] = { - val handler1: Handler[R, Throwable, WebSocketChannel, Any] = - self.asInstanceOf[Handler[R, Throwable, WebSocketChannel, Any]] - - val handler2 = Handler.fromZIO(handler1.toResponse) - - HttpApp(Routes.singleton(handler2.contramap[(Path, Request)](_._2))) - } + def toHttpAppWS(implicit err: Err <:< Throwable, in: WebSocketChannel <:< In): HttpApp[R] = + Handler.fromZIO(self.toResponse).toHttpApp /** * Creates a new response from a socket handler. @@ -815,11 +815,11 @@ object Handler { } } - def fromFile[R](makeFile: => File)(implicit trace: Trace): Handler[R, Response, Any, Response] = + def fromFile[R](makeFile: => File)(implicit trace: Trace): Handler[R, Throwable, Any, Response] = fromFileZIO(ZIO.attempt(makeFile)) - def fromFileZIO[R](getFile: ZIO[R, Throwable, File])(implicit trace: Trace): Handler[R, Response, Any, Response] = { - Handler.fromZIO[R, Response, Response]( + def fromFileZIO[R](getFile: ZIO[R, Throwable, File])(implicit trace: Trace): Handler[R, Throwable, Any, Response] = { + Handler.fromZIO[R, Throwable, Response]( getFile.flatMap { file => ZIO.suspend { if (!file.exists()) { @@ -845,27 +845,26 @@ object Handler { } } } - }.mapError(Response.fromThrowable(_)), + }, ) } /** * Creates a handler from a resource path */ - def fromResource(path: String)(implicit trace: Trace): Handler[Any, Response, Any, Response] = + def fromResource(path: String)(implicit trace: Trace): Handler[Any, Throwable, Any, Response] = Handler.fromZIO { ZIO .attemptBlocking(getClass.getClassLoader.getResource(path)) .map { resource => - if (resource == null) Handler.fail(Response(status = Status.NotFound)) + if (resource == null) Handler.fail(new FileNotFoundException(s"Resource $path not found")) else fromResourceWithURL(resource) } - .mapError(Response.fromThrowable(_)) }.flatten private[zio] def fromResourceWithURL( url: java.net.URL, - )(implicit trace: Trace): Handler[Any, Response, Any, Response] = { + )(implicit trace: Trace): Handler[Any, Throwable, Any, Response] = { url.getProtocol match { case "file" => Handler.fromFile(new File(url.getPath)) case "jar" => @@ -901,11 +900,10 @@ object Handler { .addHeader(Header.ContentLength(contentLength)) } } - .mapError(Response.fromThrowable(_)) } case proto => - Handler.fail(Response.badRequest(s"Unsupported protocol: $proto")) + Handler.fail(new IllegalArgumentException(s"Unsupported protocol: $proto")) } } diff --git a/zio-http/src/main/scala/zio/http/Route.scala b/zio-http/src/main/scala/zio/http/Route.scala index f9854de81c..67dd6bc1ae 100644 --- a/zio-http/src/main/scala/zio/http/Route.scala +++ b/zio-http/src/main/scala/zio/http/Route.scala @@ -2,15 +2,22 @@ package zio.http import zio._ -import zio.http.Route.Provide +import zio.http.Route.Provided /** * Represents a single route, which has either handled its errors by converting * them into responses, or which has polymorphic errors, which must later be * converted into responses before the route can be executed. + * + * Routes have the property that, before conversion into handlers, they will + * fully handle all errors, including defects, translating them appropriately + * into responses that can be delivered to clients. Thus, the handlers returned + * by `toHandler` will never fail, and will always produce a valid response. + * + * Individual routes can be aggregated using [[ziop.http.Routes]]. */ sealed trait Route[-Env, +Err] { self => - import Route.{Augmented, Handled, Provide, Unhandled} + import Route.{Augmented, Handled, Provided, Unhandled} /** * Augments this route with the specified middleware. @@ -42,7 +49,7 @@ sealed trait Route[-Env, +Err] { self => */ final def handleErrorCause(f: Cause[Err] => Response): Route[Env, Nothing] = self match { - case Provide(route, env) => Provide(route.handleErrorCause(f), env) + case Provided(route, env) => Provided(route.handleErrorCause(f), env) case Augmented(route, aspect) => Augmented(route.handleErrorCause(f), aspect) case Handled(routePattern, handler, location) => Handled(routePattern, handler, location) @@ -78,7 +85,7 @@ sealed trait Route[-Env, +Err] { self => def location: Trace final def provideEnvironment(env: ZEnvironment[Env]): Route[Any, Err] = - Route.Provide(self, env) + Route.Provided(self, env) /** * The route pattern over which the route is defined. The route can only @@ -147,7 +154,7 @@ object Route { Unhandled(rpm, handler, zippable, trace) } - private final case class Provide[Env, +Err]( + private final case class Provided[Env, +Err]( route: Route[Env, Err], env: ZEnvironment[Env], ) extends Route[Any, Err] { @@ -158,7 +165,7 @@ object Route { override def toHandler(implicit ev: Err <:< Response): Handler[Any, Response, Request, Response] = route.toHandler.provideEnvironment(env) - override def toString() = s"Route.Provide(${route}, ${env})" + override def toString() = s"Route.Provided(${route}, ${env})" } private final case class Augmented[-Env, +Err]( diff --git a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala index dbebb078fb..999a970fc5 100644 --- a/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala +++ b/zio-http/src/main/scala/zio/http/netty/server/ServerInboundHandler.scala @@ -82,9 +82,10 @@ private[zio] final case class ServerInboundHandler( ensureHasApp() val exit = if (jReq.decoderResult().isFailure) { - // Fatal server error: - Exit.fail(Response.internalServerError(jReq.decoderResult().cause().getMessage())) - } else app(req) + val throwable = jReq.decoderResult().cause() + Exit.succeed(Response.fromThrowable(throwable)) + } else + app(req) if (!attemptImmediateWrite(ctx, exit, time)) writeResponse(ctx, env, exit, jReq)(releaseRequest) else @@ -102,8 +103,8 @@ private[zio] final case class ServerInboundHandler( ensureHasApp() val exit = if (jReq.decoderResult().isFailure) { - // Fatal server error: - Exit.fail(Response.internalServerError(jReq.decoderResult().cause().getMessage())) + val throwable = jReq.decoderResult().cause() + Exit.succeed(Response.fromThrowable(throwable)) } else app(req) if (!attemptImmediateWrite(ctx, exit, time)) { @@ -123,21 +124,20 @@ private[zio] final case class ServerInboundHandler( override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = cause match { - case ioe: IOException if ioe.getMessage != null && ioe.getMessage.startsWith("Connection reset") => - // Why are we writing when the other side closed already? - case cause => - if (config.logWarningOnFatalError) { + case ioe: IOException if ioe.getMessage.contentEquals("Connection reset by peer") => + case t => + if (app ne null) { runtime.run(ctx, () => {}) { - // TODO: Fatal error - ZIO.logWarningCause(s"Fatal exception in Netty", Cause.die(cause)) + // We cannot return the generated response from here, but still calling the handler for its side effect + // for example logging. + ZIO.logWarningCause(s"Fatal exception in Netty", Cause.die(t)).when(config.logWarningOnFatalError) } } - cause match { case _: ReadTimeoutException => ctx.close() case _ => - super.exceptionCaught(ctx, cause) + super.exceptionCaught(ctx, t) } } @@ -185,19 +185,25 @@ private[zio] final case class ServerInboundHandler( jRequest: HttpRequest, time: ServerTime, ): Task[Unit] = { - if (response.isWebSocket) ZIO.attempt(upgradeToWebSocket(ctx, jRequest, response, runtime)) - else - NettyResponseEncoder.encode(response).flatMap { jResponse => - setServerTime(time, response, jResponse) - ctx.writeAndFlush(jResponse) - - (if (!jResponse.isInstanceOf[FullHttpResponse]) { - NettyBodyWriter.write(response.body, ctx) - } else Exit.succeed(true)).map(flushed => - if (!flushed) ctx.flush() - else (), - ) - } + for { + _ <- + if (response.isWebSocket) ZIO.attempt(upgradeToWebSocket(ctx, jRequest, response, runtime)) + else + for { + jResponse <- NettyResponseEncoder.encode(response) + _ <- ZIO.attempt { + setServerTime(time, response, jResponse) + ctx.writeAndFlush(jResponse) + } + flushed <- + if (!jResponse.isInstanceOf[FullHttpResponse]) + NettyBodyWriter + .write(response.body, ctx) + else + ZIO.succeed(true) + _ <- ZIO.attempt(ctx.flush()).when(!flushed) + } yield () + } yield () } private def attemptImmediateWrite( @@ -309,11 +315,14 @@ private[zio] final case class ServerInboundHandler( } } - private def writeNotFound(ctx: ChannelHandlerContext, jReq: HttpRequest): Unit = { - val response = Response.notFound(jReq.uri()) - val done = attemptFastWrite(ctx, response, time) - if (!done) { - attemptFullWrite(ctx, response, jReq, time) + private def writeNotFound(ctx: ChannelHandlerContext, jReq: HttpRequest)(ensured: () => Unit): Unit = { + // TODO: this can be done without ZIO + runtime.run(ctx, ensured) { + for { + response <- ZIO.succeed(HttpError.NotFound(jReq.uri()).toResponse) + done <- ZIO.attempt(attemptFastWrite(ctx, response, time)) + _ <- attemptFullWrite(ctx, response, jReq, time).unless(done) + } yield () } } @@ -324,34 +333,45 @@ private[zio] final case class ServerInboundHandler( jReq: HttpRequest, )(ensured: () => Unit): Unit = { runtime.run(ctx, ensured) { - exit.catchAllCause { error => - error.failureOrCause - .fold[UIO[Response]]( - response => ZIO.succeed(response), - cause => - if (cause.isInterruptedOnly) { - interrupted(ctx).as(null) - } else { - ZIO.succeed(Response.fromThrowable(FiberFailure(cause))) + val pgm = for { + response <- exit.sandbox.catchAll { error => + error.failureOrCause + .fold[UIO[Response]]( + response => ZIO.succeed(response), + cause => + if (cause.isInterruptedOnly) { + interrupted(ctx).as(null) + } else { + ZIO.succeed(withDefaultErrorResponse(null, Some(FiberFailure(cause)))) + }, + ) + } + _ <- + if (response ne null) { + for { + done <- ZIO.attempt(attemptFastWrite(ctx, response, time)).catchSomeCause { case cause => + ZIO.attempt( + attemptFastWrite(ctx, withDefaultErrorResponse(null, Some(cause.squash)).freeze, time), + ) + } + _ <- attemptFullWrite(ctx, response, jReq, time).catchSomeCause { case cause => + ZIO.attempt( + attemptFastWrite(ctx, withDefaultErrorResponse(null, Some(cause.squash)).freeze, time), + ) + + } + .unless(done) + } yield () + } else { + ZIO.attempt( + if (ctx.channel().isOpen) { + writeNotFound(ctx, jReq)(() => ()) }, - ) - }.map { response => - if (response ne null) { - try { - val done = attemptFastWrite(ctx, response, time) - - if (!done) { - attemptFullWrite(ctx, response, jReq, time) - } - } catch { - case NonFatal(cause) => attemptFastWrite(ctx, Response.fromThrowable(cause).freeze, time) + ) } - } else { - if (ctx.channel().isOpen) { - writeNotFound(ctx, jReq) - } - } - }.provideEnvironment(env) + } yield () + + pgm.provideEnvironment(env) } } @@ -384,3 +404,399 @@ object ServerInboundHandler { } } + +// package zio.http.netty.server + +// import java.io.IOException +// import java.net.InetSocketAddress +// import java.util.concurrent.atomic.LongAdder + +// import scala.annotation.tailrec +// import scala.util.control.NonFatal + +// import zio._ + +// import zio.http._ +// import zio.http.netty._ +// import zio.http.netty.model.Conversions +// import zio.http.netty.socket.NettySocketProtocol + +// import io.netty.channel.ChannelHandler.Sharable +// import io.netty.channel._ +// import io.netty.handler.codec.http._ +// import io.netty.handler.codec.http.websocketx.{WebSocketFrame => JWebSocketFrame, WebSocketServerProtocolHandler} +// import io.netty.handler.timeout.ReadTimeoutException + +// @Sharable +// private[zio] final case class ServerInboundHandler( +// appRef: AppRef, +// config: Server.Config, +// runtime: NettyRuntime, +// time: ServerTime, +// )(implicit trace: Trace) +// extends SimpleChannelInboundHandler[HttpObject](false) { self => + +// implicit private val unsafe: Unsafe = Unsafe.unsafe + +// private var app: App[Any] = _ +// private var env: ZEnvironment[Any] = _ + +// val inFlightRequests: LongAdder = new LongAdder() + +// def refreshApp(): Unit = { +// val pair = appRef.get() + +// this.app = pair._1 +// this.env = pair._2 +// } + +// private def ensureHasApp(): Unit = { +// if (app eq null) { +// refreshApp() +// } +// } + +// override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject): Unit = { + +// msg match { +// case jReq: FullHttpRequest => +// val req = makeZioRequest(ctx, jReq) +// inFlightRequests.increment() + +// val releaseRequest = { () => +// inFlightRequests.decrement() +// if (jReq.refCnt() > 0) { +// val _ = jReq.release() +// } +// } + +// ensureHasApp() +// val exit = +// if (jReq.decoderResult().isFailure) { +// val throwable = jReq.decoderResult().cause() +// app.runServerErrorOrNull(Cause.die(throwable)).as(withDefaultErrorResponse(null, Some(throwable))) +// } else +// app.runZIOOrNull(req) +// if (!attemptImmediateWrite(ctx, exit, time)) +// writeResponse(ctx, env, exit, jReq)(releaseRequest) +// else +// releaseRequest() + +// case jReq: HttpRequest => +// val req = makeZioRequest(ctx, jReq) +// inFlightRequests.increment() + +// val releaseRequest = { () => +// inFlightRequests.decrement() +// () +// } + +// ensureHasApp() +// val exit = +// if (jReq.decoderResult().isFailure) { +// val throwable = jReq.decoderResult().cause() +// app.runServerErrorOrNull(Cause.die(throwable)).as(withDefaultErrorResponse(null, Some(throwable))) +// } else +// app.runZIOOrNull(req) +// if (!attemptImmediateWrite(ctx, exit, time)) { +// writeResponse(ctx, env, exit, jReq)(releaseRequest) +// } else { +// releaseRequest() +// } + +// case msg: HttpContent => +// val _ = ctx.fireChannelRead(msg) + +// case _ => +// throw new IllegalStateException(s"Unexpected message type: ${msg.getClass.getName}") +// } + +// } + +// override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = +// cause match { +// case ioe: IOException if ioe.getMessage.contentEquals("Connection reset by peer") => +// case t => +// if (app ne null) { +// runtime.run(ctx, () => {}) { +// // We cannot return the generated response from here, but still calling the handler for its side effect +// // for example logging. +// app +// .runServerErrorOrNull(Cause.die(t)) +// .zipLeft( +// ZIO.logWarningCause(s"Fatal exception in Netty", Cause.die(t)).when(config.logWarningOnFatalError), +// ) +// } +// } +// cause match { +// case _: ReadTimeoutException => +// ctx.close() +// case _ => +// super.exceptionCaught(ctx, t) +// } +// } + +// private def addAsyncBodyHandler(ctx: ChannelHandlerContext): AsyncBodyReader = { +// val handler = new ServerAsyncBodyHandler +// ctx +// .channel() +// .pipeline() +// .addAfter(Names.HttpRequestHandler, Names.HttpContentHandler, handler) +// handler +// } + +// private def attemptFastWrite( +// ctx: ChannelHandlerContext, +// response: Response, +// time: ServerTime, +// ): Boolean = { + +// def fastEncode(response: Response, bytes: Array[Byte]) = { +// NettyResponseEncoder.fastEncode(response, bytes) match { +// case jResponse: FullHttpResponse if response.frozen => +// val djResponse = jResponse.retainedDuplicate() +// setServerTime(time, response, djResponse) +// ctx.writeAndFlush(djResponse, ctx.voidPromise()) +// true +// case _ => false +// } +// } + +// response.body match { +// case body: Body.UnsafeBytes => +// try { +// fastEncode(response, body.unsafeAsArray) +// } catch { +// case NonFatal(e) => fastEncode(withDefaultErrorResponse(null, Some(e)).freeze, Array.emptyByteArray) +// } +// case _ => false +// } + +// } + +// private def attemptFullWrite( +// ctx: ChannelHandlerContext, +// response: Response, +// jRequest: HttpRequest, +// time: ServerTime, +// ): Task[Unit] = { + +// for { +// _ <- +// if (response.isWebSocket) ZIO.attempt(upgradeToWebSocket(ctx, jRequest, response, runtime)) +// else +// for { +// jResponse <- NettyResponseEncoder.encode(response) +// _ <- ZIO.attempt { +// setServerTime(time, response, jResponse) +// ctx.writeAndFlush(jResponse) +// } +// flushed <- +// if (!jResponse.isInstanceOf[FullHttpResponse]) +// NettyBodyWriter +// .write(response.body, ctx) +// else +// ZIO.succeed(true) +// _ <- ZIO.attempt(ctx.flush()).when(!flushed) +// } yield () +// } yield () +// } + +// private def attemptImmediateWrite( +// ctx: ChannelHandlerContext, +// exit: ZIO[Any, Response, Response], +// time: ServerTime, +// ): Boolean = { +// exit match { +// case Exit.Success(response) if response ne null => +// attemptFastWrite(ctx, response, time) +// case _ => false +// } +// } + +// private def makeZioRequest(ctx: ChannelHandlerContext, nettyReq: HttpRequest): Request = { +// val nettyHttpVersion = nettyReq.protocolVersion() +// val protocolVersion = nettyHttpVersion match { +// case HttpVersion.HTTP_1_0 => Version.Http_1_0 +// case HttpVersion.HTTP_1_1 => Version.Http_1_1 +// case _ => throw new IllegalArgumentException(s"Unsupported HTTP version: $nettyHttpVersion") +// } + +// val remoteAddress = ctx.channel().remoteAddress() match { +// case m: InetSocketAddress => Option(m.getAddress) +// case _ => None +// } + +// val headers = Conversions.headersFromNetty(nettyReq.headers()) +// val contentType = headers.header(Header.ContentType) + +// nettyReq match { +// case nettyReq: FullHttpRequest => +// // println(s"Got ready http request") +// Request( +// body = NettyBody.fromByteBuf( +// nettyReq.content(), +// contentType, +// ), +// headers = headers, +// method = Conversions.methodFromNetty(nettyReq.method()), +// url = URL.decode(nettyReq.uri()).getOrElse(URL.empty), +// version = protocolVersion, +// remoteAddress = remoteAddress, +// ) +// case nettyReq: HttpRequest => +// // println(s"Got streaming http request") +// val handler = addAsyncBodyHandler(ctx) +// val body = NettyBody.fromAsync( +// { async => +// handler.connect(async) +// }, +// contentType, +// ) + +// Request( +// body = body, +// headers = headers, +// method = Conversions.methodFromNetty(nettyReq.method()), +// url = URL.decode(nettyReq.uri()).getOrElse(URL.empty), +// version = protocolVersion, +// remoteAddress = remoteAddress, +// ) +// } + +// } + +// private def setServerTime(time: ServerTime, response: Response, jResponse: HttpResponse): Unit = { +// val _ = +// if (response.addServerTime) +// jResponse.headers().set(HttpHeaderNames.DATE, time.refreshAndGet()) +// } + +// /* +// * Checks if the response requires to switch protocol to websocket. Returns +// * true if it can, otherwise returns false +// */ +// @tailrec +// private def upgradeToWebSocket( +// ctx: ChannelHandlerContext, +// jReq: HttpRequest, +// res: Response, +// runtime: NettyRuntime, +// ): Unit = { +// val app = res.socketApp +// jReq match { +// case jReq: FullHttpRequest => +// val queue = runtime.runtime(ctx).unsafe.run(Queue.unbounded[WebSocketChannelEvent]).getOrThrowFiberFailure() +// runtime.runtime(ctx).unsafe.run { +// val nettyChannel = NettyChannel.make[JWebSocketFrame](ctx.channel()) +// val webSocketChannel = WebSocketChannel.make(nettyChannel, queue) +// val webSocketApp = app.getOrElse(Handler.unit) +// webSocketApp.runZIO(webSocketChannel).ignoreLogged.forkDaemon +// } +// ctx +// .channel() +// .pipeline() +// .addLast( +// new WebSocketServerProtocolHandler(NettySocketProtocol.serverBuilder(config.webSocketConfig).build()), +// ) +// .addLast(Names.WebSocketHandler, new WebSocketAppHandler(runtime, queue, None)) + +// val retained = jReq.retainedDuplicate() +// val _ = ctx.channel().eventLoop().submit { () => ctx.fireChannelRead(retained) } + +// case jReq: HttpRequest => +// val fullRequest = new DefaultFullHttpRequest(jReq.protocolVersion(), jReq.method(), jReq.uri()) +// fullRequest.headers().setAll(jReq.headers()) +// upgradeToWebSocket(ctx: ChannelHandlerContext, fullRequest, res, runtime) +// } +// } + +// private def writeNotFound(ctx: ChannelHandlerContext, jReq: HttpRequest)(ensured: () => Unit): Unit = { +// // TODO: this can be done without ZIO +// runtime.run(ctx, ensured) { +// for { +// response <- ZIO.succeed(HttpError.NotFound(jReq.uri()).toResponse) +// done <- ZIO.attempt(attemptFastWrite(ctx, response, time)) +// _ <- attemptFullWrite(ctx, response, jReq, time).unless(done) +// } yield () +// } +// } + +// private def writeResponse( +// ctx: ChannelHandlerContext, +// env: ZEnvironment[Any], +// exit: ZIO[Any, Response, Response], +// jReq: HttpRequest, +// )(ensured: () => Unit): Unit = { +// runtime.run(ctx, ensured) { +// val pgm = for { +// response <- exit.sandbox.catchAll { error => +// error.failureOrCause +// .fold[UIO[Response]]( +// response => ZIO.succeed(response), +// cause => +// if (cause.isInterruptedOnly) { +// interrupted(ctx).as(null) +// } else { +// ZIO.succeed(withDefaultErrorResponse(null, Some(FiberFailure(cause)))) +// }, +// ) +// } +// _ <- +// if (response ne null) { +// for { +// done <- ZIO.attempt(attemptFastWrite(ctx, response, time)).catchSomeCause { case cause => +// ZIO.attempt( +// attemptFastWrite(ctx, withDefaultErrorResponse(null, Some(cause.squash)).freeze, time), +// ) +// } +// _ <- attemptFullWrite(ctx, response, jReq, time).catchSomeCause { case cause => +// ZIO.attempt( +// attemptFastWrite(ctx, withDefaultErrorResponse(null, Some(cause.squash)).freeze, time), +// ) + +// } +// .unless(done) +// } yield () +// } else { +// ZIO.attempt( +// if (ctx.channel().isOpen) { +// writeNotFound(ctx, jReq)(() => ()) +// }, +// ) +// } +// } yield () + +// pgm.provideEnvironment(env) +// } +// } + +// private def interrupted(ctx: ChannelHandlerContext): ZIO[Any, Nothing, Unit] = +// ZIO.attempt { +// ctx.channel().close() +// }.unit.orDie + +// private def withDefaultErrorResponse(responseOrNull: Response, cause: Option[Throwable]): Response = +// if (responseOrNull ne null) responseOrNull else HttpError.InternalServerError(cause = cause).toResponse +// } + +// object ServerInboundHandler { + +// val live: ZLayer[ +// ServerTime with Server.Config with NettyRuntime with AppRef, +// Nothing, +// ServerInboundHandler, +// ] = { +// implicit val trace: Trace = Trace.empty +// ZLayer.fromZIO { +// for { +// appRef <- ZIO.service[AppRef] +// rtm <- ZIO.service[NettyRuntime] +// config <- ZIO.service[Server.Config] +// time <- ZIO.service[ServerTime] + +// } yield ServerInboundHandler(appRef, config, rtm, time) +// } +// } + +// } diff --git a/zio-http/src/test/scala/zio/http/HandlerSpec.scala b/zio-http/src/test/scala/zio/http/HandlerSpec.scala index 995bc9e976..cb262067c3 100644 --- a/zio-http/src/test/scala/zio/http/HandlerSpec.scala +++ b/zio-http/src/test/scala/zio/http/HandlerSpec.scala @@ -399,7 +399,7 @@ object HandlerSpec extends ZIOSpecDefault with ExitAssertion { val http = Handler.fromFileZIO(ZIO.succeed(new java.io.File("does-not-exist"))) for { - status <- http.merge.status.run() + status <- http.sandbox.merge.status.run() } yield assertTrue(status == Status.NotFound) }, test("must fail if given file is a directory") { @@ -410,10 +410,17 @@ object HandlerSpec extends ZIOSpecDefault with ExitAssertion { val http = Handler.fromFileZIO(ZIO.succeed(tempFile)) for { - status <- http.merge.status.run() + status <- http.sandbox.merge.status.run() } yield assertTrue(status == Status.BadRequest) } }, + test("resource regression") { + val handler = Handler.fromResource("TestFile.txt").sandbox + + for { + status <- handler.merge.status.run() + } yield assertTrue(status == Status.Ok) + }, ), ) @@ timeout(10 seconds) } diff --git a/zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala b/zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala index d11c336894..65842259e6 100644 --- a/zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala +++ b/zio-http/src/test/scala/zio/http/StaticFileServerSpec.scala @@ -27,16 +27,20 @@ import zio.http.internal.{DynamicServer, HttpRunnableSpec, severTestLayer} object StaticFileServerSpec extends HttpRunnableSpec { - private val fileOk = Handler.fromResource("TestFile.txt").toHttpApp.deploy - private val fileNotFound = Handler.fromResource("Nothing").toHttpApp.deploy + private val fileOk = Handler.fromResource("TestFile.txt").sandbox.toHttpApp.deploy + private val fileNotFound = Handler.fromResource("Nothing").sandbox.toHttpApp.deploy private val testArchivePath = getClass.getResource("/TestArchive.jar").getPath private val resourceOk = - Handler.fromResourceWithURL(new java.net.URL(s"jar:file:$testArchivePath!/TestFile.txt")).toHttpApp.deploy + Handler.fromResourceWithURL(new java.net.URL(s"jar:file:$testArchivePath!/TestFile.txt")).sandbox.toHttpApp.deploy private val resourceNotFound = - Handler.fromResourceWithURL(new java.net.URL(s"jar:file:$testArchivePath!/NonExistent.txt")).toHttpApp.deploy + Handler + .fromResourceWithURL(new java.net.URL(s"jar:file:$testArchivePath!/NonExistent.txt")) + .sandbox + .toHttpApp + .deploy - override def spec = suite("StaticFileServer") { + override def spec = suite("StaticFileServerSpec") { ZIO.scoped(serve.as(List(staticSpec))) }.provideShared(DynamicServer.live, severTestLayer, Client.default, Scope.default) @@ timeout(5 seconds) @@ withLiveClock @@ -86,7 +90,7 @@ object StaticFileServerSpec extends HttpRunnableSpec { final class BadFile(name: String) extends File(name) { override def exists(): Boolean = throw new Error("Haha") } - val res = Handler.fromFile(new BadFile("Length Failure")).toHttpApp.deploy.run().map(_.status) + val res = Handler.fromFile(new BadFile("Length Failure")).sandbox.toHttpApp.deploy.run().map(_.status) assertZIO(res)(equalTo(Status.InternalServerError)) }, ), diff --git a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala index 609d97eca3..c4640f854b 100644 --- a/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala +++ b/zio-http/src/test/scala/zio/http/internal/HttpRunnableSpec.scala @@ -30,34 +30,6 @@ import zio.http._ * requests. */ abstract class HttpRunnableSpec extends ZIOSpecDefault { self => - - implicit class RunnableClientHttpSyntax[R, A](app: Handler[R, Response, Request, A]) { - - /** - * Runs the deployed Http app by making a real http request to it. The - * method allows us to configure individual constituents of a ClientRequest. - */ - def run( - version: Version = Version.Default, - method: Method = Method.ANY, - path: Path = Root, - headers: Headers = Headers.empty, - body: Body = Body.empty, - addZioUserAgentHeader: Boolean = false, - ): ZIO[R, Response, A] = - app - .runZIO( - Request( - body = body, - headers = headers.combineIf(addZioUserAgentHeader)(Headers(Client.defaultUAHeader)), - method = method, - url = URL(path), // url set here is overridden later via `deploy` method - version = version, - remoteAddress = None, - ), - ) - } - implicit class RunnableHttpClientAppSyntax[R](route: HttpApp[R]) { def app: HttpApp[R] = route