Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abstract tx5 backend in prep for m̶o̶c̶k̶ mem backend #105

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/tx5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ backend-webrtc-rs = [ "tx5-connection/backend-webrtc-rs" ]

[dependencies]
base64 = { workspace = true }
futures = { workspace = true }
influxive-otel-atomic-obs = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
Expand Down
134 changes: 134 additions & 0 deletions crates/tx5/src/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//! Backend modules usable by tx5.

use std::io::Result;
use std::sync::Arc;

use futures::future::BoxFuture;

use crate::{Config, PubKey};
use tx5_core::deps::serde_json;
jost-s marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(feature = "backend-go-pion")]
mod go_pion;

/// Backend modules usable by tx5.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
#[cfg(feature = "backend-go-pion")]
/// The Go Pion-based backend.
GoPion,

#[cfg(feature = "backend-webrtc-rs")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this doing? Is it coming along at all or still not production ready?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I knew, but it doesn't seem to be the appropriate time to evaluate it. Once we have everything stable it'd be great to just go straight for the implementation so we can do a direct A/B comparison through our CI and manual usage.

/// The Webrtc-RS-based backend.
WebrtcRs,

/// The mock backend.
Mock,
}

impl Default for BackendModule {
#[allow(unreachable_code)]
fn default() -> Self {
#[cfg(feature = "backend-go-pion")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these already prevented from being enabled together?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are currently, but when we get to implementing it, I'd actually like them to not be exclusive, hence the fallback pattern in this default constructor. Not that we'd compile very often with both of them, but it would fit better into rust's features being additive paradigm.

return Self::GoPion;
#[cfg(feature = "backend-webrtc-rs")]
return Self::WebrtcRs;
Self::Mock
}
}
Comment on lines +14 to +38
Copy link
Contributor

@cdunster cdunster Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be achieved by deriving Default. The Mock variant becomes a bit ugly with listing all the features but I still think it's clearer (opinion) and removes the need for #[allow(unreachable_code)] 🙂

Suggested change
/// Backend modules usable by tx5.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
#[cfg(feature = "backend-go-pion")]
/// The Go Pion-based backend.
GoPion,
#[cfg(feature = "backend-webrtc-rs")]
/// The Webrtc-RS-based backend.
WebrtcRs,
/// The mock backend.
Mock,
}
impl Default for BackendModule {
#[allow(unreachable_code)]
fn default() -> Self {
#[cfg(feature = "backend-go-pion")]
return Self::GoPion;
#[cfg(feature = "backend-webrtc-rs")]
return Self::WebrtcRs;
Self::Mock
}
}
/// Backend modules usable by tx5.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum BackendModule {
#[cfg(feature = "backend-go-pion")]
#[default]
/// The Go Pion-based backend.
GoPion,
#[cfg(feature = "backend-webrtc-rs")]
#[default]
/// The Webrtc-RS-based backend.
WebrtcRs,
#[cfg_attr(
not(any(feature = "backend-go-pion", feature = "backend-webrtc-rs")),
default
)]
/// The mock backend.
Mock,
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope was to make the backend features non-exclusive, hence the fallback returns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in Callums suggestion when you compile with both features? Compiler error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would be a compiler error for multiple defined defaults. You could fix it with more cfg_attr like setting WebrtcRs to only default if go-pion isn't enabled:

#[cfg(feature = "backend-webrtc-rs")]
#[cfg_attr(not(feature = "backend-go-pion"), default)]
/// The Webrtc-RS-based backend.
WebrtcRs,

but I can imagine that this starts to look unruly as we possibly add more backends.


impl BackendModule {
/// Get a default version of the module-specific config.
pub fn default_config(&self) -> serde_json::Value {
match self {
#[cfg(feature = "backend-go-pion")]
Self::GoPion => go_pion::default_config(),
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => serde_json::json!({}),
}
}

/// Connect a new backend module endpoint.
pub async fn connect(
&self,
url: &str,
listener: bool,
config: &Arc<Config>,
) -> Result<(DynBackEp, DynBackEpRecv)> {
match self {
#[cfg(feature = "backend-go-pion")]
Self::GoPion => go_pion::connect(config, url, listener).await,
#[cfg(feature = "backend-webrtc-rs")]
Self::WebrtcRs => todo!(),
Self::Mock => todo!(),
}
}
}

