From ba9109083da4e74e1fd24e7e8d4c864e8c009671 Mon Sep 17 00:00:00 2001 From: evdokimovs <49490279+evdokimovs@users.noreply.github.com> Date: Tue, 23 Jun 2020 15:52:05 +0300 Subject: [PATCH] Refactor PeersService with more idiomatic async/.await (#110, #27) - add unit tests for PeersService - fix Peer might not be registered in PeerTrafficWatcher --- Cargo.lock | 62 +- src/media/peer.rs | 4 +- src/signalling/elements/endpoints/mod.rs | 11 +- .../endpoints/webrtc/play_endpoint.rs | 9 + .../endpoints/webrtc/publish_endpoint.rs | 9 + src/signalling/peers/metrics.rs | 35 +- src/signalling/peers/mod.rs | 795 +++++++++++++----- src/signalling/room/command_handler.rs | 38 +- src/signalling/room/mod.rs | 9 +- src/signalling/room/rpc_server.rs | 9 +- 10 files changed, 670 insertions(+), 311 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e1e26651..c8e707928 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,7 +120,7 @@ dependencies = [ "pin-project", "rand 0.7.3", "regex", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "serde_urlencoded", "sha1", @@ -148,7 +148,7 @@ dependencies = [ "http", "log", "regex", - "serde 1.0.112", + "serde 1.0.114", ] [[package]] @@ -288,7 +288,7 @@ dependencies = [ "net2", "pin-project", "regex", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "serde_urlencoded", "time", @@ -334,9 +334,9 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a49806b9dadc843c61e7c97e72490ad7f7220ae249012fbda9ad0609457c0543" +checksum = "602d785912f476e480434627e8732e6766b760c045bbf897d9dfaa9f4fbd399c" dependencies = [ "gimli", ] @@ -349,9 +349,9 @@ checksum = "567b077b825e468cc974f0020d4082ee6e03132512f207ef1a02fd5d00d1f32d" [[package]] name = "aho-corasick" -version = "0.7.10" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +checksum = "c259a748ac706ba73d609b73fc13469e128337f9a6b2fb3cc82d100f8dd8d511" dependencies = [ "memchr", ] @@ -462,7 +462,7 @@ dependencies = [ "mime", "percent-encoding", "rand 0.7.3", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "serde_urlencoded", ] @@ -636,7 +636,7 @@ dependencies = [ "lazy_static", "nom", "rust-ini", - "serde 1.0.112", + "serde 1.0.114", "serde-hjson", "serde_json", "toml", @@ -791,7 +791,7 @@ dependencies = [ "config", "crossbeam-queue", "num_cpus", - "serde 1.0.112", + "serde 1.0.114", "tokio", ] @@ -807,7 +807,7 @@ dependencies = [ "futures 0.3.5", "log", "redis 0.15.1", - "serde 1.0.112", + "serde 1.0.114", ] [[package]] @@ -1236,7 +1236,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1c57351e2c81b7a03e82a6c8f4198b309c158f6d67c4adb707af6af7d91465a" dependencies = [ "humantime", - "serde 1.0.112", + "serde 1.0.114", ] [[package]] @@ -1480,7 +1480,7 @@ dependencies = [ "redis 0.15.1", "redis 0.15.2-alpha.0", "rust-crypto", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "serde_yaml", "serial_test", @@ -1505,7 +1505,7 @@ version = "0.2.0-dev" dependencies = [ "derive_more", "medea-macro", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "serde_with", ] @@ -1523,7 +1523,7 @@ dependencies = [ "humantime-serde", "medea-control-api-proto", "protobuf", - "serde 1.0.112", + "serde 1.0.114", "slog", "slog-async", "slog-envlogger", @@ -1574,7 +1574,7 @@ dependencies = [ "medea-reactive", "mockall", "predicates-tree", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "tracerr", "wasm-bindgen", @@ -1974,9 +1974,9 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.14.0" +version = "2.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e86d370532557ae7573551a1ec8235a0f8d6cb276c7c9e6aa490b511c447485" +checksum = "3e2ccb6b8f7e175f2d2401e7a5988b0630000164d221262c4fe50ae729513202" [[package]] name = "quick-error" @@ -2282,9 +2282,9 @@ checksum = "9dad3f759919b92c3068c696c15c3d17238234498bbdcc80f2c469606f948ac8" [[package]] name = "serde" -version = "1.0.112" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "736aac72d1eafe8e5962d1d1c3d99b0df526015ba40915cb3c49d042e92ec243" +checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3" dependencies = [ "serde_derive", ] @@ -2304,9 +2304,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.112" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0343ce212ac0d3d6afd9391ac8e9c9efe06b533c8d33f660f6390cc4093f57" +checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ "proc-macro2", "quote", @@ -2321,7 +2321,7 @@ checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" dependencies = [ "itoa", "ryu", - "serde 1.0.112", + "serde 1.0.114", ] [[package]] @@ -2341,7 +2341,7 @@ checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" dependencies = [ "dtoa", "itoa", - "serde 1.0.112", + "serde 1.0.114", "url", ] @@ -2351,7 +2351,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d3d595d64120bbbc70b7f6d5ae63298b62a3d9f373ec2f56acf5365ca8a444" dependencies = [ - "serde 1.0.112", + "serde 1.0.114", "serde_with_macros", ] @@ -2374,7 +2374,7 @@ checksum = "ae3e2dd40a7cdc18ca80db804b7f461a39bb721160a85c9a1fa30134bf3c02a5" dependencies = [ "dtoa", "linked-hash-map 0.5.3", - "serde 1.0.112", + "serde 1.0.114", "yaml-rust", ] @@ -2462,7 +2462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a" dependencies = [ "chrono", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "slog", ] @@ -2552,9 +2552,9 @@ checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" [[package]] name = "syn" -version = "1.0.31" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5304cfdf27365b7585c25d4af91b35016ed21ef88f17ced89c7093b43dba8b6" +checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" dependencies = [ "proc-macro2", "quote", @@ -2713,7 +2713,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" dependencies = [ - "serde 1.0.112", + "serde 1.0.114", ] [[package]] @@ -3135,7 +3135,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c2dc4aa152834bc334f506c1a06b866416a8b6697d5c9f75b9a689c8486def0" dependencies = [ "cfg-if", - "serde 1.0.112", + "serde 1.0.114", "serde_json", "wasm-bindgen-macro", ] diff --git a/src/media/peer.rs b/src/media/peer.rs index b74675da6..c4f529903 100644 --- a/src/media/peer.rs +++ b/src/media/peer.rs @@ -350,7 +350,7 @@ impl Peer { &mut self, src: &WebRtcPublishEndpoint, publisher_peer: &mut Peer, - tracks_counter: &mut Counter, + tracks_counter: &Counter, ) { let audio_settings = src.audio_settings(); if audio_settings.publish_policy != PublishPolicy::Disabled { @@ -528,7 +528,7 @@ pub mod tests { }, }; - let mut track_id_counter = Counter::default(); + let track_id_counter = Counter::default(); for _ in 0..send_audio { let track_id = track_id_counter.next_id(); diff --git a/src/signalling/elements/endpoints/mod.rs b/src/signalling/elements/endpoints/mod.rs index cdb9f9423..71cec84d2 100644 --- a/src/signalling/elements/endpoints/mod.rs +++ b/src/signalling/elements/endpoints/mod.rs @@ -17,6 +17,7 @@ use crate::signalling::elements::endpoints::webrtc::{ /// /// [Medea]: https://github.com/instrumentisto/medea #[enum_delegate(pub fn is_force_relayed(&self) -> bool)] +#[enum_delegate(pub fn has_traffic_callback(&self) -> bool)] #[derive(Clone, Debug, From)] pub enum Endpoint { WebRtcPublishEndpoint(webrtc::WebRtcPublishEndpoint), @@ -24,16 +25,6 @@ pub enum Endpoint { } impl Endpoint { - /// Returns `true` if `on_start` or `on_stop` callback is set. - #[allow(clippy::unused_self)] - #[inline] - pub fn has_traffic_callback(&self) -> bool { - // TODO: Delegate this call to - // `WebRtcPublishEndpoint`/`WebRtcPlayEndpoint`. - - false - } - /// Returns [`Weak`] reference to this [`Endpoint`]. pub fn downgrade(&self) -> WeakEndpoint { match self { diff --git a/src/signalling/elements/endpoints/webrtc/play_endpoint.rs b/src/signalling/elements/endpoints/webrtc/play_endpoint.rs index 365a6f9ee..275d3f4bc 100644 --- a/src/signalling/elements/endpoints/webrtc/play_endpoint.rs +++ b/src/signalling/elements/endpoints/webrtc/play_endpoint.rs @@ -168,6 +168,15 @@ impl WebRtcPlayEndpoint { self.0.borrow().is_force_relayed } + /// Returns `true` if `on_start` or `on_stop` callback is set. + #[allow(clippy::unused_self)] + #[inline] + pub fn has_traffic_callback(&self) -> bool { + // TODO: Must depend on on_start/on_stop endpoint callbacks, when those + // will be added (#91). + true + } + /// Downgrades [`WebRtcPlayEndpoint`] to [`WeakWebRtcPlayEndpoint`] weak /// pointer. pub fn downgrade(&self) -> WeakWebRtcPlayEndpoint { diff --git a/src/signalling/elements/endpoints/webrtc/publish_endpoint.rs b/src/signalling/elements/endpoints/webrtc/publish_endpoint.rs index 935e17975..c6f087cf5 100644 --- a/src/signalling/elements/endpoints/webrtc/publish_endpoint.rs +++ b/src/signalling/elements/endpoints/webrtc/publish_endpoint.rs @@ -213,6 +213,15 @@ impl WebRtcPublishEndpoint { self.0.borrow().is_force_relayed } + /// Returns `true` if `on_start` or `on_stop` callback is set. + #[allow(clippy::unused_self)] + #[inline] + pub fn has_traffic_callback(&self) -> bool { + // TODO: Must depend on on_start/on_stop endpoint callbacks, when those + // will be added (#91). + true + } + /// Returns [`AudioSettings`] of this [`WebRtcPublishEndpoint`]. pub fn audio_settings(&self) -> AudioSettings { self.0.borrow().audio_settings diff --git a/src/signalling/peers/metrics.rs b/src/signalling/peers/metrics.rs index e676a89ef..129e45abb 100644 --- a/src/signalling/peers/metrics.rs +++ b/src/signalling/peers/metrics.rs @@ -104,7 +104,7 @@ impl PeersMetricsService { /// /// Sends [`PeersMetricsEvent::NoTrafficFlow`] message if it determines that /// some track is not flowing. - pub fn check_peers(&mut self) { + pub fn check_peers(&self) { for peer in self .peers .values() @@ -202,7 +202,7 @@ impl PeersMetricsService { /// [`PeersMetricsEvent::WrongTrafficFlowing`] or [`PeersMetricsEvent:: /// TrackTrafficStarted`] to the [`Room`] if some /// [`MediaType`]/[`Direction`] was stopped. - pub fn add_stats(&mut self, peer_id: PeerId, stats: Vec) { + pub fn add_stats(&self, peer_id: PeerId, stats: Vec) { if let Some(peer) = self.peers.get(&peer_id) { let mut peer_ref = peer.borrow_mut(); @@ -409,7 +409,7 @@ pub enum PeersMetricsEvent { /// /// This spec is compared with [`Peer`]s actual stats, to calculate difference /// between expected and actual [`Peer`] state. -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] struct PeerTracks { /// Count of the [`MediaTrack`]s with the [`Direction::Publish`] and /// [`MediaType::Audio`]. @@ -860,6 +860,31 @@ mod tests { use super::PeersMetricsService; + impl PeersMetricsService { + /// Returns `true` if [`Peer`] with a provided [`PeerId`] isn't + /// registered in the [`PeersMetricsService`]. + #[inline] + pub fn is_peer_registered(&self, peer_id: PeerId) -> bool { + self.peers.contains_key(&peer_id) + } + + /// Returns count of the [`MediaTrack`] which are registered in the + /// [`PeersMetricsService`]. + pub fn peer_tracks_count(&self, peer_id: PeerId) -> usize { + if let Some(peer) = self.peers.get(&peer_id) { + let peer_tracks = peer.borrow().tracks_spec; + let mut tracks_count = 0; + tracks_count += peer_tracks.audio_recv; + tracks_count += peer_tracks.video_recv; + tracks_count += peer_tracks.audio_send; + tracks_count += peer_tracks.video_send; + tracks_count + } else { + 0 + } + } + } + /// Returns [`RtcOutboundRtpStreamStats`] with a provided number of /// `packets_sent` and [`RtcOutboundRtpStreamMediaType`] based on /// `is_audio`. @@ -1009,7 +1034,7 @@ mod tests { /// Generates [`RtcStats`] and adds them to inner /// [`PeersMetricsService`] for `PeerId(1)`. pub fn add_stats( - &mut self, + &self, send_audio: u32, send_video: u32, recv_audio: u32, @@ -1112,7 +1137,7 @@ mod tests { } /// Calls [`PeerMetricsService::check_peers`]. - pub fn check_peers(&mut self) { + pub fn check_peers(&self) { self.metrics.check_peers(); } diff --git a/src/signalling/peers/mod.rs b/src/signalling/peers/mod.rs index 16863dc78..3f180672e 100644 --- a/src/signalling/peers/mod.rs +++ b/src/signalling/peers/mod.rs @@ -5,28 +5,29 @@ mod metrics; mod traffic_watcher; use std::{ + cell::{Cell, RefCell}, collections::HashMap, convert::{TryFrom, TryInto}, + rc::Rc, sync::Arc, time::Duration, }; -use actix::{fut::wrap_future, ActorFuture, WrapFuture as _}; use derive_more::Display; +use futures::future; use medea_client_api_proto::{Incrementable, PeerId, TrackId}; use crate::{ api::control::{MemberId, RoomId}, conf, log::prelude::*, - media::{IceUser, New, Peer, PeerStateMachine}, + media::{New, Peer, PeerStateMachine}, signalling::{ elements::endpoints::{ webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint}, Endpoint, }, - room::{ActFuture, RoomError}, - Room, + room::RoomError, }, turn::{TurnAuthService, UnreachablePolicy}, }; @@ -54,7 +55,7 @@ pub struct PeersService { /// /// [`Member`]: crate::signalling::elements::member::Member /// [`Room`]: crate::signalling::Room - peers: HashMap, + peers: PeerRepository, /// Count of [`Peer`]s in this [`Room`]. /// @@ -71,7 +72,7 @@ pub struct PeersService { peers_traffic_watcher: Arc, /// Service which responsible for this [`Room`]'s [`RtcStat`]s processing. - peer_metrics_service: PeersMetricsService, + peer_metrics_service: RefCell, /// Duration, after which [`Peer`]s stats will be considered as stale. /// Passed to [`PeersMetricsService`] when registering new [`Peer`]s. @@ -79,20 +80,30 @@ pub struct PeersService { } /// Simple ID counter. -#[derive(Default, Debug, Clone, Copy, Display)] -pub struct Counter { - count: T, +#[derive(Default, Debug, Clone, Display)] +pub struct Counter { + count: Cell, } impl Counter { /// Returns id and increase counter. - pub fn next_id(&mut self) -> T { - let id = self.count; - self.count = self.count.incr(); + pub fn next_id(&self) -> T { + let id = self.count.get(); + self.count.set(id.incr()); id } } +/// Result of the [`PeersService::get_or_create_peers`] function. +#[derive(Debug, Clone, Copy)] +enum GetOrCreatePeersResult { + /// Requested [`Peer`] pair was created. + Created(PeerId, PeerId), + + /// Requested [`Peer`] pair already existed. + AlreadyExisted(PeerId, PeerId), +} + impl PeersService { /// Returns new [`PeerRepository`] for a [`Room`] with the provided /// [`RoomId`]. @@ -101,58 +112,44 @@ impl PeersService { turn_service: Arc, peers_traffic_watcher: Arc, media_conf: &conf::Media, - ) -> Self { - Self { + ) -> Rc { + Rc::new(Self { room_id: room_id.clone(), turn_service, - peers: HashMap::new(), + peers: PeerRepository::new(), peers_count: Counter::default(), tracks_count: Counter::default(), peers_traffic_watcher: peers_traffic_watcher.clone(), - peer_metrics_service: PeersMetricsService::new( + peer_metrics_service: RefCell::new(PeersMetricsService::new( room_id, peers_traffic_watcher, - ), + )), peer_stats_ttl: media_conf.max_lag, - } + }) } /// Store [`Peer`] in [`Room`]. /// /// [`Room`]: crate::signalling::Room - pub fn add_peer>(&mut self, peer: S) { - let peer = peer.into(); - self.peers.insert(peer.id(), peer); + #[inline] + pub fn add_peer>(&self, peer: S) { + self.peers.add_peer(peer) } - /// Returns borrowed [`PeerStateMachine`] by its ID. + /// Applies a function to the [`PeerStateMachine`] reference with provided + /// [`PeerId`] (if any found). /// /// # Errors /// /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't /// exist in [`PeerRepository`]. - pub fn get_peer_by_id( + #[inline] + pub fn map_peer_by_id( &self, peer_id: PeerId, - ) -> Result<&PeerStateMachine, RoomError> { - self.peers - .get(&peer_id) - .ok_or_else(|| RoomError::PeerNotFound(peer_id)) - } - - /// Returns mutably borrowed [`PeerStateMachine`] by its ID. - /// - /// # Errors - /// - /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't - /// exist in [`PeerRepository`]. - pub fn get_mut_peer_by_id( - &mut self, - peer_id: PeerId, - ) -> Result<&mut PeerStateMachine, RoomError> { - self.peers - .get_mut(&peer_id) - .ok_or_else(|| RoomError::PeerNotFound(peer_id)) + f: impl FnOnce(&PeerStateMachine) -> T, + ) -> Result { + self.peers.map_peer_by_id(peer_id, f) } /// Creates interconnected [`Peer`]s for provided endpoints and saves them @@ -160,7 +157,7 @@ impl PeersService { /// /// Returns [`PeerId`]s of the created [`Peer`]s. fn create_peers( - &mut self, + &self, src: &WebRtcPublishEndpoint, sink: &WebRtcPlayEndpoint, ) -> (PeerId, PeerId) { @@ -193,7 +190,17 @@ impl PeersService { ); sink_peer.add_endpoint(&sink.clone().into()); - src_peer.add_publisher(&src, &mut sink_peer, self.get_tracks_counter()); + src_peer.add_publisher(&src, &mut sink_peer, &self.tracks_count); + + let src_peer = PeerStateMachine::from(src_peer); + let sink_peer = PeerStateMachine::from(sink_peer); + + self.peer_metrics_service + .borrow_mut() + .register_peer(&src_peer, self.peer_stats_ttl); + self.peer_metrics_service + .borrow_mut() + .register_peer(&sink_peer, self.peer_stats_ttl); self.add_peer(src_peer); self.add_peer(sink_peer); @@ -201,62 +208,19 @@ impl PeersService { (src_peer_id, sink_peer_id) } - /// Returns mutable reference to track counter. - pub fn get_tracks_counter(&mut self) -> &mut Counter { - &mut self.tracks_count - } - /// Lookups [`Peer`] of [`Member`] with ID `member_id` which /// connected with `partner_member_id`. /// /// Returns `Some(peer_id, partner_peer_id)` if [`Peer`] has been found, /// otherwise returns `None`. - pub fn get_peer_by_members_ids( + #[inline] + pub fn get_peers_between_members( &self, member_id: &MemberId, partner_member_id: &MemberId, ) -> Option<(PeerId, PeerId)> { - for peer in self.peers.values() { - if &peer.member_id() == member_id - && &peer.partner_member_id() == partner_member_id - { - return Some((peer.id(), peer.partner_peer_id())); - } - } - - None - } - - /// Returns borrowed [`Peer`] by its ID. - /// - /// # Errors - /// - /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't - /// exist in [`PeerRepository`]. - pub fn get_inner_peer_by_id<'a, S>( - &'a self, - peer_id: PeerId, - ) -> Result<&'a Peer, RoomError> - where - &'a Peer: std::convert::TryFrom<&'a PeerStateMachine>, - <&'a Peer as TryFrom<&'a PeerStateMachine>>::Error: Into, - { - match self.peers.get(&peer_id) { - Some(peer) => peer.try_into().map_err(Into::into), - None => Err(RoomError::PeerNotFound(peer_id)), - } - } - - /// Returns all [`Peer`]s of specified [`Member`]. - /// - /// [`Member`]: crate::signalling::elements::member::Member - pub fn get_peers_by_member_id<'a>( - &'a self, - member_id: &'a MemberId, - ) -> impl Iterator { self.peers - .values() - .filter(move |peer| &peer.member_id() == member_id) + .get_peers_between_members(member_id, partner_member_id) } /// Returns owned [`Peer`] by its ID. @@ -266,17 +230,14 @@ impl PeersService { /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't /// exist in [`PeerRepository`]. pub fn take_inner_peer( - &mut self, + &self, peer_id: PeerId, ) -> Result, RoomError> where Peer: TryFrom, as TryFrom>::Error: Into, { - match self.peers.remove(&peer_id) { - Some(peer) => peer.try_into().map_err(Into::into), - None => Err(RoomError::PeerNotFound(peer_id)), - } + self.peers.take_inner_peer(peer_id) } /// Deletes [`PeerStateMachine`]s from this [`PeerRepository`] and send @@ -286,17 +247,16 @@ impl PeersService { /// /// [`Event::PeersRemoved`]: medea_client_api_proto::Event::PeersRemoved pub fn remove_peers<'a, Peers: IntoIterator>( - &mut self, + &self, member_id: &MemberId, peer_ids: Peers, ) -> HashMap> { let mut removed_peers = HashMap::new(); for peer_id in peer_ids { - if let Some(peer) = self.peers.remove(&peer_id) { + if let Some(peer) = self.peers.remove(*peer_id) { let partner_peer_id = peer.partner_peer_id(); let partner_member_id = peer.partner_member_id(); - if let Some(partner_peer) = self.peers.remove(&partner_peer_id) - { + if let Some(partner_peer) = self.peers.remove(partner_peer_id) { removed_peers .entry(partner_member_id) .or_insert_with(Vec::new) @@ -314,6 +274,7 @@ impl PeersService { .flat_map(|peer| peer.iter().map(PeerStateMachine::id)) .collect(); self.peer_metrics_service + .borrow_mut() .unregister_peers(&peers_to_unregister); self.peers_traffic_watcher .unregister_peers(self.room_id.clone(), peers_to_unregister); @@ -321,6 +282,35 @@ impl PeersService { removed_peers } + /// Returns already created [`Peer`] pair's [`PeerId`]s as + /// [`CreatedOrGottenPeer::Gotten`] variant. + /// + /// Returns newly created [`Peer`] pair's [`PeerId`]s as + /// [`CreatedOrGottenPeer::Created`] variant. + async fn get_or_create_peers( + &self, + src: &WebRtcPublishEndpoint, + sink: &WebRtcPlayEndpoint, + ) -> Result { + if let Some((first_peer_id, second_peer_id)) = self + .get_peers_between_members(&src.owner().id(), &sink.owner().id()) + { + Ok(GetOrCreatePeersResult::AlreadyExisted( + first_peer_id, + second_peer_id, + )) + } else { + let (src_peer_id, sink_peer_id) = self.create_peers(&src, &sink); + + self.peer_post_construct(src_peer_id, &src.clone().into()) + .await?; + self.peer_post_construct(sink_peer_id, &sink.clone().into()) + .await?; + + Ok(GetOrCreatePeersResult::Created(src_peer_id, sink_peer_id)) + } + } + /// Creates [`Peer`] for endpoints if [`Peer`] between endpoint's members /// doesn't exist. /// @@ -337,126 +327,118 @@ impl PeersService { /// # Panics /// /// Panics if provided endpoints already have interconnected [`Peer`]s. - pub fn connect_endpoints( - &mut self, + pub async fn connect_endpoints( + self: Rc, src: WebRtcPublishEndpoint, sink: WebRtcPlayEndpoint, - ) -> ActFuture, RoomError>> { + ) -> Result, RoomError> { debug!( "Connecting endpoints of Member [id = {}] with Member [id = {}]", src.owner().id(), sink.owner().id(), ); - let src_owner = src.owner(); - let sink_owner = sink.owner(); + match self.get_or_create_peers(&src, &sink).await? { + GetOrCreatePeersResult::Created(src_peer_id, sink_peer_id) => { + Ok(Some((src_peer_id, sink_peer_id))) + } + GetOrCreatePeersResult::AlreadyExisted( + src_peer_id, + sink_peer_id, + ) => { + if sink.peer_id().is_some() + || src.peer_ids().contains(&src_peer_id) + { + // already connected, so no-op + Ok(None) + } else { + // TODO: here we assume that peers are stable, + // which might not be the case, e.g. Control + // Service creates multiple endpoints in quick + // succession. + let mut src_peer: Peer = + self.peers.take_inner_peer(src_peer_id).unwrap(); + let mut sink_peer: Peer = + self.peers.take_inner_peer(sink_peer_id).unwrap(); + + src_peer.add_publisher( + &src, + &mut sink_peer, + &self.tracks_count, + ); + + let mut register_peer_tasks = Vec::new(); + if src.has_traffic_callback() { + register_peer_tasks.push( + self.peers_traffic_watcher.register_peer( + self.room_id.clone(), + src_peer_id, + src.is_force_relayed(), + ), + ); + } + if sink.has_traffic_callback() { + register_peer_tasks.push( + self.peers_traffic_watcher.register_peer( + self.room_id.clone(), + sink_peer_id, + sink.is_force_relayed(), + ), + ); + } - if let Some((src_peer_id, sink_peer_id)) = - self.get_peer_by_members_ids(&src_owner.id(), &sink_owner.id()) - { - // TODO: when dynamic patching of [`Room`] will be done then we need - // rewrite this code to updating [`Peer`]s in not - // [`Peer`] state. - // Also, don't forget to update `PeerSpec` in the - // [`PeerMetricsService`]. - let mut src_peer: Peer = - self.take_inner_peer(src_peer_id).unwrap(); - let mut sink_peer: Peer = - self.take_inner_peer(sink_peer_id).unwrap(); - - src_peer.add_publisher( - &src, - &mut sink_peer, - self.get_tracks_counter(), - ); - - sink_peer.add_endpoint(&sink.into()); - src_peer.add_endpoint(&src.into()); - - let src_peer = PeerStateMachine::from(src_peer); - let sink_peer = PeerStateMachine::from(sink_peer); + sink_peer.add_endpoint(&sink.into()); + src_peer.add_endpoint(&src.into()); - self.peer_metrics_service - .register_peer(&src_peer, self.peer_stats_ttl); - self.peer_metrics_service - .register_peer(&sink_peer, self.peer_stats_ttl); + let src_peer = PeerStateMachine::from(src_peer); + let sink_peer = PeerStateMachine::from(sink_peer); - self.add_peer(src_peer); - self.add_peer(sink_peer); + self.peer_metrics_service + .borrow_mut() + .update_peer_tracks(&src_peer); + self.peer_metrics_service + .borrow_mut() + .update_peer_tracks(&sink_peer); - Box::new(actix::fut::ok(None)) - } else { - let (src_peer_id, sink_peer_id) = self.create_peers(&src, &sink); + self.peers.add_peer(src_peer); + self.peers.add_peer(sink_peer); - Box::new(self.peer_post_construct(src_peer_id, src.into()).then( - move |res, room, _| { - match res { - Ok(_) => Box::new( - room.peers - .peer_post_construct(sink_peer_id, sink.into()) - .map(move |res, _, _| { - res.map(|_| { - Some((src_peer_id, sink_peer_id)) - }) - }), - ), - Err(err) => { - Box::new(actix::fut::err(err)) as ActFuture<_> - } - } - }, - )) + future::try_join_all(register_peer_tasks) + .await + .map_err(RoomError::PeerTrafficWatcherMailbox)?; + + Ok(None) + } + } } } /// Creates and sets [`IceUser`], registers [`Peer`] in /// [`PeerTrafficWatcher`]. - fn peer_post_construct( + async fn peer_post_construct( &self, peer_id: PeerId, - endpoint: Endpoint, - ) -> ActFuture> { - let room_id = self.room_id.clone(); - let turn_service = self.turn_service.clone(); - Box::new( - wrap_future(async move { - Ok(turn_service - .create(room_id, peer_id, UnreachablePolicy::ReturnErr) - .await?) - }) - .map(move |res: Result, room: &mut Room, _| { - res.map(|ice_user| { - if let Ok(peer) = room.peers.get_mut_peer_by_id(peer_id) { - peer.set_ice_user(ice_user) - } - }) - }) - .then(move |res, room: &mut Room, _| { - let room_id = room.id().clone(); - let traffic_watcher = room.peers.peers_traffic_watcher.clone(); - async move { - match res { - Ok(_) => { - if endpoint.has_traffic_callback() { - traffic_watcher - .register_peer( - room_id, - peer_id, - endpoint.is_force_relayed(), - ) - .await - .map_err( - RoomError::PeerTrafficWatcherMailbox, - ) - } else { - Ok(()) - } - } - Err(err) => Err(err), - } - } - .into_actor(room) - }), - ) + endpoint: &Endpoint, + ) -> Result<(), RoomError> { + let ice_user = self + .turn_service + .create(self.room_id.clone(), peer_id, UnreachablePolicy::ReturnErr) + .await?; + + self.peers + .map_peer_by_id_mut(peer_id, move |p| p.set_ice_user(ice_user))?; + + if endpoint.has_traffic_callback() { + self.peers_traffic_watcher + .register_peer( + self.room_id.clone(), + peer_id, + endpoint.is_force_relayed(), + ) + .await + .map_err(RoomError::PeerTrafficWatcherMailbox) + } else { + Ok(()) + } } /// Removes all [`Peer`]s related to given [`Member`]. @@ -466,51 +448,404 @@ impl PeersService { /// key - [`Peer`]'s owner [`MemberId`], /// value - removed [`Peer`]'s [`PeerId`]. // TODO: remove in #91. + #[inline] pub fn remove_peers_related_to_member( - &mut self, + &self, + member_id: &MemberId, + ) -> HashMap> { + self.peers.remove_peers_related_to_member(member_id) + } + + /// Updates [`PeerTracks`] of the [`Peer`] with provided [`PeerId`] in the + /// [`PeerMetricsService`]. + /// + /// # Errors + /// + /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't + /// exist in [`PeerRepository`]. + pub fn sync_peer_spec(&self, peer_id: PeerId) -> Result<(), RoomError> { + self.peers.map_peer_by_id(peer_id, |peer| { + self.peer_metrics_service + .borrow_mut() + .update_peer_tracks(&peer); + })?; + Ok(()) + } +} + +/// Repository which stores all [`PeerStateMachine`]s of the [`PeersService`]. +#[derive(Debug)] +struct PeerRepository(RefCell>); + +impl PeerRepository { + /// Returns empty [`PeerRepository`]. + pub fn new() -> Self { + Self(RefCell::new(HashMap::new())) + } + + /// Applies a function to the [`PeerStateMachine`] reference with provided + /// [`PeerId`] (if any found). + /// + /// # Errors + /// + /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't + /// exist in [`PeerRepository`]. + pub fn map_peer_by_id( + &self, + peer_id: PeerId, + f: impl FnOnce(&PeerStateMachine) -> T, + ) -> Result { + Ok(f(self + .0 + .borrow() + .get(&peer_id) + .ok_or_else(|| RoomError::PeerNotFound(peer_id))?)) + } + + /// Applies a function to the mutable [`PeerStateMachine`] reference with + /// provided [`PeerId`] (if any found). + /// + /// # Errors + /// + /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't + /// exist in [`PeerRepository`]. + pub fn map_peer_by_id_mut( + &self, + peer_id: PeerId, + f: impl FnOnce(&mut PeerStateMachine) -> T, + ) -> Result { + Ok(f(self + .0 + .borrow_mut() + .get_mut(&peer_id) + .ok_or_else(|| RoomError::PeerNotFound(peer_id))?)) + } + + /// Removes [`PeerStateMachine`] with a provided [`PeerId`]. + /// + /// Returns removed [`PeerStateMachine`] if it existed. + pub fn remove(&self, peer_id: PeerId) -> Option { + self.0.borrow_mut().remove(&peer_id) + } + + /// Removes [`PeerStateMachine`] with a provided [`PeerId`] and returns + /// removed [`PeerStateMachine`] if it existed. + /// + /// # Errors + /// + /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't + /// exist in [`PeerRepository`]. + pub fn take(&self, peer_id: PeerId) -> Result { + self.remove(peer_id) + .ok_or_else(|| RoomError::PeerNotFound(peer_id)) + } + + /// Returns owned [`Peer`] by its ID. + /// + /// # Errors + /// + /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't + /// exist in [`PeerRepository`]. + /// + /// Errors with [`RoomError::PeerError`] if [`Peer`] is found, but not in + /// requested state. + pub fn take_inner_peer( + &self, + peer_id: PeerId, + ) -> Result, RoomError> + where + Peer: TryFrom, + as TryFrom>::Error: Into, + { + self.take(peer_id)?.try_into().map_err(Into::into) + } + + /// Stores [`Peer`] in [`Room`]. + /// + /// [`Room`]: crate::signalling::Room + pub fn add_peer>(&self, peer: S) { + let peer = peer.into(); + self.0.borrow_mut().insert(peer.id(), peer); + } + + /// Lookups [`Peer`] of [`Member`] with ID `member_id` which + /// connected with `partner_member_id`. + /// + /// Returns `Some(peer_id, partner_peer_id)` if [`Peer`] has been found, + /// otherwise returns `None`. + pub fn get_peers_between_members( + &self, + member_id: &MemberId, + partner_member_id: &MemberId, + ) -> Option<(PeerId, PeerId)> { + for peer in self.0.borrow().values() { + if &peer.member_id() == member_id + && &peer.partner_member_id() == partner_member_id + { + return Some((peer.id(), peer.partner_peer_id())); + } + } + + None + } + + /// Removes all [`Peer`]s related to given [`Member`]. + /// Note, that this function will also remove all partners [`Peer`]s. + /// + /// Returns [`HashMap`] with all removed [`Peer`]s: + /// key - [`Peer`]'s owner [`MemberId`], + /// value - removed [`Peer`]'s [`PeerId`]. + // TODO: remove in #91. + pub fn remove_peers_related_to_member( + &self, member_id: &MemberId, ) -> HashMap> { let mut peers_to_remove: HashMap> = HashMap::new(); - self.get_peers_by_member_id(member_id).for_each(|peer| { - self.get_peers_by_member_id(&peer.partner_member_id()) - .filter(|partner_peer| { - &partner_peer.partner_member_id() == member_id - }) - .for_each(|partner_peer| { - peers_to_remove - .entry(partner_peer.member_id()) - .or_insert_with(Vec::new) - .push(partner_peer.id()); - }); - - peers_to_remove - .entry(peer.member_id()) - .or_insert_with(Vec::new) - .push(peer.id()); - }); + self.0 + .borrow() + .values() + .filter(|p| &p.member_id() == member_id) + .for_each(|peer| { + self.0 + .borrow() + .values() + .filter(|p| p.member_id() == peer.partner_member_id()) + .filter(|partner_peer| { + &partner_peer.partner_member_id() == member_id + }) + .for_each(|partner_peer| { + peers_to_remove + .entry(partner_peer.member_id()) + .or_insert_with(Vec::new) + .push(partner_peer.id()); + }); + + peers_to_remove + .entry(peer.member_id()) + .or_insert_with(Vec::new) + .push(peer.id()); + }); peers_to_remove .values() .flat_map(|peer_ids| peer_ids.iter()) .for_each(|id| { - self.peers.remove(id); + self.0.borrow_mut().remove(id); }); peers_to_remove } +} - /// Updates [`PeerTracks`] of the [`Peer`] with provided [`PeerId`] in the - /// [`PeerMetricsService`]. - /// - /// # Errors - /// - /// Errors with [`RoomError::PeerNotFound`] if requested [`PeerId`] doesn't - /// exist in [`PeerRepository`]. - pub fn sync_peer_spec(&mut self, peer_id: PeerId) -> Result<(), RoomError> { - let peer = self.get_peer_by_id(peer_id)?; - self.peer_metrics_service.update_peer_tracks(&peer); - Ok(()) +#[cfg(test)] +mod tests { + use futures::{channel::mpsc, future, StreamExt as _}; + use tokio::time::timeout; + + use crate::{ + api::control::{ + endpoints::webrtc_publish_endpoint::{ + AudioSettings, P2pMode, VideoSettings, + }, + refs::SrcUri, + }, + signalling::{ + elements::Member, peers::traffic_watcher::MockPeerTrafficWatcher, + }, + turn::service::test::new_turn_auth_service_mock, + }; + + use super::*; + + /// Checks that newly created [`Peer`] will be created in the + /// [`PeerMetricsService`] and [`PeerTrafficWatcher`]. + #[actix_rt::test] + async fn peer_is_registered_in_metrics_service() { + let mut mock = MockPeerTrafficWatcher::new(); + mock.expect_register_room() + .returning(|_, _| Box::pin(future::ok(()))); + mock.expect_unregister_room().returning(|_| {}); + let (register_peer_tx, mut register_peer_rx) = mpsc::unbounded(); + let register_peer_done = + timeout(Duration::from_secs(1), register_peer_rx.next()); + mock.expect_register_peer().returning(move |_, _, _| { + register_peer_tx.unbounded_send(()).unwrap(); + Box::pin(future::ok(())) + }); + mock.expect_traffic_flows().returning(|_, _, _| {}); + mock.expect_traffic_stopped().returning(|_, _, _| {}); + + let peers_service = PeersService::new( + "test".into(), + new_turn_auth_service_mock(), + Arc::new(mock), + &conf::Media::default(), + ); + + let publisher = Member::new( + "publisher".into(), + "test".to_string(), + "test".into(), + Duration::from_secs(10), + Duration::from_secs(10), + Duration::from_secs(5), + ); + let receiver = Member::new( + "receiver".into(), + "test".to_string(), + "test".into(), + Duration::from_secs(10), + Duration::from_secs(10), + Duration::from_secs(5), + ); + let publish = WebRtcPublishEndpoint::new( + "publish".to_string().into(), + P2pMode::Always, + publisher.downgrade(), + false, + AudioSettings::default(), + VideoSettings::default(), + ); + let play = WebRtcPlayEndpoint::new( + "play-publisher".to_string().into(), + SrcUri::try_from("local://test/publisher/publish".to_string()) + .unwrap(), + publish.downgrade(), + receiver.downgrade(), + false, + ); + + peers_service + .clone() + .connect_endpoints(publish, play) + .await + .unwrap(); + + register_peer_done.await.unwrap().unwrap(); + + assert!(peers_service + .peer_metrics_service + .borrow() + .is_peer_registered(PeerId(0))); + assert!(peers_service + .peer_metrics_service + .borrow() + .is_peer_registered(PeerId(1))); + } + + /// Check that when new `Endpoint`s added to the [`PeerService`], tracks + /// count will be updated in the [`PeerMetricsService`]. + #[actix_rt::test] + async fn adding_new_endpoint_updates_peer_metrics() { + let mut mock = MockPeerTrafficWatcher::new(); + mock.expect_register_room() + .returning(|_, _| Box::pin(future::ok(()))); + mock.expect_unregister_room().returning(|_| {}); + let (register_peer_tx, register_peer_rx) = mpsc::unbounded(); + let register_peer_done = timeout( + Duration::from_secs(1), + register_peer_rx.take(4).collect::>(), + ); + mock.expect_register_peer().returning(move |_, _, _| { + register_peer_tx.unbounded_send(()).unwrap(); + Box::pin(future::ok(())) + }); + mock.expect_traffic_flows().returning(|_, _, _| {}); + mock.expect_traffic_stopped().returning(|_, _, _| {}); + + let peers_service = PeersService::new( + "test".into(), + new_turn_auth_service_mock(), + Arc::new(mock), + &conf::Media::default(), + ); + + let publisher = Member::new( + "publisher".into(), + "test".to_string(), + "test".into(), + Duration::from_secs(10), + Duration::from_secs(10), + Duration::from_secs(5), + ); + let receiver = Member::new( + "receiver".into(), + "test".to_string(), + "test".into(), + Duration::from_secs(10), + Duration::from_secs(10), + Duration::from_secs(5), + ); + let publish = WebRtcPublishEndpoint::new( + "publish".to_string().into(), + P2pMode::Always, + publisher.downgrade(), + false, + AudioSettings::default(), + VideoSettings::default(), + ); + let play = WebRtcPlayEndpoint::new( + "play-publisher".to_string().into(), + SrcUri::try_from("local://test/publisher/publish".to_string()) + .unwrap(), + publish.downgrade(), + receiver.downgrade(), + false, + ); + + peers_service + .clone() + .connect_endpoints(publish, play) + .await + .unwrap(); + + let first_peer_tracks_count = peers_service + .peer_metrics_service + .borrow() + .peer_tracks_count(PeerId(0)); + assert_eq!(first_peer_tracks_count, 2); + let second_peer_tracks_count = peers_service + .peer_metrics_service + .borrow() + .peer_tracks_count(PeerId(1)); + assert_eq!(second_peer_tracks_count, 2); + + let publish = WebRtcPublishEndpoint::new( + "publish".to_string().into(), + P2pMode::Always, + receiver.downgrade(), + false, + AudioSettings::default(), + VideoSettings::default(), + ); + let play = WebRtcPlayEndpoint::new( + "play-publisher".to_string().into(), + SrcUri::try_from("local://test/publisher/publish".to_string()) + .unwrap(), + publish.downgrade(), + publisher.downgrade(), + false, + ); + + peers_service + .clone() + .connect_endpoints(publish, play) + .await + .unwrap(); + + let first_peer_tracks_count = peers_service + .peer_metrics_service + .borrow() + .peer_tracks_count(PeerId(0)); + assert_eq!(first_peer_tracks_count, 4); + let second_peer_tracks_count = peers_service + .peer_metrics_service + .borrow() + .peer_tracks_count(PeerId(1)); + assert_eq!(second_peer_tracks_count, 4); + + register_peer_done.await.unwrap(); } } diff --git a/src/signalling/room/command_handler.rs b/src/signalling/room/command_handler.rs index 6d6c9a6d9..ee9fa8335 100644 --- a/src/signalling/room/command_handler.rs +++ b/src/signalling/room/command_handler.rs @@ -12,8 +12,8 @@ use medea_client_api_proto::{ use crate::{ log::prelude::*, media::{ - New, Peer, PeerError, PeerStateMachine, WaitLocalHaveRemote, - WaitLocalSdp, WaitRemoteSdp, + New, Peer, PeerStateMachine, WaitLocalHaveRemote, WaitLocalSdp, + WaitRemoteSdp, }, }; @@ -123,28 +123,12 @@ impl CommandHandler for Room { return Ok(Box::new(actix::fut::ok(()))); } - let from_peer = self.peers.get_peer_by_id(from_peer_id)?; - if let PeerStateMachine::New(_) = from_peer { - return Err(PeerError::WrongState( - from_peer_id, - "Not New", - format!("{}", from_peer), - ) - .into()); - } - - let to_peer_id = from_peer.partner_peer_id(); - let to_peer = self.peers.get_peer_by_id(to_peer_id)?; - if let PeerStateMachine::New(_) = to_peer { - return Err(PeerError::WrongState( - to_peer_id, - "Not New", - format!("{}", to_peer), - ) - .into()); - } - - let to_member_id = to_peer.member_id(); + let to_peer_id = self + .peers + .map_peer_by_id(from_peer_id, PeerStateMachine::partner_peer_id)?; + let to_member_id = self + .peers + .map_peer_by_id(to_peer_id, PeerStateMachine::member_id)?; let event = Event::IceCandidateDiscovered { peer_id: to_peer_id, candidate, @@ -175,8 +159,10 @@ impl CommandHandler for Room { peer_id: PeerId, tracks_patches: Vec, ) -> Self::Output { - if let Ok(p) = self.peers.get_peer_by_id(peer_id) { - let member_id = p.member_id(); + if let Ok(member_id) = self + .peers + .map_peer_by_id(peer_id, PeerStateMachine::member_id) + { Ok(Box::new( self.members .send_event_to_member( diff --git a/src/signalling/room/mod.rs b/src/signalling/room/mod.rs index db6120b8f..30d49a81f 100644 --- a/src/signalling/room/mod.rs +++ b/src/signalling/room/mod.rs @@ -6,7 +6,7 @@ mod dynamic_api; mod peer_events_handler; mod rpc_server; -use std::sync::Arc; +use std::{rc::Rc, sync::Arc}; use actix::{ Actor, ActorFuture, Context, ContextFutureSpawner as _, Handler, @@ -132,10 +132,10 @@ pub struct Room { /// [`RpcConnection`] authorization, establishment, message sending. /// /// [`RpcConnection`]: crate::api::client::rpc_connection::RpcConnection - pub members: ParticipantService, + members: ParticipantService, /// [`Peer`]s of [`Member`]s in this [`Room`]. - pub peers: PeersService, + peers: Rc, /// Current state of this [`Room`]. state: State, @@ -261,6 +261,7 @@ impl Room { { connect_endpoints_tasks.push( self.peers + .clone() .connect_endpoints(publisher.clone(), receiver), ); } @@ -275,6 +276,7 @@ impl Room { { connect_endpoints_tasks.push( self.peers + .clone() .connect_endpoints(publisher.clone(), receiver.clone()), ) } @@ -282,6 +284,7 @@ impl Room { for connect_endpoints_task in connect_endpoints_tasks { connect_endpoints_task + .into_actor(self) .then(|result, this, _| match result { Ok(Some((peer1, peer2))) => { match this.send_peer_created(peer1, peer2) { diff --git a/src/signalling/room/rpc_server.rs b/src/signalling/room/rpc_server.rs index 0fc9c5e67..73dfd4ab8 100644 --- a/src/signalling/room/rpc_server.rs +++ b/src/signalling/room/rpc_server.rs @@ -20,6 +20,7 @@ use crate::{ RpcServer, }, log::prelude::*, + media::PeerStateMachine, utils::ResponseActAnyFuture, }; @@ -65,12 +66,12 @@ impl Room { | C::UpdateTracks { peer_id, .. } => peer_id, }; - let peer = self + let peer_member_id = self .peers - .get_peer_by_id(peer_id) + .map_peer_by_id(peer_id, PeerStateMachine::member_id) .map_err(|_| PeerNotFound(peer_id))?; - if peer.member_id() != command.member_id { - return Err(PeerBelongsToAnotherMember(peer_id, peer.member_id())); + if peer_member_id != command.member_id { + return Err(PeerBelongsToAnotherMember(peer_id, peer_member_id)); } Ok(()) }