diff --git a/Cargo.lock b/Cargo.lock index 43f02552..3015ea99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3860,10 +3860,12 @@ dependencies = [ name = "tx5-go-pion" version = "0.0.2-alpha" dependencies = [ + "futures", "parking_lot", "tokio", "tracing", "tracing-subscriber", + "tx5-core", "tx5-go-pion-sys", "tx5-go-pion-turn", "url", diff --git a/crates/tx5-core/src/lib.rs b/crates/tx5-core/src/lib.rs index 51dd339b..fe6e8a1b 100644 --- a/crates/tx5-core/src/lib.rs +++ b/crates/tx5-core/src/lib.rs @@ -47,3 +47,45 @@ pub mod ws { /// Pinned, boxed, future type alias. pub type BoxFut<'lt, T> = std::pin::Pin + 'lt + Send>>; + +/// Initial configuration. If you would like to change this from the +/// default, please call [Tx5InitConfig::set_as_global_default] +/// before creating any peer connections. +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[serde(crate = "deps::serde", rename_all = "camelCase")] +pub struct Tx5InitConfig { + /// The minimum ephemeral udp port to bind. Defaults to `1`. + pub ephemeral_udp_port_min: u16, + + /// The maximum ephemeral udp port to bind. Defaults to `65535`. + pub ephemeral_udp_port_max: u16, +} + +impl Default for Tx5InitConfig { + fn default() -> Self { + Self { + ephemeral_udp_port_min: 1, + ephemeral_udp_port_max: 65535, + } + } +} + +impl Tx5InitConfig { + /// Call this to set tx5_init defaults before creating any peer connections. + /// This will return an error if the settings have already been set. + pub fn set_as_global_default(&self) -> Result<()> { + TX5_INIT_CONFIG + .set(*self) + .map_err(|_| Error::id("Tx5InitAlreadySet")) + } + + /// Get the currently set Tx5InitConfig. WARNING! If it hasn't been + /// explicitly set, this get will trigger the config to be set + /// to default values. + pub fn get() -> Self { + *TX5_INIT_CONFIG.get_or_init(Tx5InitConfig::default) + } +} + +static TX5_INIT_CONFIG: once_cell::sync::OnceCell = + once_cell::sync::OnceCell::new(); diff --git a/crates/tx5-go-pion-sys/const.go b/crates/tx5-go-pion-sys/const.go index ab724dd7..9252be5a 100644 --- a/crates/tx5-go-pion-sys/const.go +++ b/crates/tx5-go-pion-sys/const.go @@ -16,6 +16,11 @@ const ( // - msg slot_c: utf8 info len TyOnTrace UintPtrT = 0xfffe + // Init MUST be called EXACTLY once before a peer con is created. + // - allowed contexts: Call, Response + // - msg slot_a: config buffer id + TyTx5Init UintPtrT = 0x7001 + // Request a go buffer be created / giving access to said buffer in resp. // - allowed contexts: Call, Response // - msg slot_a: buffer id diff --git a/crates/tx5-go-pion-sys/main.go b/crates/tx5-go-pion-sys/main.go index 47636d4c..af31f251 100644 --- a/crates/tx5-go-pion-sys/main.go +++ b/crates/tx5-go-pion-sys/main.go @@ -37,6 +37,7 @@ import "C" import ( "bytes" + "encoding/json" "fmt" "sync" "unsafe" @@ -110,12 +111,28 @@ func (c customLoggerFactory) NewLogger(subsystem string) logging.LeveledLogger { return customLogger{} } -var webrtc_api *webrtc.API +var webrtcApiMu sync.Mutex +var webrtcApi *webrtc.API -func init() { - webrtc_api = webrtc.NewAPI(webrtc.WithSettingEngine(webrtc.SettingEngine{ - LoggerFactory: customLoggerFactory{}, - })) +func setWebrtcApi(api *webrtc.API) { + if api == nil { + panic("CannotSetWebrtcApiToNil") + } + webrtcApiMu.Lock() + defer webrtcApiMu.Unlock() + if webrtcApi != nil { + panic("CannotSetWebrtcApiMultipleTimes") + } + webrtcApi = api +} + +func getWebrtcApi() *webrtc.API { + webrtcApiMu.Lock() + defer webrtcApiMu.Unlock() + if webrtcApi == nil { + panic("WebrtcApiIsUnset:CallTx5Init") + } + return webrtcApi } func MessageCbInvoke( @@ -167,6 +184,62 @@ func EmitEvent( ) } +type Tx5InitConfig struct { + EphemeralUdpPortMin *uint16 `json:"ephemeralUdpPortMin,omitempty"` + EphemeralUdpPortMax *uint16 `json:"ephemeralUdpPortMax,omitempty"` +} + +// Initialize the library with some optional configuration. +// You MUST call this exactly ONCE before opening any peer connections. +func CallTx5Init( + config_buf_id UintPtrT, + response_cb MessageCb, + response_usr unsafe.Pointer, +) { + buf := BufferFromPtr(config_buf_id) + buf.mu.Lock() + defer buf.mu.Unlock() + + if buf.closed { + panic("BufferClosed") + } + + var tmpConfig Tx5InitConfig + if err := json.Unmarshal(buf.buf.Bytes(), &tmpConfig); err != nil { + errStr := fmt.Sprintf("%s: %s", err, buf.buf.Bytes()) + panic(errStr) + } + + setting_engine := webrtc.SettingEngine{ + LoggerFactory: customLoggerFactory{}, + } + + var port_min uint16 = 1 + var port_max uint16 = 65535 + + if tmpConfig.EphemeralUdpPortMin != nil { + port_min = *tmpConfig.EphemeralUdpPortMin + } + + if tmpConfig.EphemeralUdpPortMax != nil { + port_max = *tmpConfig.EphemeralUdpPortMax + } + + setting_engine.SetEphemeralUDPPortRange(port_min, port_max) + + setWebrtcApi(webrtc.NewAPI(webrtc.WithSettingEngine(setting_engine))) + + MessageCbInvoke( + response_cb, + response_usr, + TyTx5Init, + 0, + 0, + 0, + 0, + ) +} + // register the MessageCb that will be invoked for events // //export OnEvent @@ -297,6 +370,8 @@ func callInner( } switch call_type { + case TyTx5Init: + CallTx5Init(slot_a, response_cb, response_usr) case TyBufferAlloc: CallBufferAlloc(response_cb, response_usr) case TyBufferAccess: diff --git a/crates/tx5-go-pion-sys/peerconnection.go b/crates/tx5-go-pion-sys/peerconnection.go index de1125c1..e334474b 100644 --- a/crates/tx5-go-pion-sys/peerconnection.go +++ b/crates/tx5-go-pion-sys/peerconnection.go @@ -87,7 +87,9 @@ func CallPeerConAlloc( config_parsed.Certificates = append(config_parsed.Certificates, *cert) } - con, err := webrtc_api.NewPeerConnection(config_parsed) + webrtcApi := getWebrtcApi() + + con, err := webrtcApi.NewPeerConnection(config_parsed) if err != nil { panic(err) } diff --git a/crates/tx5-go-pion-sys/src/lib.rs b/crates/tx5-go-pion-sys/src/lib.rs index 91ebdf8f..b5027f5f 100644 --- a/crates/tx5-go-pion-sys/src/lib.rs +++ b/crates/tx5-go-pion-sys/src/lib.rs @@ -361,6 +361,19 @@ impl Api { ); } + /// Initialize the library with some optional configuration. + /// You MUST call this exactly ONCE before opening any peer connections. + #[inline] + pub unsafe fn tx5_init( + &self, + config_buf_id: BufferId, + ) -> Result { + self.call(TY_TX_5_INIT, config_buf_id, 0, 0, 0, |r| match r { + Ok((_t, a, _b, _c, _d)) => Ok(a), + Err(e) => Err(e), + }) + } + /// Create a new buffer in go memory with given length, /// access the buffer's memory in the callback. #[inline] diff --git a/crates/tx5-go-pion/Cargo.toml b/crates/tx5-go-pion/Cargo.toml index df34af52..fccb91ac 100644 --- a/crates/tx5-go-pion/Cargo.toml +++ b/crates/tx5-go-pion/Cargo.toml @@ -11,13 +11,15 @@ keywords = ["holochain", "holo", "p2p", "webrtc", "networking"] categories = ["network-programming"] [dependencies] -tx5-go-pion-sys = { workspace = true } +futures = { workspace = true } parking_lot = { workspace = true } tokio = { workspace = true, features = [ "rt" ] } +tx5-go-pion-sys = { workspace = true } tracing = { workspace = true } url = { workspace = true } [dev-dependencies] +tx5-core = { workspace = true } tokio = { workspace = true, features = [ "full" ] } tracing-subscriber = { workspace = true } tx5-go-pion-turn = { workspace = true } diff --git a/crates/tx5-go-pion/src/lib.rs b/crates/tx5-go-pion/src/lib.rs index 2f99eb8e..606b0444 100644 --- a/crates/tx5-go-pion/src/lib.rs +++ b/crates/tx5-go-pion/src/lib.rs @@ -23,6 +23,39 @@ macro_rules! r2id { }; } +pub use tx5_core::Tx5InitConfig; + +#[allow(clippy::type_complexity)] +async fn tx5_init() -> std::result::Result<(), String> { + static SHARED: once_cell::sync::Lazy< + futures::future::Shared< + std::pin::Pin< + Box< + dyn std::future::Future< + Output = std::result::Result<(), String>, + > + + 'static + + Send, + >, + >, + >, + > = once_cell::sync::Lazy::new(|| { + futures::FutureExt::shared(Box::pin(async move { + let mut config = GoBufRef::json(Tx5InitConfig::get()); + let config = config.as_mut_ref().map_err(|e| format!("{e:?}"))?; + let config = config.0; + unsafe { + tx5_go_pion_sys::API + .tx5_init(config) + .map_err(|e| format!("{e:?}"))?; + } + >::Ok(()) + })) + }); + + SHARED.clone().await +} + use deps::*; pub use tx5_core::{Error, ErrorExt, Id, Result}; diff --git a/crates/tx5-go-pion/src/peer_con.rs b/crates/tx5-go-pion/src/peer_con.rs index c90bdf50..7c4114a8 100644 --- a/crates/tx5-go-pion/src/peer_con.rs +++ b/crates/tx5-go-pion/src/peer_con.rs @@ -113,6 +113,7 @@ impl PeerConnection { B: Into>, Cb: Fn(PeerConnectionEvent) + 'static + Send + Sync, { + tx5_init().await.map_err(Error::err)?; init_evt_manager(); r2id!(config); let cb: PeerConEvtCb = Arc::new(cb); diff --git a/crates/tx5-go-pion/tests/limit-ports.rs b/crates/tx5-go-pion/tests/limit-ports.rs new file mode 100644 index 00000000..2c849151 --- /dev/null +++ b/crates/tx5-go-pion/tests/limit-ports.rs @@ -0,0 +1,45 @@ +#[tokio::test(flavor = "multi_thread")] +async fn limit_ports() { + tx5_core::Tx5InitConfig { + ephemeral_udp_port_min: 40000, + ephemeral_udp_port_max: 40000, + } + .set_as_global_default() + .unwrap(); + + let (s, mut r) = tokio::sync::mpsc::unbounded_channel(); + + let mut con = tx5_go_pion::PeerConnection::new( + tx5_go_pion::PeerConnectionConfig::default(), + move |evt| { + let _ = s.send(evt); + }, + ) + .await + .unwrap(); + + let _dc = con + .create_data_channel(tx5_go_pion::DataChannelConfig::default()) + .await + .unwrap(); + let offer = con + .create_offer(tx5_go_pion::OfferConfig::default()) + .await + .unwrap(); + con.set_local_description(offer).await.unwrap(); + + tokio::time::timeout(std::time::Duration::from_secs(10), async { + while let Some(evt) = r.recv().await { + if let tx5_go_pion::PeerConnectionEvent::ICECandidate(mut ice) = evt + { + let ice = ice.to_vec().unwrap(); + let ice = String::from_utf8_lossy(&ice); + let ice = ice.split(' ').collect::>(); + assert_eq!("40000", ice[5]); + break; + } + } + }) + .await + .unwrap(); +} diff --git a/crates/tx5/src/lib.rs b/crates/tx5/src/lib.rs index 592112c6..1e899219 100644 --- a/crates/tx5/src/lib.rs +++ b/crates/tx5/src/lib.rs @@ -40,7 +40,7 @@ pub mod deps { use deps::{serde, serde_json}; use tx5_core::Uniq; -pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5Url}; +pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5InitConfig, Tx5Url}; pub mod actor;