From 55cb22787d6d6aceeb05eaa35f75e6a84ec4002b Mon Sep 17 00:00:00 2001 From: evdokimovs <49490279+evdokimovs@users.noreply.github.com> Date: Fri, 26 Jun 2020 12:48:48 +0300 Subject: [PATCH] Implement dynamic tracks addition with renegotiation (#105, #27) - support adding new endpoints to the already interconnected Members - implement PeerConnection renegotiation - add TracksApplied event --- CHANGELOG.md | 7 +- Cargo.lock | 8 +- ...lient-api.md => 0002-webrtc-client-api.md} | 15 +- jason/CHANGELOG.md | 5 +- jason/src/api/room.rs | 118 ++++- jason/src/peer/media/mod.rs | 8 +- jason/src/peer/mod.rs | 25 +- jason/tests/api/room.rs | 128 ++---- jason/tests/peer/media.rs | 6 +- proto/client-api/CHANGELOG.md | 8 +- proto/client-api/src/lib.rs | 56 ++- src/api/client/rpc_connection.rs | 4 +- src/media/mod.rs | 2 +- src/media/peer.rs | 426 +++++++++++------- src/signalling/elements/member.rs | 14 + src/signalling/peers/mod.rs | 45 +- src/signalling/peers/traffic_watcher.rs | 43 +- src/signalling/room/command_handler.rs | 46 +- src/signalling/room/dynamic_api.rs | 44 +- src/signalling/room/mod.rs | 266 +++++++---- src/signalling/room/rpc_server.rs | 58 +-- src/utils/actix_try_join_all.rs | 140 ++++++ src/utils/mod.rs | 18 + tests/e2e/grpc_control_api/create.rs | 100 +++- tests/e2e/main.rs | 15 + tests/e2e/signalling/mod.rs | 146 +++++- tests/e2e/signalling/pub_sub_signallng.rs | 10 +- 27 files changed, 1228 insertions(+), 533 deletions(-) rename docs/rfc/{0002-webrc-client-api.md => 0002-webrtc-client-api.md} (99%) create mode 100644 src/utils/actix_try_join_all.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a65d2cab..2331af73e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,7 +37,8 @@ All user visible changes to this project will be documented in this file. This p - Send reason of closing WebSocket connection as [Close](https://tools.ietf.org/html/rfc4566#section-5.14) frame's description ([#58]); - Send `Event::RpcSettingsUpdated` when `Member` connects ([#75]); - Send relay mode in `Event::PeerCreated` which is used for configuring client's `RtcIceTransportPolicy` ([#79]); - - Send `Command::UpdateTracks` on `Event::TracksUpdated` ([#81]). + - Emit `TracksApplied` event to create new and update existing tracks ([#105]); + - `PeerConnection` renegotiation functionality ([#105]). - [Coturn] integration: - [Coturn] sessions destroying ([#84]); - [Coturn] stats processing ([#94]). @@ -55,7 +56,8 @@ All user visible changes to this project will be documented in this file. This p ### Fixed - Signalling: - - Room crashing when handling commands with non-existent `peer_id` ([#86]). + - Room crashing when handling commands with non-existent `peer_id` ([#86]); + - Adding new endpoints to the already interconnected `Member`s ([#105]). [#28]: /../../pull/28 [#33]: /../../pull/33 @@ -70,6 +72,7 @@ All user visible changes to this project will be documented in this file. This p [#94]: /../../pull/94 [#95]: /../../pull/95 [#98]: /../../pull/98 +[#105]: /../../pull/105 diff --git a/Cargo.lock b/Cargo.lock index c8e707928..ebd5e7bd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,9 +349,9 @@ checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" [[package]] name = "aho-corasick" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c259a748ac706ba73d609b73fc13469e128337f9a6b2fb3cc82d100f8dd8d511" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" dependencies = [ "memchr", ] @@ -3070,9 +3070,9 @@ checksum = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" [[package]] name = "unicode-xid" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" [[package]] name = "unreachable" diff --git a/docs/rfc/0002-webrc-client-api.md b/docs/rfc/0002-webrtc-client-api.md similarity index 99% rename from docs/rfc/0002-webrc-client-api.md rename to docs/rfc/0002-webrtc-client-api.md index 035200ea8..65e4c45f6 100644 --- a/docs/rfc/0002-webrc-client-api.md +++ b/docs/rfc/0002-webrtc-client-api.md @@ -703,19 +703,6 @@ It's recommended to cache `Peer` ID and `Member` ID relations in `Web Client`'s ``` -#### 10. TracksUpdated - -`Media Server` notifies about necessity to update `Track`s in specified `Peer`. - -Can be used to update existing `Track` settings (e.g. change to lower video resolution, mute audio). - -```rust -struct TracksUpdated { - peer_id: PeerId, - tracks_patches: Vec, -} -``` - ### Commands @@ -1102,7 +1089,7 @@ Metrics list will be extended as needed. #### 10. UpdateTracks -`Web Client` asks permission to update `Track`s in specified `Peer`. `Media Server` gives permission by sending `Event::TracksUpdated`. +`Web Client` asks permission to update `Track`s in specified `Peer`. `Media Server` gives permission by sending `Event::TracksApplied`. ```rust struct UpdateTracks { diff --git a/jason/CHANGELOG.md b/jason/CHANGELOG.md index 0074e3c14..e25c078eb 100644 --- a/jason/CHANGELOG.md +++ b/jason/CHANGELOG.md @@ -57,7 +57,9 @@ All user visible changes to this project will be documented in this file. This p - Emitting of RPC commands: - `AddPeerConnectionMetrics` with `IceConnectionState` and `PeerConnectionState` ([#71], [#87]); - `ApplyTracks` for muting/unmuting ([#81]); - - `AddPeerConnectionStats` with `RtcStats` ([#90]). + - `AddPeerConnectionStats` with `RtcStats` ([#90]); + - Handling of RPC events: + - `TracksApplied` ([#105]). - Error handling: - Library API: - `JasonError` as library error with trace information and underlying JS error if it is the cause ([#55]) @@ -80,6 +82,7 @@ All user visible changes to this project will be documented in this file. This p [#87]: /../../pull/87 [#90]: /../../pull/90 [#97]: /../../pull/97 +[#105]: /../../pull/105 [#106]: /../../pull/106 diff --git a/jason/src/api/room.rs b/jason/src/api/room.rs index 5186d0e7e..8a53c5462 100644 --- a/jason/src/api/room.rs +++ b/jason/src/api/room.rs @@ -15,8 +15,8 @@ use futures::{ use js_sys::Promise; use medea_client_api_proto::{ Command, Direction, Event as RpcEvent, EventHandler, IceCandidate, - IceConnectionState, IceServer, PeerConnectionState, PeerId, PeerMetrics, - Track, TrackId, TrackPatch, + IceConnectionState, IceServer, NegotiationRole, PeerConnectionState, + PeerId, PeerMetrics, Track, TrackId, TrackPatch, TrackUpdate, }; use tracerr::Traced; use wasm_bindgen::{prelude::*, JsValue}; @@ -780,7 +780,7 @@ impl EventHandler for InnerRoom { fn on_peer_created( &mut self, peer_id: PeerId, - sdp_offer: Option, + negotiation_role: NegotiationRole, tracks: Vec, ice_servers: Vec, is_force_relayed: bool, @@ -804,13 +804,16 @@ impl EventHandler for InnerRoom { self.create_connections_from_tracks(&tracks); + // TODO(alexlapa): Eliminate code duplication (on_tracks_applied). + // Doing Room refactoring in another PR, it`ll allow + // to fix this smoothly. let local_stream_constraints = self.local_stream_settings.clone(); let rpc = Rc::clone(&self.rpc); let error_callback = Rc::clone(&self.on_failed_local_stream); spawn_local( async move { - match sdp_offer { - None => { + match negotiation_role { + NegotiationRole::Offerer => { let sdp_offer = peer .get_offer(tracks, local_stream_constraints) .await @@ -826,7 +829,7 @@ impl EventHandler for InnerRoom { mids, }); } - Some(offer) => { + NegotiationRole::Answerer(offer) => { let sdp_answer = peer .process_offer( offer, @@ -916,16 +919,107 @@ impl EventHandler for InnerRoom { }); } - /// Updates [`Track`]s of this [`Room`]. - fn on_tracks_updated(&mut self, peer_id: PeerId, tracks: Vec) { - if let Some(peer) = self.peers.get(peer_id) { - if let Err(err) = peer.update_senders(tracks) { - JasonError::from(err).print(); - } + /// Creates new `Track`s, updates existing [`Sender`]s/[`Receiver`]s with + /// [`TrackUpdate`]s. + /// + /// Will start renegotiation process if `Some` [`NegotiationRole`] is + /// provided. + fn on_tracks_applied( + &mut self, + peer_id: PeerId, + updates: Vec, + negotiation_role: Option, + ) { + let peer = if let Some(peer) = self.peers.get(peer_id) { + peer } else { JasonError::from(tracerr::new!(RoomError::NoSuchPeer(peer_id))) .print(); + return; + }; + + let mut new_tracks = Vec::new(); + let mut patches = Vec::new(); + + for update in updates { + match update { + TrackUpdate::Added(track) => { + new_tracks.push(track); + } + TrackUpdate::Updated(track_patch) => { + patches.push(track_patch); + } + } } + if let Err(err) = peer.update_senders(patches) { + JasonError::from(err).print(); + return; + } + + // TODO(alexlapa): Eliminate code duplication (on_peer_created). + // Doing Room refactoring in another PR, it`ll allow + // to fix this smoothly. + let local_stream_constraints = self.local_stream_settings.clone(); + let rpc = Rc::clone(&self.rpc); + let error_callback = Rc::clone(&self.on_failed_local_stream); + spawn_local( + async move { + match negotiation_role { + None => { + peer.create_tracks(new_tracks) + .await + .map_err(tracerr::map_from_and_wrap!())?; + } + Some(NegotiationRole::Offerer) => { + let sdp_offer = peer + .get_offer(new_tracks, local_stream_constraints) + .await + .map_err(tracerr::map_from_and_wrap!())?; + let mids = peer + .get_mids() + .map_err(tracerr::map_from_and_wrap!())?; + let senders_statuses = peer.get_senders_statuses(); + rpc.send_command(Command::MakeSdpOffer { + peer_id, + sdp_offer, + senders_statuses, + mids, + }); + } + Some(NegotiationRole::Answerer(offer)) => { + let sdp_answer = peer + .process_offer( + offer, + new_tracks, + local_stream_constraints, + ) + .await + .map_err(tracerr::map_from_and_wrap!())?; + let senders_statuses = peer.get_senders_statuses(); + rpc.send_command(Command::MakeSdpAnswer { + peer_id, + sdp_answer, + senders_statuses, + }); + } + }; + Result::<_, Traced>::Ok(()) + } + .then(|result| async move { + if let Err(err) = result { + let (err, trace) = err.into_parts(); + match err { + RoomError::InvalidLocalStream(_) + | RoomError::CouldNotGetLocalMedia(_) => { + let e = JasonError::from((err, trace)); + e.print(); + error_callback.call(e); + } + _ => JasonError::from((err, trace)).print(), + }; + }; + }), + ) } } diff --git a/jason/src/peer/media/mod.rs b/jason/src/peer/media/mod.rs index 90c7032d0..dff13404a 100644 --- a/jason/src/peer/media/mod.rs +++ b/jason/src/peer/media/mod.rs @@ -243,18 +243,14 @@ impl MediaConnections { out } - /// Synchronizes local state with provided tracks. Creates new [`Sender`]s - /// and [`Receiver`]s for each new [`Track`], and updates [`Track`] if - /// its settings has been changed. + /// Creates new [`Sender`]s and [`Receiver`]s for each new [`Track`]. /// /// # Errors /// /// With [`MediaConnectionsError::TransceiverNotFound`] if could not create /// new [`Sender`] cause transceiver with specified `mid` does not /// exist. - // TODO: Doesnt really updates anything, but only generates new senders - // and receivers atm. - pub fn update_tracks>( + pub fn create_tracks>( &self, tracks: I, ) -> Result<()> { diff --git a/jason/src/peer/mod.rs b/jason/src/peer/mod.rs index 2cc8660af..0c6185d43 100644 --- a/jason/src/peer/mod.rs +++ b/jason/src/peer/mod.rs @@ -576,7 +576,7 @@ impl PeerConnection { local_stream: Option, ) -> Result { self.media_connections - .update_tracks(tracks) + .create_tracks(tracks) .map_err(tracerr::map_from_and_wrap!())?; self.update_local_stream(local_stream) @@ -588,10 +588,22 @@ impl PeerConnection { .create_and_set_offer() .await .map_err(tracerr::map_from_and_wrap!())?; - Ok(offer) } + /// Creates new [`Sender`]s and [`Receiver`]s for each new [`Track`]. + /// + /// # Errors + /// + /// With [`MediaConnectionsError::TransceiverNotFound`] if could not create + /// new [`Sender`] because transceiver with specified `mid` doesn't exist. + pub async fn create_tracks(&self, tracks: Vec) -> Result<()> { + self.media_connections + .create_tracks(tracks) + .map_err(tracerr::map_from_and_wrap!())?; + Ok(()) + } + /// Inserts provided [MediaStream][1] into underlying [RTCPeerConnection][2] /// if it has all required tracks. /// Requests local stream from [`MediaManager`] if no stream was provided. @@ -775,17 +787,20 @@ impl PeerConnection { Direction::Recv { .. } => true, }); - // update receivers + // create receivers self.media_connections - .update_tracks(recv) + .create_tracks(recv) .map_err(tracerr::map_from_and_wrap!())?; + // set offer, which will create transceivers and discover remote tracks + // in receivers self.set_remote_offer(offer) .await .map_err(tracerr::wrap!())?; + // create senders self.media_connections - .update_tracks(send) + .create_tracks(send) .map_err(tracerr::map_from_and_wrap!())?; self.update_local_stream(local_constraints) diff --git a/jason/tests/api/room.rs b/jason/tests/api/room.rs index c28c3ce19..4d80f96e4 100644 --- a/jason/tests/api/room.rs +++ b/jason/tests/api/room.rs @@ -1,17 +1,19 @@ #![cfg(target_arch = "wasm32")] -use std::rc::Rc; +use std::{collections::HashMap, rc::Rc}; use futures::{ channel::{mpsc, oneshot}, - stream::{self, StreamExt as _}, + stream::{self, BoxStream, StreamExt as _}, +}; +use medea_client_api_proto::{ + Command, Event, NegotiationRole, PeerId, TrackUpdate, }; -use medea_client_api_proto::{Command, Event, IceServer, PeerId}; use medea_jason::{ api::Room, media::{AudioTrackConstraints, MediaManager, MediaStreamSettings}, peer::{ - MockPeerRepository, PeerConnection, PeerEvent, StableMuteState, + MockPeerRepository, PeerConnection, Repository, StableMuteState, TransceiverKind, }, rpc::MockRpcClient, @@ -65,9 +67,13 @@ fn get_test_room_and_exist_peer( tracks_patches, } => { event_tx - .unbounded_send(Event::TracksUpdated { + .unbounded_send(Event::TracksApplied { peer_id, - tracks_patches, + updates: tracks_patches + .into_iter() + .map(TrackUpdate::Updated) + .collect(), + negotiation_role: None, }) .unwrap(); } @@ -312,34 +318,10 @@ async fn join_unmute_and_mute_audio() { )); } -fn get_test_room_and_new_peer( - event_rx: mpsc::UnboundedReceiver, -) -> (Room, Rc) { +fn get_test_room(events: BoxStream<'static, Event>) -> Room { let mut rpc = MockRpcClient::new(); - let mut repo = Box::new(MockPeerRepository::new()); - rpc.expect_subscribe() - .return_once(move || Box::pin(event_rx)); - repo.expect_get_all().returning(|| Vec::new()); - let (tx, _rx) = mpsc::unbounded(); - let peer = PeerConnection::new( - PeerId(1), - tx, - Vec::new(), - Rc::new(MediaManager::default()), - false, - ) - .unwrap(); - let peer_clone = Rc::clone(&peer); - repo.expect_create_peer() - .withf( - move |id: &PeerId, - _ice_servers: &Vec, - _peer_events_sender: &mpsc::UnboundedSender, - _is_force_relay: &bool| { *id == PeerId(1) }, - ) - .return_once_st(move |_, _, _, _| Ok(peer_clone)); - rpc.expect_send_command().return_const(()); + rpc.expect_subscribe().return_once(move || events); rpc.expect_unsub().return_const(()); rpc.expect_set_close_reason().return_const(()); rpc.expect_on_connection_loss() @@ -347,8 +329,7 @@ fn get_test_room_and_new_peer( rpc.expect_on_reconnected() .return_once(|| stream::pending().boxed_local()); - let room = Room::new(Rc::new(rpc), repo); - (room, peer) + Room::new(Rc::new(rpc), Box::new(Repository::new(Rc::default()))) } // TODO: Allow muting before Peer init (instrumentisto/medea#85). @@ -414,9 +395,9 @@ fn get_test_room_and_new_peer( #[wasm_bindgen_test] async fn error_inject_invalid_local_stream_into_new_peer() { let (event_tx, event_rx) = mpsc::unbounded(); - let (room, _peer) = get_test_room_and_new_peer(event_rx); - + let room = get_test_room(Box::pin(event_rx)); let room_handle = room.new_handle(); + let (cb, test_result) = js_callback!(|err: JasonError| { cb_assert_eq!(&err.name(), "InvalidLocalStream"); cb_assert_eq!( @@ -439,7 +420,7 @@ async fn error_inject_invalid_local_stream_into_new_peer() { event_tx .unbounded_send(Event::PeerCreated { peer_id: PeerId(1), - sdp_offer: None, + negotiation_role: NegotiationRole::Offerer, tracks: vec![audio_track, video_track], ice_servers: Vec::new(), force_relay: false, @@ -537,8 +518,7 @@ async fn no_errors_if_track_not_provided_when_its_optional() { #[wasm_bindgen_test] async fn error_get_local_stream_on_new_peer() { let (event_tx, event_rx) = mpsc::unbounded(); - let (room, _peer) = get_test_room_and_new_peer(event_rx); - + let room = get_test_room(Box::pin(event_rx)); let room_handle = room.new_handle(); let (cb, test_result) = js_callback!(|err: JasonError| { @@ -560,7 +540,7 @@ async fn error_get_local_stream_on_new_peer() { event_tx .unbounded_send(Event::PeerCreated { peer_id: PeerId(1), - sdp_offer: None, + negotiation_role: NegotiationRole::Offerer, tracks: vec![audio_track, video_track], ice_servers: Vec::new(), force_relay: false, @@ -580,20 +560,9 @@ async fn error_get_local_stream_on_new_peer() { /// 1. Room::join returns error. #[wasm_bindgen_test] async fn error_join_room_without_on_failed_stream_callback() { - let (_, event_rx) = mpsc::unbounded(); - let mut rpc = MockRpcClient::new(); - rpc.expect_subscribe() - .return_once(move || Box::pin(event_rx)); - rpc.expect_unsub().return_const(()); - rpc.expect_set_close_reason().return_const(()); - rpc.expect_on_connection_loss() - .return_once(|| stream::pending().boxed_local()); - rpc.expect_on_reconnected() - .return_once(|| stream::pending().boxed_local()); - let repo = Box::new(MockPeerRepository::new()); - let room = Room::new(Rc::new(rpc), repo); - + let room = get_test_room(stream::pending().boxed()); let room_handle = room.new_handle(); + room_handle .on_connection_loss(js_sys::Function::new_no_args("")) .unwrap(); @@ -620,20 +589,9 @@ async fn error_join_room_without_on_failed_stream_callback() { /// 1. Room::join returns error. #[wasm_bindgen_test] async fn error_join_room_without_on_connection_loss_callback() { - let (_, event_rx) = mpsc::unbounded(); - let mut rpc = MockRpcClient::new(); - rpc.expect_subscribe() - .return_once(move || Box::pin(event_rx)); - rpc.expect_unsub().return_const(()); - rpc.expect_set_close_reason().return_const(()); - rpc.expect_on_connection_loss() - .return_once(|| stream::pending().boxed_local()); - rpc.expect_on_reconnected() - .return_once(|| stream::pending().boxed_local()); - let repo = Box::new(MockPeerRepository::new()); - let room = Room::new(Rc::new(rpc), repo); - + let room = get_test_room(stream::pending().boxed()); let room_handle = room.new_handle(); + room_handle .on_failed_local_stream(js_sys::Function::new_no_args("")) .unwrap(); @@ -653,15 +611,8 @@ async fn error_join_room_without_on_connection_loss_callback() { /// Tests for `RoomHandle.on_close` JS side callback. mod on_close_callback { - use std::rc::Rc; - - use futures::channel::mpsc; use medea_client_api_proto::CloseReason as CloseByServerReason; - use medea_jason::{ - api::Room, - peer::MockPeerRepository, - rpc::{ClientDisconnect, CloseReason, MockRpcClient}, - }; + use medea_jason::rpc::{ClientDisconnect, CloseReason}; use wasm_bindgen::{prelude::*, JsValue}; use wasm_bindgen_test::*; @@ -684,25 +635,6 @@ mod on_close_callback { fn get_is_err(reason: &JsValue) -> bool; } - /// Returns empty [`Room`] with mocks inside. - fn get_room() -> Room { - let mut rpc = MockRpcClient::new(); - let repo = Box::new(MockPeerRepository::new()); - - let (_event_tx, event_rx) = mpsc::unbounded(); - rpc.expect_subscribe() - .return_once(move || Box::pin(event_rx)); - rpc.expect_send_command().return_const(()); - rpc.expect_unsub().return_const(()); - rpc.expect_set_close_reason().return_const(()); - rpc.expect_on_connection_loss() - .return_once(|| stream::pending().boxed_local()); - rpc.expect_on_reconnected() - .return_once(|| stream::pending().boxed_local()); - - Room::new(Rc::new(rpc), repo) - } - /// Tests that JS side [`RoomHandle::on_close`] works. /// /// # Algorithm @@ -714,7 +646,7 @@ mod on_close_callback { /// 3. Check that JS callback was called with this reason. #[wasm_bindgen_test] async fn closed_by_server() { - let room = get_room(); + let room = get_test_room(stream::pending().boxed()); let mut room_handle = room.new_handle(); let (cb, test_result) = js_callback!(|closed: JsValue| { @@ -742,7 +674,7 @@ mod on_close_callback { /// RoomUnexpectedlyDropped`. #[wasm_bindgen_test] async fn unexpected_room_drop() { - let room = get_room(); + let room = get_test_room(stream::pending().boxed()); let mut room_handle = room.new_handle(); let (cb, test_result) = js_callback!(|closed: JsValue| { @@ -767,7 +699,7 @@ mod on_close_callback { /// 3. Check that JS callback was called with this [`CloseReason`]. #[wasm_bindgen_test] async fn normal_close_by_client() { - let room = get_room(); + let room = get_test_room(stream::pending().boxed()); let mut room_handle = room.new_handle(); let (cb, test_result) = js_callback!(|closed: JsValue| { @@ -798,7 +730,6 @@ mod rpc_close_reason_on_room_drop { /// with [`RpcClient`]'s close reason ([`ClientDisconnect`]). async fn get_client() -> (Room, oneshot::Receiver) { let mut rpc = MockRpcClient::new(); - let repo = Box::new(MockPeerRepository::new()); let (_event_tx, event_rx) = mpsc::unbounded(); rpc.expect_subscribe() @@ -813,7 +744,7 @@ mod rpc_close_reason_on_room_drop { rpc.expect_set_close_reason().return_once(move |reason| { test_tx.send(reason).unwrap(); }); - let room = Room::new(Rc::new(rpc), repo); + let room = Room::new(Rc::new(rpc), Box::new(MockPeerRepository::new())); (room, test_rx) } @@ -876,7 +807,6 @@ mod rpc_close_reason_on_room_drop { /// Tests for [`TrackPatch`] generation in [`Room`]. mod patches_generation { - use std::collections::HashMap; use futures::StreamExt; use medea_client_api_proto::{ diff --git a/jason/tests/peer/media.rs b/jason/tests/peer/media.rs index 31f88b276..136815d4f 100644 --- a/jason/tests/peer/media.rs +++ b/jason/tests/peer/media.rs @@ -33,7 +33,7 @@ async fn get_test_media_connections( let audio_track_id = audio_track.id; let video_track_id = video_track.id; media_connections - .update_tracks(vec![audio_track, video_track]) + .create_tracks(vec![audio_track, video_track]) .unwrap(); let request = media_connections.get_stream_request().unwrap(); let caps = SimpleStreamRequest::try_from(request).unwrap(); @@ -70,7 +70,7 @@ fn get_stream_request1() { ); let (audio_track, video_track) = get_test_unrequired_tracks(false, false); media_connections - .update_tracks(vec![audio_track, video_track]) + .create_tracks(vec![audio_track, video_track]) .unwrap(); let request = media_connections.get_stream_request(); assert!(request.is_some()); @@ -85,7 +85,7 @@ fn get_stream_request2() { Rc::new(RtcPeerConnection::new(Vec::new(), false).unwrap()), tx, ); - media_connections.update_tracks(Vec::new()).unwrap(); + media_connections.create_tracks(Vec::new()).unwrap(); let request = media_connections.get_stream_request(); assert!(request.is_none()); } diff --git a/proto/client-api/CHANGELOG.md b/proto/client-api/CHANGELOG.md index 16c614c6f..44a0c321f 100644 --- a/proto/client-api/CHANGELOG.md +++ b/proto/client-api/CHANGELOG.md @@ -25,8 +25,8 @@ All user visible changes to this project will be documented in this file. This p - `AddPeerConnectionMetrics` client command with `IceConnectionState` and `PeerConnectionState` metrics ([#71], [#87]); - `RpcSettings` server message ([#75]); - `force_relay` field to `PeerCreated` event ([#79]); -- `UpdateTracks` command and `TracksUpdated` event ([#81]); -- `StatsUpdate` metric into `AddPeerConnectionMetrics` command ([#90]). +- `UpdateTracks` command ([#81]); +- `StatsUpdate` metric into `AddPeerConnectionMetrics` command ([#90]); - `RTCPeerConnection` stats ([#90]): - `RtcCodecStats`; - `RtcInboundRtpStreamStats`; @@ -49,7 +49,8 @@ All user visible changes to this project will be documented in this file. This p - `RtcCertificateStats`; - `RtcIceServerStats`. - `Cancelled` state to the `KnownIceCandidatePairState` ([#102]); -- `is_required` field to `AudioSettings` and `VideoSettings` ([#106]). +- `is_required` field to `AudioSettings` and `VideoSettings` ([#106]); +- `TracksApplied` event with `TrackUpdate::Updated` and `TrackUpdate::Added` variants ([#81], [#105]). [#28]: /../../pull/28 [#58]: /../../pull/58 @@ -60,6 +61,7 @@ All user visible changes to this project will be documented in this file. This p [#87]: /../../pull/87 [#90]: /../../pull/90 [#102]: /../../pull/102 +[#105]: /../../pull/105 [#106]: /../../pull/106 diff --git a/proto/client-api/src/lib.rs b/proto/client-api/src/lib.rs index c10261e66..2bc463f07 100644 --- a/proto/client-api/src/lib.rs +++ b/proto/client-api/src/lib.rs @@ -130,6 +130,7 @@ pub enum Command { /// Publishing statuses of the senders from this Peer. senders_statuses: HashMap, }, + /// Web Client sends SDP Answer. MakeSdpAnswer { peer_id: PeerId, @@ -137,18 +138,21 @@ pub enum Command { /// Publishing statuses of the senders from this Peer. senders_statuses: HashMap, }, + /// Web Client sends Ice Candidate. SetIceCandidate { peer_id: PeerId, candidate: IceCandidate, }, + /// Web Client sends Peer Connection metrics. AddPeerConnectionMetrics { peer_id: PeerId, metrics: PeerMetrics, }, + /// Web Client asks permission to update [`Track`]s in specified Peer. - /// Media Server gives permission by sending [`Event::TracksUpdated`]. + /// Media Server gives permission by sending [`Event::TracksApplied`]. UpdateTracks { peer_id: PeerId, tracks_patches: Vec, @@ -244,7 +248,7 @@ pub enum Event { /// creation. PeerCreated { peer_id: PeerId, - sdp_offer: Option, + negotiation_role: NegotiationRole, tracks: Vec, ice_servers: Vec, force_relay: bool, @@ -266,16 +270,52 @@ pub enum Event { PeersRemoved { peer_ids: Vec }, /// Media Server notifies about necessity to update [`Track`]s in specified - /// Peer. - /// - /// Can be used to update existing [`Track`] settings (e.g. change to lower - /// video resolution, mute audio). - TracksUpdated { + /// `Peer`. + TracksApplied { + /// [`PeerId`] of `Peer` where [`Track`]s should be updated. peer_id: PeerId, - tracks_patches: Vec, + + /// List of [`TrackUpdate`]s which should be applied. + updates: Vec, + + /// Negotiation role basing on which should be sent + /// [`Command::MakeSdpOffer`] or [`Command::MakeSdpAnswer`]. + /// + /// If `None` then no renegotiation should be done. + negotiation_role: Option, }, } +/// `Peer`'s negotiation role. +/// +/// Some [`Event`]s can trigger SDP negotiation. +/// - If [`Event`] contains [`NegotiationRole::Offerer`], then `Peer` is +/// expected to create SDP Offer and send it via [`Command::MakeSdpOffer`]. +/// - If [`Event`] contains [`NegotiationRole::Answerer`], then `Peer` is +/// expected to apply provided SDP Offer and provide its SDP Answer in a +/// [`Command::MakeSdpAnswer`]. +#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))] +#[cfg_attr(feature = "jason", derive(Deserialize))] +pub enum NegotiationRole { + /// [`Command::MakeSdpOffer`] should be sent by client. + Offerer, + + /// [`Command::MakeSdpAnswer`] should be sent by client. + Answerer(String), +} + +/// [`Track`] update which should be applied to the `Peer`. +#[cfg_attr(feature = "medea", derive(Clone, Debug, Eq, PartialEq, Serialize))] +#[cfg_attr(feature = "jason", derive(Deserialize))] +pub enum TrackUpdate { + /// New [`Track`] should be added to the `Peer`. + Added(Track), + + /// [`Track`] should be updated by this [`TrackPatch`] in the `Peer`. + /// Can only refer tracks already known to the `Peer`. + Updated(TrackPatch), +} + /// Represents [RTCIceCandidateInit][1] object. /// /// [1]: https://www.w3.org/TR/webrtc/#dom-rtcicecandidateinit diff --git a/src/api/client/rpc_connection.rs b/src/api/client/rpc_connection.rs index 54bb51b89..ad29bb68b 100644 --- a/src/api/client/rpc_connection.rs +++ b/src/api/client/rpc_connection.rs @@ -9,7 +9,7 @@ use derive_more::{From, Into}; use futures::future::LocalBoxFuture; use medea_client_api_proto::{CloseDescription, Command, Event}; -use crate::api::control::MemberId; +use crate::{api::control::MemberId, signalling::room::RoomError}; /// Newtype for [`Command`] with actix [`Message`] implementation. #[derive(Message)] @@ -106,7 +106,7 @@ pub enum AuthorizationError { /// /// [`Member`]: crate::signalling::elements::member::Member #[derive(Debug, Message)] -#[rtype(result = "Result<(), ()>")] +#[rtype(result = "Result<(), RoomError>")] pub struct RpcConnectionEstablished { /// ID of [`Member`] that establishes [`RpcConnection`]. /// diff --git a/src/media/mod.rs b/src/media/mod.rs index a362ee1f5..4006107ab 100644 --- a/src/media/mod.rs +++ b/src/media/mod.rs @@ -8,7 +8,7 @@ pub mod track; pub use self::{ ice_user::{IceUser, IceUsername}, peer::{ - New, Peer, PeerError, PeerStateMachine, WaitLocalHaveRemote, + Peer, PeerError, PeerStateMachine, Stable, WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, }, track::MediaTrack, diff --git a/src/media/peer.rs b/src/media/peer.rs index c4f529903..de182144b 100644 --- a/src/media/peer.rs +++ b/src/media/peer.rs @@ -10,7 +10,7 @@ use derive_more::Display; use failure::Fail; use medea_client_api_proto::{ AudioSettings, Direction, IceServer, MediaType, PeerId as Id, Track, - TrackId, VideoSettings, + TrackId, TrackUpdate, VideoSettings, }; use medea_macro::enum_delegate; @@ -27,25 +27,27 @@ use crate::{ }, }; -/// Newly initialized [`Peer`] ready to signalling. -#[derive(Debug, PartialEq)] -pub struct New {} - -/// [`Peer`] doesnt have remote SDP and is waiting for local SDP. +/// [`Peer`] doesn't have remote [SDP] and is waiting for local [SDP]. +/// +/// [SDP]: https://tools.ietf.org/html/rfc4317 #[derive(Debug, PartialEq)] -pub struct WaitLocalSdp {} +pub struct WaitLocalSdp; -/// [`Peer`] has remote SDP and is waiting for local SDP. +/// [`Peer`] has remote [SDP] and is waiting for local [SDP]. +/// +/// [SDP]: https://tools.ietf.org/html/rfc4317 #[derive(Debug, PartialEq)] -pub struct WaitLocalHaveRemote {} +pub struct WaitLocalHaveRemote; -/// [`Peer`] has local SDP and is waiting for remote SDP. +/// [`Peer`] has local [SDP] and is waiting for remote [SDP]. +/// +/// [SDP]: https://tools.ietf.org/html/rfc4317 #[derive(Debug, PartialEq)] -pub struct WaitRemoteSdp {} +pub struct WaitRemoteSdp; -/// SDP exchange ended. +/// No negotiation happening atm. It may have been ended or haven't yet started. #[derive(Debug, PartialEq)] -pub struct Stable {} +pub struct Stable; /// Produced when unwrapping [`PeerStateMachine`] to [`Peer`] with wrong state. #[derive(Debug, Display, Fail)] @@ -80,15 +82,17 @@ impl PeerError { #[enum_delegate(pub fn partner_peer_id(&self) -> Id)] #[enum_delegate(pub fn partner_member_id(&self) -> MemberId)] #[enum_delegate(pub fn is_force_relayed(&self) -> bool)] -#[enum_delegate(pub fn tracks(&self) -> Vec)] #[enum_delegate(pub fn ice_servers_list(&self) -> Option>)] #[enum_delegate(pub fn set_ice_user(&mut self, ice_user: IceUser))] #[enum_delegate(pub fn endpoints(&self) -> Vec)] #[enum_delegate(pub fn add_endpoint(&mut self, endpoint: &Endpoint))] #[enum_delegate( - pub fn receivers(&self) -> HashMap> + pub fn receivers(&self) -> &HashMap> +)] +#[enum_delegate(pub fn senders(&self) -> &HashMap>)] +#[enum_delegate( + pub fn get_updates(&self) -> Vec )] -#[enum_delegate(pub fn senders(&self) -> HashMap>)] #[enum_delegate( pub fn update_senders_statuses( &self, @@ -97,7 +101,6 @@ impl PeerError { )] #[derive(Debug)] pub enum PeerStateMachine { - New(Peer), WaitLocalSdp(Peer), WaitLocalHaveRemote(Peer), WaitRemoteSdp(Peer), @@ -108,7 +111,6 @@ impl fmt::Display for PeerStateMachine { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { PeerStateMachine::WaitRemoteSdp(_) => write!(f, "WaitRemoteSdp"), - PeerStateMachine::New(_) => write!(f, "New"), PeerStateMachine::WaitLocalSdp(_) => write!(f, "WaitLocalSdp"), PeerStateMachine::WaitLocalHaveRemote(_) => { write!(f, "WaitLocalHaveRemote") @@ -138,15 +140,18 @@ macro_rules! impl_peer_converts { } impl TryFrom for Peer<$peer_type> { - type Error = PeerError; + type Error = (PeerError, PeerStateMachine); fn try_from(peer: PeerStateMachine) -> Result { match peer { PeerStateMachine::$peer_type(peer) => Ok(peer), - _ => Err(PeerError::WrongState( - peer.id(), - stringify!($peer_type), - format!("{}", peer), + _ => Err(( + PeerError::WrongState( + peer.id(), + stringify!($peer_type), + format!("{}", peer), + ), + peer, )), } } @@ -160,7 +165,6 @@ macro_rules! impl_peer_converts { }; } -impl_peer_converts!(New); impl_peer_converts!(WaitLocalSdp); impl_peer_converts!(WaitLocalHaveRemote); impl_peer_converts!(WaitRemoteSdp); @@ -168,18 +172,93 @@ impl_peer_converts!(Stable); #[derive(Debug)] pub struct Context { + /// [`PeerId`] of this [`Peer`]. id: Id, + + /// [`MemberId`] of a [`Member`] which owns this [`Peer`]. member_id: MemberId, + + /// [`PeerId`] of a partner [`Peer`]. partner_peer: Id, + + /// [`MemberId`] of a partner [`Peer`]'s owner. partner_member: MemberId, + + /// [`IceUser`] created for this [`Peer`]. ice_user: Option, + + /// [SDP] offer of this [`Peer`]. + /// + /// [SDP]: https://tools.ietf.org/html/rfc4317 sdp_offer: Option, + + /// [SDP] answer of this [`Peer`]. + /// + /// [SDP]: https://tools.ietf.org/html/rfc4317 sdp_answer: Option, + + /// All [`MediaTrack`]s with a `Recv` direction`. receivers: HashMap>, + + /// All [`MediaTrack`]s with a `Send` direction. senders: HashMap>, + + /// Indicator whether this [`Peer`] must be forcibly connected through + /// TURN. is_force_relayed: bool, + /// Weak references to the [`Endpoint`]s related to this [`Peer`]. endpoints: Vec, + + /// Indicator whether this [`Peer`] was created on remote. + is_known_to_remote: bool, + + /// Tracks changes, that remote [`Peer`] is not aware of. + pending_track_updates: Vec, +} + +/// Tracks changes, that remote [`Peer`] is not aware of. +#[derive(Debug)] +enum TrackChange { + /// [`MediaTrack`]s with [`Direction::Send`] of this [`Peer`] that remote + /// Peer is not aware of. + AddSendTrack(Rc), + + /// [`MediaTrack`]s with [`Direction::Recv`] of this [`Peer`] that remote + /// Peer is not aware of. + AddRecvTrack(Rc), +} + +impl TrackChange { + /// Tries to return [`Track`] based on this [`TrackChange`]. + /// + /// Returns `None` if this [`TrackChange`] doesn't indicates new [`Track`] + /// creation. + fn try_as_track(&self, partner_peer_id: Id) -> Option { + let (direction, track) = match self { + TrackChange::AddSendTrack(track) => ( + Direction::Send { + receivers: vec![partner_peer_id], + mid: track.mid(), + }, + track, + ), + TrackChange::AddRecvTrack(track) => ( + Direction::Recv { + sender: partner_peer_id, + mid: track.mid(), + }, + track, + ), + }; + + Some(Track { + id: track.id, + is_muted: false, + media_type: track.media_type.clone(), + direction, + }) + } } /// [RTCPeerConnection] representation. @@ -216,41 +295,31 @@ impl Peer { self.context.partner_member.clone() } - /// Returns [`Track`]s of this [`Peer`]. - pub fn tracks(&self) -> Vec { - let tracks = self.context.senders.iter().fold( - Vec::new(), - |mut tracks, (_, track)| { - tracks.push(Track { - id: track.id, - media_type: track.media_type.clone(), - direction: Direction::Send { - receivers: vec![self.context.partner_peer], - mid: track.mid(), - }, - is_muted: false, - }); - tracks - }, - ); + /// Returns [`TrackUpdate`]s of this [`Peer`] which should be sent to the + /// client in the [`Event::TracksApplied`]. + pub fn get_updates(&self) -> Vec { self.context - .receivers + .pending_track_updates .iter() - .fold(tracks, |mut tracks, (_, track)| { - tracks.push(Track { - id: track.id, - media_type: track.media_type.clone(), - direction: Direction::Recv { - sender: self.context.partner_peer, - mid: track.mid(), - }, - is_muted: false, - }); - tracks + .map(|change| { + // TODO: remove this unwrap when new TrackChanges will be + // implemented. + change.try_as_track(self.partner_peer_id()).unwrap() }) + .map(TrackUpdate::Added) + .collect() } - /// Checks if this [`Peer`] has any send tracks. + /// Returns [`Track`]s that remote [`Peer`] is not aware of. + pub fn new_tracks(&self) -> Vec { + self.context + .pending_track_updates + .iter() + .filter_map(|update| update.try_as_track(self.partner_peer_id())) + .collect() + } + + /// Indicates whether this [`Peer`] has any send tracks. pub fn is_sender(&self) -> bool { !self.context.senders.is_empty() } @@ -301,17 +370,103 @@ impl Peer { } /// Returns all receiving [`MediaTrack`]s of this [`Peer`]. - pub fn receivers(&self) -> HashMap> { - self.context.receivers.clone() + pub fn receivers(&self) -> &HashMap> { + &self.context.receivers } /// Returns all sending [`MediaTrack`]s of this [`Peer`]. - pub fn senders(&self) -> HashMap> { - self.context.senders.clone() + pub fn senders(&self) -> &HashMap> { + &self.context.senders + } + + /// Indicates whether this [`Peer`] is known to client (`Event::PeerCreated` + /// for this [`Peer`] was sent to the client). + pub fn is_known_to_remote(&self) -> bool { + self.context.is_known_to_remote + } + + /// Sets [`Self::is_known_to_remote`] to `true`. + /// + /// Resets `pending_changes` buffer. + /// + /// Should be called when renegotiation was finished. + fn negotiation_finished(&mut self) { + self.context.is_known_to_remote = true; + self.context.pending_track_updates.clear(); + } +} + +impl Peer { + /// Sets local description and transition [`Peer`] to [`WaitRemoteSdp`] + /// state. + #[inline] + pub fn set_local_sdp(self, sdp_offer: String) -> Peer { + let mut context = self.context; + context.sdp_offer = Some(sdp_offer); + Peer { + context, + state: WaitRemoteSdp {}, + } + } + + /// Sets tracks [mid]s. + /// + /// Provided [mid]s must have entries for all [`Peer`]s tracks. + /// + /// # Errors + /// + /// Errors with [`PeerError::MidsMismatch`] if [`Peer`] is sending + /// [`MediaTrack`] without providing its [mid]. + /// + /// [mid]: https://developer.mozilla.org/docs/Web/API/RTCRtpTransceiver/mid + pub fn set_mids( + &mut self, + mut mids: HashMap, + ) -> Result<(), PeerError> { + let tracks = self + .context + .senders + .iter_mut() + .chain(self.context.receivers.iter_mut()); + + for (id, track) in tracks { + let mid = mids + .remove(&id) + .ok_or_else(|| PeerError::MidsMismatch(track.id))?; + track.set_mid(mid) + } + + Ok(()) + } +} + +impl Peer { + /// Sets remote description and transitions [`Peer`] to [`Stable`] state. + pub fn set_remote_sdp(mut self, sdp_answer: &str) -> Peer { + self.negotiation_finished(); + self.context.sdp_answer = Some(sdp_answer.to_string()); + + Peer { + context: self.context, + state: Stable {}, + } } } -impl Peer { +impl Peer { + /// Sets local description and transitions [`Peer`] to [`Stable`] state. + pub fn set_local_sdp(mut self, sdp_answer: String) -> Peer { + self.negotiation_finished(); + self.context.sdp_answer = Some(sdp_answer); + + Peer { + context: self.context, + state: Stable {}, + } + } +} + +impl Peer { /// Creates new [`Peer`] for [`Member`]. /// /// [`Member`]: crate::signalling::elements::member::Member @@ -334,10 +489,13 @@ impl Peer { senders: HashMap::new(), is_force_relayed, endpoints: Vec::new(), + is_known_to_remote: false, + pending_track_updates: Vec::new(), }; + Self { context, - state: New {}, + state: Stable {}, } } @@ -349,7 +507,7 @@ impl Peer { pub fn add_publisher( &mut self, src: &WebRtcPublishEndpoint, - publisher_peer: &mut Peer, + partner_peer: &mut Peer, tracks_counter: &Counter, ) { let audio_settings = src.audio_settings(); @@ -361,7 +519,7 @@ impl Peer { }), )); self.add_sender(Rc::clone(&track_audio)); - publisher_peer.add_receiver(track_audio); + partner_peer.add_receiver(track_audio); } let video_settings = src.video_settings(); @@ -373,7 +531,7 @@ impl Peer { }), )); self.add_sender(Rc::clone(&track_video)); - publisher_peer.add_receiver(track_video); + partner_peer.add_receiver(track_video); } } @@ -398,84 +556,6 @@ impl Peer { } } - /// Adds [`Track`] to [`Peer`] for send. - pub fn add_sender(&mut self, track: Rc) { - self.context.senders.insert(track.id, track); - } - - /// Adds [`Track`] to [`Peer`] for receive. - pub fn add_receiver(&mut self, track: Rc) { - self.context.receivers.insert(track.id, track); - } -} - -impl Peer { - /// Sets local description and transition [`Peer`] - /// to [`WaitRemoteSdp`] state. - pub fn set_local_sdp(self, sdp_offer: String) -> Peer { - let mut context = self.context; - context.sdp_offer = Some(sdp_offer); - Peer { - context, - state: WaitRemoteSdp {}, - } - } - - /// Sets tracks [mid]s. - /// - /// Provided [mid]s must have entries for all [`Peer`]s tracks. - /// - /// # Errors - /// - /// Errors with [`PeerError::MidsMismatch`] if [`Peer`] is sending - /// [`MediaTrack`] without providing its [mid]. - /// - /// [mid]: - /// https://developer.mozilla.org/en-US/docs/Web/API/RTCRtpTransceiver/mid - pub fn set_mids( - &mut self, - mut mids: HashMap, - ) -> Result<(), PeerError> { - for (id, track) in self - .context - .senders - .iter_mut() - .chain(self.context.receivers.iter_mut()) - { - let mid = mids - .remove(&id) - .ok_or_else(|| PeerError::MidsMismatch(track.id))?; - track.set_mid(mid) - } - Ok(()) - } -} - -impl Peer { - /// Sets remote description and transition [`Peer`] to [`Stable`] state. - pub fn set_remote_sdp(self, sdp_answer: &str) -> Peer { - let mut context = self.context; - context.sdp_answer = Some(sdp_answer.to_string()); - Peer { - context, - state: Stable {}, - } - } -} - -impl Peer { - /// Sets local description and transition [`Peer`] to [`Stable`] state. - pub fn set_local_sdp(self, sdp_answer: String) -> Peer { - let mut context = self.context; - context.sdp_answer = Some(sdp_answer); - Peer { - context, - state: Stable {}, - } - } -} - -impl Peer { /// Returns [mid]s of this [`Peer`]. /// /// # Errors @@ -497,6 +577,47 @@ impl Peer { } Ok(mids) } + + /// Changes [`Peer`] state to [`WaitLocalSdp`] and discards previously saved + /// [SDP] Offer and Answer. + /// + /// Sets [`Context::is_renegotiate`] to `true`. + /// + /// Resets [`Context::sdp_offer`] and [`Context::sdp_answer`]. + /// + /// [SDP]: https://tools.ietf.org/html/rfc4317 + pub fn start_renegotiation(self) -> Peer { + let mut context = self.context; + context.sdp_answer = None; + context.sdp_offer = None; + + Peer { + context, + state: WaitLocalSdp {}, + } + } + + /// Adds [`Track`] to [`Peer`] send tracks list. + /// + /// This [`Track`] is considered new (not known to remote) and may be + /// obtained by calling `Peer.new_tracks`. + fn add_sender(&mut self, track: Rc) { + self.context + .pending_track_updates + .push(TrackChange::AddSendTrack(Rc::clone(&track))); + self.context.senders.insert(track.id, track); + } + + /// Adds [`Track`] to [`Peer`] receive tracks list. + /// + /// This [`Track`] is considered new (not known to remote) and may be + /// obtained by calling `Peer.new_tracks`. + fn add_receiver(&mut self, track: Rc) { + self.context + .pending_track_updates + .push(TrackChange::AddRecvTrack(Rc::clone(&track))); + self.context.receivers.insert(track.id, track); + } } #[cfg(test)] @@ -511,22 +632,13 @@ pub mod tests { recv_audio: u32, recv_video: u32, ) -> PeerStateMachine { - let mut peer = Peer { - state: Stable {}, - context: Context { - id: Id(1), - sdp_offer: None, - sdp_answer: None, - senders: HashMap::new(), - receivers: HashMap::new(), - member_id: MemberId::from("test-member"), - is_force_relayed: false, - partner_peer: Id(2), - ice_user: None, - endpoints: Vec::new(), - partner_member: MemberId::from("partner-member"), - }, - }; + let mut peer = Peer::new( + Id(1), + MemberId::from("test-member"), + Id(2), + MemberId::from("partner-member"), + false, + ); let track_id_counter = Counter::default(); diff --git a/src/signalling/elements/member.rs b/src/signalling/elements/member.rs index 4d1dc79ca..4623bddea 100644 --- a/src/signalling/elements/member.rs +++ b/src/signalling/elements/member.rs @@ -315,6 +315,20 @@ impl Member { self.0.borrow().sinks.clone() } + /// Returns partner [`Member`]s of this [`Member`]. + pub fn partners(&self) -> Vec { + let this = self.0.borrow(); + this.srcs + .values() + .flat_map(|src| src.sinks().into_iter().map(|s| s.owner())) + .chain(this.sinks.values().map(|s| s.src().owner())) + .map(|member| (member.id(), member)) + .collect::>() + .into_iter() + .map(|(_, member)| member) + .collect() + } + /// Inserts sink endpoint into this [`Member`]. pub fn insert_sink(&self, endpoint: WebRtcPlayEndpoint) { self.0.borrow_mut().sinks.insert(endpoint.id(), endpoint); diff --git a/src/signalling/peers/mod.rs b/src/signalling/peers/mod.rs index 3f180672e..f6cb75473 100644 --- a/src/signalling/peers/mod.rs +++ b/src/signalling/peers/mod.rs @@ -21,7 +21,7 @@ use crate::{ api::control::{MemberId, RoomId}, conf, log::prelude::*, - media::{New, Peer, PeerStateMachine}, + media::{Peer, PeerError, PeerStateMachine, Stable}, signalling::{ elements::endpoints::{ webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint}, @@ -80,7 +80,7 @@ pub struct PeersService { } /// Simple ID counter. -#[derive(Default, Debug, Clone, Display)] +#[derive(Clone, Debug, Default, Display)] pub struct Counter { count: Cell, } @@ -95,7 +95,7 @@ impl Counter { } /// Result of the [`PeersService::get_or_create_peers`] function. -#[derive(Debug, Clone, Copy)] +#[derive(Clone, Copy, Debug)] enum GetOrCreatePeersResult { /// Requested [`Peer`] pair was created. Created(PeerId, PeerId), @@ -104,6 +104,16 @@ enum GetOrCreatePeersResult { AlreadyExisted(PeerId, PeerId), } +/// Result of the [`PeersService::connect_endpoints`] function. +#[derive(Clone, Copy, Debug)] +pub enum ConnectEndpointsResult { + /// New [`Peer`] pair was created. + Created(PeerId, PeerId), + + /// [`Peer`] pair was updated. + Updated(PeerId, PeerId), +} + impl PeersService { /// Returns new [`PeerRepository`] for a [`Room`] with the provided /// [`RoomId`]. @@ -229,13 +239,17 @@ impl PeersService { /// /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't /// exist in [`PeerRepository`]. + /// + /// Errors with [`RoomError::PeerError`] if [`Peer`] is found, but not in + /// requested state. pub fn take_inner_peer( &self, peer_id: PeerId, ) -> Result, RoomError> where Peer: TryFrom, - as TryFrom>::Error: Into, + as TryFrom>::Error: + Into<(PeerError, PeerStateMachine)>, { self.peers.take_inner_peer(peer_id) } @@ -331,7 +345,9 @@ impl PeersService { self: Rc, src: WebRtcPublishEndpoint, sink: WebRtcPlayEndpoint, - ) -> Result, RoomError> { + ) -> Result, RoomError> { + use ConnectEndpointsResult::{Created, Updated}; + debug!( "Connecting endpoints of Member [id = {}] with Member [id = {}]", src.owner().id(), @@ -339,7 +355,7 @@ impl PeersService { ); match self.get_or_create_peers(&src, &sink).await? { GetOrCreatePeersResult::Created(src_peer_id, sink_peer_id) => { - Ok(Some((src_peer_id, sink_peer_id))) + Ok(Some(Created(src_peer_id, sink_peer_id))) } GetOrCreatePeersResult::AlreadyExisted( src_peer_id, @@ -355,9 +371,9 @@ impl PeersService { // which might not be the case, e.g. Control // Service creates multiple endpoints in quick // succession. - let mut src_peer: Peer = + let mut src_peer: Peer = self.peers.take_inner_peer(src_peer_id).unwrap(); - let mut sink_peer: Peer = + let mut sink_peer: Peer = self.peers.take_inner_peer(sink_peer_id).unwrap(); src_peer.add_publisher( @@ -406,7 +422,7 @@ impl PeersService { .await .map_err(RoomError::PeerTrafficWatcherMailbox)?; - Ok(None) + Ok(Some(Updated(src_peer_id, sink_peer_id))) } } } @@ -555,9 +571,16 @@ impl PeerRepository { ) -> Result, RoomError> where Peer: TryFrom, - as TryFrom>::Error: Into, + as TryFrom>::Error: + Into<(PeerError, PeerStateMachine)>, { - self.take(peer_id)?.try_into().map_err(Into::into) + type Err = as TryFrom>::Error; + + self.take(peer_id)?.try_into().map_err(|e: Err| { + let (err, peer) = e.into(); + self.add_peer(peer); + RoomError::from(err) + }) } /// Stores [`Peer`] in [`Room`]. diff --git a/src/signalling/peers/traffic_watcher.rs b/src/signalling/peers/traffic_watcher.rs index da2b185eb..ef53a4707 100644 --- a/src/signalling/peers/traffic_watcher.rs +++ b/src/signalling/peers/traffic_watcher.rs @@ -617,23 +617,27 @@ impl Handler for PeersTrafficWatcherImpl { _: &mut Self::Context, ) -> Self::Result { if let Some(room) = self.stats.get_mut(&msg.room_id) { - debug!( - "Peer [id = {}] from a Room [id = {}] was registered in the \ - PeersTrafficWatcher with {:?} sources.", - msg.peer_id, msg.room_id, msg.flow_metrics_sources - ); - room.peers.insert( - msg.peer_id, - PeerStat { - peer_id: msg.peer_id, - state: PeerState::New, - init_task_handler: None, - tracked_sources: msg.flow_metrics_sources, - started_at: None, - received_sources: HashMap::new(), - traffic_flowing_timeout: self.traffic_report_ttl, - }, - ); + if let Some(peer) = room.peers.get_mut(&msg.peer_id) { + peer.tracked_sources.extend(msg.flow_metrics_sources); + } else { + debug!( + "Peer [id = {}] from a Room [id = {}] was registered in \ + the PeersTrafficWatcher with {:?} sources.", + msg.peer_id, msg.room_id, msg.flow_metrics_sources + ); + room.peers.insert( + msg.peer_id, + PeerStat { + peer_id: msg.peer_id, + state: PeerState::New, + init_task_handler: None, + tracked_sources: msg.flow_metrics_sources, + started_at: None, + received_sources: HashMap::new(), + traffic_flowing_timeout: self.traffic_report_ttl, + }, + ); + } } } } @@ -750,7 +754,7 @@ mod tests { async fn correct_stopped_at_when_init_timeout_stop() { let mut helper = Helper::new(&conf::Media { init_timeout: Duration::from_millis(100), - ..Default::default() + max_lag: Duration::from_secs(999), }) .await; helper @@ -776,7 +780,6 @@ mod tests { let mut helper = Helper::new(&conf::Media { init_timeout: Duration::from_secs(999), max_lag: Duration::from_millis(50), - ..Default::default() }) .await; helper @@ -848,7 +851,6 @@ mod tests { let mut helper = Helper::new(&conf::Media { init_timeout: Duration::from_millis(30), max_lag: Duration::from_secs(999), - ..Default::default() }) .await; helper @@ -962,7 +964,6 @@ mod tests { let mut helper = Helper::new(&conf::Media { init_timeout: Duration::from_secs(999), max_lag: Duration::from_secs(999), - ..Default::default() }) .await; helper diff --git a/src/signalling/room/command_handler.rs b/src/signalling/room/command_handler.rs index ee9fa8335..fff817a97 100644 --- a/src/signalling/room/command_handler.rs +++ b/src/signalling/room/command_handler.rs @@ -5,14 +5,14 @@ use std::collections::HashMap; use actix::WrapFuture as _; use medea_client_api_proto::{ - CommandHandler, Event, IceCandidate, PeerId, PeerMetrics, TrackId, - TrackPatch, + CommandHandler, Event, IceCandidate, NegotiationRole, PeerId, PeerMetrics, + TrackId, TrackPatch, TrackUpdate, }; use crate::{ log::prelude::*, media::{ - New, Peer, PeerStateMachine, WaitLocalHaveRemote, WaitLocalSdp, + Peer, PeerStateMachine, Stable, WaitLocalHaveRemote, WaitLocalSdp, WaitRemoteSdp, }, }; @@ -24,8 +24,8 @@ impl CommandHandler for Room { /// Sends [`Event::PeerCreated`] to provided [`Peer`] partner. Provided /// [`Peer`] state must be [`WaitLocalSdp`] and will be changed to - /// [`WaitRemoteSdp`], partners [`Peer`] state must be [`New`] and will be - /// changed to [`WaitLocalHaveRemote`]. + /// [`WaitRemoteSdp`], partners [`Peer`] state must be [`Stable`] and will + /// be changed to [`WaitLocalHaveRemote`]. fn on_make_sdp_offer( &mut self, from_peer_id: PeerId, @@ -39,7 +39,7 @@ impl CommandHandler for Room { from_peer.update_senders_statuses(senders_statuses); let to_peer_id = from_peer.partner_peer_id(); - let to_peer: Peer = self.peers.take_inner_peer(to_peer_id)?; + let to_peer: Peer = self.peers.take_inner_peer(to_peer_id)?; let from_peer = from_peer.set_local_sdp(sdp_offer.clone()); let to_peer = to_peer.set_remote_sdp(sdp_offer.clone()); @@ -49,12 +49,20 @@ impl CommandHandler for Room { RoomError::NoTurnCredentials(to_member_id.clone()) })?; - let event = Event::PeerCreated { - peer_id: to_peer.id(), - sdp_offer: Some(sdp_offer), - tracks: to_peer.tracks(), - ice_servers, - force_relay: to_peer.is_force_relayed(), + let event = if from_peer.is_known_to_remote() { + Event::TracksApplied { + peer_id: to_peer_id, + negotiation_role: Some(NegotiationRole::Answerer(sdp_offer)), + updates: to_peer.get_updates(), + } + } else { + Event::PeerCreated { + peer_id: to_peer.id(), + negotiation_role: NegotiationRole::Answerer(sdp_offer), + tracks: to_peer.new_tracks(), + ice_servers, + force_relay: to_peer.is_force_relayed(), + } }; self.peers.add_peer(from_peer); @@ -73,8 +81,6 @@ impl CommandHandler for Room { /// [`Peer`] state must be [`WaitLocalHaveRemote`] and will be changed to /// [`Stable`], partners [`Peer`] state must be [`WaitRemoteSdp`] and will /// be changed to [`Stable`]. - /// - /// [`Stable`]: crate::media::peer::Stable fn on_make_sdp_answer( &mut self, from_peer_id: PeerId, @@ -111,7 +117,7 @@ impl CommandHandler for Room { } /// Sends [`Event::IceCandidateDiscovered`] to provided [`Peer`] partner. - /// Both [`Peer`]s may have any state except [`New`]. + /// Both [`Peer`]s may have any state except [`Stable`]. fn on_set_ice_candidate( &mut self, from_peer_id: PeerId, @@ -150,7 +156,7 @@ impl CommandHandler for Room { Ok(Box::new(actix::fut::ok(()))) } - /// Sends [`Event::TracksUpdated`] with data from the received + /// Sends [`Event::TracksApplied`] with data from the received /// [`Command::UpdateTracks`]. /// /// [`Command::UpdateTracks`]: medea_client_api_proto::Command::UpdateTracks @@ -167,9 +173,13 @@ impl CommandHandler for Room { self.members .send_event_to_member( member_id, - Event::TracksUpdated { + Event::TracksApplied { peer_id, - tracks_patches, + negotiation_role: None, + updates: tracks_patches + .into_iter() + .map(TrackUpdate::Updated) + .collect(), }, ) .into_actor(self), diff --git a/src/signalling/room/dynamic_api.rs b/src/signalling/room/dynamic_api.rs index bb9b6abdf..3a897edcc 100644 --- a/src/signalling/room/dynamic_api.rs +++ b/src/signalling/room/dynamic_api.rs @@ -6,8 +6,8 @@ use std::collections::{HashMap, HashSet}; use actix::{ - ActorFuture as _, Context, ContextFutureSpawner as _, Handler, Message, - WrapFuture as _, + fut, ActorFuture as _, Context, ContextFutureSpawner as _, Handler, + Message, WrapFuture as _, }; use medea_client_api_proto::PeerId; use medea_control_api_proto::grpc::api as proto; @@ -23,9 +23,12 @@ use crate::{ WebRtcPublishId, }, log::prelude::*, - signalling::elements::{ - endpoints::webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint}, - member::MemberError, + signalling::{ + elements::{ + endpoints::webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint}, + member::MemberError, + }, + room::ActFuture, }, }; @@ -180,8 +183,7 @@ impl Room { member_id: &MemberId, endpoint_id: WebRtcPlayId, spec: WebRtcPlayEndpointSpec, - ctx: &mut Context, - ) -> Result<(), RoomError> { + ) -> Result>, RoomError> { let member = self.members.get_member(&member_id)?; let is_member_have_this_sink_id = @@ -228,10 +230,10 @@ impl Room { member.insert_sink(sink); if self.members.member_has_connection(member_id) { - self.init_member_connections(&member, ctx); + Ok(Box::new(self.init_member_connections(&member))) + } else { + Ok(Box::new(actix::fut::ok(()))) } - - Ok(()) } /// Removes [`Peer`]s and call [`Room::member_peers_removed`] for every @@ -404,32 +406,36 @@ pub struct CreateEndpoint { } impl Handler for Room { - type Result = Result<(), RoomError>; + type Result = ActFuture>; fn handle( &mut self, msg: CreateEndpoint, - ctx: &mut Self::Context, + _: &mut Self::Context, ) -> Self::Result { match msg.spec { EndpointSpec::WebRtcPlay(endpoint) => { - self.create_sink_endpoint( + match self.create_sink_endpoint( &msg.member_id, msg.endpoint_id.into(), endpoint, - ctx, - )?; + ) { + Ok(fut) => Box::new(fut), + Err(e) => Box::new(fut::err(e)), + } } EndpointSpec::WebRtcPublish(endpoint) => { - self.create_src_endpoint( + if let Err(e) = self.create_src_endpoint( &msg.member_id, msg.endpoint_id.into(), &endpoint, - )?; + ) { + Box::new(fut::err(e)) + } else { + Box::new(fut::ok(())) + } } } - - Ok(()) } } diff --git a/src/signalling/room/mod.rs b/src/signalling/room/mod.rs index 30d49a81f..d38c8e39e 100644 --- a/src/signalling/room/mod.rs +++ b/src/signalling/room/mod.rs @@ -6,15 +6,16 @@ mod dynamic_api; mod peer_events_handler; mod rpc_server; -use std::{rc::Rc, sync::Arc}; +use std::{collections::HashMap, rc::Rc, sync::Arc}; use actix::{ - Actor, ActorFuture, Context, ContextFutureSpawner as _, Handler, - MailboxError, WrapFuture as _, + Actor, ActorFuture, AsyncContext as _, Context, Handler, MailboxError, + WrapFuture as _, }; use derive_more::{Display, From}; use failure::Fail; -use medea_client_api_proto::{Event, PeerId}; +use futures::future; +use medea_client_api_proto::{Event, NegotiationRole, PeerId}; use crate::{ api::control::{ @@ -27,15 +28,15 @@ use crate::{ MemberId, RoomId, }, log::prelude::*, - media::{New, Peer, PeerError}, + media::{Peer, PeerError, Stable}, shutdown::ShutdownGracefully, signalling::{ elements::{member::MemberError, Member, MembersLoadError}, participants::{ParticipantService, ParticipantServiceErr}, - peers::{PeerTrafficWatcher, PeersService}, + peers::{ConnectEndpointsResult, PeerTrafficWatcher, PeersService}, }, turn::TurnServiceErr, - utils::ResponseActAnyFuture, + utils::{actix_try_join_all, ResponseActAnyFuture}, AppContext, }; @@ -172,53 +173,66 @@ impl Room { &self.id } - /// Sends [`Event::PeerCreated`] to one of specified [`Peer`]s based on - /// which of them has any outbound tracks. That [`Peer`] state will be - /// changed to [`WaitLocalSdp`] state. Both provided peers must be in - /// [`New`] state. At least one of provided peers must have outbound - /// tracks. + /// Sends [`Event::PeerCreated`] specified [`Peer`]. That [`Peer`] state + /// will be changed to [`WaitLocalSdp`] state. fn send_peer_created( &mut self, - peer1_id: PeerId, - peer2_id: PeerId, - ) -> Result>, RoomError> { - let peer1: Peer = self.peers.take_inner_peer(peer1_id)?; - let peer2: Peer = self.peers.take_inner_peer(peer2_id)?; - - // decide which peer is sender - let (sender, receiver) = if peer1.is_sender() { - (peer1, peer2) - } else if peer2.is_sender() { - (peer2, peer1) - } else { - self.peers.add_peer(peer1); - self.peers.add_peer(peer2); - return Err(RoomError::BadRoomSpec(format!( - "Error while trying to connect Peer [id = {}] and Peer [id = \ - {}] cause neither of peers are senders", - peer1_id, peer2_id - ))); - }; - self.peers.add_peer(receiver); + peer_id: PeerId, + ) -> ActFuture> { + let peer: Peer = + actix_try!(self.peers.take_inner_peer(peer_id)); - let sender = sender.start(); - let member_id = sender.member_id(); - let ice_servers = sender + let peer = peer.start(); + let member_id = peer.member_id(); + let ice_servers = peer .ice_servers_list() - .ok_or_else(|| RoomError::NoTurnCredentials(member_id.clone()))?; + .ok_or_else(|| RoomError::NoTurnCredentials(member_id.clone())); + let ice_servers = actix_try!(ice_servers); let peer_created = Event::PeerCreated { - peer_id: sender.id(), - sdp_offer: None, - tracks: sender.tracks(), + peer_id: peer.id(), + negotiation_role: NegotiationRole::Offerer, + tracks: peer.new_tracks(), ice_servers, - force_relay: sender.is_force_relayed(), + force_relay: peer.is_force_relayed(), }; - self.peers.add_peer(sender); - Ok(Box::new( + self.peers.add_peer(peer); + Box::new( self.members .send_event_to_member(member_id, peer_created) .into_actor(self), - )) + ) + } + + /// Sends [`Event::TracksApplied`] with latest [`Peer`] changes to specified + /// [`Member`]. Starts renegotiation, marking provided [`Peer`] as + /// [`NegotiationRole::Offerer`]. + /// + /// # Errors + /// + /// Errors if [`Peer`] lookup fails, or it is not in [`Stable`] state. + fn send_tracks_applied( + &mut self, + peer_id: PeerId, + ) -> ActFuture> { + let peer: Peer = + actix_try!(self.peers.take_inner_peer(peer_id)); + let peer = peer.start_renegotiation(); + let updates = peer.get_updates(); + let member_id = peer.member_id(); + self.peers.add_peer(peer); + + Box::new( + self.members + .send_event_to_member( + member_id, + Event::TracksApplied { + updates, + negotiation_role: Some(NegotiationRole::Offerer), + peer_id, + }, + ) + .into_actor(self), + ) } /// Sends [`Event::PeersRemoved`] to [`Member`]. @@ -239,69 +253,141 @@ impl Room { ) } - /// Creates and interconnects all [`Peer`]s between connected [`Member`] - /// and all available at this moment other [`Member`]s. - /// - /// Availability is determined by checking [`RpcConnection`] of all - /// [`Member`]s from [`WebRtcPlayEndpoint`]s and from receivers of - /// the connected [`Member`]. - fn init_member_connections( + /// Connects interconnected [`Endpoint`]s between provided [`Member`]s. + fn connect_members( &mut self, - member: &Member, - ctx: &mut ::Context, - ) { + member1: &Member, + member2: &Member, + ) -> ActFuture> { + let member2_id = member2.id(); let mut connect_endpoints_tasks = Vec::new(); - for publisher in member.srcs().values() { - for receiver in publisher.sinks() { - let receiver_owner = receiver.owner(); - - if receiver.peer_id().is_none() - && self.members.member_has_connection(&receiver_owner.id()) - { + for src in member1.srcs().values() { + for sink in src.sinks() { + if sink.owner().id() == member2_id { connect_endpoints_tasks.push( - self.peers - .clone() - .connect_endpoints(publisher.clone(), receiver), + self.peers.clone().connect_endpoints(src.clone(), sink), ); } } } - for receiver in member.sinks().values() { - let publisher = receiver.src(); - - if receiver.peer_id().is_none() - && self.members.member_has_connection(&publisher.owner().id()) - { + for sink in member1.sinks().values() { + let src = sink.src(); + if src.owner().id() == member2_id { connect_endpoints_tasks.push( - self.peers - .clone() - .connect_endpoints(publisher.clone(), receiver.clone()), + self.peers.clone().connect_endpoints(src, sink.clone()), ) } } - for connect_endpoints_task in connect_endpoints_tasks { - connect_endpoints_task + Box::new( + future::try_join_all(connect_endpoints_tasks) .into_actor(self) - .then(|result, this, _| match result { - Ok(Some((peer1, peer2))) => { - match this.send_peer_created(peer1, peer2) { - Ok(fut) => fut, - Err(err) => Box::new(actix::fut::err(err)), + .then(move |result, room: &mut Room, ctx| { + let connected_peers = actix_try!(result); + + // If there are at least one + // ConnectEndpointsResult::Created for Peers pair, then + // Peers should be created on remote. + // + // If there only ConnectEndpointsResult::Updated for Peers + // pair, then Peers should be renegotiated. + let mut peer_pairs_actions: HashMap< + (PeerId, PeerId), + ConnectEndpointsResult, + > = HashMap::new(); + + // We should keep PeerId's pairs order-agnostic for correct + // Hash and Eq. + let sort_pair = + |(id1, id2): (PeerId, PeerId)| -> (PeerId, PeerId) { + if id1.0.lt(&id2.0) { + (id1, id2) + } else { + (id2, id1) + } + }; + + // Fill `peer_pairs_actions` map. + connected_peers + .into_iter() + .filter_map(|item| item) + .for_each(|item| match item { + ConnectEndpointsResult::Created(id1, id2) => { + peer_pairs_actions + .insert(sort_pair((id1, id2)), item); + } + ConnectEndpointsResult::Updated(id1, id2) => { + peer_pairs_actions + .entry(sort_pair((id1, id2))) + .or_insert_with(|| item); + } + }); + + let mut peer_updates = Vec::new(); + let mut peer_creates = Vec::new(); + for (_, action) in peer_pairs_actions.drain() { + match action { + ConnectEndpointsResult::Created(id1, _) => { + peer_creates.push(room.send_peer_created(id1)); + } + ConnectEndpointsResult::Updated(id1, _) => { + peer_updates + .push(room.send_tracks_applied(id1)); + } } } - Err(err) => Box::new(actix::fut::err(err)), - Ok(_) => Box::new(actix::fut::ok(())), - }) - .map(|res, _, _| { - if let Err(err) = res { - error!("Failed connect peers, because {}.", err); - } - }) - .spawn(ctx); - } + + // TODO: peer_creates are spawned to avoid deadlock. + // Fixed in #111. + ctx.spawn(actix_try_join_all(peer_creates).map( + |res, _, _| { + if let Err(e) = res { + error!( + "Failed to connect Endpoints because: {:?}", + e, + ); + } + }, + )); + + Box::new( + actix_try_join_all(peer_updates) + .map(|res, _, _| res.map(|_| ())), + ) + }), + ) + } + + /// Creates and interconnects all [`Peer`]s between connected [`Member`] + /// and all available at this moment other [`Member`]s. Expects that + /// provided [`Member`] have active [`RpcConnection`]. + /// + /// Availability is determined by checking [`RpcConnection`] of all + /// [`Member`]s from [`WebRtcPlayEndpoint`]s and from receivers of + /// the connected [`Member`]. + /// + /// Will start renegotiation with `MediaTrack`s adding if some not + /// interconnected `Endpoint`s will be found and if [`Peer`]s pair is + /// already exists. + fn init_member_connections( + &mut self, + member: &Member, + ) -> ActFuture> { + let connect_members_tasks = + member.partners().into_iter().filter_map(|partner| { + if self.members.member_has_connection(&partner.id()) { + Some(self.connect_members(&partner, member)) + } else { + None + } + }); + + Box::new( + actix_try_join_all(connect_members_tasks) + .map(|result, _, _| result.map(|_| ())), + ) } /// Closes [`Room`] gracefully, by dropping all the connections and moving diff --git a/src/signalling/room/rpc_server.rs b/src/signalling/room/rpc_server.rs index 73dfd4ab8..47f78a82d 100644 --- a/src/signalling/room/rpc_server.rs +++ b/src/signalling/room/rpc_server.rs @@ -21,6 +21,7 @@ use crate::{ }, log::prelude::*, media::PeerStateMachine, + signalling::room::RoomError, utils::ResponseActAnyFuture, }; @@ -70,9 +71,11 @@ impl Room { .peers .map_peer_by_id(peer_id, PeerStateMachine::member_id) .map_err(|_| PeerNotFound(peer_id))?; + if peer_member_id != command.member_id { return Err(PeerBelongsToAnotherMember(peer_id, peer_member_id)); } + Ok(()) } } @@ -89,17 +92,17 @@ impl RpcServer for Addr { member_id, connection, }) - .map(|res| match res { - Ok(_) => Ok(()), - Err(e) => { - error!( - "Failed to send RpcConnectionEstablished cause {:?}", - e, - ); - Err(()) - } + .map(|r| { + r.map_err(|e| { + error!("Failed to send RpcConnectionEstablished cause {:?}", e) }) - .boxed_local() + .and_then(|r| { + r.map_err(|e| { + error!("RpcConnectionEstablished failed cause: {:?}", e) + }) + }) + }) + .boxed_local() } /// Sends [`RpcConnectionClosed`] message to [`Room`] actor ignoring any @@ -201,7 +204,7 @@ impl Handler for Room { } impl Handler for Room { - type Result = ActFuture>; + type Result = ActFuture>; /// Saves new [`RpcConnection`] in [`ParticipantService`][1], initiates /// media establishment between members. @@ -222,22 +225,23 @@ impl Handler for Room { let fut = self .members .connection_established(ctx, msg.member_id, msg.connection) - .map(|res, room, ctx| match res { - Ok(member) => { - room.init_member_connections(&member, ctx); - if let Some(callback_url) = member.get_on_join() { - room.callbacks.send_callback( - callback_url, - member.get_fid().into(), - OnJoinEvent, - ); - }; - Ok(()) - } - Err(e) => { - error!("RpcConnectionEstablished error {:?}", e); - Err(()) - } + .then(|res, room, _| { + let member = actix_try!(res); + Box::new( + room.init_member_connections(&member) + .map(|res, _, _| res.map(|_| member)), + ) + }) + .map(|result, room, _| { + let member = result?; + if let Some(callback_url) = member.get_on_join() { + room.callbacks.send_callback( + callback_url, + member.get_fid().into(), + OnJoinEvent, + ); + }; + Ok(()) }); Box::new(fut) } diff --git a/src/utils/actix_try_join_all.rs b/src/utils/actix_try_join_all.rs new file mode 100644 index 000000000..53bd5aab2 --- /dev/null +++ b/src/utils/actix_try_join_all.rs @@ -0,0 +1,140 @@ +//! [`TryJoinAll`] for [`ActorFuture`]. +//! +//! [`actix::ActorFuture`]: actix::ActorFuture +//! [`futures::future::TryJoinAll`]: futures::future::TryJoinAll + +use std::{ + mem, + pin::Pin, + task::{Context, Poll}, +}; + +use actix::{fut::ActorFuture, Actor}; + +/// Creates a future which represents either a collection of the results of the +/// futures given or an error. +/// The returned future will drive execution for all of its underlying futures, +/// collecting the results into a destination `Vec` in the same order as they +/// were provided. +/// +/// If any future returns an error then all other futures will be canceled and +/// an error will be returned immediately. If all futures complete successfully, +/// however, then the returned future will succeed with a [`Vec`] of all the +/// successful results. +/// +/// This function is analog for the [`try_join_all`], but for +/// the [`ActorFuture`]. +/// +/// [`actix::ActorFuture`]: actix::ActorFuture +/// [`futures::future::TryJoinAll`]: futures::future::TryJoinAll +pub fn actix_try_join_all(i: I) -> ActixTryJoinAll +where + I: IntoIterator, + F: ActorFuture> + Unpin, +{ + let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect(); + ActixTryJoinAll { + elems: elems.into(), + } +} + +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ActixTryJoinAll +where + F: ActorFuture> + Unpin, +{ + elems: Pin]>>, +} + +impl ActorFuture for ActixTryJoinAll +where + F: ActorFuture> + Unpin, +{ + type Actor = F::Actor; + type Output = Result, E>; + + fn poll( + mut self: Pin<&mut Self>, + srv: &mut Self::Actor, + ctx: &mut ::Context, + task: &mut Context<'_>, + ) -> Poll { + let mut state = FinalState::AllDone; + + for mut elem in iter_pin_mut(self.elems.as_mut()) { + if let Some(pending) = elem.as_mut().pending_pin_mut() { + match pending.poll(srv, ctx, task) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(output) => match output { + Ok(item) => elem.set(ElemState::Done(Some(item))), + Err(e) => { + state = FinalState::Error(e); + break; + } + }, + } + } + } + + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(&mut self.elems, Box::pin([])); + let results = iter_pin_mut(elems.as_mut()) + .map(|e| e.take_done().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(&mut self.elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } + } +} + +#[derive(Debug)] +enum ElemState +where + F: ActorFuture> + Unpin, +{ + Pending(F), + Done(Option), +} + +impl ElemState +where + F: ActorFuture> + Unpin, +{ + fn pending_pin_mut(self: Pin<&mut Self>) -> Option> { + match self.get_mut() { + ElemState::Pending(f) => Some(Pin::new(f)), + ElemState::Done(_) => None, + } + } + + fn take_done(self: Pin<&mut Self>) -> Option { + match self.get_mut() { + ElemState::Pending(_) => None, + ElemState::Done(output) => output.take(), + } + } +} + +impl Unpin for ElemState where + F: ActorFuture> + Unpin +{ +} + +fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator> +where + T: Unpin, +{ + slice.get_mut().iter_mut().map(Pin::new) +} + +enum FinalState { + Pending, + AllDone, + Error(E), +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 616070caa..eee1e9d9c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,7 @@ //! Helper utils used in project. +mod actix_try_join_all; + use std::{future::Future, pin::Pin, time::Instant}; use actix::prelude::dev::{ @@ -9,6 +11,8 @@ use actix::prelude::dev::{ use chrono::{DateTime, Utc}; use futures::future; +pub use self::actix_try_join_all::actix_try_join_all; + /// Creates new [`HashMap`] from a list of key-value pairs. /// /// # Example @@ -105,6 +109,20 @@ macro_rules! impl_debug_by_struct_name { }; } +/// `?` analog but for the functions which will return boxed [`ActorFuture`]. +#[macro_export] +macro_rules! actix_try { + ($e:expr) => { + match $e { + Ok(p) => p, + Err(e) => { + return Box::new(actix::fut::err(e.into())) + as Box>; + } + }; + }; +} + // TODO: remove after https://github.com/actix/actix/pull/313 /// Specialized future for asynchronous message handling. Exists because /// [`actix::ResponseFuture`] implements [`actix::dev::MessageResponse`] only diff --git a/tests/e2e/grpc_control_api/create.rs b/tests/e2e/grpc_control_api/create.rs index a8b482929..af7bf511c 100644 --- a/tests/e2e/grpc_control_api/create.rs +++ b/tests/e2e/grpc_control_api/create.rs @@ -8,7 +8,11 @@ use medea::api::control::error_codes::ErrorCode; use medea_control_api_proto::grpc::api as proto; -use crate::grpc_control_api::{take_member, take_room, take_webrtc_pub}; +use crate::{ + enum_eq, + grpc_control_api::{take_member, take_room, take_webrtc_pub}, + signalling::TestMember, +}; use super::{ create_room_req, ControlClient, MemberBuilder, RoomBuilder, @@ -208,6 +212,11 @@ mod member { } mod endpoint { + use std::time::Duration; + + use futures::{channel::mpsc, StreamExt as _}; + use medea_client_api_proto::{Event, TrackUpdate}; + use tokio::time::timeout; use super::*; @@ -364,4 +373,93 @@ mod endpoint { panic!("should err") } } + + /// Checks that all needed [`Event`]s are sent when Control API adds + /// `Endpoint` to the already interconnected `Member`s. + #[actix_rt::test] + async fn create_endpoint_in_the_interconnected_members() { + const TEST_NAME: &str = "create_endpoint_in_the_interconnected_members"; + + let mut client = ControlClient::new().await; + let credentials = client.create(create_room_req(TEST_NAME)).await; + + let (publisher_tx, mut rx) = mpsc::unbounded::<()>(); + let publisher_done = timeout(Duration::from_secs(5), rx.next()); + let (responder_tx, mut rx) = mpsc::unbounded::<()>(); + let responder_done = timeout(Duration::from_secs(5), rx.next()); + let (negotiation_finished_tx, mut rx) = mpsc::unbounded::<()>(); + let negotiation_finished = timeout(Duration::from_secs(5), rx.next()); + + let _publisher = TestMember::connect( + credentials.get("publisher").unwrap(), + Some(Box::new(move |event, _, _| { + match event { + Event::TracksApplied { updates, .. } => { + if updates + .iter() + .any(|u| enum_eq!(TrackUpdate::Added, u)) + { + publisher_tx.unbounded_send(()).unwrap(); + } + } + Event::SdpAnswerMade { .. } => { + negotiation_finished_tx.unbounded_send(()).unwrap(); + } + _ => (), + }; + })), + None, + None, + ) + .await; + let _responder = TestMember::connect( + credentials.get("responder").unwrap(), + Some(Box::new(move |event, _, events| { + if let Event::TracksApplied { updates, .. } = event { + if updates.iter().any(|u| enum_eq!(TrackUpdate::Added, u)) { + responder_tx.unbounded_send(()).unwrap(); + let sdp_answer_mades_count = events + .iter() + .filter(|e| { + if let Event::SdpAnswerMade { .. } = e { + true + } else { + false + } + }) + .count(); + if sdp_answer_mades_count == 2 { + responder_tx.unbounded_send(()).unwrap(); + } + } + }; + })), + None, + None, + ) + .await; + + negotiation_finished.await.unwrap(); + + let create_publish_endpoint = WebRtcPublishEndpointBuilder::default() + .id("publish") + .p2p_mode(proto::web_rtc_publish_endpoint::P2p::Always) + .build() + .unwrap() + .build_request(format!("{}/responder", TEST_NAME)); + client.create(create_publish_endpoint).await; + + let create_play_endpoint = WebRtcPlayEndpointBuilder::default() + .id("play-receiver") + .src(format!("local://{}/responder/publish", TEST_NAME)) + .build() + .unwrap() + .build_request(format!("{}/publisher", TEST_NAME)); + client.create(create_play_endpoint).await; + + let (publisher_res, responder_res) = + futures::future::join(publisher_done, responder_done).await; + publisher_res.unwrap().unwrap(); + responder_res.unwrap().unwrap(); + } } diff --git a/tests/e2e/main.rs b/tests/e2e/main.rs index 9bcca7c6f..ed68d9e3d 100644 --- a/tests/e2e/main.rs +++ b/tests/e2e/main.rs @@ -3,3 +3,18 @@ mod callbacks; mod grpc_control_api; pub mod signalling; + +/// Equality comparisons for the enum variants. +/// +/// This macro will ignore all content of the enum, it just compare +/// enum variants not they data. +#[macro_export] +macro_rules! enum_eq { + ($e:path, $val:ident) => { + if let $e { .. } = $val { + true + } else { + false + } + }; +} diff --git a/tests/e2e/signalling/mod.rs b/tests/e2e/signalling/mod.rs index 2cd51d58a..3d8ba0aa6 100644 --- a/tests/e2e/signalling/mod.rs +++ b/tests/e2e/signalling/mod.rs @@ -5,7 +5,10 @@ mod pub_sub_signallng; mod rpc_settings; mod three_pubs; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; use actix::{ Actor, ActorContext, Addr, Arbiter, AsyncContext, Context, Handler, @@ -20,7 +23,8 @@ use awc::{ }; use futures::{executor, stream::SplitSink, SinkExt as _, StreamExt as _}; use medea_client_api_proto::{ - ClientMsg, Command, Event, IceCandidate, PeerId, RpcSettings, ServerMsg, + ClientMsg, Command, Event, IceCandidate, NegotiationRole, PeerId, + RpcSettings, ServerMsg, TrackId, TrackUpdate, }; pub type MessageHandler = @@ -53,7 +57,14 @@ pub struct TestMember { events: Vec, /// List of peers created on this client. - known_peers: Vec, + known_peers: HashSet, + + /// List of the mids which was already generated and sent to the media + /// server. + known_tracks_mids: HashMap, + + /// Number of the lastly generated mid. + last_mid: u64, /// Max test lifetime, will panic when it will be exceeded. deadline: Option, @@ -103,7 +114,9 @@ impl TestMember { Self { sink, events: Vec::new(), - known_peers: Vec::new(), + known_peers: HashSet::new(), + known_tracks_mids: HashMap::new(), + last_mid: 0, deadline, on_message, on_connection_event, @@ -129,6 +142,34 @@ impl TestMember { .await; }) } + + /// Returns mid for the `MediaTrack` with a provided [`TrackId`]. + /// + /// This function will generate new mid if no mid for the provided + /// [`TrackId`] was found. + pub fn get_mid(&mut self, track_id: TrackId) -> String { + if let Some(mid) = self.known_tracks_mids.get(&track_id) { + mid.to_string() + } else { + self.last_mid += 1; + let last_mid = self.last_mid; + let new_mid = format!("test-mid-{}", last_mid); + self.known_tracks_mids.insert(track_id, new_mid.clone()); + new_mid + } + } + + /// Adds provided mid to the `MediaTrack` with a provided [`TrackId`]. + fn add_mid(&mut self, track_id: TrackId, mid: String) { + self.known_tracks_mids.insert(track_id, mid); + } + + /// Generates and sets mid for the provided [`TrackId`]. + fn generate_mid(&mut self, track_id: TrackId) { + self.last_mid += 1; + let mid = self.last_mid.to_string(); + self.add_mid(track_id, mid); + } } impl Actor for TestMember { @@ -203,35 +244,44 @@ impl StreamHandler> for TestMember { match &event { Event::PeerCreated { peer_id, - sdp_offer, + negotiation_role, tracks, .. } => { - self.known_peers.push(*peer_id); - match sdp_offer { - Some(_) => { - self.send_command(Command::MakeSdpAnswer { + self.known_peers.insert(*peer_id); + tracks.iter().for_each(|t| { + use medea_client_api_proto::Direction; + let mid = match &t.direction { + Direction::Send { mid, .. } + | Direction::Recv { mid, .. } => { + mid.clone() + } + }; + if let Some(mid) = mid { + self.add_mid(t.id, mid); + } else { + self.generate_mid(t.id); + } + }); + + match negotiation_role { + NegotiationRole::Offerer => { + self.send_command(Command::MakeSdpOffer { peer_id: *peer_id, - sdp_answer: "responder_answer".into(), + sdp_offer: "caller_offer".into(), + mids: self.known_tracks_mids.clone(), senders_statuses: HashMap::new(), }) } - None => { - self.send_command(Command::MakeSdpOffer { + NegotiationRole::Answerer(sdp_offer) => { + assert_eq!(sdp_offer, "caller_offer"); + self.send_command(Command::MakeSdpAnswer { peer_id: *peer_id, - sdp_offer: "caller_offer".into(), - mids: tracks - .iter() - .map(|t| t.id) - .enumerate() - .map(|(mid, id)| { - (id, mid.to_string()) - }) - .collect(), + sdp_answer: "responder_answer".into(), senders_statuses: HashMap::new(), }) } - }; + } self.send_command(Command::SetIceCandidate { peer_id: *peer_id, @@ -242,9 +292,57 @@ impl StreamHandler> for TestMember { }, }); } + Event::TracksApplied { + peer_id, + negotiation_role, + updates, + } => { + assert!(self.known_peers.contains(peer_id)); + updates.iter().for_each(|t| { + use medea_client_api_proto::Direction; + if let TrackUpdate::Added(track) = t { + let mid = match &track.direction { + Direction::Send { mid, .. } + | Direction::Recv { mid, .. } => { + mid.clone() + } + }; + if let Some(mid) = mid { + self.add_mid(track.id, mid); + } else { + self.generate_mid(track.id); + } + } + }); + + if let Some(negotiation_role) = negotiation_role { + match negotiation_role { + NegotiationRole::Answerer(sdp_offer) => { + assert_eq!(sdp_offer, "caller_offer"); + self.send_command( + Command::MakeSdpAnswer { + peer_id: *peer_id, + sdp_answer: "responder_answer" + .into(), + senders_statuses: HashMap::new( + ), + }, + ) + } + NegotiationRole::Offerer => self + .send_command(Command::MakeSdpOffer { + peer_id: *peer_id, + sdp_offer: "caller_offer".into(), + mids: self + .known_tracks_mids + .clone(), + senders_statuses: HashMap::new(), + }), + } + } + } Event::SdpAnswerMade { peer_id, .. } - | Event::IceCandidateDiscovered { peer_id, .. } - | Event::TracksUpdated { peer_id, .. } => { + | Event::IceCandidateDiscovered { peer_id, .. } => { assert!(self.known_peers.contains(peer_id)) } Event::PeersRemoved { .. } => {} diff --git a/tests/e2e/signalling/pub_sub_signallng.rs b/tests/e2e/signalling/pub_sub_signallng.rs index 2b6161937..10204fb28 100644 --- a/tests/e2e/signalling/pub_sub_signallng.rs +++ b/tests/e2e/signalling/pub_sub_signallng.rs @@ -1,5 +1,5 @@ use actix::{Context, System}; -use medea_client_api_proto::{Direction, Event}; +use medea_client_api_proto::{Direction, Event, NegotiationRole}; use crate::signalling::TestMember; @@ -30,7 +30,7 @@ fn pub_sub_video_call() { let is_caller; if let Event::PeerCreated { peer_id, - sdp_offer, + negotiation_role, tracks, ice_servers, force_relay, @@ -51,7 +51,7 @@ fn pub_sub_video_call() { ); assert_eq!(force_relay, &true); - if sdp_offer.is_some() { + if let NegotiationRole::Answerer(_) = negotiation_role { is_caller = false; } else { is_caller = true; @@ -97,13 +97,13 @@ fn pub_sub_video_call() { let deadline = Some(std::time::Duration::from_secs(5)); TestMember::start( - format!("{}/caller/test", base_url), + format!("{}/responder/test", base_url), Some(Box::new(test_fn)), None, deadline, ); TestMember::start( - format!("{}/responder/test", base_url), + format!("{}/caller/test", base_url), Some(Box::new(test_fn)), None, deadline,