From 60c2321c353a256100f3981fdc8062b47d8a022b Mon Sep 17 00:00:00 2001 From: David Braden Date: Mon, 14 Oct 2024 13:54:16 -0600 Subject: [PATCH] =?UTF-8?q?abstract=20tx5=20backend=20in=20prep=20for=20m?= =?UTF-8?q?=CC=B6o=CC=B6c=CC=B6k=CC=B6=20=20mem=20backend=20(#105)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * abstract tx5 backend in prep for mock backend * Update crates/tx5/src/backend/go_pion.rs Co-authored-by: Callum Dunster * address code review comments * address code review comment --------- Co-authored-by: Callum Dunster --- crates/tx5/Cargo.toml | 1 + crates/tx5/src/backend.rs | 134 +++++++++++++++++++++++++++++ crates/tx5/src/backend/go_pion.rs | 138 ++++++++++++++++++++++++++++++ crates/tx5/src/config.rs | 12 +++ crates/tx5/src/ep.rs | 15 +--- crates/tx5/src/lib.rs | 3 + crates/tx5/src/peer.rs | 27 +++--- crates/tx5/src/sig.rs | 63 ++++---------- 8 files changed, 322 insertions(+), 71 deletions(-) create mode 100644 crates/tx5/src/backend.rs create mode 100644 crates/tx5/src/backend/go_pion.rs diff --git a/crates/tx5/Cargo.toml b/crates/tx5/Cargo.toml index 3c13e8aa..6613c7f4 100644 --- a/crates/tx5/Cargo.toml +++ b/crates/tx5/Cargo.toml @@ -21,6 +21,7 @@ backend-webrtc-rs = [ "tx5-connection/backend-webrtc-rs" ] [dependencies] base64 = { workspace = true } +futures = { workspace = true } influxive-otel-atomic-obs = { workspace = true } serde = { workspace = true } tokio = { workspace = true, features = [ "full" ] } diff --git a/crates/tx5/src/backend.rs b/crates/tx5/src/backend.rs new file mode 100644 index 00000000..b7e25060 --- /dev/null +++ b/crates/tx5/src/backend.rs @@ -0,0 +1,134 @@ +//! Backend modules usable by tx5. + +use std::io::Result; +use std::sync::Arc; + +use futures::future::BoxFuture; + +use crate::{Config, PubKey}; +use tx5_core::deps::serde_json; + +#[cfg(feature = "backend-go-pion")] +mod go_pion; + +/// Backend modules usable by tx5. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum BackendModule { + #[cfg(feature = "backend-go-pion")] + /// The Go Pion-based backend. + GoPion, + + #[cfg(feature = "backend-webrtc-rs")] + /// The Webrtc-RS-based backend. + WebrtcRs, + + /// The mock backend. + Mock, +} + +impl Default for BackendModule { + #[allow(unreachable_code)] + fn default() -> Self { + #[cfg(feature = "backend-go-pion")] + return Self::GoPion; + #[cfg(feature = "backend-webrtc-rs")] + return Self::WebrtcRs; + Self::Mock + } +} + +impl BackendModule { + /// Get a default version of the module-specific config. + pub fn default_config(&self) -> serde_json::Value { + match self { + #[cfg(feature = "backend-go-pion")] + Self::GoPion => go_pion::default_config(), + #[cfg(feature = "backend-webrtc-rs")] + Self::WebrtcRs => todo!(), + Self::Mock => serde_json::json!({}), + } + } + + /// Connect a new backend module endpoint. + pub async fn connect( + &self, + url: &str, + listener: bool, + config: &Arc, + ) -> Result<(DynBackEp, DynBackEpRecvCon)> { + match self { + #[cfg(feature = "backend-go-pion")] + Self::GoPion => go_pion::connect(config, url, listener).await, + #[cfg(feature = "backend-webrtc-rs")] + Self::WebrtcRs => todo!(), + Self::Mock => todo!(), + } + } +} + +/// Backend connection. +pub trait BackCon: 'static + Send + Sync { + /// Send data over this backend connection. + fn send(&self, data: Vec) -> BoxFuture<'_, Result<()>>; + + /// Get the pub_key identifying this connection. + fn pub_key(&self) -> &PubKey; + + /// Returns `true` if we successfully connected over webrtc. + // TODO - this isn't good encapsulation + fn is_using_webrtc(&self) -> bool; + + /// Get connection statistics. + // TODO - this isn't good encapsulation + fn get_stats(&self) -> tx5_connection::ConnStats; +} + +/// Trait-object version of backend connection. +pub type DynBackCon = Arc; + +/// Backend connection receiver. +pub trait BackConRecvData: 'static + Send { + /// Receive data from this backend connection. + fn recv(&mut self) -> BoxFuture<'_, Option>>; +} + +/// Trait-object version of backend connection receiver. +pub type DynBackConRecvData = Box; + +/// Pending connection. +pub trait BackWaitCon: 'static + Send { + /// Wait for the connection + fn wait( + &mut self, + // TODO - this isn't good encapsulation + recv_limit: Arc, + ) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>>; + + /// Get the pub_key identifying this connection. + fn pub_key(&self) -> &PubKey; +} + +/// Trait-object version of backend wait con. +pub type DynBackWaitCon = Box; + +/// Backend endpoint. +pub trait BackEp: 'static + Send + Sync { + /// Establish an outgoing connection from this backend endpoint. + fn connect(&self, pub_key: PubKey) + -> BoxFuture<'_, Result>; + + /// Get the pub_key identifying this endpoint. + fn pub_key(&self) -> &PubKey; +} + +/// Trait-object version of backend endpoint. +pub type DynBackEp = Arc; + +/// Backend endpoint receiver. +pub trait BackEpRecvCon: 'static + Send { + /// Receive incoming connection from this backend endpoint. + fn recv(&mut self) -> BoxFuture<'_, Option>; +} + +/// Trait-object version of backend endpoint receiver. +pub type DynBackEpRecvCon = Box; diff --git a/crates/tx5/src/backend/go_pion.rs b/crates/tx5/src/backend/go_pion.rs new file mode 100644 index 00000000..a7e8eea6 --- /dev/null +++ b/crates/tx5/src/backend/go_pion.rs @@ -0,0 +1,138 @@ +//! go pion backend + +use super::*; +use crate::Config; + +struct GoCon(tx5_connection::FramedConn); + +impl BackCon for GoCon { + fn send(&self, data: Vec) -> BoxFuture<'_, Result<()>> { + Box::pin(async { self.0.send(data).await }) + } + + fn pub_key(&self) -> &PubKey { + self.0.pub_key() + } + + fn is_using_webrtc(&self) -> bool { + self.0.is_using_webrtc() + } + + fn get_stats(&self) -> tx5_connection::ConnStats { + self.0.get_stats() + } +} + +struct GoConRecvData(tx5_connection::FramedConnRecv); + +impl BackConRecvData for GoConRecvData { + fn recv(&mut self) -> BoxFuture<'_, Option>> { + Box::pin(async { self.0.recv().await }) + } +} + +struct GoWaitCon { + pub_key: PubKey, + con: Option>, + con_recv: Option, +} + +impl BackWaitCon for GoWaitCon { + fn wait( + &mut self, + recv_limit: Arc, + ) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecvData)>> { + let con = self.con.take(); + let con_recv = self.con_recv.take(); + Box::pin(async move { + let (con, con_recv) = match (con, con_recv) { + (Some(con), Some(con_recv)) => (con, con_recv), + _ => return Err(std::io::Error::other("already awaited")), + }; + + con.ready().await; + + let (con, con_recv) = + tx5_connection::FramedConn::new(con, con_recv, recv_limit) + .await?; + + let con: DynBackCon = Arc::new(GoCon(con)); + let con_recv: DynBackConRecvData = + Box::new(GoConRecvData(con_recv)); + + Ok((con, con_recv)) + }) + } + + fn pub_key(&self) -> &PubKey { + &self.pub_key + } +} + +struct GoEp(tx5_connection::Hub); + +impl BackEp for GoEp { + fn connect( + &self, + pub_key: PubKey, + ) -> BoxFuture<'_, Result> { + Box::pin(async { + let (con, con_recv) = self.0.connect(pub_key).await?; + let pub_key = con.pub_key().clone(); + let wc: DynBackWaitCon = Box::new(GoWaitCon { + pub_key, + con: Some(con), + con_recv: Some(con_recv), + }); + Ok(wc) + }) + } + + fn pub_key(&self) -> &PubKey { + self.0.pub_key() + } +} + +struct GoEpRecvCon(tx5_connection::HubRecv); + +impl BackEpRecvCon for GoEpRecvCon { + fn recv(&mut self) -> BoxFuture<'_, Option> { + Box::pin(async { + let (con, con_recv) = self.0.accept().await?; + let pub_key = con.pub_key().clone(); + let wc: DynBackWaitCon = Box::new(GoWaitCon { + pub_key, + con: Some(con), + con_recv: Some(con_recv), + }); + Some(wc) + }) + } +} + +/// Get a default version of the module-specific config. +pub fn default_config() -> serde_json::Value { + serde_json::json!({}) +} + +/// Connect a new backend based on the tx5-go-pion backend. +pub async fn connect( + config: &Arc, + url: &str, + listener: bool, +) -> Result<(DynBackEp, DynBackEpRecvCon)> { + let webrtc_config = config.initial_webrtc_config.clone().into_bytes(); + let sig_config = tx5_connection::tx5_signal::SignalConfig { + listener, + allow_plain_text: config.signal_allow_plain_text, + //max_connections: config.connection_count_max as usize, + max_idle: config.timeout, + ..Default::default() + }; + let (hub, hub_recv) = + tx5_connection::Hub::new(webrtc_config, url, Arc::new(sig_config)) + .await?; + let ep: DynBackEp = Arc::new(GoEp(hub)); + let ep_recv: DynBackEpRecvCon = Box::new(GoEpRecvCon(hub_recv)); + Ok((ep, ep_recv)) +} diff --git a/crates/tx5/src/config.rs b/crates/tx5/src/config.rs index 249756d6..58813d6a 100644 --- a/crates/tx5/src/config.rs +++ b/crates/tx5/src/config.rs @@ -1,4 +1,5 @@ use crate::*; +use tx5_core::deps::serde_json; /// Tx5 endpoint configuration. pub struct Config { @@ -40,6 +41,15 @@ pub struct Config { /// set the callbacks here, otherwise no preflight will /// be sent nor validated. Default: None. pub preflight: Option<(PreflightSendCb, PreflightCheckCb)>, + + /// The backend connection module to use. + /// For the most part you should just leave this at the default. + pub backend_module: crate::backend::BackendModule, + + /// The backend module config to use. + /// For the most part you should just leave this set at `None`, + /// to get the default backend config. + pub backend_module_config: Option, } impl std::fmt::Debug for Config { @@ -81,6 +91,8 @@ impl Default for Config { backoff_start: std::time::Duration::from_secs(5), backoff_max: std::time::Duration::from_secs(60), preflight: None, + backend_module: crate::backend::BackendModule::default(), + backend_module_config: None, } } } diff --git a/crates/tx5/src/ep.rs b/crates/tx5/src/ep.rs index 7b773677..f5f9fe95 100644 --- a/crates/tx5/src/ep.rs +++ b/crates/tx5/src/ep.rs @@ -79,7 +79,6 @@ impl EndpointRecv { pub(crate) struct EpInner { this: Weak>, config: Arc, - webrtc_config: Vec, recv_limit: Arc, evt_send: tokio::sync::mpsc::Sender, sig_map: HashMap>, @@ -117,7 +116,6 @@ impl EpInner { Sig::new( self.this.clone(), self.config.clone(), - self.webrtc_config.clone(), sig_url, listener, self.evt_send.clone(), @@ -150,20 +148,14 @@ impl EpInner { .clone() } - pub fn accept_peer( - &mut self, - peer_url: PeerUrl, - conn: Arc, - conn_recv: tx5_connection::ConnRecv, - ) { + pub fn accept_peer(&mut self, peer_url: PeerUrl, wc: DynBackWaitCon) { self.peer_map.entry(peer_url.clone()).or_insert_with(|| { Peer::new_accept( self.config.clone(), self.recv_limit.clone(), self.this.clone(), peer_url, - conn, - conn_recv, + wc, self.evt_send.clone(), ) }); @@ -201,12 +193,9 @@ impl Endpoint { Self { config: config.clone(), inner: Arc::new_cyclic(|this| { - let webrtc_config = - config.initial_webrtc_config.as_bytes().to_vec(); Mutex::new(EpInner { this: this.clone(), config, - webrtc_config, recv_limit, evt_send, sig_map: HashMap::default(), diff --git a/crates/tx5/src/lib.rs b/crates/tx5/src/lib.rs index 0d523829..8e0dd700 100644 --- a/crates/tx5/src/lib.rs +++ b/crates/tx5/src/lib.rs @@ -54,6 +54,9 @@ pub type PreflightCheckCb = Arc< + Sync, >; +pub mod backend; +use backend::*; + mod config; pub use config::*; diff --git a/crates/tx5/src/peer.rs b/crates/tx5/src/peer.rs index 346ec670..36ece45b 100644 --- a/crates/tx5/src/peer.rs +++ b/crates/tx5/src/peer.rs @@ -3,7 +3,7 @@ use crate::*; use tx5_connection::*; enum MaybeReady { - Ready(Arc), + Ready(Arc), Wait(Arc), } @@ -65,8 +65,7 @@ impl Peer { recv_limit: Arc, ep: Weak>, peer_url: PeerUrl, - conn: Arc, - conn_recv: ConnRecv, + wc: DynBackWaitCon, evt_send: tokio::sync::mpsc::Sender, ) -> Arc { Arc::new_cyclic(|_this| { @@ -78,7 +77,7 @@ impl Peer { config, recv_limit, ep, - Some((conn, conn_recv)), + Some(wc), peer_url, evt_send, ready.clone(), @@ -195,7 +194,7 @@ async fn task( config: Arc, recv_limit: Arc, ep: Weak>, - conn: Option<(Arc, ConnRecv)>, + conn: Option, peer_url: PeerUrl, evt_send: tokio::sync::mpsc::Sender, ready: Arc>, @@ -206,18 +205,18 @@ async fn task( evt_send: evt_send.clone(), }; - let (conn, conn_recv) = match conn { + let mut wc = match conn { None => return, - Some(conn) => conn, + Some(wc) => wc, }; - conn.ready().await; - - let (conn, mut conn_recv) = - match FramedConn::new(conn, conn_recv, recv_limit).await { - Ok(conn) => conn, - Err(_) => return, - }; + let (conn, mut conn_recv) = match wc.wait(recv_limit).await { + Ok((conn, conn_recv)) => (conn, conn_recv), + Err(err) => { + tracing::debug!(?err, "connection wait error"); + return; + } + }; if let Some((pf_send, pf_check)) = &config.preflight { let pf_data = match pf_send(&peer_url).await { diff --git a/crates/tx5/src/sig.rs b/crates/tx5/src/sig.rs index 61969e12..81ac2695 100644 --- a/crates/tx5/src/sig.rs +++ b/crates/tx5/src/sig.rs @@ -1,10 +1,7 @@ use crate::*; -use tx5_connection::tx5_signal::*; -use tx5_connection::*; - enum MaybeReady { - Ready(Arc), + Ready(DynBackEp), Wait(Arc), } @@ -25,7 +22,6 @@ impl Sig { pub fn new( ep: Weak>, config: Arc, - webrtc_config: Vec, sig_url: SigUrl, listener: bool, evt_send: tokio::sync::mpsc::Sender, @@ -39,7 +35,6 @@ impl Sig { ep, this.clone(), config, - webrtc_config, sig_url.clone(), listener, evt_send, @@ -74,25 +69,21 @@ impl Sig { } } - pub async fn connect( - &self, - pub_key: PubKey, - ) -> Result<(Arc, ConnRecv)> { - let hub = match &*self.ready.lock().unwrap() { + pub async fn connect(&self, pub_key: PubKey) -> Result { + let ep = match &*self.ready.lock().unwrap() { MaybeReady::Ready(h) => h.clone(), _ => return Err(Error::other("not ready")), }; - hub.connect(pub_key).await + ep.connect(pub_key).await } } async fn connect_loop( config: Arc, - webrtc_config: Vec, sig_url: SigUrl, listener: bool, mut resp_url: Option>, -) -> (Hub, HubRecv) { +) -> (DynBackEp, DynBackEpRecvCon) { tracing::debug!( target: "NETAUDIT", ?config, @@ -105,17 +96,10 @@ async fn connect_loop( let mut wait = config.backoff_start; - let signal_config = Arc::new(SignalConfig { - listener, - allow_plain_text: config.signal_allow_plain_text, - max_idle: config.timeout, - ..Default::default() - }); - loop { match tokio::time::timeout( config.timeout, - Hub::new(webrtc_config.clone(), &sig_url, signal_config.clone()), + config.backend_module.connect(&sig_url, listener, &config), ) .await .map_err(Error::other) @@ -149,7 +133,7 @@ async fn connect_loop( } struct DropSig { - ep: Weak>, + inner: Weak>, sig_url: SigUrl, local_url: Option, sig: Weak, @@ -166,9 +150,9 @@ impl Drop for DropSig { a = "drop", ); - if let Some(ep_inner) = self.ep.upgrade() { + if let Some(inner) = self.inner.upgrade() { if let Some(sig) = self.sig.upgrade() { - ep_inner.lock().unwrap().drop_sig(sig); + inner.lock().unwrap().drop_sig(sig); } } } @@ -176,10 +160,9 @@ impl Drop for DropSig { #[allow(clippy::too_many_arguments)] async fn task( - ep: Weak>, + inner: Weak>, this: Weak, config: Arc, - webrtc_config: Vec, sig_url: SigUrl, listener: bool, evt_send: tokio::sync::mpsc::Sender, @@ -187,32 +170,24 @@ async fn task( resp_url: Option>, ) { let mut drop_g = DropSig { - ep: ep.clone(), + inner: inner.clone(), sig_url: sig_url.clone(), local_url: None, sig: this, }; - let (hub, mut hub_recv) = connect_loop( - config.clone(), - webrtc_config, - sig_url.clone(), - listener, - resp_url, - ) - .await; + let (ep, mut ep_recv) = + connect_loop(config.clone(), sig_url.clone(), listener, resp_url).await; - let local_url = sig_url.to_peer(hub.pub_key().clone()); + let local_url = sig_url.to_peer(ep.pub_key().clone()); drop_g.local_url = Some(local_url.clone()); - let hub = Arc::new(hub); - { let mut lock = ready.lock().unwrap(); if let MaybeReady::Wait(w) = &*lock { w.close(); } - *lock = MaybeReady::Ready(hub); + *lock = MaybeReady::Ready(ep); } drop(ready); @@ -233,10 +208,10 @@ async fn task( a = "connected", ); - while let Some((conn, conn_recv)) = hub_recv.accept().await { - if let Some(ep) = ep.upgrade() { - let peer_url = sig_url.to_peer(conn.pub_key().clone()); - ep.lock().unwrap().accept_peer(peer_url, conn, conn_recv); + while let Some(wc) = ep_recv.recv().await { + if let Some(inner) = inner.upgrade() { + let peer_url = sig_url.to_peer(wc.pub_key().clone()); + inner.lock().unwrap().accept_peer(peer_url, wc); } }