/// Backend connection.
pub trait BackCon: 'static + Send + Sync {
/// Send data over this backend connection.
fn send(&self, data: Vec<u8>) -> BoxFuture<'_, Result<()>>;

/// Get the pub_key identifying this connection.
fn pub_key(&self) -> &PubKey;

/// Returns `true` if we successfully connected over webrtc.
// TODO - this isn't good encapsulation
fn is_using_webrtc(&self) -> bool;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the right interface look like here? Something like an inner that let's you get at the real connection and you have to know which ones it makes sense to ask about the use of webrtc as the caller?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this function for? Is it to check if it's connected or to check if it's using WebRTC? If it's the former then wouldn't is_connected be better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the latter. "Connections" can communicate directly through sbd, then they attempt to upgrade to a webrtc connection. So is_using_webrtc() returns true once/if that upgrade is successful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the right interface look like here?

Yeah, that's still TBD. Does it even make sense to have this call be separate? Maybe it should just be part of the stats call, although that would just defer the problem, because there's an encapsulation question about that too : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe that's a different topic, but why does tx5 need to know whether it's connected indirectly through sbd or directly through webrtc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember, it's informational or for unit tests. But I still submit this is something that doesn't have to be sorted out to merge this PR, since I'm not changing any functionality here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked out of curiosity, it wasn't meant as a subtle push to change it. I prefer being explicit about pushes ;-) Fine by me to keep as is.


/// Get connection statistics.
// TODO - this isn't good encapsulation
fn get_stats(&self) -> tx5_connection::ConnStats;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like it if we got a structure back based on the currently enabled back end. I was fine with tx5 vs tx2 just returning different data and I think the same is still true with go-pion vs webrtc-rs vs mock. Even if it is different based on the enabled back end, that's what I'd like to see presented up at the conductor level.

Maybe an enum could work and then the encapsulation is reasonable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure that could work. Alternately, we could have this return serde_json::Value and have concrete types exposed that can be transcoded into if you know the backend you're using.

For now, I was just going to use this and the mock backend will return a blank ConnStats...

}

/// Trait-object version of backend connection.
pub type DynBackCon = Arc<dyn BackCon + 'static + Send + Sync>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use trait objects instead of generics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type-erasure. The associated types (or sub-generics) quickly get ridiculous with the stack of traits like this.


/// Backend connection receiver.
pub trait BackConRecv: 'static + Send {
/// Receive data from this backend connection.
fn recv(&mut self) -> BoxFuture<'_, Option<Vec<u8>>>;
}

/// Trait-object version of backend connection receiver.
pub type DynBackConRecv = Box<dyn BackConRecv + 'static + Send>;

/// Pending connection.
pub trait BackWaitCon: 'static + Send {
/// Wait for the connection
fn wait(
&mut self,
// TODO - this isn't good encapsulation
recv_limit: Arc<tokio::sync::Semaphore>,
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecv)>>;

/// Get the pub_key identifying this connection.
fn pub_key(&self) -> &PubKey;
}

/// Trait-object version of backend wait con.
pub type DynBackWaitCon = Box<dyn BackWaitCon + 'static + Send>;

/// Backend endpoint.
pub trait BackEp: 'static + Send + Sync {
/// Establish an outgoing connection from this backend endpoint.
fn connect(&self, pub_key: PubKey)
-> BoxFuture<'_, Result<DynBackWaitCon>>;

/// Get the pub_key identifying this endpoint.
fn pub_key(&self) -> &PubKey;
}

/// Trait-object version of backend endpoint.
pub type DynBackEp = Arc<dyn BackEp + 'static + Send + Sync>;

