Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Graceful shutdown #30

Merged
merged 113 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
c6bff7f
fix server address
nWacky Jun 17, 2019
ac68159
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
e9482c3
add exit signals handlers to Room
nWacky Jun 18, 2019
b1b46fe
fix warnings
nWacky Jun 18, 2019
cc932cb
fix tests
nWacky Jun 19, 2019
b5ceb0f
format
nWacky Jun 19, 2019
26fcfa1
minor refactor
alexlapa Jun 20, 2019
86ff2af
implement graceful_shutdown actor
nWacky Jun 21, 2019
a7141e0
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
b0d908f
GracefulShutdown use async actor context
nWacky Jun 24, 2019
6ae350c
implement then_all
nWacky Jun 24, 2019
68c3802
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
eac51b1
remove actix web graceful shutdown
nWacky Jun 24, 2019
5d89b3b
make then_all poll call recursive
nWacky Jun 25, 2019
74cdb77
fix warnings
nWacky Jun 25, 2019
08096fd
format
nWacky Jun 25, 2019
04eeb57
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
77c40eb
fix test warning
nWacky Jun 25, 2019
8b997a8
add arbiter, derive debug in room
nWacky Jun 26, 2019
07adfde
not use TimeoutShutdown
nWacky Jun 26, 2019
a356f7c
fix warnings
nWacky Jun 26, 2019
b64e99e
use standard tokio then
nWacky Jun 26, 2019
2585a77
fix server address
nWacky Jun 17, 2019
ce8a13e
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
16fa33f
add exit signals handlers to Room
nWacky Jun 18, 2019
da86037
fix warnings
nWacky Jun 18, 2019
8a780e3
minor refactor
alexlapa Jun 20, 2019
598612a
implement graceful_shutdown actor
nWacky Jun 21, 2019
5911f37
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
3e7acaa
GracefulShutdown use async actor context
nWacky Jun 24, 2019
d5804de
implement then_all
nWacky Jun 24, 2019
7cd76d0
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
e45ee35
remove actix web graceful shutdown
nWacky Jun 24, 2019
7507733
make then_all poll call recursive
nWacky Jun 25, 2019
b9bbe70
fix warnings
nWacky Jun 25, 2019
2582dfa
format
nWacky Jun 25, 2019
ec75369
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
4ec3386
add arbiter, derive debug in room
nWacky Jun 26, 2019
9ed91a9
not use TimeoutShutdown
nWacky Jun 26, 2019
e1a3cb1
fix warnings
nWacky Jun 26, 2019
eaa5fce
use standard tokio then
nWacky Jun 26, 2019
4923bd9
rebase onto new master
nWacky Jun 26, 2019
b8fd13b
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jun 26, 2019
7b5066e
finish rebase
nWacky Jun 27, 2019
a99a778
spawn arbiter on new thread
nWacky Jun 27, 2019
78d2338
format, fix warnings
nWacky Jun 27, 2019
761ae5c
use start_in_arbiter
nWacky Jun 27, 2019
6c091fb
change head
nWacky Jun 27, 2019
809c5b5
change head
nWacky Jun 27, 2019
e80d356
Merge branch '29-graceful-shutdown' of https://github.com/instrumenti…
nWacky Jun 27, 2019
127214f
fix error
nWacky Jun 28, 2019
cc6bbe5
fix logger panic when shutdown
nWacky Jul 1, 2019
95e6e82
fix logger panic when shutdown, use shutdown_timeout from config
nWacky Jul 1, 2019
48459b0
format, fix warnings
nWacky Jul 1, 2019
cf889a6
start implementing future-based graceful shutdown
nWacky Jul 1, 2019
6986de0
implement future based graceful shutdown
nWacky Jul 2, 2019
ff2ac57
fix warnings
nWacky Jul 2, 2019
5c06e70
format
nWacky Jul 2, 2019
b07c401
fix bugs
nWacky Jul 2, 2019
f8cd7f9
fix sigint, remove non-unix shutdown
nWacky Jul 3, 2019
3733b1f
add sigterm
nWacky Jul 3, 2019
5238542
start implementing futures in shutdown message
nWacky Jul 3, 2019
18c0a2c
fix
alexlapa Jul 3, 2019
9b6f9a1
use global logger struct, local lazy_static import
nWacky Jul 4, 2019
3beb989
subscribe server in server::run
nWacky Jul 4, 2019
6a4d32d
cleanup, exit process properly
nWacky Jul 4, 2019
397f09e
start fixing bug
nWacky Jul 4, 2019
b6bae60
fix bug
nWacky Jul 5, 2019
f5720b3
cleanup
nWacky Jul 5, 2019
237adb5
fix warnings
nWacky Jul 5, 2019
0fe61f7
start implementing shutdown actor
Jul 8, 2019
4240184
implement shutdown actor
nWacky Jul 8, 2019
4fca3b0
use actor context for main shutdown future, use tokio timeout
nWacky Jul 8, 2019
dc5efb8
implement unsubscribe
nWacky Jul 8, 2019
04ac8e7
add todos
nWacky Jul 8, 2019
6af4aa2
start implementing hashmap
nWacky Jul 8, 2019
61ace89
start constructing big future
nWacky Jul 8, 2019
2044548
use unit type as error
nWacky Jul 9, 2019
541ff71
replace hashmap with hashset
nWacky Jul 9, 2019
5156358
inject stream into actor context
nWacky Jul 9, 2019
cd6d5f0
add room state machine
nWacky Jul 9, 2019
25f5c33
return future from graceful shutdown
nWacky Jul 9, 2019
b474ea8
fix warnings
nWacky Jul 9, 2019
aaa5170
start implementing async CloseRoom{}
nWacky Jul 9, 2019
3560b5c
start implementing async drop_connections
nWacky Jul 9, 2019
5ce83c3
implement async drop_connections
nWacky Jul 10, 2019
67c26df
fix bug
nWacky Jul 10, 2019
bf9bf3f
fix warnings
nWacky Jul 10, 2019
0b9d86f
format imports, change room state
nWacky Jul 10, 2019
6f7190e
format imports
nWacky Jul 10, 2019
7eaf698
format imports
nWacky Jul 10, 2019
8161d35
fix
nWacky Jul 10, 2019
e913753
refactor shutdown and config
nWacky Jul 12, 2019
18e5e73
refactor messages subscribe/unsubscribe
nWacky Jul 12, 2019
60a9029
replace u8 with priority
nWacky Jul 12, 2019
ffc1f5b
cfg not unix is a warning
nWacky Jul 12, 2019
46a7278
cfg not unix is a warning
nWacky Jul 12, 2019
000f241
implement get_drop_fut
nWacky Jul 12, 2019
2863244
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jul 12, 2019
959a4d4
add description
nWacky Jul 12, 2019
9a9ca13
start implementing better signals stream
nWacky Jul 12, 2019
bb3587e
move CloseRoom to a function in Room, fix warnings
nWacky Jul 15, 2019
ec2a4dc
start using async actor context
nWacky Jul 15, 2019
f29e47a
refactor
nWacky Jul 15, 2019
0af073f
format, lint
nWacky Jul 15, 2019
36fdcf7
format
nWacky Jul 15, 2019
8ada2fd
Merge branch 'master' into 29-graceful-shutdown
tyranron Jul 17, 2019
f060d62
Correct conf implementation
tyranron Jul 17, 2019
8bf9fec
Refactor shutdown service implementation
tyranron Jul 17, 2019
ffc0534
Refactor shutdown handling
tyranron Jul 17, 2019
bded4b5
fix docs
alexlapa Jul 17, 2019
e411747
Merge branch 'master' into 29-graceful-shutdown
alexlapa Jul 18, 2019
6c7cada
fmt && update locks
alexlapa Jul 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 95 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ slog-scope = "4.1"
slog-stdlog = "3.0"
smart-default = "0.5"
tokio = "0.1"
tokio-signal = "0.2"
toml = "0.4"
[dependencies.serde-humantime]
git = "https://github.com/tailhook/serde-humantime"
Expand Down
9 changes: 9 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@
#
# Default:
# level = "INFO"




