Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Graceful shutdown #30

Merged
merged 113 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from 106 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
c6bff7f
fix server address
nWacky Jun 17, 2019
ac68159
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
e9482c3
add exit signals handlers to Room
nWacky Jun 18, 2019
b1b46fe
fix warnings
nWacky Jun 18, 2019
cc932cb
fix tests
nWacky Jun 19, 2019
b5ceb0f
format
nWacky Jun 19, 2019
26fcfa1
minor refactor
alexlapa Jun 20, 2019
86ff2af
implement graceful_shutdown actor
nWacky Jun 21, 2019
a7141e0
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
b0d908f
GracefulShutdown use async actor context
nWacky Jun 24, 2019
6ae350c
implement then_all
nWacky Jun 24, 2019
68c3802
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
eac51b1
remove actix web graceful shutdown
nWacky Jun 24, 2019
5d89b3b
make then_all poll call recursive
nWacky Jun 25, 2019
74cdb77
fix warnings
nWacky Jun 25, 2019
08096fd
format
nWacky Jun 25, 2019
04eeb57
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
77c40eb
fix test warning
nWacky Jun 25, 2019
8b997a8
add arbiter, derive debug in room
nWacky Jun 26, 2019
07adfde
not use TimeoutShutdown
nWacky Jun 26, 2019
a356f7c
fix warnings
nWacky Jun 26, 2019
b64e99e
use standard tokio then
nWacky Jun 26, 2019
2585a77
fix server address
nWacky Jun 17, 2019
ce8a13e
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
16fa33f
add exit signals handlers to Room
nWacky Jun 18, 2019
da86037
fix warnings
nWacky Jun 18, 2019
8a780e3
minor refactor
alexlapa Jun 20, 2019
598612a
implement graceful_shutdown actor
nWacky Jun 21, 2019
5911f37
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
3e7acaa
GracefulShutdown use async actor context
nWacky Jun 24, 2019
d5804de
implement then_all
nWacky Jun 24, 2019
7cd76d0
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
e45ee35
remove actix web graceful shutdown
nWacky Jun 24, 2019
7507733
make then_all poll call recursive
nWacky Jun 25, 2019
b9bbe70
fix warnings
nWacky Jun 25, 2019
2582dfa
format
nWacky Jun 25, 2019
ec75369
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
4ec3386
add arbiter, derive debug in room
nWacky Jun 26, 2019
9ed91a9
not use TimeoutShutdown
nWacky Jun 26, 2019
e1a3cb1
fix warnings
nWacky Jun 26, 2019
eaa5fce
use standard tokio then
nWacky Jun 26, 2019
4923bd9
rebase onto new master
nWacky Jun 26, 2019
b8fd13b
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jun 26, 2019
7b5066e
finish rebase
nWacky Jun 27, 2019
a99a778
spawn arbiter on new thread
nWacky Jun 27, 2019
78d2338
format, fix warnings
nWacky Jun 27, 2019
761ae5c
use start_in_arbiter
nWacky Jun 27, 2019
6c091fb
change head
nWacky Jun 27, 2019
809c5b5
change head
nWacky Jun 27, 2019
e80d356
Merge branch '29-graceful-shutdown' of https://github.com/instrumenti…
nWacky Jun 27, 2019
127214f
fix error
nWacky Jun 28, 2019
cc6bbe5
fix logger panic when shutdown
nWacky Jul 1, 2019
95e6e82
fix logger panic when shutdown, use shutdown_timeout from config
nWacky Jul 1, 2019
48459b0
format, fix warnings
nWacky Jul 1, 2019
cf889a6
start implementing future-based graceful shutdown
nWacky Jul 1, 2019
6986de0
implement future based graceful shutdown
nWacky Jul 2, 2019
ff2ac57
fix warnings
nWacky Jul 2, 2019
5c06e70
format
nWacky Jul 2, 2019
b07c401
fix bugs
nWacky Jul 2, 2019
f8cd7f9
fix sigint, remove non-unix shutdown
nWacky Jul 3, 2019
3733b1f
add sigterm
nWacky Jul 3, 2019
5238542
start implementing futures in shutdown message
nWacky Jul 3, 2019
18c0a2c
fix
alexlapa Jul 3, 2019
9b6f9a1
use global logger struct, local lazy_static import
nWacky Jul 4, 2019
3beb989
subscribe server in server::run
nWacky Jul 4, 2019
6a4d32d
cleanup, exit process properly
nWacky Jul 4, 2019
397f09e
start fixing bug
nWacky Jul 4, 2019
b6bae60
fix bug
nWacky Jul 5, 2019
f5720b3
cleanup
nWacky Jul 5, 2019
237adb5
fix warnings
nWacky Jul 5, 2019
0fe61f7
start implementing shutdown actor
Jul 8, 2019
4240184
implement shutdown actor
nWacky Jul 8, 2019
4fca3b0
use actor context for main shutdown future, use tokio timeout
nWacky Jul 8, 2019
dc5efb8
implement unsubscribe
nWacky Jul 8, 2019
04ac8e7
add todos
nWacky Jul 8, 2019
6af4aa2
start implementing hashmap
nWacky Jul 8, 2019
61ace89
start constructing big future
nWacky Jul 8, 2019
2044548
use unit type as error
nWacky Jul 9, 2019
541ff71
replace hashmap with hashset
nWacky Jul 9, 2019
5156358
inject stream into actor context
nWacky Jul 9, 2019
cd6d5f0
add room state machine
nWacky Jul 9, 2019
25f5c33
return future from graceful shutdown
nWacky Jul 9, 2019
b474ea8
fix warnings
nWacky Jul 9, 2019
aaa5170
start implementing async CloseRoom{}
nWacky Jul 9, 2019
3560b5c
start implementing async drop_connections
nWacky Jul 9, 2019
5ce83c3
implement async drop_connections
nWacky Jul 10, 2019
67c26df
fix bug
nWacky Jul 10, 2019
bf9bf3f
fix warnings
nWacky Jul 10, 2019
0b9d86f
format imports, change room state
nWacky Jul 10, 2019
6f7190e
format imports
nWacky Jul 10, 2019
7eaf698
format imports
nWacky Jul 10, 2019
8161d35
fix
nWacky Jul 10, 2019
e913753
refactor shutdown and config
nWacky Jul 12, 2019
18e5e73
refactor messages subscribe/unsubscribe
nWacky Jul 12, 2019
60a9029
replace u8 with priority
nWacky Jul 12, 2019
ffc1f5b
cfg not unix is a warning
nWacky Jul 12, 2019
46a7278
cfg not unix is a warning
nWacky Jul 12, 2019
000f241
implement get_drop_fut
nWacky Jul 12, 2019
2863244
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jul 12, 2019
959a4d4
add description
nWacky Jul 12, 2019
9a9ca13
start implementing better signals stream
nWacky Jul 12, 2019
bb3587e
move CloseRoom to a function in Room, fix warnings
nWacky Jul 15, 2019
ec2a4dc
start using async actor context
nWacky Jul 15, 2019
f29e47a
refactor
nWacky Jul 15, 2019
0af073f
format, lint
nWacky Jul 15, 2019
36fdcf7
format
nWacky Jul 15, 2019
8ada2fd
Merge branch 'master' into 29-graceful-shutdown
tyranron Jul 17, 2019
f060d62
Correct conf implementation
tyranron Jul 17, 2019
8bf9fec
Refactor shutdown service implementation
tyranron Jul 17, 2019
ffc0534
Refactor shutdown handling
tyranron Jul 17, 2019
bded4b5
fix docs
alexlapa Jul 17, 2019
e411747
Merge branch 'master' into 29-graceful-shutdown
alexlapa Jul 18, 2019
6c7cada
fmt && update locks
alexlapa Jul 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
718 changes: 390 additions & 328 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ lto = "thin"

