Skip to content

Commit

Permalink
+doc docs for graceful termination (#2072)
Browse files Browse the repository at this point in the history
* +doc docs for graceful termination

* Tweak some wording and links
  • Loading branch information
ktoso authored Jun 15, 2018
1 parent 04ece9f commit 37169eb
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi
* which could trap the server in a situation where it could not terminate if it were to wait for a response to "finish")
* - existing streaming responses must complete before the deadline as well.
* When the deadline is reached the connection will be terminated regardless of status of the streaming responses.
* - if user code does not reply with a response within the deadline, we produce a special [[akka.http.javadsl.settings.ServerSettings.getTerminationDeadlineExceededResponse]]
* - if user code does not reply with a response within the deadline we produce a special [[akka.http.javadsl.settings.ServerSettings.getTerminationDeadlineExceededResponse]]
* HTTP response (e.g. 503 Service Unavailable)
*
* 3) Keep draining incoming requests on existing connection:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* which could trap the server in a situation where it could not terminate if it were to wait for a response to "finish")
* - existing streaming responses must complete before the deadline as well.
* When the deadline is reached the connection will be terminated regardless of status of the streaming responses.
* - if user code does not reply with a response within the deadline, we produce a special [[ServerSettings.terminationDeadlineExceededResponse]]
* - if user code does not reply with a response within the deadline we produce a special [[ServerSettings.terminationDeadlineExceededResponse]]
* HTTP response (e.g. 503 Service Unavailable)
*
* 3) Keep draining incoming requests on existing connection:
Expand Down
76 changes: 76 additions & 0 deletions docs/src/main/paradox/server-side/graceful-termination.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Graceful termination

## Graceful termination using `ServerTerminator`

Akka HTTP provides two APIs to "stop" the server, either of them are available via the
@java[@javadoc[ServerBinding](akka.http.javadsl.ServerBinding)]
@scala[@scaladoc[ServerBinding](akka.http.scaladsl.Http$$ServerBinding)]
obtained from starting the server (by using any of the `bind...` methods on the
@java[@javadoc[Http](akka.http.javadsl.Http)]@scala[@scaladoc[Http](akka.http.scaladsl.HttpExt)] extension).

The first method, called `unbind()` causes the server to *stop accepting new connections*, however any existing
connections that are still being used will remain active until the client chooses to close them.
It only unbinds the port on which the http server has been listening. This allows HTTP server to finish streaming any
responses that might be still in flight and eventually terminate the entire system. If your application uses long-lived
connections, this does mean that these can delay the termination of your system indefinitely.

