From 7a7fbd03626c923ac176cd9999a49c9614f57294 Mon Sep 17 00:00:00 2001 From: neonphog Date: Wed, 31 Jan 2024 15:12:50 -0700 Subject: [PATCH] event tweak --- crates/tx5-go-pion/src/evt.rs | 358 +++++++++++++++++++--------------- 1 file changed, 203 insertions(+), 155 deletions(-) diff --git a/crates/tx5-go-pion/src/evt.rs b/crates/tx5-go-pion/src/evt.rs index 65d04a39..22832dc7 100644 --- a/crates/tx5-go-pion/src/evt.rs +++ b/crates/tx5-go-pion/src/evt.rs @@ -1,7 +1,8 @@ use crate::*; use once_cell::sync::Lazy; use std::collections::HashMap; -use std::sync::Mutex; +use std::future::Future; +use std::sync::{Arc, Mutex}; use tx5_go_pion_sys::Event as SysEvent; use tx5_go_pion_sys::API; @@ -154,6 +155,93 @@ pub(crate) fn unregister_data_chan(id: usize) { MANAGER.lock().unwrap().unregister_data_chan(id); } +struct EvtOffload { + handle: tokio::runtime::Handle, + limit: Arc, + _drop_s: tokio::sync::oneshot::Sender<()>, +} + +impl Default for EvtOffload { + fn default() -> Self { + struct D; + + impl Drop for D { + fn drop(&mut self) { + tracing::error!("tx5-go-pion offload EVENT LOOP EXITED"); + } + } + + let drop_trace = D; + + let (hand_s, hand_r) = std::sync::mpsc::sync_channel(0); + + let (_drop_s, drop_r) = tokio::sync::oneshot::channel(); + + // we need to offload the event handling to another thread + // because it's not safe to invoke other go apis within the + // same sync call: + // https://github.com/pion/webrtc/issues/2404 + std::thread::Builder::new() + .name("tx5-evt-offload".to_string()) + .spawn(move || { + // we need a custom runtime so tasks within it survive + // the closing of e.g. a #tokio::test runtime + tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let _drop_trace = drop_trace; + + let _ = hand_s.send(tokio::runtime::Handle::current()); + // make sure this runtime/thread are dropped + // if self (and sender) are dropped. + let _ = drop_r.await; + }); + }) + .expect("Failed to spawn offload thread"); + + let handle = hand_r.recv().unwrap(); + + Self { + // we will spawn offload tasks using this handle + handle, + // We don't want this "channel" to represent a significant + // memory buffer, but we also don't want it to be so small + // that it causes thread thrashing. Try 128 to start?? + // max msg size is 16 KiB, so 16 * 128 = 2 MiB. + limit: Arc::new(tokio::sync::Semaphore::new(128)), + // capture just so the receive side will error when this is dropped + _drop_s, + } + } +} + +impl EvtOffload { + pub fn blocking_send(&self, f: F) + where + F: Future + 'static + Send, + { + // work around the lack of blocking_aquire in semaphores + // by using a oneshot channel, and doing the acquire *in* the task + let (s, r) = tokio::sync::oneshot::channel(); + let limit = self.limit.clone(); + // spawn using the offload thread runtime handle + self.handle.spawn(async move { + // get the permit + let _permit = limit.acquire().await; + // notify the permit was acquired + let _ = s.send(()); + // run the future on the offload thread + f.await; + }); + // we have to do this blocking outside the task + // so that the go thread trying to send the event is blocked + let _ = r.blocking_recv(); + } +} + +/* macro_rules! manager_access { ($id:ident, $rt:ident, $map:ident, $code:expr) => { let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned(); @@ -185,167 +273,127 @@ macro_rules! manager_access { } }; } +*/ + +macro_rules! manager_access { + ($id:ident, $off:ident, $map:ident, $code:expr) => { + let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned(); + if let Some($map) = $map { + $off.blocking_send(async move { + let start = std::time::Instant::now(); + let result = $code.await; + let elapsed_s = start.elapsed().as_secs_f64(); + if elapsed_s > 0.018 { + tracing::error!(%elapsed_s, ?result, "SlowEvent"); + } + if let Err(err) = result { + $map.close(err.into()); + } + }); + } + }; +} static MANAGER: Lazy> = Lazy::new(|| { - // We don't want this channel to represent a significant - // memory buffer, but we also don't want it to be so small - // that it causes thread thrashing. Try 128 to start?? - // max msg size is 16 KiB, so 16 * 128 = 2 MiB. - let (send, mut recv) = tokio::sync::mpsc::channel(128); + let offload = EvtOffload::default(); unsafe { - API.on_event(move |sys_evt| { - let _ = send.blocking_send(sys_evt); - }); - } - - struct D; + API.on_event(move |sys_evt| match sys_evt { + SysEvent::Error(_error) => (), + SysEvent::PeerConICECandidate { + peer_con_id, + candidate, + } => { + manager_access!(peer_con_id, offload, peer_con, { + peer_con.send_evt(PeerConnectionEvent::ICECandidate(GoBuf( + candidate, + ))) + }); + } + SysEvent::PeerConStateChange { + peer_con_id, + peer_con_state, + } => { + manager_access!(peer_con_id, offload, peer_con, { + peer_con.send_evt(PeerConnectionEvent::State( + PeerConnectionState::from_raw(peer_con_state), + )) + }); + } + SysEvent::PeerConDataChan { + peer_con_id, + data_chan_id, + } => { + manager_access!(peer_con_id, offload, peer_con, async { + let recv_limit = match peer_con.get_recv_limit() { + Ok(recv_limit) => recv_limit, + Err(err) => { + API.data_chan_free(data_chan_id); + return Err(err); + } + }; - impl Drop for D { - fn drop(&mut self) { - tracing::error!("tx5-go-pion EVENT LOOP EXITED"); - } - } + let (chan, recv) = + DataChannel::new(data_chan_id, recv_limit); - std::thread::Builder::new() - .name("tx5-go-poin-evt".to_string()) - .spawn(move || { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .expect("Failed to build tokio runtime"); - runtime.block_on(async move { - let _d = D; - while let Some(sys_evt) = recv.recv().await { - match sys_evt { - SysEvent::Error(_error) => (), - SysEvent::PeerConICECandidate { - peer_con_id, - candidate, - } => { - manager_access!(peer_con_id, runtime, peer_con, { - peer_con.send_evt( - PeerConnectionEvent::ICECandidate(GoBuf( - candidate, - )), - ) - }); - } - SysEvent::PeerConStateChange { - peer_con_id, - peer_con_state, - } => { - manager_access!(peer_con_id, runtime, peer_con, { - peer_con.send_evt(PeerConnectionEvent::State( - PeerConnectionState::from_raw( - peer_con_state, - ), - )) - }); - } - SysEvent::PeerConDataChan { - peer_con_id, - data_chan_id, - } => { - manager_access!( - peer_con_id, - runtime, - peer_con, - async { - let recv_limit = - match peer_con.get_recv_limit() { - Ok(recv_limit) => recv_limit, - Err(err) => { - unsafe { - API.data_chan_free( - data_chan_id, - ); - } - return Err(err); - } - }; - - let (chan, recv) = DataChannel::new( - data_chan_id, - recv_limit, - ); - - peer_con - .send_evt( - PeerConnectionEvent::DataChannel( - chan, recv, - ), - ) - .await - } - ); - } - SysEvent::DataChanClose(data_chan_id) => { - manager_access!( - data_chan_id, - runtime, - data_chan, - data_chan.send_evt(DataChannelEvent::Close) - ); - } - SysEvent::DataChanOpen(data_chan_id) => { - manager_access!( - data_chan_id, - runtime, - data_chan, - data_chan.send_evt(DataChannelEvent::Open) - ); - } - SysEvent::DataChanMessage { - data_chan_id, - buffer_id, - } => { - let mut buf = GoBuf(buffer_id); - manager_access!( - data_chan_id, - runtime, - data_chan, - async { - let len = buf.len()?; - if len > 16 * 1024 { - return Err(Error::id("MsgTooLarge")); - } - - let recv_limit = - data_chan.get_recv_limit()?; - - let permit = recv_limit - .acquire_many_owned(len as u32) - .await - .map_err(|_| { - Error::from(Error::id( - "DataChanMessageSemaphoreClosed", - )) - })?; - - data_chan - .send_evt(DataChannelEvent::Message( - buf, permit, - )) - .await - } - ); - } - SysEvent::DataChanBufferedAmountLow(data_chan_id) => { - manager_access!( - data_chan_id, - runtime, - data_chan, - data_chan.send_evt( - DataChannelEvent::BufferedAmountLow, - ) - ); - } + peer_con + .send_evt(PeerConnectionEvent::DataChannel(chan, recv)) + .await + }); + } + SysEvent::DataChanClose(data_chan_id) => { + manager_access!( + data_chan_id, + offload, + data_chan, + data_chan.send_evt(DataChannelEvent::Close) + ); + } + SysEvent::DataChanOpen(data_chan_id) => { + manager_access!( + data_chan_id, + offload, + data_chan, + data_chan.send_evt(DataChannelEvent::Open) + ); + } + SysEvent::DataChanMessage { + data_chan_id, + buffer_id, + } => { + let mut buf = GoBuf(buffer_id); + manager_access!(data_chan_id, offload, data_chan, async { + let len = buf.len()?; + if len > 16 * 1024 { + return Err(Error::id("MsgTooLarge")); } - } - }); - }) - .expect("failed to spawn event thread"); + + let recv_limit = data_chan.get_recv_limit()?; + + let permit = recv_limit + .acquire_many_owned(len as u32) + .await + .map_err(|_| { + Error::from(Error::id( + "DataChanMessageSemaphoreClosed", + )) + })?; + + data_chan + .send_evt(DataChannelEvent::Message(buf, permit)) + .await + }); + } + SysEvent::DataChanBufferedAmountLow(data_chan_id) => { + manager_access!( + data_chan_id, + offload, + data_chan, + data_chan.send_evt(DataChannelEvent::BufferedAmountLow,) + ); + } + }); + } Manager::new() });