Skip to content

Commit

Permalink
event tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Jan 31, 2024
1 parent 122d0b2 commit 7a7fbd0
Showing 1 changed file with 203 additions and 155 deletions.
358 changes: 203 additions & 155 deletions crates/tx5-go-pion/src/evt.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::*;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
use std::future::Future;
use std::sync::{Arc, Mutex};
use tx5_go_pion_sys::Event as SysEvent;
use tx5_go_pion_sys::API;

Expand Down Expand Up @@ -154,6 +155,93 @@ pub(crate) fn unregister_data_chan(id: usize) {
MANAGER.lock().unwrap().unregister_data_chan(id);
}

struct EvtOffload {
handle: tokio::runtime::Handle,
limit: Arc<tokio::sync::Semaphore>,
_drop_s: tokio::sync::oneshot::Sender<()>,
}

impl Default for EvtOffload {
fn default() -> Self {
struct D;

impl Drop for D {
fn drop(&mut self) {
tracing::error!("tx5-go-pion offload EVENT LOOP EXITED");
}
}

let drop_trace = D;

let (hand_s, hand_r) = std::sync::mpsc::sync_channel(0);

let (_drop_s, drop_r) = tokio::sync::oneshot::channel();

// we need to offload the event handling to another thread
// because it's not safe to invoke other go apis within the
// same sync call:
// https://github.com/pion/webrtc/issues/2404
std::thread::Builder::new()
.name("tx5-evt-offload".to_string())
.spawn(move || {
// we need a custom runtime so tasks within it survive
// the closing of e.g. a #tokio::test runtime
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("Failed to build tokio runtime")
.block_on(async move {
let _drop_trace = drop_trace;

let _ = hand_s.send(tokio::runtime::Handle::current());
// make sure this runtime/thread are dropped
// if self (and sender) are dropped.
let _ = drop_r.await;
});
})
.expect("Failed to spawn offload thread");

let handle = hand_r.recv().unwrap();

Self {
// we will spawn offload tasks using this handle
handle,
// We don't want this "channel" to represent a significant
// memory buffer, but we also don't want it to be so small
// that it causes thread thrashing. Try 128 to start??
// max msg size is 16 KiB, so 16 * 128 = 2 MiB.
limit: Arc::new(tokio::sync::Semaphore::new(128)),
// capture just so the receive side will error when this is dropped
_drop_s,
}
}
}

impl EvtOffload {
pub fn blocking_send<F>(&self, f: F)
where
F: Future<Output = ()> + 'static + Send,
{
// work around the lack of blocking_aquire in semaphores
// by using a oneshot channel, and doing the acquire *in* the task
let (s, r) = tokio::sync::oneshot::channel();
let limit = self.limit.clone();
// spawn using the offload thread runtime handle
self.handle.spawn(async move {
// get the permit
let _permit = limit.acquire().await;
// notify the permit was acquired
let _ = s.send(());
// run the future on the offload thread
f.await;
});
// we have to do this blocking outside the task
// so that the go thread trying to send the event is blocked
let _ = r.blocking_recv();
}
}

/*
macro_rules! manager_access {
($id:ident, $rt:ident, $map:ident, $code:expr) => {
let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned();
Expand Down Expand Up @@ -185,167 +273,127 @@ macro_rules! manager_access {
}
};
}
*/

macro_rules! manager_access {
($id:ident, $off:ident, $map:ident, $code:expr) => {
let $map = MANAGER.lock().unwrap().$map.get(&$id).cloned();
if let Some($map) = $map {
$off.blocking_send(async move {
let start = std::time::Instant::now();
let result = $code.await;
let elapsed_s = start.elapsed().as_secs_f64();
if elapsed_s > 0.018 {
tracing::error!(%elapsed_s, ?result, "SlowEvent");
}
if let Err(err) = result {
$map.close(err.into());
}
});
}
};
}

