Skip to content

Commit

Permalink
Avoid eagerly failing connections when request closes (#2069)
Browse files Browse the repository at this point in the history
* Avoid eagerly failing connections when request closes

* Test scalariform
  • Loading branch information
raboof authored and ktoso committed Jun 15, 2018
1 parent 37169eb commit 928db61
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ private[http] final class GracefulTerminatorStage(settings: ServerSettings)
pendingUserHandlerResponse = false
push(toNet, response)
}

override def onUpstreamFinish(): Unit = {
// don't finish the whole bidi stage, just propagate the completion:
complete(toNet)
}
})
setHandler(toUser, new OutHandler {
override def onPull(): Unit = {
Expand All @@ -241,6 +246,11 @@ private[http] final class GracefulTerminatorStage(settings: ServerSettings)
pendingUserHandlerResponse = true
push(toUser, request)
}

override def onUpstreamFinish(): Unit = {
// don't finish the whole bidi stage, just propagate the completion:
complete(toUser)
}
})
setHandler(toNet, new OutHandler {
override def onPull(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit
connSourceSub.cancel()
}
}

"complete a request/response when request has `Connection: close` set" in Utils.assertAllStagesStopped {
// In akka/akka#19542 / akka/akka-http#459 it was observed that when an akka-http closes the connection after
// a request, the TCP connection is sometimes aborted. Aborting means that `socket.close` is called with SO_LINGER = 0
Expand Down Expand Up @@ -557,6 +558,33 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit
} finally server.foreach(_.unbind())
}

"complete a request/response when the request side immediately closes the connection after sending the request" in Utils.assertAllStagesStopped {
val (hostname, port) = ("localhost", 8080)
val responsePromise = Promise[HttpResponse]()

// settings adapting network buffer sizes
val serverSettings = ServerSettings(system)

val server = Http().bindAndHandleAsync(_ responsePromise.future, hostname, port, settings = serverSettings)

try {
val result = Source.single(ByteString(
"""GET / HTTP/1.1
Host: example.com
"""))
.via(Tcp().outgoingConnection(hostname, port))
.runWith(Sink.reduce[ByteString](_ ++ _))
Try(Await.result(result, 2.seconds).utf8String) match {
case scala.util.Success(body) fail(body)
case scala.util.Failure(_: TimeoutException) // Expected
}
} finally {
responsePromise.failure(new TimeoutException())
server.foreach(_.unbind())
}
}

"complete a request/response over https when request has `Connection: close` set" in Utils.assertAllStagesStopped {
// akka/akka-http#1219
val serverToClientNetworkBufferSize = 1000
Expand Down

0 comments on commit 928db61

Please sign in to comment.