Skip to content

Commit

Permalink
initial checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed Mar 29, 2024
1 parent d36e4bb commit 79cf335
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rustls-native-certs = "0.6.2"
rustls-pemfile = "1.0.2"
serde = { version = "1.0.160", features = [ "derive", "rc" ] }
serde_json = { version = "1.0.96", features = [ "preserve_order" ] }
serde_repr = "0.1.18"
sha2 = "0.10.6"
socket2 = { version = "0.5.2", features = [ "all" ] }
sodoken = "=0.0.11"
Expand Down
6 changes: 4 additions & 2 deletions crates/tx5-go-pion-sys/peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func (peerCon *PeerCon) Free() {
}

type PeerConConfig struct {
ICEServers []webrtc.ICEServer `json:"iceServers,omitempty"`
Certificates []string `json:"certificates,omitempty"`
ICEServers []webrtc.ICEServer `json:"iceServers,omitempty"`
Certificates []string `json:"certificates,omitempty"`
ICETransportPolicy webrtc.ICETransportPolicy `json:"iceTransportPolicy,omitempty"`
}

func CallPeerConAlloc(
Expand Down Expand Up @@ -78,6 +79,7 @@ func CallPeerConAlloc(

var config_parsed webrtc.Configuration
config_parsed.ICEServers = tmpConfig.ICEServers
config_parsed.ICETransportPolicy = tmpConfig.ICETransportPolicy

for _, certPem := range tmpConfig.Certificates {
cert, err := webrtc.CertificateFromPEM(certPem)
Expand Down
1 change: 1 addition & 0 deletions crates/tx5-go-pion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ categories = ["network-programming"]
[dependencies]
futures = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tx5-go-pion-sys = { workspace = true }
tracing = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/tx5-go-pion/examples/turn-stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn main() {

let config = PeerConnectionConfig {
ice_servers: vec![ice],
..Default::default()
};

println!("running with: {config:?}");
Expand Down
39 changes: 39 additions & 0 deletions crates/tx5-go-pion/src/peer_con.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,51 @@ pub struct IceServer {
pub credential: Option<String>,
}

/// ICETransportPolicy events.
#[derive(
Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq,
)]
#[serde(rename_all = "camelCase")]
pub enum ICETransportPolicy {
/// <https://pkg.go.dev/github.com/pion/webrtc/v3#ICETransportPolicy>
All = 0x00,

/// <https://pkg.go.dev/github.com/pion/webrtc/v3#ICETransportPolicy>
Relay = 0x01,
}

impl Default for ICETransportPolicy {
fn default() -> Self {
Self::All
}
}

impl ICETransportPolicy {
/// Construct from a raw integer value.
pub fn from_raw(raw: usize) -> Self {
match raw {
0x00 => ICETransportPolicy::All,
0x01 => ICETransportPolicy::Relay,
_ => panic!("invalid raw ICETransportPolicy value: {raw}"),
}
}

/// Returns true if this ICETransportPolicy instance is "All".
pub fn is_default(&self) -> bool {
*self == ICETransportPolicy::All
}
}

/// Configuration for a go pion webrtc PeerConnection.
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(crate = "tx5_core::deps::serde", rename_all = "camelCase")]
pub struct PeerConnectionConfig {
/// ICE server list.
pub ice_servers: Vec<IceServer>,

/// ICE Transport Policy.
#[serde(default, skip_serializing_if = "ICETransportPolicy::is_default")]
pub ice_transport_policy: ICETransportPolicy,
}

impl From<PeerConnectionConfig> for GoBufRef<'static> {
Expand Down
1 change: 1 addition & 0 deletions crates/tx5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ openssl-sys = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = [ "macros", "rt", "rt-multi-thread", "sync" ] }
tracing-subscriber = { workspace = true }
tx5-go-pion-turn = { workspace = true }
tx5-signal-srv = { workspace = true }
rand = { workspace = true }
rand-utf8 = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/tx5/examples/turn_doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ async fn stun_check(check: &TurnCheck) {
username: None,
credential: None,
}],
..Default::default()
};
println!("CHECK_STUN: {config:?}");
let ice = gather_ice(config).await;
Expand All @@ -421,6 +422,7 @@ async fn turn_udp_check(check: &TurnCheck) {
username: Some(check.user.clone()),
credential: Some(check.cred.clone()),
}],
..Default::default()
},
)
.await;
Expand All @@ -438,6 +440,7 @@ async fn turn_tcp_plain_check(check: &TurnCheck) {
username: Some(check.user.clone()),
credential: Some(check.cred.clone()),
}],
..Default::default()
},
)
.await;
Expand All @@ -455,6 +458,7 @@ async fn turn_tcp_tls_check(check: &TurnCheck) {
username: Some(check.user.clone()),
credential: Some(check.cred.clone()),
}],
..Default::default()
},
)
.await;
Expand Down
26 changes: 23 additions & 3 deletions crates/tx5/src/ep3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,29 @@ impl Ep3 {

let sig = assert_sig(&self.ep, &sig_url).await?;

let peer = sig
.assert_peer(peer_url, peer_id, PeerDir::ActiveOrOutgoing)
.await?;
let peer = match sig
.assert_peer(
peer_url.clone(),
peer_id,
PeerDir::ActiveOrOutgoing,
IceFilter::StunOnly,
)
.await
{
Ok(peer) => peer,
Err(err) => {
tracing::error!(?err, "@@@@-test-@@@@");
eprintln!("{err:?} @@@@-test-@@@@");

sig.assert_peer(
peer_url,
peer_id,
PeerDir::ActiveOrOutgoing,
IceFilter::TurnOkay,
)
.await?
}
};

peer.send(data).await
}
Expand Down
21 changes: 20 additions & 1 deletion crates/tx5/src/ep3/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ impl Drop for Peer {
}
}