[dependencies]
actix = "0.8"
actix-rt = "0.2.3"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you specify the patch version when all the other deps specify minor version only?

actix-web = "1.0"
actix-web-actors = "1.0"
bb8 = "0.3"
Expand Down Expand Up @@ -49,7 +50,9 @@ slog-scope = "4.1"
slog-stdlog = "3.0"
smart-default = "0.5"
tokio = "0.1"
tokio-signal = "0.2.7"
toml = "0.4"

[dependencies.serde-humantime]
git = "https://github.com/tailhook/serde-humantime"
branch = "serde_wrapper"
Expand Down
6 changes: 6 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,9 @@
#
# Default:
# connection_timeout = "5s"


[shutdown]
# Number of milliseconds given to shutdown gracefully
# Default:
# timeout = 5000
6 changes: 3 additions & 3 deletions src/api/client/rpc_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`RpcConnection`] with related messages.

use core::fmt;
use std::fmt;

use actix::Message;
use futures::Future;
Expand All @@ -27,7 +27,7 @@ pub trait RpcConnection: fmt::Debug + Send {
/// Closes [`RpcConnection`].
/// No [`RpcConnectionClosed`] signals should be emitted.
/// Always returns success.
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>>;
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()> + Send>;

/// Sends [`Event`] to remote [`Member`].
fn send_event(
Expand Down Expand Up @@ -208,7 +208,7 @@ pub mod test {
}

impl RpcConnection for Addr<TestConnection> {
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>> {
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let fut = self.send(Close {}).map_err(|_| ());
Box::new(fut)
}
Expand Down
51 changes: 44 additions & 7 deletions src/api/client/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! HTTP server for handling WebSocket connections of Client API.

use actix::{Actor, Addr};
use actix_web::{
middleware,
web::{resource, Data, Path, Payload},
App, HttpRequest, HttpResponse, HttpServer,
middleware, web::{Data, Path, Payload, resource},
};
use actix_web_actors::ws;
use futures::{
Expand All @@ -25,6 +25,8 @@ use crate::{
signalling::{RoomId, RoomsRepository},
};

use self::actors::ServerWrapper;

/// Parameters of new WebSocket connection creation HTTP request.
#[derive(Debug, Deserialize)]
struct RequestParams {
Expand Down Expand Up @@ -85,10 +87,10 @@ pub struct Context {
}

/// Starts HTTP server for handling WebSocket connections of Client API.
pub fn run(rooms: RoomsRepository, config: Conf) {
pub fn run(rooms: RoomsRepository, config: Conf) -> Addr<ServerWrapper> {
let server_addr = config.server.bind_addr();

HttpServer::new(move || {
let actix_server = HttpServer::new(move || {
App::new()
.data(Context {
rooms: rooms.clone(),
Expand All @@ -100,19 +102,52 @@ pub fn run(rooms: RoomsRepository, config: Conf) {
.route(actix_web::web::get().to_async(ws_index)),
)
})
.disable_signals()
.bind(server_addr)
.unwrap()
.start();

info!("Started HTTP server on 0.0.0.0:8080");
info!("Started HTTP server on {:?}", server_addr);

actors::ServerWrapper(actix_server).start()
}

pub mod actors {
use actix::{Actor, ActorFuture, Context, Handler};
use actix::fut::wrap_future;
use actix_web::dev::Server;
use tokio::prelude::Future;

use crate::{log::prelude::*, shutdown::ShutdownMessage};

pub struct ServerWrapper(pub Server);

impl Actor for ServerWrapper {
type Context = Context<Self>;
}

impl Handler<ShutdownMessage> for ServerWrapper {
type Result = Box<dyn ActorFuture<Actor = Self, Item = (), Error = ()>>;

fn handle(
&mut self,
_: ShutdownMessage,
_: &mut Self::Context,
) -> Self::Result {
info!("Shutting down Actix Web Server");

Box::new(wrap_future(
self.0.stop(true).then(move |_| futures::future::ok(())),
))
}
}
}

#[cfg(test)]
mod test {
use std::{ops::Add, thread, time::Duration};

use actix::Actor as _;
use actix_http::{ws::Message, HttpService};
use actix_http::{HttpService, ws::Message};
use actix_http_test::{TestServer, TestServerRuntime};
use futures::{future::IntoFuture as _, sink::Sink as _, Stream as _};

Expand Down Expand Up @@ -171,13 +206,15 @@ mod test {

#[test]
fn ping_pong_and_disconnects_on_idle() {
let config = Conf::parse().unwrap();
let conf = Conf {
rpc: Rpc {
idle_timeout: Duration::new(2, 0),
reconnect_timeout: Default::default(),
},
turn: Turn::default(),
server: Server::default(),
system_config: config.system_config,
};

let mut server = ws_server(conf.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/api/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl RpcConnection for Addr<WsSession> {
/// Closes [`WsSession`] by sending itself "normal closure" close message.
///
/// Never returns error.
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()>> {
fn close(&mut self) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let fut = self
.send(Close {
reason: Some(ws::CloseCode::Normal.into()),
Expand Down
4 changes: 4 additions & 0 deletions src/conf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod rpc;
pub mod server;
pub mod shutdown_config;
pub mod turn;

use std::env;
Expand All @@ -13,6 +14,7 @@ use serde::{Deserialize, Serialize};
pub use self::{
rpc::Rpc,
server::Server,
shutdown_config::ShutdownConfiguration,
turn::{Redis, Turn},
};

Expand All @@ -33,6 +35,8 @@ pub struct Conf {
pub server: Server,
/// TURN server settings.
pub turn: Turn,
// More settings
pub system_config: ShutdownConfiguration,
}

impl Conf {
Expand Down
11 changes: 11 additions & 0 deletions src/conf/shutdown_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! More system settings

use serde::{Deserialize, Serialize};
use smart_default::*;

#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)]
#[serde(default)]
pub struct ShutdownConfiguration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like was said earlier: just Shutdown is enough here. No sense to specify Configuration term as you are in conf module already. You had the similar declarations nearby, don't the difference confuse you?

#[default(5000)]
pub timeout: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation is mandatory.

}
39 changes: 26 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
//! Medea media server application.

#[macro_use]
pub mod utils;
pub mod api;
pub mod conf;
pub mod log;
pub mod media;
pub mod signalling;
pub mod turn;

use actix::prelude::*;

use dotenv::dotenv;
use log::prelude::*;

Expand All @@ -21,24 +13,36 @@ use crate::{
turn::new_turn_auth_service,
};

#[macro_use]
pub mod utils;
pub mod api;
pub mod conf;
pub mod log;
pub mod media;
pub mod shutdown;
pub mod signalling;
pub mod turn;

fn main() {
dotenv().ok();
let logger = log::new_dual_logger(std::io::stdout(), std::io::stderr());
let _scope_guard = slog_scope::set_global_logger(logger);
slog_stdlog::init().unwrap();

let sys = System::new("medea");

let config = Conf::parse().unwrap();

info!("{:?}", config);

let sys = System::new("medea");

let members = hashmap! {
1 => Member::new(1, "caller_credentials".to_owned()),
2 => Member::new(2, "responder_credentials".to_owned()),
};

let peers = create_peers(1, 2);

let graceful_shutdown_addr = shutdown::create(config.system_config.timeout);

let turn_auth_service =
new_turn_auth_service(&config).expect("Unable to start turn service");

Expand All @@ -47,10 +51,19 @@ fn main() {
let room = Room::start_in_arbiter(&Arbiter::new(), move |_| {
Room::new(1, members, peers, rpc_reconnect_timeout, turn_auth_service)
});
graceful_shutdown_addr.do_send(shutdown::Subscribe(shutdown::Subscriber {
addr: room.clone().recipient(),
priority: shutdown::Priority(1),
}));

let rooms = hashmap! {1 => room};
let rooms_repo = RoomsRepository::new(rooms);

server::run(rooms_repo, config);
let server_addr = server::run(rooms_repo, config);
graceful_shutdown_addr.do_send(shutdown::Subscribe(shutdown::Subscriber {
addr: server_addr.recipient(),
priority: shutdown::Priority(5),
}));

let _ = sys.run();
}
Loading