From 659212a28d07c275b9e962fa304ba9d8900c691b Mon Sep 17 00:00:00 2001 From: yngrtc Date: Sun, 10 Mar 2024 11:55:35 -0700 Subject: [PATCH] fix agent_test.rs (WIP) --- rtc-ice/src/agent/agent_config.rs | 4 +- rtc-ice/src/agent/agent_test.rs | 842 +++++++++++------------- rtc-ice/src/agent/mod.rs | 7 +- rtc-ice/src/candidate/candidate_pair.rs | 9 +- 4 files changed, 402 insertions(+), 460 deletions(-) diff --git a/rtc-ice/src/agent/agent_config.rs b/rtc-ice/src/agent/agent_config.rs index 6a4ac09..f26ac3e 100644 --- a/rtc-ice/src/agent/agent_config.rs +++ b/rtc-ice/src/agent/agent_config.rs @@ -39,8 +39,8 @@ pub(crate) const MAX_BINDING_REQUEST_TIMEOUT: Duration = Duration::from_millis(4 pub(crate) fn default_candidate_types() -> Vec { vec![ CandidateType::Host, - //CandidateType::ServerReflexive, - //CandidateType::Relay, + CandidateType::ServerReflexive, + CandidateType::Relay, ] } diff --git a/rtc-ice/src/agent/agent_test.rs b/rtc-ice/src/agent/agent_test.rs index 5eaa184..f1a8730 100644 --- a/rtc-ice/src/agent/agent_test.rs +++ b/rtc-ice/src/agent/agent_test.rs @@ -1,58 +1,47 @@ use std::net::Ipv4Addr; use std::ops::Sub; use std::str::FromStr; +use std::sync::Arc; -use async_trait::async_trait; use stun::message::*; use stun::textattrs::Username; -use util::vnet::*; -use util::Conn; use waitgroup::{WaitGroup, Worker}; -use super::agent_vnet_test::*; use super::*; -use crate::agent::agent_transport_test::pipe; -use crate::candidate::candidate_base::*; +use crate::attributes::{ + control::AttrControlling, priority::PriorityAttr, use_candidate::UseCandidateAttr, +}; use crate::candidate::candidate_host::*; use crate::candidate::candidate_peer_reflexive::*; +use crate::candidate::candidate_relay::CandidateRelayConfig; use crate::candidate::candidate_server_reflexive::*; -use crate::control::AttrControlling; -use crate::priority::PriorityAttr; -use crate::use_candidate::UseCandidateAttr; +use crate::candidate::*; +use crate::network_type::supported_network_types; -#[tokio::test] -async fn test_pair_search() -> Result<()> { +#[test] +fn test_pair_search() -> Result<()> { let config = AgentConfig::default(); - let a = Agent::new(config).await?; + let mut a = Agent::new(config)?; - { - { - let checklist = a.internal.agent_conn.checklist.lock().await; - assert!( - checklist.is_empty(), - "TestPairSearch is only a valid test if a.validPairs is empty on construction" - ); - } + assert!( + a.candidate_pairs.is_empty(), + "TestPairSearch is only a valid test if a.validPairs is empty on construction" + ); - let cp = a - .internal - .agent_conn - .get_best_available_candidate_pair() - .await; - assert!(cp.is_none(), "No Candidate pairs should exist"); - } + let cp = a.get_best_available_candidate_pair(); + assert!(cp.is_none(), "No Candidate pairs should exist"); - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_pair_priority() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_pair_priority() -> Result<()> { + let mut a = Agent::new(AgentConfig::default())?; let host_config = CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.1.1".to_owned(), port: 19216, @@ -61,10 +50,11 @@ async fn test_pair_priority() -> Result<()> { }, ..Default::default() }; - let host_local: Arc = Arc::new(host_config.new_candidate_host()?); + let host_local = host_config.new_candidate_host()?; + a.local_candidates.push(host_local); let relay_config = CandidateRelayConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.4".to_owned(), port: 12340, @@ -79,7 +69,7 @@ async fn test_pair_priority() -> Result<()> { let relay_remote = relay_config.new_candidate_relay()?; let srflx_config = CandidateServerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19218, @@ -93,7 +83,7 @@ async fn test_pair_priority() -> Result<()> { let srflx_remote = srflx_config.new_candidate_server_reflexive()?; let prflx_config = CandidatePeerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19217, @@ -107,7 +97,7 @@ async fn test_pair_priority() -> Result<()> { let prflx_remote = prflx_config.new_candidate_peer_reflexive()?; let host_config = CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.5".to_owned(), port: 12350, @@ -118,39 +108,32 @@ async fn test_pair_priority() -> Result<()> { }; let host_remote = host_config.new_candidate_host()?; - let remotes: Vec> = vec![ - Arc::new(relay_remote), - Arc::new(srflx_remote), - Arc::new(prflx_remote), - Arc::new(host_remote), - ]; + let remotes: Vec = vec![relay_remote, srflx_remote, prflx_remote, host_remote]; + for remote in remotes { + a.remote_candidates.push(remote); + } { - for remote in remotes { - if a.internal.find_pair(&host_local, &remote).await.is_none() { - a.internal - .add_pair(host_local.clone(), remote.clone()) - .await; + let local = 0; + for remote in 0..a.remote_candidates.len() { + if a.find_pair(local, remote).is_none() { + a.add_pair(local, remote); } - if let Some(p) = a.internal.find_pair(&host_local, &remote).await { - p.state - .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst); + if let Some(p) = a.find_pair(local, remote) { + a.candidate_pairs[p].state = CandidatePairState::Succeeded; } - if let Some(best_pair) = a - .internal - .agent_conn - .get_best_available_candidate_pair() - .await - { + if let Some(best_pair) = a.get_best_available_candidate_pair() { assert_eq!( - best_pair.to_string(), - CandidatePair { - remote: remote.clone(), - local: host_local.clone(), - ..Default::default() - } + a.candidate_pairs[best_pair].to_string(), + CandidatePair::new( + local, + remote, + a.local_candidates[local].priority(), + a.remote_candidates[remote].priority(), + a.is_controlling, + ) .to_string(), "Unexpected bestPair {best_pair} (expected remote: {remote})", ); @@ -160,47 +143,41 @@ async fn test_pair_priority() -> Result<()> { } } - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_agent_get_stats() -> Result<()> { - let (conn_a, conn_b, agent_a, agent_b) = pipe(None, None).await?; - assert_eq!(agent_a.get_bytes_received(), 0); - assert_eq!(agent_a.get_bytes_sent(), 0); - assert_eq!(agent_b.get_bytes_received(), 0); - assert_eq!(agent_b.get_bytes_sent(), 0); +fn pipe( + default_config0: Option, + default_config1: Option, +) -> Result<(Agent, Agent)> { + let mut cfg0 = if let Some(cfg) = default_config0 { + cfg + } else { + AgentConfig::default() + }; + cfg0.urls = vec![]; - let _na = conn_a.send(&[0u8; 10]).await?; - let mut buf = vec![0u8; 10]; - let _nb = conn_b.recv(&mut buf).await?; + let a_agent = Agent::new(cfg0)?; - assert_eq!(agent_a.get_bytes_received(), 0); - assert_eq!(agent_a.get_bytes_sent(), 10); + let mut cfg1 = if let Some(cfg) = default_config1 { + cfg + } else { + AgentConfig::default() + }; + cfg1.urls = vec![]; - assert_eq!(agent_b.get_bytes_received(), 10); - assert_eq!(agent_b.get_bytes_sent(), 0); + let b_agent = Agent::new(cfg1)?; - Ok(()) + Ok((a_agent, b_agent)) } -#[tokio::test] -async fn test_on_selected_candidate_pair_change() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; - let (callback_called_tx, mut callback_called_rx) = mpsc::channel::<()>(1); - let callback_called_tx = Arc::new(Mutex::new(Some(callback_called_tx))); - let cb: OnSelectedCandidatePairChangeHdlrFn = Box::new(move |_, _| { - let callback_called_tx_clone = Arc::clone(&callback_called_tx); - Box::pin(async move { - let mut tx = callback_called_tx_clone.lock().await; - tx.take(); - }) - }); - a.on_selected_candidate_pair_change(cb); +#[test] +fn test_on_selected_candidate_pair_change() -> Result<()> { + let mut a = Agent::new(AgentConfig::default())?; let host_config = CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.1.1".to_owned(), port: 19216, @@ -210,9 +187,11 @@ async fn test_on_selected_candidate_pair_change() -> Result<()> { ..Default::default() }; let host_local = host_config.new_candidate_host()?; + let local_priority = host_local.priority(); + a.add_local_candidate(host_local)?; let relay_config = CandidateRelayConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.4".to_owned(), port: 12340, @@ -224,28 +203,34 @@ async fn test_on_selected_candidate_pair_change() -> Result<()> { ..Default::default() }; let relay_remote = relay_config.new_candidate_relay()?; + let remote_priority = relay_remote.priority(); + a.add_remote_candidate(relay_remote)?; // select the pair - let p = Arc::new(CandidatePair::new( - Arc::new(host_local), - Arc::new(relay_remote), - false, - )); - a.internal.set_selected_pair(Some(p)).await; + let (local, remote) = (0, 0); + a.add_pair(local, remote); + a.set_selected_pair(Some(0)); // ensure that the callback fired on setting the pair - let _ = callback_called_rx.recv().await; + let mut is_selected_candidate_pair_change_event_fired = false; + while let Some(event) = a.poll_event() { + if let Event::SelectedCandidatePairChange(_, _) = event { + is_selected_candidate_pair_change_event_fired = true; + } + } + + assert!(is_selected_candidate_pair_change_event_fired); - a.close().await?; + a.close()?; Ok(()) } - -#[tokio::test] -async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +/* TODO: +#[test] +fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let host_config = CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.0.2".to_owned(), port: 777, @@ -260,7 +245,7 @@ async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> { let remote = SocketAddr::from_str("172.17.0.3:999")?; let (username, local_pwd, tie_breaker) = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ( ufrag_pwd.local_ufrag.to_owned() + ":" + ufrag_pwd.remote_ufrag.as_str(), ufrag_pwd.local_pwd.clone(), @@ -281,9 +266,9 @@ async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> { ])?; { - a.internal.handle_inbound(&mut msg, &local, remote).await; + a.internal.handle_inbound(&mut msg, &local, remote); - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); // length of remote candidate list must be one now assert_eq!( remote_candidates.len(), @@ -318,20 +303,20 @@ async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> { } } - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_handle_peer_reflexive_unknown_remote() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let mut tid = TransactionId::default(); tid.0[..3].copy_from_slice("ABC".as_bytes()); let remote_pwd = { { - let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await; + let mut pending_binding_requests = a.internal.pending_binding_requests.lock(); *pending_binding_requests = vec![BindingRequest { timestamp: Instant::now(), transaction_id: tid, @@ -339,12 +324,12 @@ async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> { is_use_candidate: false, }]; } - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ufrag_pwd.remote_pwd.clone() }; let host_config = CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.0.2".to_owned(), port: 777, @@ -367,9 +352,9 @@ async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> { ])?; { - a.internal.handle_inbound(&mut msg, &local, remote).await; + a.internal.handle_inbound(&mut msg, &local, remote); - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); assert_eq!( remote_candidates.len(), 0, @@ -377,15 +362,15 @@ async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> { ); } - a.close().await?; + a.close()?; Ok(()) } //use std::io::Write; // Assert that Agent on startup sends message, and doesn't wait for connectivityTicker to fire -#[tokio::test] -async fn test_connectivity_on_startup() -> Result<()> { +#[test] +fn test_connectivity_on_startup() -> Result<()> { /*env_logger::Builder::new() .format(|buf, record| { writeln!( @@ -416,9 +401,9 @@ async fn test_connectivity_on_startup() -> Result<()> { ..Default::default() }))); - connect_net2router(&net0, &wan).await?; - connect_net2router(&net1, &wan).await?; - start_router(&wan).await?; + connect_net2router(&net0, &wan)?; + connect_net2router(&net1, &wan)?; + start_router(&wan)?; let (a_notifier, mut a_connected) = on_connected(); let (b_notifier, mut b_connected) = on_connected(); @@ -434,7 +419,7 @@ async fn test_connectivity_on_startup() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); a_agent.on_connection_state_change(a_notifier); let cfg1 = AgentConfig { @@ -446,14 +431,14 @@ async fn test_connectivity_on_startup() -> Result<()> { ..Default::default() }; - let b_agent = Arc::new(Agent::new(cfg1).await?); + let b_agent = Arc::new(Agent::new(cfg1)?); b_agent.on_connection_state_change(b_notifier); // Manual signaling - let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials().await; - let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials().await; + let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials(); + let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials(); - gather_and_exchange_candidates(&a_agent, &b_agent).await?; + gather_and_exchange_candidates(&a_agent, &b_agent)?; let (accepted_tx, mut accepted_rx) = mpsc::channel::<()>(1); let (accepting_tx, mut accepting_rx) = mpsc::channel::<()>(1); @@ -465,40 +450,40 @@ async fn test_connectivity_on_startup() -> Result<()> { let accepted_tx_clone = Arc::clone(&accepting_tx); Box::pin(async move { if s == ConnectionState::Checking { - let mut tx = accepted_tx_clone.lock().await; + let mut tx = accepted_tx_clone.lock(); tx.take(); } }) })); tokio::spawn(async move { - let result = a_agent.accept(a_cancel_rx, b_ufrag, b_pwd).await; + let result = a_agent.accept(a_cancel_rx, b_ufrag, b_pwd); assert!(result.is_ok(), "agent accept expected OK"); drop(accepted_tx); }); - let _ = accepting_rx.recv().await; + let _ = accepting_rx.recv(); - let _ = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd).await?; + let _ = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd)?; // Ensure accepted - let _ = accepted_rx.recv().await; + let _ = accepted_rx.recv(); // Ensure pair selected // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair - let _ = a_connected.recv().await; - let _ = b_connected.recv().await; + let _ = a_connected.recv(); + let _ = b_connected.recv(); { - let mut w = wan.lock().await; - w.stop().await?; + let mut w = wan.lock(); + w.stop()?; } Ok(()) } -#[tokio::test] -async fn test_connectivity_lite() -> Result<()> { +#[test] +fn test_connectivity_lite() -> Result<()> { /*env_logger::Builder::new() .format(|buf, record| { writeln!( @@ -528,7 +513,7 @@ async fn test_connectivity_lite() -> Result<()> { ..Default::default() }; - let v = build_vnet(nat_type, nat_type).await?; + let v = build_vnet(nat_type, nat_type)?; let (a_notifier, mut a_connected) = on_connected(); let (b_notifier, mut b_connected) = on_connected(); @@ -540,7 +525,7 @@ async fn test_connectivity_lite() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); a_agent.on_connection_state_change(a_notifier); let cfg1 = AgentConfig { @@ -552,17 +537,17 @@ async fn test_connectivity_lite() -> Result<()> { ..Default::default() }; - let b_agent = Arc::new(Agent::new(cfg1).await?); + let b_agent = Arc::new(Agent::new(cfg1)?); b_agent.on_connection_state_change(b_notifier); - let _ = connect_with_vnet(&a_agent, &b_agent).await?; + let _ = connect_with_vnet(&a_agent, &b_agent)?; // Ensure pair selected // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair - let _ = a_connected.recv().await; - let _ = b_connected.recv().await; + let _ = a_connected.recv(); + let _ = b_connected.recv(); - v.close().await?; + v.close()?; Ok(()) } @@ -571,30 +556,23 @@ struct MockPacketConn; #[async_trait] impl Conn for MockPacketConn { - async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> { + fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> { Ok(()) } - async fn recv(&self, _buf: &mut [u8]) -> std::result::Result { + fn recv(&self, _buf: &mut [u8]) -> std::result::Result { Ok(0) } - async fn recv_from( - &self, - _buf: &mut [u8], - ) -> std::result::Result<(usize, SocketAddr), util::Error> { + fn recv_from(&self, _buf: &mut [u8]) -> std::result::Result<(usize, SocketAddr), util::Error> { Ok((0, SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0))) } - async fn send(&self, _buf: &[u8]) -> std::result::Result { + fn send(&self, _buf: &[u8]) -> std::result::Result { Ok(0) } - async fn send_to( - &self, - _buf: &[u8], - _target: SocketAddr, - ) -> std::result::Result { + fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> std::result::Result { Ok(0) } @@ -606,7 +584,7 @@ impl Conn for MockPacketConn { None } - async fn close(&self) -> std::result::Result<(), util::Error> { + fn close(&self) -> std::result::Result<(), util::Error> { Ok(()) } } @@ -623,8 +601,8 @@ fn build_msg(c: MessageClass, username: String, key: String) -> Result Ok(msg) } -#[tokio::test] -async fn test_inbound_validity() -> Result<()> { +#[test] +fn test_inbound_validity() -> Result<()> { /*env_logger::Builder::new() .format(|buf, record| { writeln!( @@ -643,7 +621,7 @@ async fn test_inbound_validity() -> Result<()> { let remote = SocketAddr::from_str("172.17.0.3:999")?; let local: Arc = Arc::new( CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.0.2".to_owned(), port: 777, @@ -658,22 +636,20 @@ async fn test_inbound_validity() -> Result<()> { //"Invalid Binding requests should be discarded" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let local_pwd = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ufrag_pwd.local_pwd.clone() }; - a.internal - .handle_inbound( - &mut build_msg(CLASS_REQUEST, "invalid".to_owned(), local_pwd)?, - &local, - remote, - ) - .await; + a.internal.handle_inbound( + &mut build_msg(CLASS_REQUEST, "invalid".to_owned(), local_pwd)?, + &local, + remote, + ); { - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); assert_ne!( remote_candidates.len(), 1, @@ -682,18 +658,16 @@ async fn test_inbound_validity() -> Result<()> { } let username = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag) }; - a.internal - .handle_inbound( - &mut build_msg(CLASS_REQUEST, username, "Invalid".to_owned())?, - &local, - remote, - ) - .await; + a.internal.handle_inbound( + &mut build_msg(CLASS_REQUEST, username, "Invalid".to_owned())?, + &local, + remote, + ); { - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); assert_ne!( remote_candidates.len(), 1, @@ -702,27 +676,25 @@ async fn test_inbound_validity() -> Result<()> { } } - a.close().await?; + a.close()?; } //"Invalid Binding success responses should be discarded" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let username = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag) }; - a.internal - .handle_inbound( - &mut build_msg(CLASS_SUCCESS_RESPONSE, username, "Invalid".to_owned())?, - &local, - remote, - ) - .await; + a.internal.handle_inbound( + &mut build_msg(CLASS_SUCCESS_RESPONSE, username, "Invalid".to_owned())?, + &local, + remote, + ); { - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); assert_ne!( remote_candidates.len(), 1, @@ -731,26 +703,24 @@ async fn test_inbound_validity() -> Result<()> { } } - a.close().await?; + a.close()?; } //"Discard non-binding messages" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let username = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag) }; - a.internal - .handle_inbound( - &mut build_msg(CLASS_ERROR_RESPONSE, username, "Invalid".to_owned())?, - &local, - remote, - ) - .await; - let remote_candidates = a.internal.remote_candidates.lock().await; + a.internal.handle_inbound( + &mut build_msg(CLASS_ERROR_RESPONSE, username, "Invalid".to_owned())?, + &local, + remote, + ); + let remote_candidates = a.internal.remote_candidates.lock(); assert_ne!( remote_candidates.len(), 1, @@ -758,29 +728,27 @@ async fn test_inbound_validity() -> Result<()> { ); } - a.close().await?; + a.close()?; } //"Valid bind request" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let (username, local_pwd) = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ( format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag), ufrag_pwd.local_pwd.clone(), ) }; - a.internal - .handle_inbound( - &mut build_msg(CLASS_REQUEST, username, local_pwd)?, - &local, - remote, - ) - .await; - let remote_candidates = a.internal.remote_candidates.lock().await; + a.internal.handle_inbound( + &mut build_msg(CLASS_REQUEST, username, local_pwd)?, + &local, + remote, + ); + let remote_candidates = a.internal.remote_candidates.lock(); assert_eq!( remote_candidates.len(), 1, @@ -788,16 +756,16 @@ async fn test_inbound_validity() -> Result<()> { ); } - a.close().await?; + a.close()?; } //"Valid bind without fingerprint" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let (username, local_pwd) = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ( format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag), ufrag_pwd.local_pwd.clone(), @@ -812,8 +780,8 @@ async fn test_inbound_validity() -> Result<()> { Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)), ])?; - a.internal.handle_inbound(&mut msg, &local, remote).await; - let remote_candidates = a.internal.remote_candidates.lock().await; + a.internal.handle_inbound(&mut msg, &local, remote); + let remote_candidates = a.internal.remote_candidates.lock(); assert_eq!( remote_candidates.len(), 1, @@ -821,12 +789,12 @@ async fn test_inbound_validity() -> Result<()> { ); } - a.close().await?; + a.close()?; } //"Success with invalid TransactionID" { - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { let remote = SocketAddr::from_str("172.17.0.3:999")?; @@ -835,7 +803,7 @@ async fn test_inbound_validity() -> Result<()> { t_id.0[..3].copy_from_slice(b"ABC"); let remote_pwd = { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); ufrag_pwd.remote_pwd.clone() }; @@ -847,10 +815,10 @@ async fn test_inbound_validity() -> Result<()> { Box::new(FINGERPRINT), ])?; - a.internal.handle_inbound(&mut msg, &local, remote).await; + a.internal.handle_inbound(&mut msg, &local, remote); { - let remote_candidates = a.internal.remote_candidates.lock().await; + let remote_candidates = a.internal.remote_candidates.lock(); assert_eq!( remote_candidates.len(), 0, @@ -859,25 +827,25 @@ async fn test_inbound_validity() -> Result<()> { } } - a.close().await?; + a.close()?; } Ok(()) } -#[tokio::test] -async fn test_invalid_agent_starts() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_invalid_agent_starts() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let (_cancel_tx1, cancel_rx1) = mpsc::channel(1); - let result = a.dial(cancel_rx1, "".to_owned(), "bar".to_owned()).await; + let result = a.dial(cancel_rx1, "".to_owned(), "bar".to_owned()); assert!(result.is_err()); if let Err(err) = result { assert_eq!(Error::ErrRemoteUfragEmpty, err); } let (_cancel_tx2, cancel_rx2) = mpsc::channel(1); - let result = a.dial(cancel_rx2, "foo".to_owned(), "".to_owned()).await; + let result = a.dial(cancel_rx2, "foo".to_owned(), "".to_owned()); assert!(result.is_err()); if let Err(err) = result { assert_eq!(Error::ErrRemotePwdEmpty, err); @@ -885,24 +853,24 @@ async fn test_invalid_agent_starts() -> Result<()> { let (cancel_tx3, cancel_rx3) = mpsc::channel(1); tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)); drop(cancel_tx3); }); - let result = a.dial(cancel_rx3, "foo".to_owned(), "bar".to_owned()).await; + let result = a.dial(cancel_rx3, "foo".to_owned(), "bar".to_owned()); assert!(result.is_err()); if let Err(err) = result { assert_eq!(Error::ErrCanceledByCaller, err); } let (_cancel_tx4, cancel_rx4) = mpsc::channel(1); - let result = a.dial(cancel_rx4, "foo".to_owned(), "bar".to_owned()).await; + let result = a.dial(cancel_rx4, "foo".to_owned(), "bar".to_owned()); assert!(result.is_err()); if let Err(err) = result { assert_eq!(Error::ErrMultipleStart, err); } - a.close().await?; + a.close()?; Ok(()) } @@ -910,8 +878,8 @@ async fn test_invalid_agent_starts() -> Result<()> { //use std::io::Write; // Assert that Agent emits Connecting/Connected/Disconnected/Failed/Closed messages -#[tokio::test] -async fn test_connection_state_callback() -> Result<()> { +#[test] +fn test_connection_state_callback() -> Result<()> { /*env_logger::Builder::new() .format(|buf, record| { writeln!( @@ -949,8 +917,8 @@ async fn test_connection_state_callback() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let (is_checking_tx, mut is_checking_rx) = mpsc::channel::<()>(1); let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1); @@ -974,27 +942,27 @@ async fn test_connection_state_callback() -> Result<()> { match c { ConnectionState::Checking => { debug!("drop is_checking_tx"); - let mut tx = is_checking_tx_clone.lock().await; + let mut tx = is_checking_tx_clone.lock(); tx.take(); } ConnectionState::Connected => { debug!("drop is_connected_tx"); - let mut tx = is_connected_tx_clone.lock().await; + let mut tx = is_connected_tx_clone.lock(); tx.take(); } ConnectionState::Disconnected => { debug!("drop is_disconnected_tx"); - let mut tx = is_disconnected_tx_clone.lock().await; + let mut tx = is_disconnected_tx_clone.lock(); tx.take(); } ConnectionState::Failed => { debug!("drop is_failed_tx"); - let mut tx = is_failed_tx_clone.lock().await; + let mut tx = is_failed_tx_clone.lock(); tx.take(); } ConnectionState::Closed => { debug!("drop is_closed_tx"); - let mut tx = is_closed_tx_clone.lock().await; + let mut tx = is_closed_tx_clone.lock(); tx.take(); } _ => {} @@ -1002,30 +970,30 @@ async fn test_connection_state_callback() -> Result<()> { }) })); - connect_with_vnet(&a_agent, &b_agent).await?; + connect_with_vnet(&a_agent, &b_agent)?; debug!("wait is_checking_tx"); - let _ = is_checking_rx.recv().await; + let _ = is_checking_rx.recv(); debug!("wait is_connected_rx"); - let _ = is_connected_rx.recv().await; + let _ = is_connected_rx.recv(); debug!("wait is_disconnected_rx"); - let _ = is_disconnected_rx.recv().await; + let _ = is_disconnected_rx.recv(); debug!("wait is_failed_rx"); - let _ = is_failed_rx.recv().await; + let _ = is_failed_rx.recv(); - a_agent.close().await?; - b_agent.close().await?; + a_agent.close()?; + b_agent.close()?; debug!("wait is_closed_rx"); - let _ = is_closed_rx.recv().await; + let _ = is_closed_rx.recv(); Ok(()) } -#[tokio::test] -async fn test_invalid_gather() -> Result<()> { +#[test] +fn test_invalid_gather() -> Result<()> { //"Gather with no OnCandidate should error" - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; if let Err(err) = a.gather_candidates() { assert_eq!( @@ -1035,18 +1003,18 @@ async fn test_invalid_gather() -> Result<()> { ); } - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_candidate_pair_stats() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_candidate_pair_stats() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let host_local: Arc = Arc::new( CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.1.1".to_owned(), port: 19216, @@ -1060,7 +1028,7 @@ async fn test_candidate_pair_stats() -> Result<()> { let relay_remote: Arc = Arc::new( CandidateRelayConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.4".to_owned(), port: 2340, @@ -1076,7 +1044,7 @@ async fn test_candidate_pair_stats() -> Result<()> { let srflx_remote: Arc = Arc::new( CandidateServerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19218, @@ -1091,7 +1059,7 @@ async fn test_candidate_pair_stats() -> Result<()> { let prflx_remote: Arc = Arc::new( CandidatePeerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19217, @@ -1106,7 +1074,7 @@ async fn test_candidate_pair_stats() -> Result<()> { let host_remote: Arc = Arc::new( CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.5".to_owned(), port: 12350, @@ -1124,23 +1092,22 @@ async fn test_candidate_pair_stats() -> Result<()> { Arc::clone(&prflx_remote), Arc::clone(&host_remote), ] { - let p = a.internal.find_pair(&host_local, remote).await; + let p = a.internal.find_pair(&host_local, remote); if p.is_none() { a.internal - .add_pair(Arc::clone(&host_local), Arc::clone(remote)) - .await; + .add_pair(Arc::clone(&host_local), Arc::clone(remote)); } } { - if let Some(p) = a.internal.find_pair(&host_local, &prflx_remote).await { + if let Some(p) = a.internal.find_pair(&host_local, &prflx_remote) { p.state .store(CandidatePairState::Failed as u8, Ordering::SeqCst); } } - let stats = a.get_candidate_pairs_stats().await; + let stats = a.get_candidate_pairs_stats(); assert_eq!(stats.len(), 4, "expected 4 candidate pairs stats"); let (mut relay_pair_stat, mut srflx_pair_stat, mut prflx_pair_stat, mut host_pair_stat) = ( @@ -1197,18 +1164,18 @@ async fn test_candidate_pair_stats() -> Result<()> { prflx_pair_stat.state ); - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_local_candidate_stats() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_local_candidate_stats() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let host_local: Arc = Arc::new( CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.1.1".to_owned(), port: 19216, @@ -1222,7 +1189,7 @@ async fn test_local_candidate_stats() -> Result<()> { let srflx_local: Arc = Arc::new( CandidateServerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "192.168.1.1".to_owned(), port: 19217, @@ -1236,14 +1203,14 @@ async fn test_local_candidate_stats() -> Result<()> { ); { - let mut local_candidates = a.internal.local_candidates.lock().await; + let mut local_candidates = a.internal.local_candidates.lock(); local_candidates.insert( NetworkType::Udp4, vec![Arc::clone(&host_local), Arc::clone(&srflx_local)], ); } - let local_stats = a.get_local_candidates_stats().await; + let local_stats = a.get_local_candidates_stats(); assert_eq!( local_stats.len(), 2, @@ -1288,18 +1255,18 @@ async fn test_local_candidate_stats() -> Result<()> { "missing srflx local stat" ); - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_remote_candidate_stats() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_remote_candidate_stats() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let relay_remote: Arc = Arc::new( CandidateRelayConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.4".to_owned(), port: 12340, @@ -1315,7 +1282,7 @@ async fn test_remote_candidate_stats() -> Result<()> { let srflx_remote: Arc = Arc::new( CandidateServerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19218, @@ -1330,7 +1297,7 @@ async fn test_remote_candidate_stats() -> Result<()> { let prflx_remote: Arc = Arc::new( CandidatePeerReflexiveConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "10.10.10.2".to_owned(), port: 19217, @@ -1345,7 +1312,7 @@ async fn test_remote_candidate_stats() -> Result<()> { let host_remote: Arc = Arc::new( CandidateHostConfig { - base_config: CandidateBaseConfig { + base_config: CandidateConfig { network: "udp".to_owned(), address: "1.2.3.5".to_owned(), port: 12350, @@ -1358,7 +1325,7 @@ async fn test_remote_candidate_stats() -> Result<()> { ); { - let mut remote_candidates = a.internal.remote_candidates.lock().await; + let mut remote_candidates = a.internal.remote_candidates.lock(); remote_candidates.insert( NetworkType::Udp4, vec![ @@ -1370,7 +1337,7 @@ async fn test_remote_candidate_stats() -> Result<()> { ); } - let remote_stats = a.get_remote_candidates_stats().await; + let remote_stats = a.get_remote_candidates_stats(); assert_eq!( remote_stats.len(), 4, @@ -1435,21 +1402,21 @@ async fn test_remote_candidate_stats() -> Result<()> { "missing host remote stat" ); - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_binding_request_timeout() -> Result<()> { +#[test] +fn test_binding_request_timeout() -> Result<()> { const EXPECTED_REMOVAL_COUNT: usize = 2; - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; let now = Instant::now(); { { - let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await; + let mut pending_binding_requests = a.internal.pending_binding_requests.lock(); pending_binding_requests.push(BindingRequest { timestamp: now, // valid ..Default::default() @@ -1468,32 +1435,32 @@ async fn test_binding_request_timeout() -> Result<()> { }); } - a.internal.invalidate_pending_binding_requests(now).await; + a.internal.invalidate_pending_binding_requests(now); { - let pending_binding_requests = a.internal.pending_binding_requests.lock().await; + let pending_binding_requests = a.internal.pending_binding_requests.lock(); assert_eq!(pending_binding_requests.len(), EXPECTED_REMOVAL_COUNT, "Binding invalidation due to timeout did not remove the correct number of binding requests") } } - a.close().await?; + a.close()?; Ok(()) } // test_agent_credentials checks if local username fragments and passwords (if set) meet RFC standard // and ensure it's backwards compatible with previous versions of the pion/ice -#[tokio::test] -async fn test_agent_credentials() -> Result<()> { +#[test] +fn test_agent_credentials() -> Result<()> { // Agent should not require any of the usernames and password to be set // If set, they should follow the default 16/128 bits random number generator strategy - let a = Agent::new(AgentConfig::default()).await?; + let a = Agent::new(AgentConfig::default())?; { - let ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let ufrag_pwd = a.internal.ufrag_pwd.lock(); assert!(ufrag_pwd.local_ufrag.as_bytes().len() * 8 >= 24); assert!(ufrag_pwd.local_pwd.as_bytes().len() * 8 >= 128); } - a.close().await?; + a.close()?; // Should honor RFC standards // Local values MUST be unguessable, with at least 128 bits of @@ -1503,9 +1470,7 @@ async fn test_agent_credentials() -> Result<()> { if let Err(err) = Agent::new(AgentConfig { local_ufrag: "xx".to_owned(), ..Default::default() - }) - .await - { + }) { assert_eq!(Error::ErrLocalUfragInsufficientBits, err); } else { panic!("expected error, but got ok"); @@ -1514,9 +1479,7 @@ async fn test_agent_credentials() -> Result<()> { if let Err(err) = Agent::new(AgentConfig { local_pwd: "xxxxxx".to_owned(), ..Default::default() - }) - .await - { + }) { assert_eq!(Error::ErrLocalPwdInsufficientBits, err); } else { panic!("expected error, but got ok"); @@ -1527,8 +1490,8 @@ async fn test_agent_credentials() -> Result<()> { // Assert that Agent on Failure deletes all existing candidates // User can then do an ICE Restart to bring agent back -#[tokio::test] -async fn test_connection_state_failed_delete_all_candidates() -> Result<()> { +#[test] +fn test_connection_state_failed_delete_all_candidates() -> Result<()> { let one_second = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); @@ -1547,8 +1510,8 @@ async fn test_connection_state_failed_delete_all_candidates() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let (is_failed_tx, mut is_failed_rx) = mpsc::channel::<()>(1); let is_failed_tx = Arc::new(Mutex::new(Some(is_failed_tx))); @@ -1556,35 +1519,35 @@ async fn test_connection_state_failed_delete_all_candidates() -> Result<()> { let is_failed_tx_clone = Arc::clone(&is_failed_tx); Box::pin(async move { if c == ConnectionState::Failed { - let mut tx = is_failed_tx_clone.lock().await; + let mut tx = is_failed_tx_clone.lock(); tx.take(); } }) })); - connect_with_vnet(&a_agent, &b_agent).await?; - let _ = is_failed_rx.recv().await; + connect_with_vnet(&a_agent, &b_agent)?; + let _ = is_failed_rx.recv(); { { - let remote_candidates = a_agent.internal.remote_candidates.lock().await; + let remote_candidates = a_agent.internal.remote_candidates.lock(); assert_eq!(remote_candidates.len(), 0); } { - let local_candidates = a_agent.internal.local_candidates.lock().await; + let local_candidates = a_agent.internal.local_candidates.lock(); assert_eq!(local_candidates.len(), 0); } } - a_agent.close().await?; - b_agent.close().await?; + a_agent.close()?; + b_agent.close()?; Ok(()) } // Assert that the ICE Agent can go directly from Connecting -> Failed on both sides -#[tokio::test] -async fn test_connection_state_connecting_to_failed() -> Result<()> { +#[test] +fn test_connection_state_connecting_to_failed() -> Result<()> { let one_second = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); @@ -1601,8 +1564,8 @@ async fn test_connection_state_connecting_to_failed() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let is_failed = WaitGroup::new(); let is_checking = WaitGroup::new(); @@ -1615,10 +1578,10 @@ async fn test_connection_state_connecting_to_failed() -> Result<()> { let wc_clone = Arc::clone(&wc); Box::pin(async move { if c == ConnectionState::Failed { - let mut f = wf_clone.lock().await; + let mut f = wf_clone.lock(); f.take(); } else if c == ConnectionState::Checking { - let mut c = wc_clone.lock().await; + let mut c = wc_clone.lock(); c.take(); } else if c == ConnectionState::Connected || c == ConnectionState::Completed { panic!("Unexpected ConnectionState: {c}"); @@ -1637,59 +1600,55 @@ async fn test_connection_state_connecting_to_failed() -> Result<()> { let agent_a = Arc::clone(&a_agent); tokio::spawn(async move { let (_cancel_tx, cancel_rx) = mpsc::channel(1); - let result = agent_a - .accept(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned()) - .await; + let result = agent_a.accept(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned()); assert!(result.is_err()); }); let agent_b = Arc::clone(&b_agent); tokio::spawn(async move { let (_cancel_tx, cancel_rx) = mpsc::channel(1); - let result = agent_b - .dial(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned()) - .await; + let result = agent_b.dial(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned()); assert!(result.is_err()); }); - is_checking.wait().await; - is_failed.wait().await; + is_checking.wait(); + is_failed.wait(); - a_agent.close().await?; - b_agent.close().await?; + a_agent.close()?; + b_agent.close()?; Ok(()) } -#[tokio::test] -async fn test_agent_restart_during_gather() -> Result<()> { +#[test] +fn test_agent_restart_during_gather() -> Result<()> { //"Restart During Gather" - let agent = Agent::new(AgentConfig::default()).await?; + let agent = Agent::new(AgentConfig::default())?; agent .gathering_state .store(GatheringState::Gathering as u8, Ordering::SeqCst); - if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await { + if let Err(err) = agent.restart("".to_owned(), "".to_owned()) { assert_eq!(Error::ErrRestartWhenGathering, err); } else { panic!("expected error, but got ok"); } - agent.close().await?; + agent.close()?; Ok(()) } -#[tokio::test] -async fn test_agent_restart_when_closed() -> Result<()> { +#[test] +fn test_agent_restart_when_closed() -> Result<()> { //"Restart When Closed" - let agent = Agent::new(AgentConfig::default()).await?; - agent.close().await?; + let agent = Agent::new(AgentConfig::default())?; + agent.close()?; - if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await { + if let Err(err) = agent.restart("".to_owned(), "".to_owned()) { assert_eq!(Error::ErrClosed, err); } else { panic!("expected error, but got ok"); @@ -1698,8 +1657,8 @@ async fn test_agent_restart_when_closed() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_agent_restart_one_side() -> Result<()> { +#[test] +fn test_agent_restart_one_side() -> Result<()> { let one_second = Duration::from_secs(1); //"Restart One Side" @@ -1714,8 +1673,7 @@ async fn test_agent_restart_one_side() -> Result<()> { failed_timeout: Some(one_second), ..Default::default() }), - ) - .await?; + )?; let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); let cancel_tx = Arc::new(Mutex::new(Some(cancel_tx))); @@ -1723,24 +1681,24 @@ async fn test_agent_restart_one_side() -> Result<()> { let cancel_tx_clone = Arc::clone(&cancel_tx); Box::pin(async move { if c == ConnectionState::Failed || c == ConnectionState::Disconnected { - let mut tx = cancel_tx_clone.lock().await; + let mut tx = cancel_tx_clone.lock(); tx.take(); } }) })); - agent_a.restart("".to_owned(), "".to_owned()).await?; + agent_a.restart("".to_owned(), "".to_owned())?; - let _ = cancel_rx.recv().await; + let _ = cancel_rx.recv(); - agent_a.close().await?; - agent_b.close().await?; + agent_a.close()?; + agent_b.close()?; Ok(()) } -#[tokio::test] -async fn test_agent_restart_both_side() -> Result<()> { +#[test] +fn test_agent_restart_both_side() -> Result<()> { let one_second = Duration::from_secs(1); //"Restart Both Sides" @@ -1772,13 +1730,10 @@ async fn test_agent_restart_both_side() -> Result<()> { failed_timeout: Some(one_second), ..Default::default() }), - ) - .await?; + )?; - let conn_afirst_candidates = - generate_candidate_address_strings(agent_a.get_local_candidates().await); - let conn_bfirst_candidates = - generate_candidate_address_strings(agent_b.get_local_candidates().await); + let conn_afirst_candidates = generate_candidate_address_strings(agent_a.get_local_candidates()); + let conn_bfirst_candidates = generate_candidate_address_strings(agent_b.get_local_candidates()); let (a_notifier, mut a_connected) = on_connected(); agent_a.on_connection_state_change(a_notifier); @@ -1787,44 +1742,44 @@ async fn test_agent_restart_both_side() -> Result<()> { agent_b.on_connection_state_change(b_notifier); // Restart and Re-Signal - agent_a.restart("".to_owned(), "".to_owned()).await?; - agent_b.restart("".to_owned(), "".to_owned()).await?; + agent_a.restart("".to_owned(), "".to_owned())?; + agent_b.restart("".to_owned(), "".to_owned())?; // Exchange Candidates and Credentials - let (ufrag, pwd) = agent_b.get_local_user_credentials().await; - agent_a.set_remote_credentials(ufrag, pwd).await?; + let (ufrag, pwd) = agent_b.get_local_user_credentials(); + agent_a.set_remote_credentials(ufrag, pwd)?; - let (ufrag, pwd) = agent_a.get_local_user_credentials().await; - agent_b.set_remote_credentials(ufrag, pwd).await?; + let (ufrag, pwd) = agent_a.get_local_user_credentials(); + agent_b.set_remote_credentials(ufrag, pwd)?; - gather_and_exchange_candidates(&agent_a, &agent_b).await?; + gather_and_exchange_candidates(&agent_a, &agent_b)?; // Wait until both have gone back to connected - let _ = a_connected.recv().await; - let _ = b_connected.recv().await; + let _ = a_connected.recv(); + let _ = b_connected.recv(); // Assert that we have new candiates each time assert_ne!( conn_afirst_candidates, - generate_candidate_address_strings(agent_a.get_local_candidates().await) + generate_candidate_address_strings(agent_a.get_local_candidates()) ); assert_ne!( conn_bfirst_candidates, - generate_candidate_address_strings(agent_b.get_local_candidates().await) + generate_candidate_address_strings(agent_b.get_local_candidates()) ); - agent_a.close().await?; - agent_b.close().await?; + agent_a.close()?; + agent_b.close()?; Ok(()) } -#[tokio::test] -async fn test_get_remote_credentials() -> Result<()> { - let a = Agent::new(AgentConfig::default()).await?; +#[test] +fn test_get_remote_credentials() -> Result<()> { + let a = Agent::new(AgentConfig::default())?; let (remote_ufrag, remote_pwd) = { - let mut ufrag_pwd = a.internal.ufrag_pwd.lock().await; + let mut ufrag_pwd = a.internal.ufrag_pwd.lock(); ufrag_pwd.remote_ufrag = "remoteUfrag".to_owned(); ufrag_pwd.remote_pwd = "remotePwd".to_owned(); ( @@ -1833,18 +1788,18 @@ async fn test_get_remote_credentials() -> Result<()> { ) }; - let (actual_ufrag, actual_pwd) = a.get_remote_user_credentials().await; + let (actual_ufrag, actual_pwd) = a.get_remote_user_credentials(); assert_eq!(actual_ufrag, remote_ufrag); assert_eq!(actual_pwd, remote_pwd); - a.close().await?; + a.close()?; Ok(()) } -#[tokio::test] -async fn test_close_in_connection_state_callback() -> Result<()> { +#[test] +fn test_close_in_connection_state_callback() -> Result<()> { let disconnected_duration = Duration::from_secs(1); let failed_duration = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); @@ -1869,8 +1824,8 @@ async fn test_close_in_connection_state_callback() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let (is_closed_tx, mut is_closed_rx) = mpsc::channel::<()>(1); let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1); @@ -1881,28 +1836,28 @@ async fn test_close_in_connection_state_callback() -> Result<()> { let is_connected_tx_clone = Arc::clone(&is_connected_tx); Box::pin(async move { if c == ConnectionState::Connected { - let mut tx = is_connected_tx_clone.lock().await; + let mut tx = is_connected_tx_clone.lock(); tx.take(); } else if c == ConnectionState::Closed { - let mut tx = is_closed_tx_clone.lock().await; + let mut tx = is_closed_tx_clone.lock(); tx.take(); } }) })); - connect_with_vnet(&a_agent, &b_agent).await?; + connect_with_vnet(&a_agent, &b_agent)?; - let _ = is_connected_rx.recv().await; - a_agent.close().await?; + let _ = is_connected_rx.recv(); + a_agent.close()?; - let _ = is_closed_rx.recv().await; - b_agent.close().await?; + let _ = is_closed_rx.recv(); + b_agent.close()?; Ok(()) } -#[tokio::test] -async fn test_run_task_in_connection_state_callback() -> Result<()> { +#[test] +fn test_run_task_in_connection_state_callback() -> Result<()> { let one_second = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); @@ -1926,8 +1881,8 @@ async fn test_run_task_in_connection_state_callback() -> Result<()> { ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let (is_complete_tx, mut is_complete_rx) = mpsc::channel::<()>(1); let is_complete_tx = Arc::new(Mutex::new(Some(is_complete_tx))); @@ -1935,26 +1890,26 @@ async fn test_run_task_in_connection_state_callback() -> Result<()> { let is_complete_tx_clone = Arc::clone(&is_complete_tx); Box::pin(async move { if c == ConnectionState::Connected { - let mut tx = is_complete_tx_clone.lock().await; + let mut tx = is_complete_tx_clone.lock(); tx.take(); } }) })); - connect_with_vnet(&a_agent, &b_agent).await?; + connect_with_vnet(&a_agent, &b_agent)?; - let _ = is_complete_rx.recv().await; - let _ = a_agent.get_local_user_credentials().await; - a_agent.restart("".to_owned(), "".to_owned()).await?; + let _ = is_complete_rx.recv(); + let _ = a_agent.get_local_user_credentials(); + a_agent.restart("".to_owned(), "".to_owned())?; - a_agent.close().await?; - b_agent.close().await?; + a_agent.close()?; + b_agent.close()?; Ok(()) } -#[tokio::test] -async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<()> { +#[test] +fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<()> { let one_second = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); @@ -1978,8 +1933,8 @@ async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<() ..Default::default() }; - let a_agent = Arc::new(Agent::new(cfg0).await?); - let b_agent = Arc::new(Agent::new(cfg1).await?); + let a_agent = Arc::new(Agent::new(cfg0)?); + let b_agent = Arc::new(Agent::new(cfg1)?); let (is_tested_tx, mut is_tested_rx) = mpsc::channel::<()>(1); let is_tested_tx = Arc::new(Mutex::new(Some(is_tested_tx))); @@ -1987,7 +1942,7 @@ async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<() move |_: &Arc, _: &Arc| { let is_tested_tx_clone = Arc::clone(&is_tested_tx); Box::pin(async move { - let mut tx = is_tested_tx_clone.lock().await; + let mut tx = is_tested_tx_clone.lock(); tx.take(); }) }, @@ -1999,37 +1954,34 @@ async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<() let is_complete_tx_clone = Arc::clone(&is_complete_tx); Box::pin(async move { if c == ConnectionState::Connected { - let mut tx = is_complete_tx_clone.lock().await; + let mut tx = is_complete_tx_clone.lock(); tx.take(); } }) })); - connect_with_vnet(&a_agent, &b_agent).await?; + connect_with_vnet(&a_agent, &b_agent)?; - let _ = is_complete_rx.recv().await; - let _ = is_tested_rx.recv().await; + let _ = is_complete_rx.recv(); + let _ = is_tested_rx.recv(); - let _ = a_agent.get_local_user_credentials().await; + let _ = a_agent.get_local_user_credentials(); - a_agent.close().await?; - b_agent.close().await?; + a_agent.close()?; + b_agent.close()?; Ok(()) } // Assert that a Lite agent goes to disconnected and failed -#[tokio::test] -async fn test_lite_lifecycle() -> Result<()> { +#[test] +fn test_lite_lifecycle() -> Result<()> { let (a_notifier, mut a_connected_rx) = on_connected(); - let a_agent = Arc::new( - Agent::new(AgentConfig { - network_types: supported_network_types(), - ..Default::default() - }) - .await?, - ); + let a_agent = Arc::new(Agent::new(AgentConfig { + network_types: supported_network_types(), + ..Default::default() + })?); a_agent.on_connection_state_change(a_notifier); @@ -2037,19 +1989,16 @@ async fn test_lite_lifecycle() -> Result<()> { let failed_duration = Duration::from_secs(1); let keepalive_interval = Duration::from_secs(0); - let b_agent = Arc::new( - Agent::new(AgentConfig { - lite: true, - candidate_types: vec![CandidateType::Host], - network_types: supported_network_types(), - disconnected_timeout: Some(disconnected_duration), - failed_timeout: Some(failed_duration), - keepalive_interval: Some(keepalive_interval), - check_interval: Duration::from_millis(500), - ..Default::default() - }) - .await?, - ); + let b_agent = Arc::new(Agent::new(AgentConfig { + lite: true, + candidate_types: vec![CandidateType::Host], + network_types: supported_network_types(), + disconnected_timeout: Some(disconnected_duration), + failed_timeout: Some(failed_duration), + keepalive_interval: Some(keepalive_interval), + check_interval: Duration::from_millis(500), + ..Default::default() + })?); let (b_connected_tx, mut b_connected_rx) = mpsc::channel::<()>(1); let (b_disconnected_tx, mut b_disconnected_rx) = mpsc::channel::<()>(1); @@ -2065,28 +2014,29 @@ async fn test_lite_lifecycle() -> Result<()> { Box::pin(async move { if c == ConnectionState::Connected { - let mut tx = b_connected_tx_clone.lock().await; + let mut tx = b_connected_tx_clone.lock(); tx.take(); } else if c == ConnectionState::Disconnected { - let mut tx = b_disconnected_tx_clone.lock().await; + let mut tx = b_disconnected_tx_clone.lock(); tx.take(); } else if c == ConnectionState::Failed { - let mut tx = b_failed_tx_clone.lock().await; + let mut tx = b_failed_tx_clone.lock(); tx.take(); } }) })); - connect_with_vnet(&b_agent, &a_agent).await?; + connect_with_vnet(&b_agent, &a_agent)?; - let _ = a_connected_rx.recv().await; - let _ = b_connected_rx.recv().await; - a_agent.close().await?; + let _ = a_connected_rx.recv(); + let _ = b_connected_rx.recv(); + a_agent.close()?; - let _ = b_disconnected_rx.recv().await; - let _ = b_failed_rx.recv().await; + let _ = b_disconnected_rx.recv(); + let _ = b_failed_rx.recv(); - b_agent.close().await?; + b_agent.close()?; Ok(()) } +*/ diff --git a/rtc-ice/src/agent/mod.rs b/rtc-ice/src/agent/mod.rs index dd2bec3..e67f64d 100644 --- a/rtc-ice/src/agent/mod.rs +++ b/rtc-ice/src/agent/mod.rs @@ -1,5 +1,5 @@ -//TODO:#[cfg(test)] -//TODO:mod agent_test; +#[cfg(test)] +mod agent_test; pub mod agent_config; pub mod agent_selector; @@ -142,9 +142,6 @@ impl Agent { { return Err(Error::ErrLiteUsingNonHostCandidates); } - if !config.lite { - return Err(Error::ErrLiteSupportOnly); - } if !config.urls.is_empty() && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types) diff --git a/rtc-ice/src/candidate/candidate_pair.rs b/rtc-ice/src/candidate/candidate_pair.rs index 29a5bd6..1127c31 100644 --- a/rtc-ice/src/candidate/candidate_pair.rs +++ b/rtc-ice/src/candidate/candidate_pair.rs @@ -2,8 +2,9 @@ use serde::Serialize; use std::fmt; /// Represent the ICE candidate pair state. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, Serialize)] pub enum CandidatePairState { + #[default] #[serde(rename = "unspecified")] Unspecified = 0, @@ -37,12 +38,6 @@ impl From for CandidatePairState { } } -impl Default for CandidatePairState { - fn default() -> Self { - Self::Unspecified - } -} - impl fmt::Display for CandidatePairState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match *self {