/// Removes all iceServer urls that use the turn protocol.
/// Then removes all blocks that have no iceServer urls.
fn filter_turn(
ice: &serde_json::Value,
) -> Result<tx5_go_pion::PeerConnectionConfig> {
let mut ice: tx5_go_pion::PeerConnectionConfig =
serde_json::from_str(&serde_json::to_string(ice)?)?;
ice.ice_servers.retain_mut(|x| {
x.urls.retain_mut(|x| !x.starts_with("turn"));
!x.urls.is_empty()
});
Ok(ice)
}

impl Peer {
#[allow(clippy::too_many_arguments)]
pub async fn new(
Expand All @@ -97,6 +111,7 @@ impl Peer {
ice_servers: Arc<serde_json::Value>,
new_peer_dir: NewPeerDir,
mut peer_cmd_recv: EventRecv<PeerCmd>,
ice_filter: IceFilter,
) -> CRes<Arc<Self>> {
use influxive_otel_atomic_obs::MeterExt;
use opentelemetry_api::metrics::MeterProvider;
Expand Down Expand Up @@ -155,7 +170,11 @@ impl Peer {
Some(sig_hnd) => sig_hnd,
};

let peer_config = BackBuf::from_json(ice_servers)?;
let peer_config = if ice_filter == IceFilter::StunOnly {
BackBuf::from_json(filter_turn(&ice_servers)?)?
} else {
BackBuf::from_json(&ice_servers)?
};

let (peer, mut peer_recv) =
tx5_go_pion::PeerConnection::new(peer_config.imp.buf).await?;
Expand Down
9 changes: 9 additions & 0 deletions crates/tx5/src/ep3/sig.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use super::*;

#[derive(Clone, Copy, Debug, PartialEq)]
pub(crate) enum IceFilter {
StunOnly,
TurnOkay,
}

type PeerCmdSend = EventSend<PeerCmd>;
type AnswerSend =
Arc<Mutex<Option<tokio::sync::oneshot::Sender<serde_json::Value>>>>;
Expand Down Expand Up @@ -125,6 +131,7 @@ impl Sig {
peer_url,
rem_pub,
PeerDir::Incoming { offer },
IceFilter::StunOnly,
)
.await;
});
Expand Down Expand Up @@ -215,6 +222,7 @@ impl Sig {
peer_url: PeerUrl,
peer_id: Id,
peer_dir: PeerDir,
ice_filter: IceFilter,
) -> CRes<Arc<Peer>> {
if peer_id == self.sig.this_id {
return Err(Error::str("Cannot establish connection with remote peer id matching this id").into());
Expand Down Expand Up @@ -281,6 +289,7 @@ impl Sig {
ice_servers,
new_peer_dir,
peer_cmd_recv,
ice_filter,
),
),
)
Expand Down
62 changes: 62 additions & 0 deletions crates/tx5/src/ep3/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ struct Test {
sig_srv_hnd: Option<tx5_signal_srv::SrvHnd>,
sig_port: Option<u16>,
sig_url: Option<SigUrl>,
ice: tx5_go_pion::PeerConnectionConfig,
_turn: tx5_go_pion_turn::Tx5TurnServer,
}

impl Test {
pub async fn new() -> Self {
Self::with_config(false).await
}

pub async fn with_config(relay: bool) -> Self {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(
tracing_subscriber::filter::EnvFilter::from_default_env(),
Expand All @@ -18,10 +24,36 @@ impl Test {

let _ = tracing::subscriber::set_global_default(subscriber);

let (ice, _turn) = tx5_go_pion_turn::test_turn_server().await.unwrap();
let mut ice: tx5_go_pion::PeerConnectionConfig =
serde_json::from_str(&format!("{{\"iceServers\":[{ice}]}}"))
.unwrap();
let url = ice.ice_servers.get(0).unwrap().urls.get(0).unwrap().clone();
let mut url = url.split("?");
let url = url.next().unwrap();
let mut url = url.split(":");
url.next().unwrap();
let a = url.next().unwrap();
let p = url.next().unwrap();
ice.ice_servers.insert(
0,
tx5_go_pion::IceServer {
urls: vec![format!("stun:{a}:{p}"), format!("stun:{a}:{p}")],
username: None,
credential: None,
},
);
if relay {
ice.ice_transport_policy = tx5_go_pion::ICETransportPolicy::Relay;
}
//println!("iceServers: {ice:#?}");

let mut this = Test {
sig_srv_hnd: None,
sig_port: None,
sig_url: None,
ice,
_turn,
};

this.restart_sig().await;
Expand Down Expand Up @@ -51,6 +83,9 @@ impl Test {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;

let mut srv_config = tx5_signal_srv::Config::default();
srv_config.ice_servers =
serde_json::from_str(&serde_json::to_string(&self.ice).unwrap())
.unwrap();
srv_config.port = self.sig_port.unwrap_or(0);

let (sig_srv_hnd, addr_list, _) =
Expand All @@ -74,6 +109,33 @@ impl Test {
}
}

#[tokio::test(flavor = "multi_thread")]
async fn ep3_turn_fallback_works() {
let mut config = Config3::default();
config.timeout = std::time::Duration::from_secs(5);
let config = Arc::new(config);
let test = Test::with_config(true).await;

let (_cli_url1, ep1, _ep1_recv) = test.ep(config.clone()).await;
let (cli_url2, _ep2, mut ep2_recv) = test.ep(config).await;

ep1.send(cli_url2, b"hello").await.unwrap();

let res = ep2_recv.recv().await.unwrap();
match res {
Ep3Event::Connected { .. } => (),
_ => panic!(),
}

let res = ep2_recv.recv().await.unwrap();
match res {
Ep3Event::Message { message, .. } => {
assert_eq!(&b"hello"[..], &message);
}
oth => panic!("{oth:?}"),
}
}

#[tokio::test(flavor = "multi_thread")]
async fn ep3_sanity() {
let config = Arc::new(Config3::default());
Expand Down

0 comments on commit 79cf335

Please sign in to comment.