Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Graceful shutdown #30

Merged
merged 113 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
c6bff7f
fix server address
nWacky Jun 17, 2019
ac68159
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
e9482c3
add exit signals handlers to Room
nWacky Jun 18, 2019
b1b46fe
fix warnings
nWacky Jun 18, 2019
cc932cb
fix tests
nWacky Jun 19, 2019
b5ceb0f
format
nWacky Jun 19, 2019
26fcfa1
minor refactor
alexlapa Jun 20, 2019
86ff2af
implement graceful_shutdown actor
nWacky Jun 21, 2019
a7141e0
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
b0d908f
GracefulShutdown use async actor context
nWacky Jun 24, 2019
6ae350c
implement then_all
nWacky Jun 24, 2019
68c3802
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
eac51b1
remove actix web graceful shutdown
nWacky Jun 24, 2019
5d89b3b
make then_all poll call recursive
nWacky Jun 25, 2019
74cdb77
fix warnings
nWacky Jun 25, 2019
08096fd
format
nWacky Jun 25, 2019
04eeb57
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
77c40eb
fix test warning
nWacky Jun 25, 2019
8b997a8
add arbiter, derive debug in room
nWacky Jun 26, 2019
07adfde
not use TimeoutShutdown
nWacky Jun 26, 2019
a356f7c
fix warnings
nWacky Jun 26, 2019
b64e99e
use standard tokio then
nWacky Jun 26, 2019
2585a77
fix server address
nWacky Jun 17, 2019
ce8a13e
add SIGINT handler to ws_connection
nWacky Jun 18, 2019
16fa33f
add exit signals handlers to Room
nWacky Jun 18, 2019
da86037
fix warnings
nWacky Jun 18, 2019
8a780e3
minor refactor
alexlapa Jun 20, 2019
598612a
implement graceful_shutdown actor
nWacky Jun 21, 2019
5911f37
GracefulShutdown prototype implementantion
nWacky Jun 21, 2019
3e7acaa
GracefulShutdown use async actor context
nWacky Jun 24, 2019
d5804de
implement then_all
nWacky Jun 24, 2019
7cd76d0
change actix server signals that they wait, encapsulate ServerWrapper
nWacky Jun 24, 2019
e45ee35
remove actix web graceful shutdown
nWacky Jun 24, 2019
7507733
make then_all poll call recursive
nWacky Jun 25, 2019
b9bbe70
fix warnings
nWacky Jun 25, 2019
2582dfa
format
nWacky Jun 25, 2019
ec75369
send message to subscribe to graceful shutdown
nWacky Jun 25, 2019
4ec3386
add arbiter, derive debug in room
nWacky Jun 26, 2019
9ed91a9
not use TimeoutShutdown
nWacky Jun 26, 2019
e1a3cb1
fix warnings
nWacky Jun 26, 2019
eaa5fce
use standard tokio then
nWacky Jun 26, 2019
4923bd9
rebase onto new master
nWacky Jun 26, 2019
b8fd13b
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jun 26, 2019
7b5066e
finish rebase
nWacky Jun 27, 2019
a99a778
spawn arbiter on new thread
nWacky Jun 27, 2019
78d2338
format, fix warnings
nWacky Jun 27, 2019
761ae5c
use start_in_arbiter
nWacky Jun 27, 2019
6c091fb
change head
nWacky Jun 27, 2019
809c5b5
change head
nWacky Jun 27, 2019
e80d356
Merge branch '29-graceful-shutdown' of https://github.com/instrumenti…
nWacky Jun 27, 2019
127214f
fix error
nWacky Jun 28, 2019
cc6bbe5
fix logger panic when shutdown
nWacky Jul 1, 2019
95e6e82
fix logger panic when shutdown, use shutdown_timeout from config
nWacky Jul 1, 2019
48459b0
format, fix warnings
nWacky Jul 1, 2019
cf889a6
start implementing future-based graceful shutdown
nWacky Jul 1, 2019
6986de0
implement future based graceful shutdown
nWacky Jul 2, 2019
ff2ac57
fix warnings
nWacky Jul 2, 2019
5c06e70
format
nWacky Jul 2, 2019
b07c401
fix bugs
nWacky Jul 2, 2019
f8cd7f9
fix sigint, remove non-unix shutdown
nWacky Jul 3, 2019
3733b1f
add sigterm
nWacky Jul 3, 2019
5238542
start implementing futures in shutdown message
nWacky Jul 3, 2019
18c0a2c
fix
alexlapa Jul 3, 2019
9b6f9a1
use global logger struct, local lazy_static import
nWacky Jul 4, 2019
3beb989
subscribe server in server::run
nWacky Jul 4, 2019
6a4d32d
cleanup, exit process properly
nWacky Jul 4, 2019
397f09e
start fixing bug
nWacky Jul 4, 2019
b6bae60
fix bug
nWacky Jul 5, 2019
f5720b3
cleanup
nWacky Jul 5, 2019
237adb5
fix warnings
nWacky Jul 5, 2019
0fe61f7
start implementing shutdown actor
Jul 8, 2019
4240184
implement shutdown actor
nWacky Jul 8, 2019
4fca3b0
use actor context for main shutdown future, use tokio timeout
nWacky Jul 8, 2019
dc5efb8
implement unsubscribe
nWacky Jul 8, 2019
04ac8e7
add todos
nWacky Jul 8, 2019
6af4aa2
start implementing hashmap
nWacky Jul 8, 2019
61ace89
start constructing big future
nWacky Jul 8, 2019
2044548
use unit type as error
nWacky Jul 9, 2019
541ff71
replace hashmap with hashset
nWacky Jul 9, 2019
5156358
inject stream into actor context
nWacky Jul 9, 2019
cd6d5f0
add room state machine
nWacky Jul 9, 2019
25f5c33
return future from graceful shutdown
nWacky Jul 9, 2019
b474ea8
fix warnings
nWacky Jul 9, 2019
aaa5170
start implementing async CloseRoom{}
nWacky Jul 9, 2019
3560b5c
start implementing async drop_connections
nWacky Jul 9, 2019
5ce83c3
implement async drop_connections
nWacky Jul 10, 2019
67c26df
fix bug
nWacky Jul 10, 2019
bf9bf3f
fix warnings
nWacky Jul 10, 2019
0b9d86f
format imports, change room state
nWacky Jul 10, 2019
6f7190e
format imports
nWacky Jul 10, 2019
7eaf698
format imports
nWacky Jul 10, 2019
8161d35
fix
nWacky Jul 10, 2019
e913753
refactor shutdown and config
nWacky Jul 12, 2019
18e5e73
refactor messages subscribe/unsubscribe
nWacky Jul 12, 2019
60a9029
replace u8 with priority
nWacky Jul 12, 2019
ffc1f5b
cfg not unix is a warning
nWacky Jul 12, 2019
46a7278
cfg not unix is a warning
nWacky Jul 12, 2019
000f241
implement get_drop_fut
nWacky Jul 12, 2019
2863244
Merge remote-tracking branch 'origin/29-graceful-shutdown' into 29-gr…
nWacky Jul 12, 2019
959a4d4
add description
nWacky Jul 12, 2019
9a9ca13
start implementing better signals stream
nWacky Jul 12, 2019
bb3587e
move CloseRoom to a function in Room, fix warnings
nWacky Jul 15, 2019
ec2a4dc
start using async actor context
nWacky Jul 15, 2019
f29e47a
refactor
nWacky Jul 15, 2019
0af073f
format, lint
nWacky Jul 15, 2019
36fdcf7
format
nWacky Jul 15, 2019
8ada2fd
Merge branch 'master' into 29-graceful-shutdown
tyranron Jul 17, 2019
f060d62
Correct conf implementation
tyranron Jul 17, 2019
8bf9fec
Refactor shutdown service implementation
tyranron Jul 17, 2019
ffc0534
Refactor shutdown handling
tyranron Jul 17, 2019
bded4b5
fix docs
alexlapa Jul 17, 2019
e411747
Merge branch 'master' into 29-graceful-shutdown
alexlapa Jul 18, 2019
6c7cada
fmt && update locks
alexlapa Jul 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ actix = "0.7"
actix-web = "0.7"
bb8 = "0.3"
bb8-redis = "0.3"
config = "0.9"
chrono = "0.4"
config = "0.9"
dotenv = "0.13"
failure = "0.1"
futures = "0.1"
Expand All @@ -41,11 +41,11 @@ rust-crypto = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
slog = "2.4"
slog-envlogger = "2.1"
slog-stdlog = "3.0"
slog-async = "2.3"
slog-envlogger = "2.1"
slog-json = "2.3"
slog-scope = "4.1"
slog-stdlog = "3.0"
smart-default = "0.5"
tokio = "0.1"
toml = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 2
version: "2"