[shutdown]
# Maximum duration given to shutdown the whole application gracefully.
#
# Default:
# timeout = "1s"
69 changes: 47 additions & 22 deletions src/api/client/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use std::io;

use actix::{Actor, Addr, Handler, ResponseActFuture, WrapFuture as _};
use actix_web::{
dev::Server as ActixServer,
middleware,
web::{resource, Data, Path, Payload},
App, HttpRequest, HttpResponse, HttpServer,
Expand All @@ -24,6 +26,7 @@ use crate::{
},
conf::{Conf, Rpc},
log::prelude::*,
shutdown::ShutdownGracefully,
signalling::{RoomId, RoomsRepository},
};

Expand Down Expand Up @@ -86,42 +89,64 @@ pub struct Context {
pub config: Rpc,
}

/// Starts HTTP server for handling WebSocket connections of Client API.
pub fn run(rooms: RoomsRepository, config: Conf) -> io::Result<()> {
let server_addr = config.server.bind_addr();
/// HTTP server that handles WebSocket connections of Client API.
pub struct Server(ActixServer);

impl Server {
/// Starts Client API HTTP server.
pub fn run(rooms: RoomsRepository, config: Conf) -> io::Result<Addr<Self>> {
let server_addr = config.server.bind_addr();

let server = HttpServer::new(move || {
App::new()
.data(Context {
rooms: rooms.clone(),
config: config.rpc.clone(),
})
.wrap(middleware::Logger::default())
.service(
resource("/ws/{room_id}/{member_id}/{credentials}")
.route(actix_web::web::get().to_async(ws_index)),
)
})
.disable_signals()
.bind(server_addr)?
.start();

HttpServer::new(move || {
App::new()
.data(Context {
rooms: rooms.clone(),
config: config.rpc.clone(),
})
.wrap(middleware::Logger::default())
.service(
resource("/ws/{room_id}/{member_id}/{credentials}")
.route(actix_web::web::get().to_async(ws_index)),
)
})
.bind(server_addr)?
.start();
info!("Started Client API HTTP server on {}", server_addr);

Ok(Self(server).start())
}
}

info!("Started HTTP server on {}", server_addr);
impl Actor for Server {
type Context = actix::Context<Self>;
}

Ok(())
impl Handler<ShutdownGracefully> for Server {
type Result = ResponseActFuture<Self, (), ()>;

fn handle(
&mut self,
_: ShutdownGracefully,
_: &mut Self::Context,
) -> Self::Result {
info!("Shutting down Client API HTTP server");
Box::new(self.0.stop(true).into_actor(self))
}
}

#[cfg(test)]
mod test {
use std::{ops::Add, thread, time::Duration};

use actix::Actor as _;
use actix_http::{ws::Message, HttpService};
use actix_http_test::{TestServer, TestServerRuntime};
use futures::{future::IntoFuture as _, sink::Sink as _, Stream as _};

use crate::{
api::control::Member, media::create_peers, signalling::Room,
turn::new_turn_auth_service_mock,
api::control::Member, conf::Conf, media::create_peers,
signalling::Room, turn::new_turn_auth_service_mock,
};

use super::*;
Expand Down
17 changes: 17 additions & 0 deletions src/conf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod log;
pub mod rpc;
pub mod server;
pub mod shutdown;
pub mod turn;

use std::env;
Expand All @@ -15,6 +16,7 @@ pub use self::{
log::Log,
rpc::Rpc,
server::Server,
shutdown::Shutdown,
turn::{Redis, Turn},
};

Expand All @@ -37,6 +39,8 @@ pub struct Conf {
pub turn: Turn,
/// Logging settings.
pub log: Log,
/// Application shutdown settings.
pub shutdown: Shutdown,
}

impl Conf {
Expand Down Expand Up @@ -262,4 +266,17 @@ mod tests {

assert_eq!(Conf::parse().unwrap().log.level(), None);
}

#[test]
#[serial]
fn shutdown_conf_test() {
let default_conf = Conf::default();

env::set_var("MEDEA_SHUTDOWN.TIMEOUT", "700ms");

let env_conf = Conf::parse().unwrap();

assert_ne!(default_conf.shutdown.timeout, env_conf.shutdown.timeout);
assert_eq!(env_conf.shutdown.timeout, Duration::from_millis(700));
}
}
16 changes: 16 additions & 0 deletions src/conf/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//! Application shutdown settings.

use std::time::Duration;

use serde::{Deserialize, Serialize};
use smart_default::SmartDefault;

/// Application shutdown settings.
#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)]
#[serde(default)]
pub struct Shutdown {
/// Maximum duration given to shutdown the whole application gracefully.
#[default(Duration::from_secs(5))]
#[serde(with = "serde_humantime")]
pub timeout: Duration,
}
47 changes: 41 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod api;
pub mod conf;
pub mod log;
pub mod media;
pub mod shutdown;
pub mod signalling;
pub mod turn;

Expand All @@ -17,9 +18,10 @@ use futures::IntoFuture as _;
use log::prelude::*;

use crate::{
api::{client::server, control::Member},
api::{client::server::Server, control::Member},
conf::Conf,
media::create_peers,
shutdown::GracefulShutdown,
signalling::{Room, RoomsRepository},
turn::new_turn_auth_service,
};
Expand Down Expand Up @@ -53,15 +55,48 @@ fn main() -> io::Result<()> {
turn_auth_service,
)
.start();

Ok((room, config))
})
.and_then(|(room, config)| {
let graceful_shutdown =
GracefulShutdown::new(config.shutdown.timeout).start();
graceful_shutdown
.send(shutdown::Subscribe(shutdown::Subscriber {
addr: room.clone().recipient(),
priority: shutdown::Priority(2),
}))
.map_err(|e| {
error!("Shutdown subscription failed for Room: {}", e)
})
.map(move |_| (room, graceful_shutdown, config))
})
.map(|(room, graceful_shutdown, config)| {
let rooms = hashmap! {1 => room};
let rooms_repo = RoomsRepository::new(rooms);

server::run(rooms_repo, config)
.map_err(|err| {
error!("Error starting application {:?}", err)
(rooms_repo, graceful_shutdown, config)
})
.and_then(|(rooms_repo, graceful_shutdown, config)| {
Server::run(rooms_repo, config)
.map_err(|e| {
error!("Error starting Client API HTTP server {:?}", e)
})
.map(|server| {
graceful_shutdown
.send(shutdown::Subscribe(shutdown::Subscriber {
addr: server.recipient(),
priority: shutdown::Priority(1),
}))
.map_err(|e| {
error!(
"Shutdown subscription failed for Client \
API HTTP server: {}",
e
)
})
Copy link
Collaborator

@alexlapa alexlapa Jul 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tyranron,

Напрягает толстый main. main, в моем представлении, должен просто собирать контекст. Предлагаю инжектить
graceful_shutdown в new всех интересующих нас компонентов и там уже подписываться.

Тут, теоретическим минусом может быть то, что тогда Room::new будет рурму не только собирать, но и стартовать. Но я не думаю что это будет проблемой.

Если не так, то тогда хотя бы какой-нибудь helper запилю, дабы кода в main было чуть меньше.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexlapa let's refactor it later. Things will be much simple with async/.await usage. And the subscribing itself will be moved to Actor initialization for each kind of long-living thing. But for now just left it as is...

.map(|_| ())
})
.into_future()
.flatten()
})
})
}
Loading