A better and more graceful solution to terminate an Akka HTTP server is to use the
@java[@javadoc[ServerBinding.terminate(Duration)](akka.http.javadsl.ServerBinding#terminate-java.time.Duration-)]
@scala[@scaladoc[ServerBinding.terminate(FiniteDuration)](akka.http.scaladsl.Http$$ServerBinding#terminate%28FiniteDuration%29:Future[HttpTerminated])]
method, which not only performs the unbinding, but also
handles replying to new incoming requests with (configurable) "terminating" HTTP responses.
It also allows setting a deadline after which any connections that are still alive will be shut down forcefully.
More precisely, termination works by following these steps:

First, the server port is unbound and no new connections will be accepted (same as invoking `unbind()`).
Immediately the
@java[@javadoc[ServerBinding#whenTerminationSignalIssued](akka.http.javadsl.ServerBinding#whenTerminationSignalIssued--) `CompletionStage`]
@scala[@scaladoc[ServerBinding#whenTerminationSignalIssued](akka.http.scaladsl.Http$$ServerBinding#whenTerminationSignalIssued:Future[Deadline]) `Future`]
is completed.
This can be used to signal parts of the application that the HTTP server is shutting down and they should clean up as well.
Note also that for more advanced shut down scenarios you may want to use the @extref[Coordinated Shutdown](akka-docs:/actors.html#coordinated-shutdown) capabilities of Akka.

Next, all in flight requests will be handled. If a request is "in-flight" (being handled by user code), it is given `hardDeadline` time to complete.

- if user code emits a response within the timeout, then this response is sent to the client
- if it is a streaming response, it is also mandated that it shall complete within the deadline, and if it does not
the connection will be terminated regardless of status of the streaming response. This is because such response could be infinite,
which could trap the server in a situation where it could not terminate if it were to wait for a response to "finish".
- existing streaming responses must complete before the deadline as well.
When the deadline is reached the connection will be terminated regardless of status of the streaming responses.
- if user code does not reply with a response within the deadline we produce a special @java[`akka.http.javadsl.settings.ServerSettings.getTerminationDeadlineExceededResponse`]@scala[`akka.http.scaladsl.settings.ServerSettings.terminationDeadlineExceededResponse`]
HTTP response (e.g. `503 Service Unavailable`)

During that time incoming requests are continue to be served. The existing connections will remain alive for until the
`hardDeadline` is exceeded, yet no new requests will be delivered to the user handler. All such drained responses will be replied to with an termination response (as explained in step 2).

Finally, all remaining alive connections are forcefully terminated once the `hardDeadline` is exceeded.
The `whenTerminated` (exposed by `ServerBinding`) @java[CompletionStage]@scala[future] is completed as well, so the
graceful termination (of the `ActorSystem` or entire JVM itself can be safely performed, as by then it is known that no
connections remain alive to this server).

Note that the termination response is configurable in `ServerSettings`, and by default is an `503 Service Unavailable`,
with an empty response entity.

Starting a graceful termination is as simple as invoking the terminate() method on the server binding:

Scala
: @@snip [HttpServerExampleSpec.scala]($test$/scala/docs/http/scaladsl/HttpServerExampleSpec.scala) { #graceful-termination }

Java
: @@snip [HttpServerExampleDocTest.java]($test$/java/docs/http/javadsl/server/HttpServerExampleDocTest.java) { #graceful-termination }

## Akka Coordinated Shutdown

@@@ note

NOT IMPLEMENTED YET.

Coordinated shutdown support is not yet implemented in Akka HTTP;
The goal is for it to invoke the graceful termination process as described above automatically when shutdown is requested.
See the issue [#1210](https://github.com/akka/akka-http/issues/1210) for more details.

@@@

Coordinated shutdown is Akka's managed way of shutting down multiple modules / sub-systems (persistence, cluster, http etc)
in a predictable and ordered fashion. For example, in a typical Akka application you will want to stop accepting new HTTP connections, and then shut down the cluster etc.
1 change: 1 addition & 0 deletions docs/src/main/paradox/server-side/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ from a background with non-"streaming first" HTTP Servers.
* [routing-dsl/index](../routing-dsl/index.md)
* [websocket-support](websocket-support.md)
* [server-https-support](server-https-support.md)
* [graceful-termination](graceful-termination.md)
* [http2](http2.md)

@@@
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.IncomingConnection;
import akka.http.javadsl.ServerBinding;
import akka.actor.CoordinatedShutdown;
import akka.http.javadsl.*;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.*;
import akka.http.javadsl.model.headers.Connection;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.japi.function.Function;
Expand All @@ -30,6 +30,7 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -216,24 +217,24 @@ public HttpResponse apply(HttpRequest request) throws Exception {
public static void main(String[] args) throws Exception {
fullServerExample();
}


static class ConsumeEntityUsingEntityDirective {
//#consume-entity-directive
class Bid {
final String userId;
final int bid;

Bid(String userId, int bid) {
this.userId = userId;
this.bid = bid;
this.bid = bid;
}
}

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class);

final Route s = path("bid", () ->
Expand All @@ -246,7 +247,7 @@ class Bid {
);
//#consume-entity-directive
}

void consumeEntityUsingRawDataBytes() {
//#consume-raw-dataBytes
final ActorSystem system = ActorSystem.create();
Expand All @@ -270,7 +271,7 @@ void consumeEntityUsingRawDataBytes() {

//#consume-raw-dataBytes
}

void discardEntityUsingRawBytes() {
//#discard-discardEntityBytes
final ActorSystem system = ActorSystem.create();
Expand All @@ -293,7 +294,7 @@ void discardEntityUsingRawBytes() {
);
//#discard-discardEntityBytes
}

void discardEntityManuallyCloseConnections() {
//#discard-close-connections
final ActorSystem system = ActorSystem.create();
Expand All @@ -312,7 +313,7 @@ void discardEntityManuallyCloseConnections() {
// Closing connections, method 2 (graceful):
// consider draining connection and replying with `Connection: Close` header
// if you want the client to close after this request/reply cycle instead:
return respondWithHeader(Connection.create("close"), () ->
return respondWithHeader(Connection.create("close"), () ->
complete(StatusCodes.FORBIDDEN, "Not allowed!")
);
})
Expand All @@ -321,6 +322,35 @@ void discardEntityManuallyCloseConnections() {
);
//#discard-close-connections
}



public static void gracefulTerminationExample() throws Exception {
//#graceful-termination
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);


CompletionStage<ServerBinding> binding = Http.get(system).bindAndHandle(
Directives.complete("Hello world!").flow(system, materializer),
ConnectHttp.toHost("localhost", 8080), materializer);

ServerBinding serverBinding = binding.toCompletableFuture().get(3, TimeUnit.SECONDS);

// ...
// once ready to terminate the server, invoke terminate:
CompletionStage<HttpTerminated> onceAllConnectionsTerminated =
serverBinding.terminate(Duration.ofSeconds(3));

// once all connections are terminated,
HttpTerminated httpTerminated = onceAllConnectionsTerminated.toCompletableFuture().get();

// - you can invoke coordinated shutdown to tear down the rest of the system:
CoordinatedShutdown.get(system).run();

// - or terminate the system explicitly:
// system.terminate();

//#graceful-termination
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@

package docs.http.scaladsl

import akka.actor.CoordinatedShutdown
import akka.actor.CoordinatedShutdown.UnknownReason
import akka.event.LoggingAdapter
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.testkit.TestActors
import docs.CompileOnlySpec
import org.scalatest.{ Matchers, WordSpec }

import scala.language.postfixOps
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.{ Await, ExecutionContext, Future }

class HttpServerExampleSpec extends WordSpec with Matchers
with CompileOnlySpec {
Expand Down Expand Up @@ -745,4 +748,42 @@ class HttpServerExampleSpec extends WordSpec with Matchers
val route = fixedRoute ~ dynamicRoute
//#dynamic-routing-example
}

"graceful termination" in compileOnlySpec {
//#graceful-termination
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import scala.concurrent.duration._

implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()

val routes = get {
complete("Hello world!")
}

val binding: Future[Http.ServerBinding] =
Http().bindAndHandle(routes, "127.0.0.1", 8080)

// ...
// once ready to terminate the server, invoke terminate:
val onceAllConnectionsTerminated: Future[Http.HttpTerminated] =
Await.result(binding, 10.seconds)
.terminate(hardDeadline = 3.seconds)

// once all connections are terminated,
// - you can invoke coordinated shutdown to tear down the rest of the system:
onceAllConnectionsTerminated.flatMap { _
CoordinatedShutdown(system).run(UnknownReason)
}
// - or terminate the system explicitly:
// onceAllConnectionsTerminated.flatMap { _ ⇒
// system.terminate()
// }

//#graceful-termination
}
}

0 comments on commit 37169eb

Please sign in to comment.