Skip to content

Commit

Permalink
allow config of pion udp ephemeral port range (#55)
Browse files Browse the repository at this point in the history
* allow config of pion udp ephemeral port range

* fix thread-unsafe global set/get

* generalize init config so it will work with other backends

* add limit-ports test and fix init race condition

* faster limit-ports test
  • Loading branch information
neonphog authored Sep 29, 2023
1 parent f513591 commit 6ebef8d
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions crates/tx5-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,45 @@ pub mod ws {
/// Pinned, boxed, future type alias.
pub type BoxFut<'lt, T> =
std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'lt + Send>>;

/// Initial configuration. If you would like to change this from the
/// default, please call [Tx5InitConfig::set_as_global_default]
/// before creating any peer connections.
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
#[serde(crate = "deps::serde", rename_all = "camelCase")]
pub struct Tx5InitConfig {
/// The minimum ephemeral udp port to bind. Defaults to `1`.
pub ephemeral_udp_port_min: u16,

/// The maximum ephemeral udp port to bind. Defaults to `65535`.
pub ephemeral_udp_port_max: u16,
}

impl Default for Tx5InitConfig {
fn default() -> Self {
Self {
ephemeral_udp_port_min: 1,
ephemeral_udp_port_max: 65535,
}
}
}

impl Tx5InitConfig {
/// Call this to set tx5_init defaults before creating any peer connections.
/// This will return an error if the settings have already been set.
pub fn set_as_global_default(&self) -> Result<()> {
TX5_INIT_CONFIG
.set(*self)
.map_err(|_| Error::id("Tx5InitAlreadySet"))
}

/// Get the currently set Tx5InitConfig. WARNING! If it hasn't been
/// explicitly set, this get will trigger the config to be set
/// to default values.
pub fn get() -> Self {
*TX5_INIT_CONFIG.get_or_init(Tx5InitConfig::default)
}
}

static TX5_INIT_CONFIG: once_cell::sync::OnceCell<Tx5InitConfig> =
once_cell::sync::OnceCell::new();
5 changes: 5 additions & 0 deletions crates/tx5-go-pion-sys/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ const (
// - msg slot_c: utf8 info len
TyOnTrace UintPtrT = 0xfffe

// Init MUST be called EXACTLY once before a peer con is created.
// - allowed contexts: Call, Response
// - msg slot_a: config buffer id
TyTx5Init UintPtrT = 0x7001

// Request a go buffer be created / giving access to said buffer in resp.
// - allowed contexts: Call, Response
// - msg slot_a: buffer id
Expand Down
85 changes: 80 additions & 5 deletions crates/tx5-go-pion-sys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import "C"

import (
"bytes"
"encoding/json"
"fmt"
"sync"
"unsafe"
Expand Down Expand Up @@ -110,12 +111,28 @@ func (c customLoggerFactory) NewLogger(subsystem string) logging.LeveledLogger {
return customLogger{}
}

var webrtc_api *webrtc.API
var webrtcApiMu sync.Mutex
var webrtcApi *webrtc.API

func init() {
webrtc_api = webrtc.NewAPI(webrtc.WithSettingEngine(webrtc.SettingEngine{
LoggerFactory: customLoggerFactory{},
}))
func setWebrtcApi(api *webrtc.API) {
if api == nil {
panic("CannotSetWebrtcApiToNil")
}
webrtcApiMu.Lock()
defer webrtcApiMu.Unlock()
if webrtcApi != nil {
panic("CannotSetWebrtcApiMultipleTimes")
}
webrtcApi = api
}

func getWebrtcApi() *webrtc.API {
webrtcApiMu.Lock()
defer webrtcApiMu.Unlock()
if webrtcApi == nil {
panic("WebrtcApiIsUnset:CallTx5Init")
}
return webrtcApi
}

func MessageCbInvoke(
Expand Down Expand Up @@ -167,6 +184,62 @@ func EmitEvent(
)
}

type Tx5InitConfig struct {
EphemeralUdpPortMin *uint16 `json:"ephemeralUdpPortMin,omitempty"`
EphemeralUdpPortMax *uint16 `json:"ephemeralUdpPortMax,omitempty"`
}

// Initialize the library with some optional configuration.
// You MUST call this exactly ONCE before opening any peer connections.
func CallTx5Init(
config_buf_id UintPtrT,
response_cb MessageCb,
response_usr unsafe.Pointer,
) {
buf := BufferFromPtr(config_buf_id)
buf.mu.Lock()
defer buf.mu.Unlock()

if buf.closed {
panic("BufferClosed")
}

var tmpConfig Tx5InitConfig
if err := json.Unmarshal(buf.buf.Bytes(), &tmpConfig); err != nil {
errStr := fmt.Sprintf("%s: %s", err, buf.buf.Bytes())
panic(errStr)
}

setting_engine := webrtc.SettingEngine{
LoggerFactory: customLoggerFactory{},
}

var port_min uint16 = 1
var port_max uint16 = 65535

if tmpConfig.EphemeralUdpPortMin != nil {
port_min = *tmpConfig.EphemeralUdpPortMin
}

if tmpConfig.EphemeralUdpPortMax != nil {
port_max = *tmpConfig.EphemeralUdpPortMax
}

setting_engine.SetEphemeralUDPPortRange(port_min, port_max)

setWebrtcApi(webrtc.NewAPI(webrtc.WithSettingEngine(setting_engine)))

MessageCbInvoke(
response_cb,
response_usr,
TyTx5Init,
0,
0,
0,
0,
)
}

// register the MessageCb that will be invoked for events
//
//export OnEvent
Expand Down Expand Up @@ -297,6 +370,8 @@ func callInner(
}

switch call_type {
case TyTx5Init:
CallTx5Init(slot_a, response_cb, response_usr)
case TyBufferAlloc:
CallBufferAlloc(response_cb, response_usr)
case TyBufferAccess:
Expand Down
4 changes: 3 additions & 1 deletion crates/tx5-go-pion-sys/peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func CallPeerConAlloc(
config_parsed.Certificates = append(config_parsed.Certificates, *cert)
}

con, err := webrtc_api.NewPeerConnection(config_parsed)
webrtcApi := getWebrtcApi()

con, err := webrtcApi.NewPeerConnection(config_parsed)
if err != nil {
panic(err)
}
Expand Down
13 changes: 13 additions & 0 deletions crates/tx5-go-pion-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,19 @@ impl Api {
);
}

/// Initialize the library with some optional configuration.
/// You MUST call this exactly ONCE before opening any peer connections.
#[inline]
pub unsafe fn tx5_init(
&self,
config_buf_id: BufferId,
) -> Result<PeerConId> {
self.call(TY_TX_5_INIT, config_buf_id, 0, 0, 0, |r| match r {
Ok((_t, a, _b, _c, _d)) => Ok(a),
Err(e) => Err(e),
})
}

/// Create a new buffer in go memory with given length,
/// access the buffer's memory in the callback.
#[inline]
Expand Down
4 changes: 3 additions & 1 deletion crates/tx5-go-pion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ keywords = ["holochain", "holo", "p2p", "webrtc", "networking"]
categories = ["network-programming"]

[dependencies]
tx5-go-pion-sys = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = [ "rt" ] }
tx5-go-pion-sys = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
tx5-core = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tracing-subscriber = { workspace = true }
tx5-go-pion-turn = { workspace = true }
33 changes: 33 additions & 0 deletions crates/tx5-go-pion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,39 @@ macro_rules! r2id {
};
}

pub use tx5_core::Tx5InitConfig;

#[allow(clippy::type_complexity)]
async fn tx5_init() -> std::result::Result<(), String> {
static SHARED: once_cell::sync::Lazy<
futures::future::Shared<
std::pin::Pin<
Box<
dyn std::future::Future<
Output = std::result::Result<(), String>,
>
+ 'static
+ Send,
>,
>,
>,
> = once_cell::sync::Lazy::new(|| {
futures::FutureExt::shared(Box::pin(async move {
let mut config = GoBufRef::json(Tx5InitConfig::get());
let config = config.as_mut_ref().map_err(|e| format!("{e:?}"))?;
let config = config.0;
unsafe {
tx5_go_pion_sys::API
.tx5_init(config)
.map_err(|e| format!("{e:?}"))?;
}
<std::result::Result<(), String>>::Ok(())
}))
});

SHARED.clone().await
}

use deps::*;

pub use tx5_core::{Error, ErrorExt, Id, Result};
Expand Down
1 change: 1 addition & 0 deletions crates/tx5-go-pion/src/peer_con.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl PeerConnection {
B: Into<GoBufRef<'a>>,
Cb: Fn(PeerConnectionEvent) + 'static + Send + Sync,
{
tx5_init().await.map_err(Error::err)?;
init_evt_manager();
r2id!(config);
let cb: PeerConEvtCb = Arc::new(cb);
Expand Down
45 changes: 45 additions & 0 deletions crates/tx5-go-pion/tests/limit-ports.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#[tokio::test(flavor = "multi_thread")]
async fn limit_ports() {
tx5_core::Tx5InitConfig {
ephemeral_udp_port_min: 40000,
ephemeral_udp_port_max: 40000,
}
.set_as_global_default()
.unwrap();

let (s, mut r) = tokio::sync::mpsc::unbounded_channel();

let mut con = tx5_go_pion::PeerConnection::new(
tx5_go_pion::PeerConnectionConfig::default(),
move |evt| {
let _ = s.send(evt);
},
)
.await
.unwrap();

let _dc = con
.create_data_channel(tx5_go_pion::DataChannelConfig::default())
.await
.unwrap();
let offer = con
.create_offer(tx5_go_pion::OfferConfig::default())
.await
.unwrap();
con.set_local_description(offer).await.unwrap();

tokio::time::timeout(std::time::Duration::from_secs(10), async {
while let Some(evt) = r.recv().await {
if let tx5_go_pion::PeerConnectionEvent::ICECandidate(mut ice) = evt
{
let ice = ice.to_vec().unwrap();
let ice = String::from_utf8_lossy(&ice);
let ice = ice.split(' ').collect::<Vec<_>>();
assert_eq!("40000", ice[5]);
break;
}
}
})
.await
.unwrap();
}
2 changes: 1 addition & 1 deletion crates/tx5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub mod deps {
use deps::{serde, serde_json};

use tx5_core::Uniq;
pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5Url};
pub use tx5_core::{Error, ErrorExt, Id, Result, Tx5InitConfig, Tx5Url};

pub mod actor;

Expand Down

0 comments on commit 6ebef8d

Please sign in to comment.