Skip to content
This repository has been archived by the owner on Aug 25, 2021. It is now read-only.

Commit

Permalink
Refactor PeersService with more idiomatic async/.await (#110, #27)
Browse files Browse the repository at this point in the history
- add unit tests for PeersService
- fix Peer might not be registered in PeerTrafficWatcher
  • Loading branch information
evdokimovs authored Jun 23, 2020
1 parent 7ed8f6d commit ba91090
Show file tree
Hide file tree
Showing 10 changed files with 670 additions and 311 deletions.
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/media/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl Peer<New> {
&mut self,
src: &WebRtcPublishEndpoint,
publisher_peer: &mut Peer<New>,
tracks_counter: &mut Counter<TrackId>,
tracks_counter: &Counter<TrackId>,
) {
let audio_settings = src.audio_settings();
if audio_settings.publish_policy != PublishPolicy::Disabled {
Expand Down Expand Up @@ -528,7 +528,7 @@ pub mod tests {
},
};

let mut track_id_counter = Counter::default();
let track_id_counter = Counter::default();

for _ in 0..send_audio {
let track_id = track_id_counter.next_id();
Expand Down
11 changes: 1 addition & 10 deletions src/signalling/elements/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,14 @@ use crate::signalling::elements::endpoints::webrtc::{
///
/// [Medea]: https://github.com/instrumentisto/medea
#[enum_delegate(pub fn is_force_relayed(&self) -> bool)]
#[enum_delegate(pub fn has_traffic_callback(&self) -> bool)]
#[derive(Clone, Debug, From)]
pub enum Endpoint {
WebRtcPublishEndpoint(webrtc::WebRtcPublishEndpoint),
WebRtcPlayEndpoint(webrtc::WebRtcPlayEndpoint),
}

impl Endpoint {
/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
#[inline]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Delegate this call to
// `WebRtcPublishEndpoint`/`WebRtcPlayEndpoint`.

false
}

/// Returns [`Weak`] reference to this [`Endpoint`].
pub fn downgrade(&self) -> WeakEndpoint {
match self {
Expand Down
9 changes: 9 additions & 0 deletions src/signalling/elements/endpoints/webrtc/play_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ impl WebRtcPlayEndpoint {
self.0.borrow().is_force_relayed
}

/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
#[inline]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Must depend on on_start/on_stop endpoint callbacks, when those
// will be added (#91).
true
}

/// Downgrades [`WebRtcPlayEndpoint`] to [`WeakWebRtcPlayEndpoint`] weak
/// pointer.
pub fn downgrade(&self) -> WeakWebRtcPlayEndpoint {
Expand Down
9 changes: 9 additions & 0 deletions src/signalling/elements/endpoints/webrtc/publish_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ impl WebRtcPublishEndpoint {
self.0.borrow().is_force_relayed
}

/// Returns `true` if `on_start` or `on_stop` callback is set.
#[allow(clippy::unused_self)]
#[inline]
pub fn has_traffic_callback(&self) -> bool {
// TODO: Must depend on on_start/on_stop endpoint callbacks, when those
// will be added (#91).
true
}

/// Returns [`AudioSettings`] of this [`WebRtcPublishEndpoint`].
pub fn audio_settings(&self) -> AudioSettings {
self.0.borrow().audio_settings
Expand Down
35 changes: 30 additions & 5 deletions src/signalling/peers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl PeersMetricsService {
///
/// Sends [`PeersMetricsEvent::NoTrafficFlow`] message if it determines that
/// some track is not flowing.
pub fn check_peers(&mut self) {
pub fn check_peers(&self) {
for peer in self
.peers
.values()
Expand Down Expand Up @@ -202,7 +202,7 @@ impl PeersMetricsService {
/// [`PeersMetricsEvent::WrongTrafficFlowing`] or [`PeersMetricsEvent::
/// TrackTrafficStarted`] to the [`Room`] if some
/// [`MediaType`]/[`Direction`] was stopped.
pub fn add_stats(&mut self, peer_id: PeerId, stats: Vec<RtcStat>) {
pub fn add_stats(&self, peer_id: PeerId, stats: Vec<RtcStat>) {
if let Some(peer) = self.peers.get(&peer_id) {
let mut peer_ref = peer.borrow_mut();

Expand Down Expand Up @@ -409,7 +409,7 @@ pub enum PeersMetricsEvent {
///
/// This spec is compared with [`Peer`]s actual stats, to calculate difference
/// between expected and actual [`Peer`] state.
#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
struct PeerTracks {
/// Count of the [`MediaTrack`]s with the [`Direction::Publish`] and
/// [`MediaType::Audio`].
Expand Down Expand Up @@ -860,6 +860,31 @@ mod tests {

use super::PeersMetricsService;

impl PeersMetricsService {
/// Returns `true` if [`Peer`] with a provided [`PeerId`] isn't
/// registered in the [`PeersMetricsService`].
#[inline]
pub fn is_peer_registered(&self, peer_id: PeerId) -> bool {
self.peers.contains_key(&peer_id)
}

/// Returns count of the [`MediaTrack`] which are registered in the
/// [`PeersMetricsService`].
pub fn peer_tracks_count(&self, peer_id: PeerId) -> usize {
if let Some(peer) = self.peers.get(&peer_id) {
let peer_tracks = peer.borrow().tracks_spec;
let mut tracks_count = 0;
tracks_count += peer_tracks.audio_recv;
tracks_count += peer_tracks.video_recv;
tracks_count += peer_tracks.audio_send;
tracks_count += peer_tracks.video_send;
tracks_count
} else {
0
}
}
}

/// Returns [`RtcOutboundRtpStreamStats`] with a provided number of
/// `packets_sent` and [`RtcOutboundRtpStreamMediaType`] based on
/// `is_audio`.
Expand Down Expand Up @@ -1009,7 +1034,7 @@ mod tests {
/// Generates [`RtcStats`] and adds them to inner
/// [`PeersMetricsService`] for `PeerId(1)`.
pub fn add_stats(
&mut self,
&self,
send_audio: u32,
send_video: u32,
recv_audio: u32,
Expand Down Expand Up @@ -1112,7 +1137,7 @@ mod tests {
}

/// Calls [`PeerMetricsService::check_peers`].
pub fn check_peers(&mut self) {
pub fn check_peers(&self) {
self.metrics.check_peers();
}

Expand Down
Loading

0 comments on commit ba91090

Please sign in to comment.