diff --git a/Cargo.lock b/Cargo.lock index f91e9f6..2b21de1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2833,6 +2833,7 @@ dependencies = [ "sbd-server", "serde", "serde_json", + "slab", "tokio", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 4029a58..de42e7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ sbd-server = "0.0.6-alpha" serde = { version = "1.0.160", features = [ "derive", "rc" ] } serde_json = { version = "1.0.96", features = [ "preserve_order" ] } sha2 = "0.10.6" +slab = "0.4.9" socket2 = { version = "0.5.2", features = [ "all" ] } sodoken = "=0.0.901-alpha" tempfile = "3.8.0" diff --git a/crates/tx5/Cargo.toml b/crates/tx5/Cargo.toml index 6613c7f..6496d2d 100644 --- a/crates/tx5/Cargo.toml +++ b/crates/tx5/Cargo.toml @@ -24,6 +24,7 @@ base64 = { workspace = true } futures = { workspace = true } influxive-otel-atomic-obs = { workspace = true } serde = { workspace = true } +slab = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing = { workspace = true } tx5-connection = { workspace = true, default-features = false } diff --git a/crates/tx5/examples/mem-echo-stress.rs b/crates/tx5/examples/mem-echo-stress.rs new file mode 100644 index 0000000..a61d4cc --- /dev/null +++ b/crates/tx5/examples/mem-echo-stress.rs @@ -0,0 +1,92 @@ +//! Mem backend timings. The hope is to have near ideal zero overhead. +//! At time of writing, you can see we're pretty close to exactly +//! proportional to node count for a given machine's resources: +//! +//! - 1 nodes, 33941 ops, 0.000028 sec/op, in 1.004478s +//! - 10 nodes, 174041 ops, 0.000056 sec/op, in 1.016977s +//! - 100 nodes, 175111 ops, 0.000580 sec/op, in 1.018943s +//! - 1000 nodes, 160266 ops, 0.006361 sec/op, in 1.019859s + +use std::sync::Arc; +use tx5::{backend::*, *}; + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + let fake_sig = SigUrl::parse("wss://fake.fake").unwrap(); + + let config = Arc::new(Config { + backend_module: BackendModule::Mem, + ..Default::default() + }); + + let (listen_ep, mut listen_recv) = Endpoint::new(config.clone()); + let listen_ep = Arc::new(listen_ep); + let listen_url = listen_ep.listen(fake_sig.clone()).await.unwrap(); + + tokio::task::spawn(async move { + while let Some(evt) = listen_recv.recv().await { + match evt { + EndpointEvent::ListeningAddressOpen { .. } => (), + EndpointEvent::Connected { .. } => (), + EndpointEvent::Message { peer_url, .. } => { + let listen_ep = listen_ep.clone(); + // Don't hold up our recv loop on the response + tokio::task::spawn(async move { + listen_ep.send(peer_url, Vec::new()).await.unwrap(); + }); + } + _ => panic!("unexpected: {evt:?}"), + } + } + panic!("listener task ended"); + }); + + println!("listening at: {listen_url}"); + + let (timing_send, mut timing_recv) = tokio::sync::mpsc::unbounded_channel(); + let mut timing_buf = Vec::new(); + let mut node_count = 0; + + loop { + let start = std::time::Instant::now(); + + node_count += 1; + + let config = config.clone(); + let timing_send = timing_send.clone(); + let listen_url = listen_url.clone(); + tokio::task::spawn(async move { + let (cli_ep, mut cli_recv) = Endpoint::new(config); + let _ = cli_ep.listen(listen_url.to_sig()).await.unwrap(); + + loop { + let start = std::time::Instant::now(); + cli_ep.send(listen_url.clone(), Vec::new()).await.unwrap(); + loop { + let evt = cli_recv.recv().await.unwrap(); + match evt { + EndpointEvent::ListeningAddressOpen { .. } => (), + EndpointEvent::Connected { .. } => (), + EndpointEvent::Message { .. } => break, + _ => panic!("unexpected: {evt:?}"), + } + } + timing_send.send(start.elapsed().as_secs_f64()).unwrap(); + } + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + timing_recv.recv_many(&mut timing_buf, usize::MAX).await; + + let count = timing_buf.len(); + let sum: f64 = timing_buf.iter().sum(); + timing_buf.clear(); + + println!( + "{node_count} nodes, {count} ops, {:0.6} sec/op, in {:0.6}s", + sum / count as f64, + start.elapsed().as_secs_f64(), + ); + } +} diff --git a/crates/tx5/src/backend.rs b/crates/tx5/src/backend.rs index b7e2506..2cc79b3 100644 --- a/crates/tx5/src/backend.rs +++ b/crates/tx5/src/backend.rs @@ -11,6 +11,8 @@ use tx5_core::deps::serde_json; #[cfg(feature = "backend-go-pion")] mod go_pion; +mod mem; + /// Backend modules usable by tx5. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum BackendModule { @@ -22,8 +24,8 @@ pub enum BackendModule { /// The Webrtc-RS-based backend. WebrtcRs, - /// The mock backend. - Mock, + /// The mem backend. + Mem, } impl Default for BackendModule { @@ -33,7 +35,7 @@ impl Default for BackendModule { return Self::GoPion; #[cfg(feature = "backend-webrtc-rs")] return Self::WebrtcRs; - Self::Mock + Self::Mem } } @@ -45,7 +47,7 @@ impl BackendModule { Self::GoPion => go_pion::default_config(), #[cfg(feature = "backend-webrtc-rs")] Self::WebrtcRs => todo!(), - Self::Mock => serde_json::json!({}), + Self::Mem => mem::default_config(), } } @@ -61,7 +63,7 @@ impl BackendModule { Self::GoPion => go_pion::connect(config, url, listener).await, #[cfg(feature = "backend-webrtc-rs")] Self::WebrtcRs => todo!(), - Self::Mock => todo!(), + Self::Mem => mem::connect(config, url, listener).await, } } } diff --git a/crates/tx5/src/backend/mem.rs b/crates/tx5/src/backend/mem.rs new file mode 100644 index 0000000..2512565 --- /dev/null +++ b/crates/tx5/src/backend/mem.rs @@ -0,0 +1,333 @@ +use super::*; + +use std::sync::{Arc, Mutex, Weak}; + +pub fn default_config() -> serde_json::Value { + serde_json::json!({}) +} + +pub async fn connect( + _config: &Arc, + _url: &str, + _listener: bool, +) -> Result<(DynBackEp, DynBackEpRecvCon)> { + let (ep, ep_recv) = STAT.listen(); + let ep: DynBackEp = ep; + let ep_recv: DynBackEpRecvCon = Box::new(ep_recv); + + Ok((ep, ep_recv)) +} + +struct ConRecvData(tokio::sync::mpsc::UnboundedReceiver>); + +impl BackConRecvData for ConRecvData { + fn recv(&mut self) -> BoxFuture<'_, Option>> { + Box::pin(async { self.0.recv().await }) + } +} + +struct Con { + pub_key: PubKey, + send: tokio::sync::mpsc::UnboundedSender>, +} + +impl Con { + #[allow(clippy::new_ret_no_self)] + pub fn new(pub_key: PubKey) -> (DynBackCon, DynBackConRecvData) { + let (send, recv) = tokio::sync::mpsc::unbounded_channel(); + + let con: DynBackCon = Arc::new(Self { pub_key, send }); + + let con_recv: DynBackConRecvData = Box::new(ConRecvData(recv)); + + (con, con_recv) + } +} + +impl BackCon for Con { + fn send(&self, data: Vec) -> BoxFuture<'_, Result<()>> { + let r = self.send.send(data).map_err(|err| { + std::io::Error::other(format!("send failure ({err:?})")) + }); + Box::pin(async { r }) + } + + fn pub_key(&self) -> &PubKey { + &self.pub_key + } + + fn is_using_webrtc(&self) -> bool { + true + } + + fn get_stats(&self) -> tx5_connection::ConnStats { + tx5_connection::ConnStats { + send_msg_count: 0, + send_byte_count: 0, + recv_msg_count: 0, + recv_byte_count: 0, + } + } +} + +struct WaitCon { + pub_key: PubKey, + con: Option, + con_recv: Option, +} + +impl BackWaitCon for WaitCon { + fn wait( + &mut self, + _recv_limit: Arc, + ) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>> { + let con = self.con.take().unwrap(); + let con_recv = self.con_recv.take().unwrap(); + Box::pin(async { Ok((con, con_recv)) }) + } + + fn pub_key(&self) -> &PubKey { + &self.pub_key + } +} + +struct EpRecvCon(tokio::sync::mpsc::UnboundedReceiver); + +impl BackEpRecvCon for EpRecvCon { + fn recv(&mut self) -> BoxFuture<'_, Option> { + Box::pin(async { self.0.recv().await }) + } +} + +fn gen_pub_key(uniq: u64, loc: usize) -> PubKey { + // in base64 a string of 0xb7 renders as `t7e3t7e3t...` + // which sort-of looks like "test" if you squint : ) + let mut pub_key = [0xb7; 32]; + pub_key[16..24].copy_from_slice(&uniq.to_be_bytes()); + pub_key[24..32].copy_from_slice(&(loc as u64).to_be_bytes()); + + PubKey(Arc::new(pub_key)) +} + +fn parse_pub_key(pub_key: &PubKey) -> (u64, usize) { + let mut uniq = [0; 8]; + uniq.copy_from_slice(&pub_key[16..24]); + let uniq = u64::from_be_bytes(uniq); + + let mut loc = [0; 8]; + loc.copy_from_slice(&pub_key[24..32]); + let loc = u64::from_be_bytes(loc) as usize; + + (uniq, loc) +} + +struct Ep { + uniq: u64, + loc: usize, + pub_key: PubKey, + send: tokio::sync::mpsc::UnboundedSender, +} + +impl Drop for Ep { + fn drop(&mut self) { + STAT.remove(self.uniq, self.loc); + } +} + +impl Ep { + pub fn new(uniq: u64, loc: usize) -> (Arc, EpRecvCon) { + let pub_key = gen_pub_key(uniq, loc); + + let (send, recv) = tokio::sync::mpsc::unbounded_channel(); + + ( + Arc::new(Self { + uniq, + loc, + pub_key, + send, + }), + EpRecvCon(recv), + ) + } +} + +impl BackEp for Ep { + fn connect( + &self, + pub_key: PubKey, + ) -> BoxFuture<'_, Result> { + Box::pin(async { + if self.pub_key == pub_key { + return Err(std::io::Error::other("cannot connect to self")); + } + STAT.connect(self.pub_key.clone(), pub_key) + }) + } + + fn pub_key(&self) -> &PubKey { + &self.pub_key + } +} + +struct Stat { + store: Mutex>>, +} + +impl Stat { + pub const fn new() -> Self { + Self { + store: Mutex::new(slab::Slab::new()), + } + } + + pub fn remove(&self, uniq: u64, loc: usize) { + let mut lock = self.store.lock().unwrap(); + if let Some(ep) = lock.get(loc) { + if let Some(ep) = ep.upgrade() { + if ep.uniq == uniq { + lock.remove(loc); + } + } else { + lock.remove(loc); + } + } + } + + pub fn listen(&self) -> (Arc, EpRecvCon) { + static UNIQ: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(1); + let uniq = UNIQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let mut lock = self.store.lock().unwrap(); + let entry = lock.vacant_entry(); + let loc = entry.key(); + let (ep, ep_recv) = Ep::new(uniq, loc); + entry.insert(Arc::downgrade(&ep)); + + (ep, ep_recv) + } + + pub fn connect( + &self, + src_pub_key: PubKey, + dst_pub_key: PubKey, + ) -> Result { + let (dst_uniq, dst_loc) = parse_pub_key(&dst_pub_key); + + let ep = self.store.lock().unwrap().get(dst_loc).cloned(); + + let ep = match ep { + None => { + return Err(std::io::Error::other( + "failed to connect (no peer)", + )) + } + Some(ep) => ep, + }; + + let ep = match ep.upgrade() { + None => { + return Err(std::io::Error::other( + "failed to connect (peer closed)", + )) + } + Some(ep) => ep, + }; + + if ep.uniq != dst_uniq { + return Err(std::io::Error::other( + "failed to connect (no peer/uniq)", + )); + } + + let (dst_con, src_con_recv) = Con::new(dst_pub_key.clone()); + let (src_con, dst_con_recv) = Con::new(src_pub_key.clone()); + + let dst_wait: DynBackWaitCon = Box::new(WaitCon { + pub_key: dst_pub_key, + con: Some(dst_con), + con_recv: Some(dst_con_recv), + }); + + let src_wait: DynBackWaitCon = Box::new(WaitCon { + pub_key: src_pub_key, + con: Some(src_con), + con_recv: Some(src_con_recv), + }); + + if ep.send.send(src_wait).is_err() { + return Err(std::io::Error::other( + "failed to connect (chan closed)", + )); + } + + Ok(dst_wait) + } +} + +static STAT: Stat = Stat::new(); + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn ensure_dropped_from_slab() { + let (ep, _recv) = STAT.listen(); + let (uniq, loc) = parse_pub_key(&ep.pub_key); + drop(ep); + if let Some(ep) = STAT.store.lock().unwrap().get(loc) { + if let Some(ep) = ep.upgrade() { + if ep.uniq == uniq { + panic!("failed to delete from slab"); + } + } + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn mem_backend_e2e() { + let config = Arc::new(Config { + backend_module: BackendModule::Mem, + ..Default::default() + }); + + let (e1, _er1) = connect(&config, "", true).await.unwrap(); + let (e2, mut er2) = connect(&config, "", true).await.unwrap(); + + assert_ne!(e1.pub_key(), e2.pub_key()); + + let mut w1 = e1.connect(e2.pub_key().clone()).await.unwrap(); + + assert_eq!(w1.pub_key(), e2.pub_key()); + + let (c1, _cr1) = w1 + .wait(Arc::new(tokio::sync::Semaphore::new(1))) + .await + .unwrap(); + + assert_eq!(c1.pub_key(), e2.pub_key()); + + let mut w2 = er2.recv().await.unwrap(); + + assert_eq!(w2.pub_key(), e1.pub_key()); + + let (c2, mut cr2) = w2 + .wait(Arc::new(tokio::sync::Semaphore::new(1))) + .await + .unwrap(); + + assert_eq!(c2.pub_key(), e1.pub_key()); + + c1.send(vec![1]).await.unwrap(); + + let r = cr2.recv().await.unwrap(); + + assert_eq!(vec![1], r); + + drop(c1); + + assert_eq!(None, cr2.recv().await); + } +}