From 5a242a03677d1e546796d52095107a0a330ccf1e Mon Sep 17 00:00:00 2001 From: neonphog Date: Thu, 1 Feb 2024 10:10:41 -0700 Subject: [PATCH] add event dispatcher unit tests --- crates/tx5-go-pion/src/data_chan.rs | 4 +- crates/tx5-go-pion/src/evt.rs | 204 ++++++++++++++++++++++------ crates/tx5-go-pion/src/peer_con.rs | 4 +- 3 files changed, 169 insertions(+), 43 deletions(-) diff --git a/crates/tx5-go-pion/src/data_chan.rs b/crates/tx5-go-pion/src/data_chan.rs index 8e8d06b2..cb79f5b0 100644 --- a/crates/tx5-go-pion/src/data_chan.rs +++ b/crates/tx5-go-pion/src/data_chan.rs @@ -2,7 +2,7 @@ use crate::*; use std::sync::{Arc, Mutex, Weak}; use tx5_go_pion_sys::API; -struct DataChanCore { +pub(crate) struct DataChanCore { data_chan_id: usize, recv_limit: Arc, evt_send: EventSend, @@ -41,7 +41,7 @@ impl DataChanCore { #[derive(Clone)] pub(crate) struct WeakDataChan( - Weak>>, + pub(crate) Weak>>, ); macro_rules! data_chan_strong_core { diff --git a/crates/tx5-go-pion/src/evt.rs b/crates/tx5-go-pion/src/evt.rs index 0bfc38fb..ee3e9475 100644 --- a/crates/tx5-go-pion/src/evt.rs +++ b/crates/tx5-go-pion/src/evt.rs @@ -242,8 +242,8 @@ impl EvtOffload { } macro_rules! manager_access { - ($id:ident, $off:ident, $map:ident, $code:expr) => { - let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned(); + ($man:ident, $id:ident, $off:ident, $map:ident, $code:expr) => { + let $map = $man.lock().unwrap().$map.get(&$id).cloned(); if let Some($map) = $map { $off.blocking_send(async move { let start = std::time::Instant::now(); @@ -258,8 +258,12 @@ macro_rules! manager_access { let elapsed_s = start.elapsed().as_secs_f64(); tracing::error!(%elapsed_s, "EventTimeout"); $map.close(Error::id("EventTimeout").into()); + $man.lock().unwrap().$map.remove(&$id); + } + Ok(Err(err)) => { + $map.close(err.into()); + $man.lock().unwrap().$map.remove(&$id); } - Ok(Err(err)) => $map.close(err.into()), Ok(_) => (), } let elapsed_s = start.elapsed().as_secs_f64(); @@ -281,7 +285,7 @@ static MANAGER: Lazy> = Lazy::new(|| { peer_con_id, candidate, } => { - manager_access!(peer_con_id, offload, peer_con, { + manager_access!(MANAGER, peer_con_id, offload, peer_con, { peer_con.send_evt(PeerConnectionEvent::ICECandidate(GoBuf( candidate, ))) @@ -291,7 +295,7 @@ static MANAGER: Lazy> = Lazy::new(|| { peer_con_id, peer_con_state, } => { - manager_access!(peer_con_id, offload, peer_con, { + manager_access!(MANAGER, peer_con_id, offload, peer_con, { peer_con.send_evt(PeerConnectionEvent::State( PeerConnectionState::from_raw(peer_con_state), )) @@ -301,25 +305,34 @@ static MANAGER: Lazy> = Lazy::new(|| { peer_con_id, data_chan_id, } => { - manager_access!(peer_con_id, offload, peer_con, async { - let recv_limit = match peer_con.get_recv_limit() { - Ok(recv_limit) => recv_limit, - Err(err) => { - API.data_chan_free(data_chan_id); - return Err(err); - } - }; - - let (chan, recv) = - DataChannel::new(data_chan_id, recv_limit); - - peer_con - .send_evt(PeerConnectionEvent::DataChannel(chan, recv)) - .await - }); + manager_access!( + MANAGER, + peer_con_id, + offload, + peer_con, + async { + let recv_limit = match peer_con.get_recv_limit() { + Ok(recv_limit) => recv_limit, + Err(err) => { + API.data_chan_free(data_chan_id); + return Err(err); + } + }; + + let (chan, recv) = + DataChannel::new(data_chan_id, recv_limit); + + peer_con + .send_evt(PeerConnectionEvent::DataChannel( + chan, recv, + )) + .await + } + ); } SysEvent::DataChanClose(data_chan_id) => { manager_access!( + MANAGER, data_chan_id, offload, data_chan, @@ -328,6 +341,7 @@ static MANAGER: Lazy> = Lazy::new(|| { } SysEvent::DataChanOpen(data_chan_id) => { manager_access!( + MANAGER, data_chan_id, offload, data_chan, @@ -339,30 +353,37 @@ static MANAGER: Lazy> = Lazy::new(|| { buffer_id, } => { let mut buf = GoBuf(buffer_id); - manager_access!(data_chan_id, offload, data_chan, async { - let len = buf.len()?; - if len > 16 * 1024 { - return Err(Error::id("MsgTooLarge")); - } + manager_access!( + MANAGER, + data_chan_id, + offload, + data_chan, + async { + let len = buf.len()?; + if len > 16 * 1024 { + return Err(Error::id("MsgTooLarge")); + } - let recv_limit = data_chan.get_recv_limit()?; + let recv_limit = data_chan.get_recv_limit()?; - let permit = recv_limit - .acquire_many_owned(len as u32) - .await - .map_err(|_| { - Error::from(Error::id( - "DataChanMessageSemaphoreClosed", - )) - })?; + let permit = recv_limit + .acquire_many_owned(len as u32) + .await + .map_err(|_| { + Error::from(Error::id( + "DataChanMessageSemaphoreClosed", + )) + })?; - data_chan - .send_evt(DataChannelEvent::Message(buf, permit)) - .await - }); + data_chan + .send_evt(DataChannelEvent::Message(buf, permit)) + .await + } + ); } SysEvent::DataChanBufferedAmountLow(data_chan_id) => { manager_access!( + MANAGER, data_chan_id, offload, data_chan, @@ -374,3 +395,108 @@ static MANAGER: Lazy> = Lazy::new(|| { Manager::new() }); + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Clone)] + struct Test { + pub man: Arc>, + pub off: Arc, + } + + impl Test { + pub fn new(id_count: usize) -> Self { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_env_filter( + tracing_subscriber::filter::EnvFilter::from_default_env(), + ) + .with_file(true) + .with_line_number(true) + .finish(); + let _ = tracing::subscriber::set_global_default(subscriber); + + use std::sync::Weak; + + let man = Arc::new(Manager::new()); + + { + let mut lock = man.lock().unwrap(); + for id in 0..id_count { + lock.register_peer_con(id, WeakPeerCon(Weak::new())); + } + } + + Self { + man, + off: Arc::new(EvtOffload::default()), + } + } + + pub fn run(&self, id: usize, f: F) + where + F: Future + 'static + Send, + { + let this = self.clone(); + tokio::task::spawn_blocking(move || { + let Test { man, off } = this; + manager_access!(man, id, off, peer_con, async { + f.await; + Result::Ok(()) + }); + }); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn evt_manager_stress() { + tokio::time::timeout(std::time::Duration::from_secs(2), async { + let test = Test::new(1); + + let (s, mut r) = tokio::sync::mpsc::unbounded_channel(); + + const COUNT: usize = 512; + + let start = std::time::Instant::now(); + + let barrier = Arc::new(tokio::sync::Barrier::new(COUNT)); + + for _ in 0..COUNT { + let barrier = barrier.clone(); + let test = test.clone(); + let s = s.clone(); + tokio::task::spawn(async move { + barrier.wait().await; + test.run(0, async move { + tokio::time::sleep(std::time::Duration::from_millis(1)) + .await; + let _ = s.send(()); + }); + }); + } + + for _ in 0..COUNT { + let _ = r.recv().await.unwrap(); + } + + println!("elapsed: {}", start.elapsed().as_secs_f64()); + }) + .await + .unwrap(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn evt_manager_timeouts_kill() { + let test = Test::new(1); + test.run(0, async move { + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + }); + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + let (s, r) = tokio::sync::oneshot::channel(); + test.run(0, async move { + let _ = s.send(()); + }); + assert!(r.await.is_err()); + } +} diff --git a/crates/tx5-go-pion/src/peer_con.rs b/crates/tx5-go-pion/src/peer_con.rs index d78e3704..c520f49e 100644 --- a/crates/tx5-go-pion/src/peer_con.rs +++ b/crates/tx5-go-pion/src/peer_con.rs @@ -93,7 +93,7 @@ impl From<&AnswerConfig> for GoBufRef<'static> { } } -struct PeerConCore { +pub(crate) struct PeerConCore { peer_con_id: usize, recv_limit: Arc, evt_send: EventSend, @@ -132,7 +132,7 @@ impl PeerConCore { #[derive(Clone)] pub(crate) struct WeakPeerCon( - Weak>>, + pub(crate) Weak>>, ); macro_rules! peer_con_strong_core {