services:
coturn:
Expand Down
2 changes: 1 addition & 1 deletion jason/src/utils/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<T: Deref<Target = EventTarget>, A: FromWasmAbi + 'static>
where
F: FnOnce(A) + 'static,
{
let closure: Closure<FnMut(A)> = Closure::once(closure);
let closure: Closure<dyn FnMut(A)> = Closure::once(closure);

target.add_event_listener_with_callback(
event_name,
Expand Down
4 changes: 2 additions & 2 deletions proto/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ pub struct Track {
pub media_type: MediaType,
}

/// Representation of [RTCIceServer][1] (item of `iceServers` field
/// from [RTCConfiguration][2]).
/// Representation of [`RTCIceServer`][1] (item of `iceServers` field
/// from [`RTCConfiguration`][2]).
///
/// [1]: https://developer.mozilla.org/en-US/docs/Web/API/RTCIceServer
/// [2]: https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration
Expand Down
2 changes: 1 addition & 1 deletion src/api/client/rpc_connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`RpcConnection`] with related messages.

use core::fmt;
use std::fmt;

use actix::Message;
use futures::Future;
Expand Down
12 changes: 10 additions & 2 deletions src/api/client/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,18 @@ pub fn run(rooms: RoomsRepository, config: Conf) {
.unwrap()
.start();

info!("Started HTTP server on 0.0.0.0:8080");
info!("Started HTTP server on {:?}", server_addr);
}

#[cfg(test)]
mod test {
use std::{ops::Add, thread, time::Duration};

use actix::Arbiter;
use actix_web::{http, test, App};
use actix_web::{
actix::{actors::signal, System},
http, test, App,
};
use futures::Stream;

use crate::{
Expand All @@ -132,13 +135,18 @@ mod test {
ice_user: None
},
};

let process_signals =
System::current().registry().get::<signal::ProcessSignals>();

let room = Arbiter::start(move |_| {
Room::new(
1,
members,
create_peers(1, 2),
conf.reconnect_timeout,
new_turn_auth_service_mock(),
process_signals,
)
});
let rooms = hashmap! {1 => room};
Expand Down
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod media;
pub mod signalling;
pub mod turn;

use actix::prelude::*;
use actix::{actors::signal, prelude::Arbiter, System};
use dotenv::dotenv;
use log::prelude::*;

Expand All @@ -26,9 +26,7 @@ fn main() {
let logger = log::new_dual_logger(std::io::stdout(), std::io::stderr());
let _scope_guard = slog_scope::set_global_logger(logger);
slog_stdlog::init().unwrap();

let sys = System::new("medea");

let config = Conf::parse().unwrap();

info!("{:?}", config);
Expand All @@ -39,6 +37,9 @@ fn main() {
};
let peers = create_peers(1, 2);

let process_signals =
System::current().registry().get::<signal::ProcessSignals>();

let turn_auth_service =
new_turn_auth_service(&config).expect("Unable to start turn service");
let room = Room::new(
Expand All @@ -47,6 +48,7 @@ fn main() {
peers,
config.rpc.reconnect_timeout,
turn_auth_service,
process_signals,
);
let room = Arbiter::start(move |_| room);
let rooms = hashmap! {1 => room};
Expand Down
68 changes: 63 additions & 5 deletions src/signalling/room.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Room definitions and implementations. Room is responsible for media
//! connection establishment between concrete [`Member`]s.

use std::time::Duration;
use std::{fmt, time::Duration};

use actix::{
fut::wrap_future, Actor, ActorFuture, AsyncContext, Context, Handler,
Message,
actors::signal::{self, ProcessSignals, Subscribe},
fut::wrap_future,
Actor, ActorFuture, Addr, AsyncContext, Context, Handler, Message,
};
use failure::Fail;
use futures::future;
Expand Down Expand Up @@ -37,7 +38,31 @@ pub type Id = u64;
pub type ActFuture<I, E> =
Box<dyn ActorFuture<Actor = Room, Item = I, Error = E>>;

#[derive(Fail, Debug)]
macro_rules! important_quit_signals {
($msg: expr, $action: expr) => {
match $msg {
signal::SignalType::Int => {
error!("SIGINT received by Room, exiting");
$action();
}
signal::SignalType::Hup => {
error!("SIGHUP received by Room, reloading");
$action();
}
signal::SignalType::Term => {
error!("SIGTERM received by Room, stopping");
$action();
}
signal::SignalType::Quit => {
error!("SIGQUIT received by Room, exiting");
$action();
}
_ => (),
}
};
}

#[derive(Debug, Fail)]
#[allow(clippy::module_name_repetitions)]
pub enum RoomError {
#[fail(display = "Couldn't find Peer with [id = {}]", _0)]
Expand Down Expand Up @@ -65,7 +90,6 @@ impl From<PeerStateError> for RoomError {
}

/// Media server room with its [`Member`]s.
#[derive(Debug)]
pub struct Room {
id: Id,

Expand All @@ -74,6 +98,9 @@ pub struct Room {

/// [`Peer`]s of [`Member`]s in this [`Room`].
peers: PeerRepository,

/// Actix addr of [`ProcessSignals`]
process_signals: Addr<ProcessSignals>,
}

impl Room {
Expand All @@ -84,6 +111,7 @@ impl Room {
peers: HashMap<PeerId, PeerStateMachine>,
reconnect_timeout: Duration,
turn: Box<dyn TurnAuthService>,
process_signals: Addr<ProcessSignals>,
) -> Self {
Self {
id,
Expand All @@ -94,6 +122,7 @@ impl Room {
turn,
reconnect_timeout,
),
process_signals,
}
}

Expand Down Expand Up @@ -261,11 +290,26 @@ impl Room {
}
}

impl fmt::Debug for Room {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nWacky ,

Тут кастомный Debug не нужен, derive нормально отрабатывает.

fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Room {{ id: {:?}, participants: {:?}, peers: {:?} }}",
self.id, self.participants, self.peers
)
}
}

/// [`Actor`] implementation that provides an ergonomic way
/// to interact with [`Room`].
// TODO: close connections on signal (gracefull shutdown)
impl Actor for Room {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
self.process_signals
.do_send(Subscribe(ctx.address().recipient()));
}
}

impl Handler<Authorize> for Room {
Expand Down Expand Up @@ -429,6 +473,15 @@ impl Handler<CloseRoom> for Room {
}
}

// Close room on `SIGINT`, `SIGTERM`, `SIGQUIT` signals.
impl Handler<signal::Signal> for Room {
type Result = ();

fn handle(&mut self, msg: signal::Signal, ctx: &mut Self::Context) {
important_quit_signals!(msg.0, || { ctx.notify(CloseRoom {}) });
}
}

impl Handler<RpcConnectionClosed> for Room {
type Result = ();

Expand Down Expand Up @@ -474,13 +527,18 @@ mod test {
ice_user: None
},
};

let process_signals =
System::current().registry().get::<signal::ProcessSignals>();

Arbiter::start(move |_| {
Room::new(
1,
members,
create_peers(1, 2),
Duration::from_secs(10),
new_turn_auth_service_mock(),
process_signals,
)
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/signalling/room_repo.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Repository that stores [`Room`]s addresses.

use std::sync::{Arc, Mutex};

use actix::Addr;
use hashbrown::HashMap;

use std::sync::{Arc, Mutex};

use crate::signalling::{Room, RoomId};

/// Repository that stores [`Room`]s addresses.
Expand Down
4 changes: 2 additions & 2 deletions src/turn/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl TurnAuthService for Addr<Service> {
member_id: u64,
room_id: RoomId,
policy: UnreachablePolicy,
) -> Box<Future<Item = IceUser, Error = TurnServiceErr>> {
) -> Box<dyn Future<Item = IceUser, Error = TurnServiceErr>> {
Box::new(
self.send(CreateIceUser {
member_id,
Expand All @@ -69,7 +69,7 @@ impl TurnAuthService for Addr<Service> {
fn delete(
&self,
users: Vec<IceUser>,
) -> Box<Future<Item = (), Error = TurnServiceErr>> {
) -> Box<dyn Future<Item = (), Error = TurnServiceErr>> {
// leave only non static users
let users: Vec<IceUser> =
users.into_iter().filter(|u| !u.is_static()).collect();
Expand Down