/// Backend endpoint receiver.
pub trait BackEpRecv: 'static + Send {
/// Receive incoming connection from this backend endpoint.
fn recv(&mut self) -> BoxFuture<'_, Option<DynBackWaitCon>>;
}

/// Trait-object version of backend endpoint receiver.
jost-s marked this conversation as resolved.
Show resolved Hide resolved
pub type DynBackEpRecv = Box<dyn BackEpRecv + 'static + Send>;
137 changes: 137 additions & 0 deletions crates/tx5/src/backend/go_pion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! go pion backend

use super::*;
use crate::Config;

struct GoCon(tx5_connection::FramedConn);

impl BackCon for GoCon {
fn send(&self, data: Vec<u8>) -> BoxFuture<'_, Result<()>> {
Box::pin(async { self.0.send(data).await })
}

fn pub_key(&self) -> &PubKey {
self.0.pub_key()
}

fn is_using_webrtc(&self) -> bool {
self.0.is_using_webrtc()
}

fn get_stats(&self) -> tx5_connection::ConnStats {
self.0.get_stats()
}
}

struct GoConRecv(tx5_connection::FramedConnRecv);

impl BackConRecv for GoConRecv {
fn recv(&mut self) -> BoxFuture<'_, Option<Vec<u8>>> {
Box::pin(async { self.0.recv().await })
}
}

struct GoWaitCon {
pub_key: PubKey,
con: Option<Arc<tx5_connection::Conn>>,
con_recv: Option<tx5_connection::ConnRecv>,
}

impl BackWaitCon for GoWaitCon {
fn wait(
&mut self,
recv_limit: Arc<tokio::sync::Semaphore>,
) -> BoxFuture<'static, Result<(DynBackCon, DynBackConRecv)>> {
let con = self.con.take();
let con_recv = self.con_recv.take();
Box::pin(async move {
let (con, con_recv) = match (con, con_recv) {
(Some(con), Some(con_recv)) => (con, con_recv),
_ => return Err(std::io::Error::other("already awaited")),
};

con.ready().await;

let (con, con_recv) =
tx5_connection::FramedConn::new(con, con_recv, recv_limit)
.await?;

let con: DynBackCon = Arc::new(GoCon(con));
let con_recv: DynBackConRecv = Box::new(GoConRecv(con_recv));

Ok((con, con_recv))
})
}

fn pub_key(&self) -> &PubKey {
&self.pub_key
}
}

struct GoEp(tx5_connection::Hub);

impl BackEp for GoEp {
fn connect(
&self,
pub_key: PubKey,
) -> BoxFuture<'_, Result<DynBackWaitCon>> {
Box::pin(async {
let (con, con_recv) = self.0.connect(pub_key).await?;
let pub_key = con.pub_key().clone();
let wc: DynBackWaitCon = Box::new(GoWaitCon {
pub_key,
con: Some(con),
con_recv: Some(con_recv),
});
Ok(wc)
})
}

fn pub_key(&self) -> &PubKey {
self.0.pub_key()
}
}

struct GoEpRecv(tx5_connection::HubRecv);

impl BackEpRecv for GoEpRecv {
fn recv(&mut self) -> BoxFuture<'_, Option<DynBackWaitCon>> {
Box::pin(async {
let (con, con_recv) = self.0.accept().await?;
let pub_key = con.pub_key().clone();
let wc: DynBackWaitCon = Box::new(GoWaitCon {
pub_key,
con: Some(con),
con_recv: Some(con_recv),
});
Some(wc)
})
}
}

/// Get a default version of the module-specific config.
pub fn default_config() -> serde_json::Value {
serde_json::json!({})
}

