Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Commit

Permalink
Implement graceful shutdown (#30, #29)
Browse files Browse the repository at this point in the history
- impl GracefulShutdown service which listens to OS signals and shutdowns each component gracefully
- impl graceful shutdown for Room and HTTP server
- add [shutdown] config section

Additionally:
- provide explicit states for Room
  • Loading branch information
nWacky authored and alexlapa committed Jul 18, 2019
1 parent 0721706 commit 9fa4b5b
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 120 deletions.
153 changes: 95 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ slog-scope = "4.1"
slog-stdlog = "3.0"
smart-default = "0.5"
tokio = "0.1"
tokio-signal = "0.2"
toml = "0.4"
[dependencies.serde-humantime]
git = "https://github.com/tailhook/serde-humantime"
Expand Down
9 changes: 9 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@
#
# Default:
# level = "INFO"




[shutdown]
# Maximum duration given to shutdown the whole application gracefully.
#
# Default:
# timeout = "1s"
69 changes: 47 additions & 22 deletions src/api/client/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
use std::io;

use actix::{Actor, Addr, Handler, ResponseActFuture, WrapFuture as _};
use actix_web::{
dev::Server as ActixServer,
middleware,
web::{resource, Data, Path, Payload},
App, HttpRequest, HttpResponse, HttpServer,
Expand All @@ -24,6 +26,7 @@ use crate::{
},
conf::{Conf, Rpc},
log::prelude::*,
shutdown::ShutdownGracefully,
signalling::{RoomId, RoomsRepository},
};

Expand Down Expand Up @@ -86,42 +89,64 @@ pub struct Context {
pub config: Rpc,
}

/// Starts HTTP server for handling WebSocket connections of Client API.
pub fn run(rooms: RoomsRepository, config: Conf) -> io::Result<()> {
let server_addr = config.server.bind_addr();
/// HTTP server that handles WebSocket connections of Client API.
pub struct Server(ActixServer);

impl Server {
/// Starts Client API HTTP server.
pub fn run(rooms: RoomsRepository, config: Conf) -> io::Result<Addr<Self>> {
let server_addr = config.server.bind_addr();

let server = HttpServer::new(move || {
App::new()
.data(Context {
rooms: rooms.clone(),
config: config.rpc.clone(),
})
.wrap(middleware::Logger::default())
.service(
resource("/ws/{room_id}/{member_id}/{credentials}")
.route(actix_web::web::get().to_async(ws_index)),
)
})
.disable_signals()
.bind(server_addr)?
.start();

HttpServer::new(move || {
App::new()
.data(Context {
rooms: rooms.clone(),
config: config.rpc.clone(),
})
.wrap(middleware::Logger::default())
.service(
resource("/ws/{room_id}/{member_id}/{credentials}")
.route(actix_web::web::get().to_async(ws_index)),
)
})
.bind(server_addr)?
.start();
info!("Started Client API HTTP server on {}", server_addr);

Ok(Self(server).start())
}
}

info!("Started HTTP server on {}", server_addr);
impl Actor for Server {
type Context = actix::Context<Self>;
}

Ok(())
impl Handler<ShutdownGracefully> for Server {
type Result = ResponseActFuture<Self, (), ()>;

fn handle(
&mut self,
_: ShutdownGracefully,
_: &mut Self::Context,
) -> Self::Result {
info!("Shutting down Client API HTTP server");
Box::new(self.0.stop(true).into_actor(self))
}
}

#[cfg(test)]
mod test {
use std::{ops::Add, thread, time::Duration};

use actix::Actor as _;
use actix_http::{ws::Message, HttpService};
use actix_http_test::{TestServer, TestServerRuntime};
use futures::{future::IntoFuture as _, sink::Sink as _, Stream as _};

use crate::{
api::control::Member, media::create_peers, signalling::Room,
turn::new_turn_auth_service_mock,
api::control::Member, conf::Conf, media::create_peers,
signalling::Room, turn::new_turn_auth_service_mock,
};

use super::*;
Expand Down
17 changes: 17 additions & 0 deletions src/conf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod log;
pub mod rpc;
pub mod server;
pub mod shutdown;
pub mod turn;

use std::env;
Expand All @@ -15,6 +16,7 @@ pub use self::{
log::Log,
rpc::Rpc,
server::Server,
shutdown::Shutdown,
turn::{Redis, Turn},
};

Expand All @@ -37,6 +39,8 @@ pub struct Conf {
pub turn: Turn,
/// Logging settings.
pub log: Log,
/// Application shutdown settings.
pub shutdown: Shutdown,
}

impl Conf {
Expand Down Expand Up @@ -262,4 +266,17 @@ mod tests {

assert_eq!(Conf::parse().unwrap().log.level(), None);
}

#[test]
#[serial]
fn shutdown_conf_test() {
let default_conf = Conf::default();

env::set_var("MEDEA_SHUTDOWN.TIMEOUT", "700ms");

let env_conf = Conf::parse().unwrap();

assert_ne!(default_conf.shutdown.timeout, env_conf.shutdown.timeout);
assert_eq!(env_conf.shutdown.timeout, Duration::from_millis(700));
}
}
16 changes: 16 additions & 0 deletions src/conf/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Application shutdown settings.
use std::time::Duration;

use serde::{Deserialize, Serialize};
use smart_default::SmartDefault;

/// Application shutdown settings.
#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)]
#[serde(default)]
pub struct Shutdown {
/// Maximum duration given to shutdown the whole application gracefully.
#[default(Duration::from_secs(5))]
#[serde(with = "serde_humantime")]
pub timeout: Duration,
}
47 changes: 41 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod api;
pub mod conf;
pub mod log;
pub mod media;
pub mod shutdown;
pub mod signalling;
pub mod turn;

Expand All @@ -17,9 +18,10 @@ use futures::IntoFuture as _;
use log::prelude::*;

use crate::{
api::{client::server, control::Member},
api::{client::server::Server, control::Member},
conf::Conf,
media::create_peers,
shutdown::GracefulShutdown,
signalling::{Room, RoomsRepository},
turn::new_turn_auth_service,
};
Expand Down Expand Up @@ -53,15 +55,48 @@ fn main() -> io::Result<()> {
turn_auth_service,
)
.start();

Ok((room, config))
})
.and_then(|(room, config)| {
let graceful_shutdown =
GracefulShutdown::new(config.shutdown.timeout).start();
graceful_shutdown
.send(shutdown::Subscribe(shutdown::Subscriber {
addr: room.clone().recipient(),
priority: shutdown::Priority(2),
}))
.map_err(|e| {
error!("Shutdown subscription failed for Room: {}", e)
})
.map(move |_| (room, graceful_shutdown, config))
})
.map(|(room, graceful_shutdown, config)| {
let rooms = hashmap! {1 => room};
let rooms_repo = RoomsRepository::new(rooms);

server::run(rooms_repo, config)
.map_err(|err| {
error!("Error starting application {:?}", err)
(rooms_repo, graceful_shutdown, config)
})
.and_then(|(rooms_repo, graceful_shutdown, config)| {
Server::run(rooms_repo, config)
.map_err(|e| {
error!("Error starting Client API HTTP server {:?}", e)
})
.map(|server| {
graceful_shutdown
.send(shutdown::Subscribe(shutdown::Subscriber {
addr: server.recipient(),
priority: shutdown::Priority(1),
}))
.map_err(|e| {
error!(
"Shutdown subscription failed for Client \
API HTTP server: {}",
e
)
})
.map(|_| ())
})
.into_future()
.flatten()
})
})
}
Loading

0 comments on commit 9fa4b5b

Please sign in to comment.