From 1c2a3956805982421cebfdc280e32b9bb55578bf Mon Sep 17 00:00:00 2001 From: neonphog Date: Tue, 7 May 2024 11:10:08 -0600 Subject: [PATCH] checkpoint --- Cargo.lock | 214 +++++++++++++++++++++++++++++++ Makefile | 63 ++------- crates/tx5-demo/src/main.rs | 112 ++++++++++------ crates/tx5/Cargo.toml | 3 + crates/tx5/benches/throughput.rs | 97 +++++++++----- crates/tx5/src/ep.rs | 85 +++++++++--- crates/tx5/src/lib.rs | 2 + crates/tx5/src/sig.rs | 9 ++ crates/tx5/src/stats.rs | 55 ++++++++ crates/tx5/src/test.rs | 53 ++++---- crates/tx5/src/url.rs | 24 +++- 11 files changed, 543 insertions(+), 174 deletions(-) create mode 100644 crates/tx5/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 080ea2a2..33302092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,6 +71,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.13" @@ -256,6 +262,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.95" @@ -289,6 +301,33 @@ dependencies = [ "serde", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -405,6 +444,44 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -414,12 +491,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -549,6 +651,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "either" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -785,6 +893,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1085,6 +1203,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "influxive-otel-atomic-obs" +version = "0.0.2-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac0ec101d28862a46c15d6140cec376b02725160dfcf57282952898a94cf35e" +dependencies = [ + "opentelemetry_api", +] + [[package]] name = "influxive-writer" version = "0.0.2-alpha.1" @@ -1124,6 +1251,26 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -1414,6 +1561,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "openssl" version = "0.10.64" @@ -1595,6 +1748,34 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" +[[package]] +name = "plotters" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c224ba00d7cadd4d5c660deaf2098e5e80e07846537c51f9cfa4be50c1fd45" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e76628b4d3a7581389a35d5b6e2139607ad7c75b17aed325f210aa91f4a9609" + +[[package]] +name = "plotters-svg" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38f6d39893cca0701371e3c27294f09797214b86f1fb951b89ade8ec04e2abab" +dependencies = [ + "plotters-backend", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1685,6 +1866,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2347,6 +2548,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2608,8 +2819,11 @@ name = "tx5" version = "0.0.9-alpha" dependencies = [ "base64 0.22.1", + "criterion", "futures", + "influxive-otel-atomic-obs", "sbd-server", + "serde", "tokio", "tracing", "tracing-subscriber", diff --git a/Makefile b/Makefile index 28f8c15b..0a028605 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # tx5 Makefile -.PHONY: all publish-all publish bump test static docs tools tool_rust tool_fmt tool_readme +.PHONY: all publish-all publish bump test unit static lint dep fmt docs SHELL = /usr/bin/env sh -eu @@ -73,20 +73,28 @@ bump: fi sed -i 's/^\(tx5[^=]*= { \|\)version = "[^"]*"/\1version = "$(ver)"/g' $$(find . -name Cargo.toml) -test: static tools +test: static unit + +unit: cargo build --all-targets RUST_BACKTRACE=1 RUST_LOG=error cargo test -- --nocapture -static: docs tools +static: dep fmt lint docs + @if [ "${CI}x" != "x" ]; then git diff --exit-code; fi + +lint: + cargo clippy -- -Dwarnings + +dep: @#uhhh... better way to do this? depend on cargo-tree? @if [ $$(grep 'name = "sodoken"' Cargo.lock | wc -l) != "1" ]; then echo "ERROR: multiple sodokens"; exit 127; fi + +fmt: cargo fmt -- --check - cargo clippy -- -Dwarnings (cd crates/tx5-go-pion-sys; go fmt) (cd crates/tx5-go-pion-turn; go fmt) - @if [ "${CI}x" != "x" ]; then git diff --exit-code; fi -docs: tools +docs: cp -f crates/tx5-core/src/README.tpl README.md cp -f crates/tx5-core/src/README.tpl crates/tx5-core/README.md cargo rdme --force -w tx5-core @@ -106,46 +114,3 @@ docs: tools cargo rdme --force -w tx5 cp -f crates/tx5-core/src/README.tpl crates/tx5-demo/README.md cargo rdme --force -w tx5-demo - -tools: tool_rust tool_fmt tool_clippy tool_readme - -tool_rust: - @if ! rustup --version >/dev/null 2>&1; then \ - echo "# Makefile # rustup not found, hopefully we're on stable"; \ - fi; - -tool_fmt: tool_rust - @if ! (cargo fmt --version); \ - then \ - if rustup --version >/dev/null 2>&1; then \ - echo "# Makefile # installing rustfmt with rustup"; \ - rustup component add rustfmt; \ - else \ - echo "# Makefile # rustup not found, cannot install rustfmt"; \ - exit 1; \ - fi; \ - else \ - echo "# Makefile # rustfmt ok"; \ - fi; - -tool_clippy: tool_rust - @if ! (cargo clippy --version); \ - then \ - if rustup --version >/dev/null 2>&1; then \ - echo "# Makefile # installing clippy with rustup"; \ - rustup component add clippy; \ - else \ - echo "# Makefile # rustup not found, cannot install clippy"; \ - exit 1; \ - fi; \ - else \ - echo "# Makefile # clippy ok"; \ - fi; - -tool_readme: tool_rust - @if ! (cargo rdme --version); \ - then \ - cargo install cargo-rdme --version 1.4.0 --locked; \ - else \ - echo "# Makefile # readme ok"; \ - fi; diff --git a/crates/tx5-demo/src/main.rs b/crates/tx5-demo/src/main.rs index 327fd829..945465ec 100644 --- a/crates/tx5-demo/src/main.rs +++ b/crates/tx5-demo/src/main.rs @@ -1,14 +1,15 @@ -#![deny(warnings)] #![deny(unsafe_code)] #![allow(clippy::needless_range_loop)] //! tx5-demo -const DASH_TX5: &[u8] = include_bytes!("influxive-dashboards/tx5.json"); +//const DASH_TX5: &[u8] = include_bytes!("influxive-dashboards/tx5.json"); + +use std::io::{Error, Result}; use clap::Parser; use std::collections::HashMap; use std::sync::Arc; -use tx5::{Config3, Ep3, Ep3Event, Result, Tx5Url}; +use tx5::{Config, Endpoint, EndpointEvent}; #[derive(Debug, Parser)] #[clap(name = "tx5-demo", version, about = "Holochain Tx5 WebRTC Demo Cli")] @@ -48,24 +49,27 @@ enum Message { impl Message { pub fn encode(&self) -> Result> { - serde_json::to_vec(&self).map_err(tx5::Error::err) + serde_json::to_vec(&self).map_err(Error::other) } pub fn decode(data: &[u8]) -> Result { Ok(serde_json::from_slice(data)?) } - pub fn hello(known_peers: &HashMap) -> Self { + pub fn hello(known_peers: &HashMap) -> Self { Message::Hello { known_peers: known_peers.keys().map(|u| u.to_string()).collect(), } } pub fn big() -> Self { + use base64::Engine; use rand::Rng; + let mut big = vec![0; (1024 * 1024 * 15 * 3) / 4]; // 15 MiB but base64 rand::thread_rng().fill(&mut big[..]); - let big = base64::encode(&big); + + let big = base64::engine::general_purpose::STANDARD.encode(&big); Message::Big(big) } } @@ -138,26 +142,26 @@ impl PeerInfo { } struct Node { - this_url: Tx5Url, - known_peers: HashMap, + this_url: tx5::PeerUrl, + known_peers: HashMap, } impl std::fmt::Debug for Node { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut dbg = f.debug_struct("Node"); - dbg.field("this_id", &self.this_url.id().unwrap()); + dbg.field("this_id", &self.this_url.pub_key()); for (url, i) in self.known_peers.iter() { - dbg.field(&format!("{:?}", url.id().unwrap()), i); + dbg.field(&format!("{:?}", url.pub_key()), i); } dbg.finish() } } impl Node { - pub fn new(this_url: Tx5Url, peer_urls: Vec) -> Self { + pub fn new(this_url: tx5::PeerUrl, peer_urls: Vec) -> Self { let known_peers = peer_urls .into_iter() - .map(|u| (Tx5Url::new(u).unwrap(), PeerInfo::new())) + .map(|u| (tx5::PeerUrl::parse(u).unwrap(), PeerInfo::new())) .collect::>(); Self { this_url, @@ -165,7 +169,7 @@ impl Node { } } - pub fn add_known_peer(&mut self, url: Tx5Url) { + pub fn add_known_peer(&mut self, url: tx5::PeerUrl) { if let std::collections::hash_map::Entry::Vacant(e) = self.known_peers.entry(url.clone()) { @@ -174,15 +178,20 @@ impl Node { } } - pub fn send(&self, ep: &Arc, rem_url: &Tx5Url, data: Vec) { + pub fn send( + &self, + ep: &Arc, + rem_url: &tx5::PeerUrl, + data: Vec, + ) { let ep = ep.clone(); let rem_url = rem_url.clone(); tokio::task::spawn(async move { let len = data.len(); - let id = rem_url.id().unwrap(); + let id = rem_url.pub_key(); - if let Err(err) = ep.send(rem_url, &data).await { + if let Err(err) = ep.send(rem_url.clone(), data).await { d!( error, "SEND_ERROR", @@ -194,7 +203,7 @@ impl Node { }); } - pub fn broadcast_hello(&self, ep: &Arc) -> Result<()> { + pub fn broadcast_hello(&self, ep: &Arc) -> Result<()> { let hello = Message::hello(&self.known_peers).encode()?; for url in self.known_peers.keys() { if url == &self.this_url { @@ -205,14 +214,14 @@ impl Node { Ok(()) } - pub fn recv_hello(&mut self, url: Tx5Url) -> Result<()> { + pub fn recv_hello(&mut self, url: tx5::PeerUrl) -> Result<()> { self.known_peers.get_mut(&url).unwrap().last_seen = std::time::Instant::now(); - d!(info, "RECV_HELLO", "{:?}", url.id().unwrap()); + d!(info, "RECV_HELLO", "{:?}", url.pub_key()); Ok(()) } - pub fn five_sec(&mut self, ep: &Arc) -> Result<()> { + pub fn five_sec(&mut self, ep: &Arc) -> Result<()> { { let this = self.known_peers.get_mut(&self.this_url).unwrap(); this.last_seen = std::time::Instant::now(); @@ -237,7 +246,7 @@ impl Node { Ok(()) } - pub fn thirty_sec(&mut self, ep: &Arc) -> Result<()> { + pub fn thirty_sec(&mut self, ep: &Arc) -> Result<()> { let mut v = Vec::new(); for peer in self.known_peers.keys() { @@ -266,6 +275,8 @@ async fn main_err() -> Result<()> { peer_urls, } = Args::parse(); + let mut _app_guard = None; + if let Some(trace_file) = trace_file { let _ = std::fs::remove_file(&trace_file); @@ -278,11 +289,13 @@ async fn main_err() -> Result<()> { .file_name() .expect("failed to get filename from trace_file"), ); - let (app, _app_guard) = + let (app, g) = tracing_appender::non_blocking::NonBlockingBuilder::default() .lossy(false) .finish(app); + _app_guard = Some(g); + tracing_subscriber::FmtSubscriber::builder() .with_env_filter( tracing_subscriber::filter::EnvFilter::builder() @@ -297,6 +310,7 @@ async fn main_err() -> Result<()> { .init(); } + /* let tmp = tempfile::tempdir()?; let (i, meter_provider) = @@ -313,12 +327,30 @@ async fn main_err() -> Result<()> { } opentelemetry_api::global::set_meter_provider(meter_provider); d!(info, "METRICS", "{}", i.get_host()); + */ - let sig_url = Tx5Url::new(sig_url)?; + let sig_url = tx5::SigUrl::parse(sig_url)?; - let (ep, mut evt) = tx5::Ep3::new(Arc::new(Config3::default())).await; + let (ep, mut evt) = tx5::Endpoint::new(Arc::new(Config { + signal_allow_plain_text: true, + ..Default::default() + })); let ep = Arc::new(ep); - let this_addr = ep.listen(sig_url.clone()).await?; + + d!(info, "LISTEN", "{sig_url}"); + + ep.listen(sig_url.clone()); + + let this_addr = loop { + if let Some(evt) = evt.recv().await { + d!(info, "PRE_EVENT", "{evt:?}"); + if let EndpointEvent::ListeningAddressOpen { local_url } = evt { + break local_url; + } + } else { + unreachable!(); + } + }; let mut node = Node::new(this_addr.clone(), peer_urls); @@ -337,7 +369,7 @@ async fn main_err() -> Result<()> { enum Cmd { FiveSec, ThirtySec, - EpEvt(Ep3Event), + EpEvt(EndpointEvent), } let (cmd_s, mut cmd_r) = tokio::sync::mpsc::unbounded_channel(); @@ -383,19 +415,21 @@ async fn main_err() -> Result<()> { node.five_sec(&ep)?; d!(info, "FIVE_SEC", "{node:?}"); tracing::info!( - "{}", - serde_json::to_string(&ep.get_stats().await).unwrap() + stats = %serde_json::to_string(&ep.get_stats()).unwrap() ); } Cmd::ThirtySec => node.thirty_sec(&ep)?, - Cmd::EpEvt(Ep3Event::Error(err)) => panic!("{err:?}"), - Cmd::EpEvt(Ep3Event::Connected { peer_url }) => { + Cmd::EpEvt(EndpointEvent::ListeningAddressOpen { .. }) + | Cmd::EpEvt(EndpointEvent::ListeningAddressClosed { .. }) => { + panic!("demo can't handle re-homing") + } + Cmd::EpEvt(EndpointEvent::Connected { peer_url }) => { node.add_known_peer(peer_url); } - Cmd::EpEvt(Ep3Event::Disconnected { peer_url }) => { - d!(info, "DISCONNECTED", "{:?}", peer_url.id().unwrap()); + Cmd::EpEvt(EndpointEvent::Disconnected { peer_url }) => { + d!(info, "DISCONNECTED", "{:?}", peer_url.pub_key()); } - Cmd::EpEvt(Ep3Event::Message { + Cmd::EpEvt(EndpointEvent::Message { peer_url, message, .. }) => { node.add_known_peer(peer_url.clone()); @@ -403,7 +437,9 @@ async fn main_err() -> Result<()> { Err(err) => d!(error, "RECV_ERROR", "{err:?}"), Ok(Message::Hello { known_peers: kp }) => { for peer in kp { - node.add_known_peer(Tx5Url::new(peer).unwrap()); + node.add_known_peer( + tx5::PeerUrl::parse(peer).unwrap(), + ); } node.recv_hello(peer_url)?; } @@ -413,7 +449,7 @@ async fn main_err() -> Result<()> { "RECV_BIG", "len:{} {:?}", d.as_bytes().len(), - peer_url.id().unwrap() + peer_url.pub_key(), ); } } @@ -431,7 +467,7 @@ impl AddrFile { Ok(Self(tokio::fs::File::create(path).await?)) } - pub async fn write(&mut self, this_addr: &Tx5Url) -> Result<()> { + pub async fn write(&mut self, this_addr: &tx5::PeerUrl) -> Result<()> { use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; @@ -439,7 +475,9 @@ impl AddrFile { self.0.set_len(0).await?; self.0 - .write_all(>::as_ref(this_addr).as_bytes()) + .write_all( + >::as_ref(this_addr).as_bytes(), + ) .await?; self.0.write_all(b"\n").await?; self.0.sync_all().await?; diff --git a/crates/tx5/Cargo.toml b/crates/tx5/Cargo.toml index 1b0b0c88..acc9ba78 100644 --- a/crates/tx5/Cargo.toml +++ b/crates/tx5/Cargo.toml @@ -21,6 +21,8 @@ backend-webrtc-rs = [ "tx5-connection/backend-webrtc-rs" ] [dependencies] base64 = { workspace = true } +influxive-otel-atomic-obs = { workspace = true } +serde = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing = { workspace = true } tx5-connection = { workspace = true, default-features = false } @@ -28,6 +30,7 @@ tx5-core = { workspace = true } url = { workspace = true } [dev-dependencies] +criterion = { workspace = true } futures = { workspace = true } sbd-server = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/tx5/benches/throughput.rs b/crates/tx5/benches/throughput.rs index 7d3d448a..9a568aeb 100644 --- a/crates/tx5/benches/throughput.rs +++ b/crates/tx5/benches/throughput.rs @@ -1,66 +1,94 @@ use criterion::{criterion_group, criterion_main, Criterion}; +use std::io::{Error, Result}; use std::sync::Arc; use tokio::sync::Mutex; use tx5::*; -use tx5_core::EventRecv; const DATA: &[u8] = &[0xdb; 4096]; struct Test { - _sig_srv_hnd: tx5_signal_srv::SrvHnd, + _server: sbd_server::SbdServer, cli_url1: PeerUrl, - ep1: Ep3, - ep_rcv1: EventRecv, + ep1: Endpoint, + ep_rcv1: EndpointRecv, cli_url2: PeerUrl, - ep2: Ep3, - ep_rcv2: EventRecv, + ep2: Endpoint, + ep_rcv2: EndpointRecv, } impl Test { pub async fn new() -> Self { - let mut srv_config = tx5_signal_srv::Config::default(); - srv_config.port = 0; - srv_config.demo = true; + let bind = vec![format!("127.0.0.1:0"), format!("[::1]:0")]; - let (_sig_srv_hnd, addr_list, _) = - tx5_signal_srv::exec_tx5_signal_srv(srv_config) - .await - .unwrap(); - - let sig_port = addr_list.get(0).unwrap().port(); + let config = Arc::new(sbd_server::Config { + bind, + disable_rate_limiting: true, + ..Default::default() + }); - let sig_url = - Tx5Url::new(format!("ws://localhost:{sig_port}")).unwrap(); + let server = sbd_server::SbdServer::new(config).await.unwrap(); - let config = Arc::new(Config3::default()); + let sig_url = SigUrl::parse(&format!( + "ws://{}", + server.bind_addrs().get(0).unwrap() + )) + .unwrap(); - let (ep1, mut ep_rcv1) = Ep3::new(config.clone()).await; - let cli_url1 = ep1.listen(sig_url.clone()).await.unwrap(); - let (ep2, mut ep_rcv2) = Ep3::new(config).await; - let cli_url2 = ep2.listen(sig_url).await.unwrap(); + let config = Arc::new(Config { + signal_allow_plain_text: true, + ..Default::default() + }); - ep1.send(cli_url2.clone(), b"hello").await.unwrap(); + let (ep1, mut ep_rcv1) = Endpoint::new(config.clone()); + ep1.listen(sig_url.clone()); + let (ep2, mut ep_rcv2) = Endpoint::new(config); + ep2.listen(sig_url); + + let (cli_url1, cli_url2) = tokio::join!( + async { + loop { + if let Some(EndpointEvent::ListeningAddressOpen { + local_url, + }) = ep_rcv1.recv().await + { + break local_url; + } + } + }, + async { + loop { + if let Some(EndpointEvent::ListeningAddressOpen { + local_url, + }) = ep_rcv2.recv().await + { + break local_url; + } + } + }, + ); + + ep1.send(cli_url2.clone(), b"hello".to_vec()).await.unwrap(); match ep_rcv2.recv().await { - Some(Ep3Event::Connected { .. }) => (), + Some(EndpointEvent::Connected { .. }) => (), oth => panic!("unexpected: {oth:?}"), } match ep_rcv2.recv().await { - Some(Ep3Event::Message { .. }) => (), + Some(EndpointEvent::Message { .. }) => (), oth => panic!("unexpected: {oth:?}"), } - ep2.send(cli_url1.clone(), b"world").await.unwrap(); + ep2.send(cli_url1.clone(), b"world".to_vec()).await.unwrap(); match ep_rcv1.recv().await { - Some(Ep3Event::Connected { .. }) => (), + Some(EndpointEvent::Connected { .. }) => (), oth => panic!("unexpected: {oth:?}"), } match ep_rcv1.recv().await { - Some(Ep3Event::Message { .. }) => (), + Some(EndpointEvent::Message { .. }) => (), oth => panic!("unexpected: {oth:?}"), } Self { - _sig_srv_hnd, + _server: server, cli_url1, ep1, ep_rcv1, @@ -72,18 +100,18 @@ impl Test { pub async fn test(&mut self) { let Test { - _sig_srv_hnd, cli_url1, ep1, ep_rcv1, cli_url2, ep2, ep_rcv2, + .. } = self; let _ = tokio::try_join!( - ep1.send(cli_url2.clone(), DATA,), - ep2.send(cli_url1.clone(), DATA,), + ep1.send(cli_url2.clone(), DATA.to_vec()), + ep2.send(cli_url1.clone(), DATA.to_vec()), async { txerr(ep_rcv1.recv().await) }, async { txerr(ep_rcv2.recv().await) }, ) @@ -91,10 +119,9 @@ impl Test { } } -fn txerr(v: Option) -> Result<()> { +fn txerr(v: Option) -> Result<()> { match v { - None => Err(Error::id("end")), - Some(Ep3Event::Error(err)) => Err(err.into()), + None => Err(Error::other("end")), _ => Ok(()), } } diff --git a/crates/tx5/src/ep.rs b/crates/tx5/src/ep.rs index a9c84fc9..ec323101 100644 --- a/crates/tx5/src/ep.rs +++ b/crates/tx5/src/ep.rs @@ -63,6 +63,16 @@ impl std::fmt::Debug for EndpointEvent { } } +/// Receiver for endpoint events. +pub struct EndpointRecv(tokio::sync::mpsc::Receiver); + +impl EndpointRecv { + /// Receive an endpoint event. + pub async fn recv(&mut self) -> Option { + self.0.recv().await + } +} + pub(crate) struct EpInner { this: Weak>, config: Arc, @@ -157,37 +167,33 @@ impl EpInner { pub struct Endpoint { config: Arc, inner: Arc>, - evt_recv: tokio::sync::Mutex>, } impl Endpoint { /// Construct a new tx5 endpoint. - pub fn new(config: Arc) -> Self { + pub fn new(config: Arc) -> (Self, EndpointRecv) { let recv_limit = Arc::new(tokio::sync::Semaphore::new( config.incoming_message_bytes_max as usize, )); let (evt_send, evt_recv) = tokio::sync::mpsc::channel(32); - Self { - config: config.clone(), - inner: Arc::new_cyclic(|this| { - Mutex::new(EpInner { - this: this.clone(), - config, - recv_limit, - evt_send, - sig_map: HashMap::default(), - peer_map: HashMap::default(), - }) - }), - evt_recv: tokio::sync::Mutex::new(evt_recv), - } - } - - /// Receive an event from this tx5 endpoint. - pub async fn recv(&self) -> Option { - self.evt_recv.lock().await.recv().await + ( + Self { + config: config.clone(), + inner: Arc::new_cyclic(|this| { + Mutex::new(EpInner { + this: this.clone(), + config, + recv_limit, + evt_send, + sig_map: HashMap::default(), + peer_map: HashMap::default(), + }) + }), + }, + EndpointRecv(evt_recv), + ) } /// Connect to a signal server as a listener, allowing incoming connections. @@ -242,4 +248,41 @@ impl Endpoint { let _ = task.await; } } + + /// Get a list of listening addresses (PeerUrls) at which this endpoint + /// is currently reachable. + pub fn get_listening_addresses(&self) -> Vec { + let all_sigs = self + .inner + .lock() + .unwrap() + .sig_map + .values() + .cloned() + .collect::>(); + + all_sigs + .into_iter() + .filter_map(|sig| { + if !sig.listener { + return None; + } + sig.get_peer_url() + }) + .collect() + } + + /// Get stats. + pub fn get_stats(&self) -> stats::Stats { + #[cfg(feature = "backend-go-pion")] + let backend = stats::StatsBackend::BackendGoPion; + #[cfg(feature = "backend-webrtc-rs")] + let backend = stats::StatsBackend::BackendWebrtcRs; + + stats::Stats { + backend, + peer_url_list: self.get_listening_addresses(), + connection_list: Vec::new(), + } + } } diff --git a/crates/tx5/src/lib.rs b/crates/tx5/src/lib.rs index 70b2798c..6d6ff669 100644 --- a/crates/tx5/src/lib.rs +++ b/crates/tx5/src/lib.rs @@ -63,6 +63,8 @@ pub(crate) use peer::*; mod ep; pub use ep::*; +pub mod stats; + #[cfg(test)] mod test; diff --git a/crates/tx5/src/sig.rs b/crates/tx5/src/sig.rs index 83be6b77..21301442 100644 --- a/crates/tx5/src/sig.rs +++ b/crates/tx5/src/sig.rs @@ -61,6 +61,15 @@ impl Sig { let _ = w.acquire().await; } + pub fn get_peer_url(&self) -> Option { + match &*self.ready.lock().unwrap() { + MaybeReady::Wait(_) => None, + MaybeReady::Ready(hub) => { + Some(self.sig_url.to_peer(hub.pub_key().clone())) + } + } + } + pub async fn connect( &self, pub_key: PubKey, diff --git a/crates/tx5/src/stats.rs b/crates/tx5/src/stats.rs new file mode 100644 index 00000000..b30e6924 --- /dev/null +++ b/crates/tx5/src/stats.rs @@ -0,0 +1,55 @@ +//! Return types for get_stats() call. + +/// Backend type. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub enum StatsBackend { + /// The rust ffi bindings to the golang pion webrtc library. + BackendGoPion, + + /// The rust webrtc library. + BackendWebrtcRs, +} + +/// Data for an individual connection. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct StatsConnection { + /// The public key of the remote peer. + pub pub_key: [u8; 32], + + /// The message count sent on this connection. + pub send_message_count: u64, + + /// The bytes sent on this connection. + pub send_bytes: u64, + + /// The message count received on this connection. + pub recv_message_count: u64, + + /// The bytes received on this connection + pub recv_bytes: u64, + + /// UNIX epoch timestamp in seconds when this connection was opened. + pub opened_at_s: u64, + + /// True if this connection has successfully upgraded to webrtc. + pub is_webrtc: bool, +} + +/// Stats type. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +#[non_exhaustive] +pub struct Stats { + /// Backend type. + pub backend: StatsBackend, + + /// List of PeerUrls this endpoint can currently be reached at. + pub peer_url_list: Vec, + + /// List of current connections. + pub connection_list: Vec, +} diff --git a/crates/tx5/src/test.rs b/crates/tx5/src/test.rs index 9ba9455c..045234dc 100644 --- a/crates/tx5/src/test.rs +++ b/crates/tx5/src/test.rs @@ -1,7 +1,7 @@ use crate::*; struct TestEp { - ep: Arc, + ep: Endpoint, task: tokio::task::JoinHandle<()>, recv: Option)>>, peer_url: Arc>, @@ -22,8 +22,7 @@ impl Drop for TestEp { } impl TestEp { - pub async fn new(ep: Endpoint) -> Self { - let ep = Arc::new(ep); + pub async fn new(ep: Endpoint, mut ep_recv: EndpointRecv) -> Self { let (send, recv) = tokio::sync::mpsc::unbounded_channel(); let peer_url = Arc::new(Mutex::new( @@ -35,38 +34,30 @@ impl TestEp { let (s, r) = tokio::sync::oneshot::channel(); let mut s = Some(s); - let weak = Arc::downgrade(&ep); let peer_url2 = peer_url.clone(); let task = tokio::task::spawn(async move { - while let Some(ep) = weak.upgrade() { - if let Some(evt) = ep.recv().await { - match evt { - EndpointEvent::ListeningAddressOpen { local_url } => { - *peer_url2.lock().unwrap() = local_url; - if let Some(s) = s.take() { - let _ = s.send(()); - } + while let Some(evt) = ep_recv.recv().await { + match evt { + EndpointEvent::ListeningAddressOpen { local_url } => { + *peer_url2.lock().unwrap() = local_url; + if let Some(s) = s.take() { + let _ = s.send(()); } - EndpointEvent::Disconnected { peer_url } => { - if send - .send(( - peer_url, - b"<<>>".to_vec(), - )) - .is_err() - { - break; - } + } + EndpointEvent::Disconnected { peer_url } => { + if send + .send((peer_url, b"<<>>".to_vec())) + .is_err() + { + break; } - EndpointEvent::Message { peer_url, message } => { - if send.send((peer_url, message)).is_err() { - break; - } + } + EndpointEvent::Message { peer_url, message } => { + if send.send((peer_url, message)).is_err() { + break; } - _ => (), } - } else { - break; + _ => (), } } }); @@ -131,10 +122,10 @@ impl Test { pub async fn ep(&self, config: Arc) -> TestEp { let sig_url = self.sig_url.clone().unwrap(); - let ep = Endpoint::new(config); + let (ep, ep_recv) = Endpoint::new(config); ep.listen(sig_url); - TestEp::new(ep).await + TestEp::new(ep, ep_recv).await } pub fn drop_sig(&mut self) { diff --git a/crates/tx5/src/url.rs b/crates/tx5/src/url.rs index 8961f977..350aa903 100644 --- a/crates/tx5/src/url.rs +++ b/crates/tx5/src/url.rs @@ -46,7 +46,7 @@ fn parse_url>(r: R) -> Result<(String, Option)> { } /// A signal server url. -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub struct SigUrl(Arc); impl std::ops::Deref for SigUrl { @@ -95,6 +95,28 @@ impl SigUrl { #[derive(Clone)] pub struct PeerUrl(Arc<(String, PubKey)>); +impl serde::Serialize for PeerUrl { + fn serialize( + &self, + serializer: S, + ) -> std::result::Result + where + S: serde::Serializer, + { + self.0 .0.serialize(serializer) + } +} + +impl<'de> serde::Deserialize<'de> for PeerUrl { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + let url: &'de str = serde::Deserialize::deserialize(deserializer)?; + PeerUrl::parse(url).map_err(serde::de::Error::custom) + } +} + impl std::ops::Deref for PeerUrl { type Target = str;