/// Connect a new backend based on the tx5-go-pion backend.
pub async fn connect(
config: &Arc<Config>,
url: &str,
listener: bool,
) -> Result<(DynBackEp, DynBackEpRecv)> {
let webrtc_config = config.initial_webrtc_config.clone().into_bytes();
let sig_config = tx5_connection::tx5_signal::SignalConfig {
listener,
allow_plain_text: config.signal_allow_plain_text,
//max_connections: config.connection_count_max as usize,
max_idle: config.timeout,
..Default::default()
};
let (hub, hub_recv) =
tx5_connection::Hub::new(webrtc_config, url, Arc::new(sig_config))
.await?;
let ep: DynBackEp = Arc::new(GoEp(hub));
let ep_recv: DynBackEpRecv = Box::new(GoEpRecv(hub_recv));
Ok((ep, ep_recv))
}
12 changes: 12 additions & 0 deletions crates/tx5/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::*;
use tx5_core::deps::serde_json;

/// Tx5 endpoint configuration.
pub struct Config {
Expand Down Expand Up @@ -40,6 +41,15 @@ pub struct Config {
/// set the callbacks here, otherwise no preflight will
/// be sent nor validated. Default: None.
pub preflight: Option<(PreflightSendCb, PreflightCheckCb)>,

/// The backend connection module to use.
/// For the most part you should just leave this at the default.
pub backend_module: crate::backend::BackendModule,

/// The backend module config to use.
/// For the most part you should just leave this set at `None`,
/// to get the default backend config.
pub backend_module_config: Option<serde_json::Value>,
}

impl std::fmt::Debug for Config {
Expand Down Expand Up @@ -81,6 +91,8 @@ impl Default for Config {
backoff_start: std::time::Duration::from_secs(5),
backoff_max: std::time::Duration::from_secs(60),
preflight: None,
backend_module: crate::backend::BackendModule::default(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only be compile time configurable? Just thinking about writing tests against this, we'd have to build and test one way to run most tests and then build and test again with different features to use mocking

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your question seems to answer why I wouldn't want to switch this to compile-time configuration. This way, most of the tests can use the true backend, but I can also have some which exercise the mock without having to have multiple top-level calls to cargo test with different features.

backend_module_config: None,
}
}
}
15 changes: 2 additions & 13 deletions crates/tx5/src/ep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl EndpointRecv {
pub(crate) struct EpInner {
this: Weak<Mutex<EpInner>>,
config: Arc<Config>,
webrtc_config: Vec<u8>,
recv_limit: Arc<tokio::sync::Semaphore>,
evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
sig_map: HashMap<SigUrl, Arc<Sig>>,
Expand Down Expand Up @@ -117,7 +116,6 @@ impl EpInner {
Sig::new(
self.this.clone(),
self.config.clone(),
self.webrtc_config.clone(),
sig_url,
listener,
self.evt_send.clone(),
Expand Down Expand Up @@ -150,20 +148,14 @@ impl EpInner {
.clone()
}

pub fn accept_peer(
&mut self,
peer_url: PeerUrl,
conn: Arc<tx5_connection::Conn>,
conn_recv: tx5_connection::ConnRecv,
) {
pub fn accept_peer(&mut self, peer_url: PeerUrl, wc: DynBackWaitCon) {
self.peer_map.entry(peer_url.clone()).or_insert_with(|| {
Peer::new_accept(
self.config.clone(),
self.recv_limit.clone(),
self.this.clone(),
peer_url,
conn,
conn_recv,
wc,
self.evt_send.clone(),
)
});
Expand Down Expand Up @@ -201,12 +193,9 @@ impl Endpoint {
Self {
config: config.clone(),
inner: Arc::new_cyclic(|this| {
let webrtc_config =
config.initial_webrtc_config.as_bytes().to_vec();
Mutex::new(EpInner {
this: this.clone(),
config,
webrtc_config,
recv_limit,
evt_send,
sig_map: HashMap::default(),
Expand Down
3 changes: 3 additions & 0 deletions crates/tx5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub type PreflightCheckCb = Arc<
+ Sync,
>;

pub mod backend;
use backend::*;

mod config;
pub use config::*;

Expand Down
Loading
Loading