Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mem backend for tx5 #106

Merged
merged 11 commits into from
Oct 16, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sbd-server = "0.0.6-alpha"
serde = { version = "1.0.160", features = [ "derive", "rc" ] }
serde_json = { version = "1.0.96", features = [ "preserve_order" ] }
sha2 = "0.10.6"
slab = "0.4.9"
socket2 = { version = "0.5.2", features = [ "all" ] }
sodoken = "=0.0.901-alpha"
tempfile = "3.8.0"
Expand Down
1 change: 1 addition & 0 deletions crates/tx5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ base64 = { workspace = true }
futures = { workspace = true }
influxive-otel-atomic-obs = { workspace = true }
serde = { workspace = true }
slab = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing = { workspace = true }
tx5-connection = { workspace = true, default-features = false }
Expand Down
92 changes: 92 additions & 0 deletions crates/tx5/examples/mem-echo-stress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Mem backend timings. The hope is to have near ideal zero overhead.
//! At time of writing, you can see we're pretty close to exactly
jost-s marked this conversation as resolved.
Show resolved Hide resolved
//! proportional to node count for a given machine's resources:
//!
//! - 1 nodes, 33941 ops, 0.000028 sec/op, in 1.004478s
//! - 10 nodes, 174041 ops, 0.000056 sec/op, in 1.016977s
//! - 100 nodes, 175111 ops, 0.000580 sec/op, in 1.018943s
//! - 1000 nodes, 160266 ops, 0.006361 sec/op, in 1.019859s

use std::sync::Arc;
use tx5::{backend::*, *};

#[tokio::main(flavor = "multi_thread")]
async fn main() {
let fake_sig = SigUrl::parse("wss://fake.fake").unwrap();

let config = Arc::new(Config {
backend_module: BackendModule::Mem,
..Default::default()
});

let (listen_ep, mut listen_recv) = Endpoint::new(config.clone());
let listen_ep = Arc::new(listen_ep);
let listen_url = listen_ep.listen(fake_sig.clone()).await.unwrap();

tokio::task::spawn(async move {
while let Some(evt) = listen_recv.recv().await {
match evt {
EndpointEvent::ListeningAddressOpen { .. } => (),
EndpointEvent::Connected { .. } => (),
EndpointEvent::Message { peer_url, .. } => {
let listen_ep = listen_ep.clone();
// Don't hold up our recv loop on the response
tokio::task::spawn(async move {
jost-s marked this conversation as resolved.
Show resolved Hide resolved
listen_ep.send(peer_url, Vec::new()).await.unwrap();
});
}
_ => panic!("unexpected: {evt:?}"),
}
}
panic!("listener task ended");
});

println!("listening at: {listen_url}");

let (timing_send, mut timing_recv) = tokio::sync::mpsc::unbounded_channel();
let mut timing_buf = Vec::new();
let mut node_count = 0;

loop {
let start = std::time::Instant::now();

node_count += 1;

let config = config.clone();
let timing_send = timing_send.clone();
let listen_url = listen_url.clone();
tokio::task::spawn(async move {
let (cli_ep, mut cli_recv) = Endpoint::new(config);
let _ = cli_ep.listen(listen_url.to_sig()).await.unwrap();

loop {
let start = std::time::Instant::now();
cli_ep.send(listen_url.clone(), Vec::new()).await.unwrap();
loop {
let evt = cli_recv.recv().await.unwrap();
match evt {
EndpointEvent::ListeningAddressOpen { .. } => (),
EndpointEvent::Connected { .. } => (),
EndpointEvent::Message { .. } => break,
_ => panic!("unexpected: {evt:?}"),
}
}
timing_send.send(start.elapsed().as_secs_f64()).unwrap();
}
});

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

timing_recv.recv_many(&mut timing_buf, usize::MAX).await;

let count = timing_buf.len();
let sum: f64 = timing_buf.iter().sum();
timing_buf.clear();

println!(
"{node_count} nodes, {count} ops, {:0.6} sec/op, in {:0.6}s",
sum / count as f64,
start.elapsed().as_secs_f64(),
);
}
}
12 changes: 7 additions & 5 deletions crates/tx5/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tx5_core::deps::serde_json;
#[cfg(feature = "backend-go-pion")]
mod go_pion;

mod mem;

/// Backend modules usable by tx5.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
Expand All @@ -22,8 +24,8 @@ pub enum BackendModule {
/// The Webrtc-RS-based backend.
WebrtcRs,

/// The mock backend.
Mock,
/// The mem backend.
Mem,
}

impl Default for BackendModule {
Expand All @@ -33,7 +35,7 @@ impl Default for BackendModule {
return Self::GoPion;
#[cfg(feature = "backend-webrtc-rs")]
return Self::WebrtcRs;
Self::Mock
Self::Mem
}
}

Expand All @@ -45,7 +47,7 @@ impl BackendModule {
Self::GoPion => go_pion::default_config(),
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => serde_json::json!({}),
Self::Mem => mem::default_config(),
}
}

Expand All @@ -61,7 +63,7 @@ impl BackendModule {
Self::GoPion => go_pion::connect(config, url, listener).await,
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => todo!(),
Self::Mem => mem::connect(config, url, listener).await,
}
}
}
Expand Down
Loading
Loading