Skip to content

Commit

Permalink
mem backend for tx5 (#106)
Browse files Browse the repository at this point in the history
* abstract tx5 backend in prep for mock backend

* Update crates/tx5/src/backend/go_pion.rs

Co-authored-by: Callum Dunster <cdunster@users.noreply.github.com>

* address code review comments

* add functional mem backend for tx5

* address code review comment

* merge upstream

* test and fixes

* Apply suggestions from code review

Co-authored-by: Callum Dunster <cdunster@users.noreply.github.com>

* address code review comment

---------

Co-authored-by: Callum Dunster <cdunster@users.noreply.github.com>
  • Loading branch information
neonphog and cdunster authored Oct 16, 2024
1 parent 60c2321 commit b20d2cc
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sbd-server = "0.0.6-alpha"
serde = { version = "1.0.160", features = [ "derive", "rc" ] }
serde_json = { version = "1.0.96", features = [ "preserve_order" ] }
sha2 = "0.10.6"
slab = "0.4.9"
socket2 = { version = "0.5.2", features = [ "all" ] }
sodoken = "=0.0.901-alpha"
tempfile = "3.8.0"
Expand Down
1 change: 1 addition & 0 deletions crates/tx5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ base64 = { workspace = true }
futures = { workspace = true }
influxive-otel-atomic-obs = { workspace = true }
serde = { workspace = true }
slab = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing = { workspace = true }
tx5-connection = { workspace = true, default-features = false }
Expand Down
92 changes: 92 additions & 0 deletions crates/tx5/examples/mem-echo-stress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Mem backend timings. The hope is to have near ideal zero overhead.
//! At time of writing, you can see we're pretty close to exactly
//! proportional to node count for a given machine's resources:
//!
//! - 1 nodes, 33941 ops, 0.000028 sec/op, in 1.004478s
//! - 10 nodes, 174041 ops, 0.000056 sec/op, in 1.016977s
//! - 100 nodes, 175111 ops, 0.000580 sec/op, in 1.018943s
//! - 1000 nodes, 160266 ops, 0.006361 sec/op, in 1.019859s

use std::sync::Arc;
use tx5::{backend::*, *};

#[tokio::main(flavor = "multi_thread")]
async fn main() {
let fake_sig = SigUrl::parse("wss://fake.fake").unwrap();

let config = Arc::new(Config {
backend_module: BackendModule::Mem,
..Default::default()
});

let (listen_ep, mut listen_recv) = Endpoint::new(config.clone());
let listen_ep = Arc::new(listen_ep);
let listen_url = listen_ep.listen(fake_sig.clone()).await.unwrap();

tokio::task::spawn(async move {
while let Some(evt) = listen_recv.recv().await {
match evt {
EndpointEvent::ListeningAddressOpen { .. } => (),
EndpointEvent::Connected { .. } => (),
EndpointEvent::Message { peer_url, .. } => {
let listen_ep = listen_ep.clone();
// Don't hold up our recv loop on the response
tokio::task::spawn(async move {
listen_ep.send(peer_url, Vec::new()).await.unwrap();
});
}
_ => panic!("unexpected: {evt:?}"),
}
}
panic!("listener task ended");
});

println!("listening at: {listen_url}");

let (timing_send, mut timing_recv) = tokio::sync::mpsc::unbounded_channel();
let mut timing_buf = Vec::new();
let mut node_count = 0;

loop {
let start = std::time::Instant::now();

node_count += 1;

let config = config.clone();
let timing_send = timing_send.clone();
let listen_url = listen_url.clone();
tokio::task::spawn(async move {
let (cli_ep, mut cli_recv) = Endpoint::new(config);
let _ = cli_ep.listen(listen_url.to_sig()).await.unwrap();

loop {
let start = std::time::Instant::now();
cli_ep.send(listen_url.clone(), Vec::new()).await.unwrap();
loop {
let evt = cli_recv.recv().await.unwrap();
match evt {
EndpointEvent::ListeningAddressOpen { .. } => (),
EndpointEvent::Connected { .. } => (),
EndpointEvent::Message { .. } => break,
_ => panic!("unexpected: {evt:?}"),
}
}
timing_send.send(start.elapsed().as_secs_f64()).unwrap();
}
});

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

timing_recv.recv_many(&mut timing_buf, usize::MAX).await;

let count = timing_buf.len();
let sum: f64 = timing_buf.iter().sum();
timing_buf.clear();

println!(
"{node_count} nodes, {count} ops, {:0.6} sec/op, in {:0.6}s",
sum / count as f64,
start.elapsed().as_secs_f64(),
);
}
}
12 changes: 7 additions & 5 deletions crates/tx5/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tx5_core::deps::serde_json;
#[cfg(feature = "backend-go-pion")]
mod go_pion;

mod mem;

/// Backend modules usable by tx5.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
Expand All @@ -22,8 +24,8 @@ pub enum BackendModule {
/// The Webrtc-RS-based backend.
WebrtcRs,

/// The mock backend.
Mock,
/// The mem backend.
Mem,
}

impl Default for BackendModule {
Expand All @@ -33,7 +35,7 @@ impl Default for BackendModule {
return Self::GoPion;
#[cfg(feature = "backend-webrtc-rs")]
return Self::WebrtcRs;
Self::Mock
Self::Mem
}
}

Expand All @@ -45,7 +47,7 @@ impl BackendModule {
Self::GoPion => go_pion::default_config(),
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => serde_json::json!({}),
Self::Mem => mem::default_config(),
}
}

Expand All @@ -61,7 +63,7 @@ impl BackendModule {
Self::GoPion => go_pion::connect(config, url, listener).await,
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => todo!(),
Self::Mem => mem::connect(config, url, listener).await,
}
}
}
Expand Down
Loading

0 comments on commit b20d2cc

Please sign in to comment.