From 928db6199d91711b159d09a6a0c30ae6d8502e16 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 15 Jun 2018 12:07:07 +0200 Subject: [PATCH] Avoid eagerly failing connections when request closes (#2069) * Avoid eagerly failing connections when request closes * Test scalariform --- .../impl/engine/server/ServerTerminator.scala | 10 +++++++ .../akka/http/scaladsl/ClientServerSpec.scala | 28 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/ServerTerminator.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/ServerTerminator.scala index 00a80750c9f..83da611d60c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/ServerTerminator.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/ServerTerminator.scala @@ -228,6 +228,11 @@ private[http] final class GracefulTerminatorStage(settings: ServerSettings) pendingUserHandlerResponse = false push(toNet, response) } + + override def onUpstreamFinish(): Unit = { + // don't finish the whole bidi stage, just propagate the completion: + complete(toNet) + } }) setHandler(toUser, new OutHandler { override def onPull(): Unit = { @@ -241,6 +246,11 @@ private[http] final class GracefulTerminatorStage(settings: ServerSettings) pendingUserHandlerResponse = true push(toUser, request) } + + override def onUpstreamFinish(): Unit = { + // don't finish the whole bidi stage, just propagate the completion: + complete(toUser) + } }) setHandler(toNet, new OutHandler { override def onPull(): Unit = { diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index bdd8bbb6398..c65ac8da8fd 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -514,6 +514,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit connSourceSub.cancel() } } + "complete a request/response when request has `Connection: close` set" in Utils.assertAllStagesStopped { // In akka/akka#19542 / akka/akka-http#459 it was observed that when an akka-http closes the connection after // a request, the TCP connection is sometimes aborted. Aborting means that `socket.close` is called with SO_LINGER = 0 @@ -557,6 +558,33 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit } finally server.foreach(_.unbind()) } + "complete a request/response when the request side immediately closes the connection after sending the request" in Utils.assertAllStagesStopped { + val (hostname, port) = ("localhost", 8080) + val responsePromise = Promise[HttpResponse]() + + // settings adapting network buffer sizes + val serverSettings = ServerSettings(system) + + val server = Http().bindAndHandleAsync(_ ⇒ responsePromise.future, hostname, port, settings = serverSettings) + + try { + val result = Source.single(ByteString( + """GET / HTTP/1.1 +Host: example.com + +""")) + .via(Tcp().outgoingConnection(hostname, port)) + .runWith(Sink.reduce[ByteString](_ ++ _)) + Try(Await.result(result, 2.seconds).utf8String) match { + case scala.util.Success(body) ⇒ fail(body) + case scala.util.Failure(_: TimeoutException) ⇒ // Expected + } + } finally { + responsePromise.failure(new TimeoutException()) + server.foreach(_.unbind()) + } + } + "complete a request/response over https when request has `Connection: close` set" in Utils.assertAllStagesStopped { // akka/akka-http#1219 val serverToClientNetworkBufferSize = 1000