Skip to content

Commit

Permalink
add event dispatcher unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Feb 1, 2024
1 parent 28614cd commit 5a242a0
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 43 deletions.
4 changes: 2 additions & 2 deletions crates/tx5-go-pion/src/data_chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::*;
use std::sync::{Arc, Mutex, Weak};
use tx5_go_pion_sys::API;

struct DataChanCore {
pub(crate) struct DataChanCore {
data_chan_id: usize,
recv_limit: Arc<tokio::sync::Semaphore>,
evt_send: EventSend<DataChannelEvent>,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl DataChanCore {

#[derive(Clone)]
pub(crate) struct WeakDataChan(
Weak<Mutex<std::result::Result<DataChanCore, Error>>>,
pub(crate) Weak<Mutex<std::result::Result<DataChanCore, Error>>>,
);

macro_rules! data_chan_strong_core {
Expand Down
204 changes: 165 additions & 39 deletions crates/tx5-go-pion/src/evt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ impl EvtOffload {
}

macro_rules! manager_access {
($id:ident, $off:ident, $map:ident, $code:expr) => {
let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned();
($man:ident, $id:ident, $off:ident, $map:ident, $code:expr) => {
let $map = $man.lock().unwrap().$map.get(&$id).cloned();
if let Some($map) = $map {
$off.blocking_send(async move {
let start = std::time::Instant::now();
Expand All @@ -258,8 +258,12 @@ macro_rules! manager_access {
let elapsed_s = start.elapsed().as_secs_f64();
tracing::error!(%elapsed_s, "EventTimeout");
$map.close(Error::id("EventTimeout").into());
$man.lock().unwrap().$map.remove(&$id);
}
Ok(Err(err)) => {
$map.close(err.into());
$man.lock().unwrap().$map.remove(&$id);
}
Ok(Err(err)) => $map.close(err.into()),
Ok(_) => (),
}
let elapsed_s = start.elapsed().as_secs_f64();
Expand All @@ -281,7 +285,7 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
peer_con_id,
candidate,
} => {
manager_access!(peer_con_id, offload, peer_con, {
manager_access!(MANAGER, peer_con_id, offload, peer_con, {
peer_con.send_evt(PeerConnectionEvent::ICECandidate(GoBuf(
candidate,
)))
Expand All @@ -291,7 +295,7 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
peer_con_id,
peer_con_state,
} => {
manager_access!(peer_con_id, offload, peer_con, {
manager_access!(MANAGER, peer_con_id, offload, peer_con, {
peer_con.send_evt(PeerConnectionEvent::State(
PeerConnectionState::from_raw(peer_con_state),
))
Expand All @@ -301,25 +305,34 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
peer_con_id,
data_chan_id,
} => {
manager_access!(peer_con_id, offload, peer_con, async {
let recv_limit = match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
API.data_chan_free(data_chan_id);
return Err(err);
}
};

let (chan, recv) =
DataChannel::new(data_chan_id, recv_limit);

peer_con
.send_evt(PeerConnectionEvent::DataChannel(chan, recv))
.await
});
manager_access!(
MANAGER,
peer_con_id,
offload,
peer_con,
async {
let recv_limit = match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
API.data_chan_free(data_chan_id);
return Err(err);
}
};

let (chan, recv) =
DataChannel::new(data_chan_id, recv_limit);

peer_con
.send_evt(PeerConnectionEvent::DataChannel(
chan, recv,
))
.await
}
);
}
SysEvent::DataChanClose(data_chan_id) => {
manager_access!(
MANAGER,
data_chan_id,
offload,
data_chan,
Expand All @@ -328,6 +341,7 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
}
SysEvent::DataChanOpen(data_chan_id) => {
manager_access!(
MANAGER,
data_chan_id,
offload,
data_chan,
Expand All @@ -339,30 +353,37 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
buffer_id,
} => {
let mut buf = GoBuf(buffer_id);
manager_access!(data_chan_id, offload, data_chan, async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}
manager_access!(
MANAGER,
data_chan_id,
offload,
data_chan,
async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}

let recv_limit = data_chan.get_recv_limit()?;
let recv_limit = data_chan.get_recv_limit()?;

let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
))
})?;
let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
))
})?;

data_chan
.send_evt(DataChannelEvent::Message(buf, permit))
.await
});
data_chan
.send_evt(DataChannelEvent::Message(buf, permit))
.await
}
);
}
SysEvent::DataChanBufferedAmountLow(data_chan_id) => {
manager_access!(
MANAGER,
data_chan_id,
offload,
data_chan,
Expand All @@ -374,3 +395,108 @@ static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {

Manager::new()
});

#[cfg(test)]
mod tests {
use super::*;

#[derive(Clone)]
struct Test {
pub man: Arc<Mutex<Manager>>,
pub off: Arc<EvtOffload>,
}

impl Test {
pub fn new(id_count: usize) -> Self {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
tracing_subscriber::filter::EnvFilter::from_default_env(),
)
.with_file(true)
.with_line_number(true)
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);

use std::sync::Weak;

let man = Arc::new(Manager::new());

{
let mut lock = man.lock().unwrap();
for id in 0..id_count {
lock.register_peer_con(id, WeakPeerCon(Weak::new()));
}
}

Self {
man,
off: Arc::new(EvtOffload::default()),
}
}

pub fn run<F>(&self, id: usize, f: F)
where
F: Future<Output = ()> + 'static + Send,
{
let this = self.clone();
tokio::task::spawn_blocking(move || {
let Test { man, off } = this;
manager_access!(man, id, off, peer_con, async {
f.await;
Result::Ok(())
});
});
}
}

#[tokio::test(flavor = "multi_thread")]
async fn evt_manager_stress() {
tokio::time::timeout(std::time::Duration::from_secs(2), async {
let test = Test::new(1);

let (s, mut r) = tokio::sync::mpsc::unbounded_channel();

const COUNT: usize = 512;

let start = std::time::Instant::now();

let barrier = Arc::new(tokio::sync::Barrier::new(COUNT));

for _ in 0..COUNT {
let barrier = barrier.clone();
let test = test.clone();
let s = s.clone();
tokio::task::spawn(async move {
barrier.wait().await;
test.run(0, async move {
tokio::time::sleep(std::time::Duration::from_millis(1))
.await;
let _ = s.send(());
});
});
}

for _ in 0..COUNT {
let _ = r.recv().await.unwrap();
}

println!("elapsed: {}", start.elapsed().as_secs_f64());
})
.await
.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn evt_manager_timeouts_kill() {
let test = Test::new(1);
test.run(0, async move {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
});
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
let (s, r) = tokio::sync::oneshot::channel();
test.run(0, async move {
let _ = s.send(());
});
assert!(r.await.is_err());
}
}
4 changes: 2 additions & 2 deletions crates/tx5-go-pion/src/peer_con.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl From<&AnswerConfig> for GoBufRef<'static> {
}
}

struct PeerConCore {
pub(crate) struct PeerConCore {
peer_con_id: usize,
recv_limit: Arc<tokio::sync::Semaphore>,
evt_send: EventSend<PeerConnectionEvent>,
Expand Down Expand Up @@ -132,7 +132,7 @@ impl PeerConCore {

#[derive(Clone)]
pub(crate) struct WeakPeerCon(
Weak<Mutex<std::result::Result<PeerConCore, Error>>>,
pub(crate) Weak<Mutex<std::result::Result<PeerConCore, Error>>>,
);

macro_rules! peer_con_strong_core {
Expand Down

0 comments on commit 5a242a0

Please sign in to comment.