static MANAGER: Lazy<Mutex<Manager>> = Lazy::new(|| {
// We don't want this channel to represent a significant
// memory buffer, but we also don't want it to be so small
// that it causes thread thrashing. Try 128 to start??
// max msg size is 16 KiB, so 16 * 128 = 2 MiB.
let (send, mut recv) = tokio::sync::mpsc::channel(128);
let offload = EvtOffload::default();

unsafe {
API.on_event(move |sys_evt| {
let _ = send.blocking_send(sys_evt);
});
}

struct D;
API.on_event(move |sys_evt| match sys_evt {
SysEvent::Error(_error) => (),
SysEvent::PeerConICECandidate {
peer_con_id,
candidate,
} => {
manager_access!(peer_con_id, offload, peer_con, {
peer_con.send_evt(PeerConnectionEvent::ICECandidate(GoBuf(
candidate,
)))
});
}
SysEvent::PeerConStateChange {
peer_con_id,
peer_con_state,
} => {
manager_access!(peer_con_id, offload, peer_con, {
peer_con.send_evt(PeerConnectionEvent::State(
PeerConnectionState::from_raw(peer_con_state),
))
});
}
SysEvent::PeerConDataChan {
peer_con_id,
data_chan_id,
} => {
manager_access!(peer_con_id, offload, peer_con, async {
let recv_limit = match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
API.data_chan_free(data_chan_id);
return Err(err);
}
};

impl Drop for D {
fn drop(&mut self) {
tracing::error!("tx5-go-pion EVENT LOOP EXITED");
}
}
let (chan, recv) =
DataChannel::new(data_chan_id, recv_limit);

std::thread::Builder::new()
.name("tx5-go-poin-evt".to_string())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("Failed to build tokio runtime");
runtime.block_on(async move {
let _d = D;
while let Some(sys_evt) = recv.recv().await {
match sys_evt {
SysEvent::Error(_error) => (),
SysEvent::PeerConICECandidate {
peer_con_id,
candidate,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(
PeerConnectionEvent::ICECandidate(GoBuf(
candidate,
)),
)
});
}
SysEvent::PeerConStateChange {
peer_con_id,
peer_con_state,
} => {
manager_access!(peer_con_id, runtime, peer_con, {
peer_con.send_evt(PeerConnectionEvent::State(
PeerConnectionState::from_raw(
peer_con_state,
),
))
});
}
SysEvent::PeerConDataChan {
peer_con_id,
data_chan_id,
} => {
manager_access!(
peer_con_id,
runtime,
peer_con,
async {
let recv_limit =
match peer_con.get_recv_limit() {
Ok(recv_limit) => recv_limit,
Err(err) => {
unsafe {
API.data_chan_free(
data_chan_id,
);
}
return Err(err);
}
};

let (chan, recv) = DataChannel::new(
data_chan_id,
recv_limit,
);

peer_con
.send_evt(
PeerConnectionEvent::DataChannel(
chan, recv,
),
)
.await
}
);
}
SysEvent::DataChanClose(data_chan_id) => {
manager_access!(
data_chan_id,
runtime,
data_chan,
data_chan.send_evt(DataChannelEvent::Close)
);
}
SysEvent::DataChanOpen(data_chan_id) => {
manager_access!(
data_chan_id,
runtime,
data_chan,
data_chan.send_evt(DataChannelEvent::Open)
);
}
SysEvent::DataChanMessage {
data_chan_id,
buffer_id,
} => {
let mut buf = GoBuf(buffer_id);
manager_access!(
data_chan_id,
runtime,
data_chan,
async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}

let recv_limit =
data_chan.get_recv_limit()?;

let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
))
})?;

data_chan
.send_evt(DataChannelEvent::Message(
buf, permit,
))
.await
}
);
}
SysEvent::DataChanBufferedAmountLow(data_chan_id) => {
manager_access!(
data_chan_id,
runtime,
data_chan,
data_chan.send_evt(
DataChannelEvent::BufferedAmountLow,
)
);
}
peer_con
.send_evt(PeerConnectionEvent::DataChannel(chan, recv))
.await
});
}
SysEvent::DataChanClose(data_chan_id) => {
manager_access!(
data_chan_id,
offload,
data_chan,
data_chan.send_evt(DataChannelEvent::Close)
);
}
SysEvent::DataChanOpen(data_chan_id) => {
manager_access!(
data_chan_id,
offload,
data_chan,
data_chan.send_evt(DataChannelEvent::Open)
);
}
SysEvent::DataChanMessage {
data_chan_id,
buffer_id,
} => {
let mut buf = GoBuf(buffer_id);
manager_access!(data_chan_id, offload, data_chan, async {
let len = buf.len()?;
if len > 16 * 1024 {
return Err(Error::id("MsgTooLarge"));
}
}
});
})
.expect("failed to spawn event thread");

let recv_limit = data_chan.get_recv_limit()?;

let permit = recv_limit
.acquire_many_owned(len as u32)
.await
.map_err(|_| {
Error::from(Error::id(
"DataChanMessageSemaphoreClosed",
))
})?;

data_chan
.send_evt(DataChannelEvent::Message(buf, permit))
.await
});
}
SysEvent::DataChanBufferedAmountLow(data_chan_id) => {
manager_access!(
data_chan_id,
offload,
data_chan,
data_chan.send_evt(DataChannelEvent::BufferedAmountLow,)
);
}
});
}

Manager::new()
});

0 comments on commit 7a7fbd0

Please sign in to comment.