diff --git a/rtc-ice/src/agent/agent_selector.rs b/rtc-ice/src/agent/agent_selector.rs index 25616eb..fa659e5 100644 --- a/rtc-ice/src/agent/agent_selector.rs +++ b/rtc-ice/src/agent/agent_selector.rs @@ -1,7 +1,5 @@ use crate::agent::Agent; use std::net::SocketAddr; -use std::rc::Rc; -use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use stun::attributes::*; use stun::fingerprint::*; @@ -10,49 +8,44 @@ use stun::message::*; use stun::textattrs::*; use crate::attributes::{control::*, priority::*, use_candidate::*}; -use crate::candidate::*; +use crate::candidate::{candidate_pair::*, *}; trait ControllingSelector { fn start(&mut self); fn contact_candidates(&mut self); - fn ping_candidate(&mut self, local: &Rc, remote: &Rc); + fn ping_candidate(&mut self, local: usize, remote: usize); fn handle_success_response( &mut self, m: &Message, - local: &Rc, - remote: &Rc, + local: usize, + remote: usize, remote_addr: SocketAddr, ); - fn handle_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ); + fn handle_binding_request(&mut self, m: &Message, local: usize, remote: usize); } trait ControlledSelector { fn start(&mut self); fn contact_candidates(&mut self); - fn ping_candidate(&mut self, local: &Rc, remote: &Rc); + fn ping_candidate(&mut self, local: usize, remote: usize); fn handle_success_response( &mut self, m: &Message, - local: &Rc, - remote: &Rc, + local: usize, + remote: usize, remote_addr: SocketAddr, ); - fn handle_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ); + fn handle_binding_request(&mut self, m: &Message, local: usize, remote: usize); } impl Agent { - fn is_nominatable(&self, c: &Rc) -> bool { + fn is_nominatable(&self, index: usize, is_local: bool) -> bool { let start_time = self.start_time; + let c = if is_local { + &self.local_candidates[index] + } else { + &self.remote_candidates[index] + }; match c.candidate_type() { CandidateType::Host => { Instant::now() @@ -90,7 +83,7 @@ impl Agent { Box::new(Username::new(ATTR_USERNAME, username)), Box::::default(), Box::new(AttrControlling(self.tie_breaker)), - Box::new(PriorityAttr(pair.local.priority())), + Box::new(PriorityAttr(pair.local_priority)), Box::new(MessageIntegrity::new_short_term_integrity( ufrag_pwd.remote_pwd.clone(), )), @@ -105,11 +98,11 @@ impl Agent { } else { log::trace!( "ping STUN (nominate candidate pair from {} to {}", - pair.local, - pair.remote + self.local_candidates[pair.local], + self.remote_candidates[pair.remote], ); - let local = pair.local.clone(); - let remote = pair.remote.clone(); + let local = pair.local; + let remote = pair.remote; Some((msg, local, remote)) } } else { @@ -118,7 +111,7 @@ impl Agent { }; if let Some((msg, local, remote)) = result { - self.send_binding_request(&msg, &local, &remote); + self.send_binding_request(&msg, local, remote); } } @@ -138,7 +131,7 @@ impl Agent { } } - pub(crate) fn ping_candidate(&mut self, local: &Rc, remote: &Rc) { + pub(crate) fn ping_candidate(&mut self, local: usize, remote: usize) { if self.is_controlling { ControllingSelector::ping_candidate(self, local, remote); } else { @@ -149,8 +142,8 @@ impl Agent { pub(crate) fn handle_success_response( &mut self, m: &Message, - local: &Rc, - remote: &Rc, + local: usize, + remote: usize, remote_addr: SocketAddr, ) { if self.is_controlling { @@ -160,12 +153,7 @@ impl Agent { } } - pub(crate) fn handle_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ) { + pub(crate) fn handle_binding_request(&mut self, m: &Message, local: usize, remote: usize) { if self.is_controlling { ControllingSelector::handle_binding_request(self, m, local, remote); } else { @@ -198,21 +186,23 @@ impl ControllingSelector for Agent { self.nominate_pair(); } else { let has_nominated_pair = - if let Some(p) = self.agent_conn.get_best_valid_candidate_pair() { - self.is_nominatable(&p.local) && self.is_nominatable(&p.remote) + if let Some(index) = self.agent_conn.get_best_valid_candidate_pair() { + let p = self.agent_conn.checklist[index]; + self.is_nominatable(p.local, true) && self.is_nominatable(p.remote, false) } else { false }; if has_nominated_pair { - if let Some(p) = self.agent_conn.get_best_valid_candidate_pair() { + if let Some(index) = self.agent_conn.get_best_valid_candidate_pair() { + let p = &mut self.agent_conn.checklist[index]; log::trace!( "Nominatable pair found, nominating ({}, {})", - p.local.to_string(), - p.remote.to_string() + self.local_candidates[p.local], + self.remote_candidates[p.remote], ); - p.nominated.store(true, Ordering::SeqCst); - self.nominated_pair = Some(p); + p.nominated = true; + self.nominated_pair = Some(*p); } self.nominate_pair(); @@ -222,7 +212,7 @@ impl ControllingSelector for Agent { } } - fn ping_candidate(&mut self, local: &Rc, remote: &Rc) { + fn ping_candidate(&mut self, local: usize, remote: usize) { let (msg, result) = { let ufrag_pwd = &self.ufrag_pwd; let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str(); @@ -232,7 +222,7 @@ impl ControllingSelector for Agent { Box::new(TransactionId::new()), Box::new(Username::new(ATTR_USERNAME, username)), Box::new(AttrControlling(self.tie_breaker)), - Box::new(PriorityAttr(local.priority())), + Box::new(PriorityAttr(self.local_candidates[local].priority())), Box::new(MessageIntegrity::new_short_term_integrity( ufrag_pwd.remote_pwd.clone(), )), @@ -251,8 +241,8 @@ impl ControllingSelector for Agent { fn handle_success_response( &mut self, m: &Message, - local: &Rc, - remote: &Rc, + local: usize, + remote: usize, remote_addr: SocketAddr, ) { if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id) { @@ -272,18 +262,18 @@ impl ControllingSelector for Agent { ); let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none(); - if let Some(p) = self.find_pair(local, remote) { - p.state - .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst); + if let Some(index) = self.find_pair(local, remote) { + let p = &mut self.agent_conn.checklist[index]; + p.state = CandidatePairState::Succeeded; log::trace!( "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}", - p, - p.state.load(Ordering::SeqCst), + *p, + p.state, pending_request.is_use_candidate, selected_pair_is_none ); if pending_request.is_use_candidate && selected_pair_is_none { - self.set_selected_pair(Some(Rc::clone(&p))); + self.set_selected_pair(Some(index)); } } else { // This shouldn't happen @@ -298,26 +288,22 @@ impl ControllingSelector for Agent { } } - fn handle_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ) { + fn handle_binding_request(&mut self, m: &Message, local: usize, remote: usize) { self.send_binding_success(m, local, remote); log::trace!("controllingSelector: sendBindingSuccess"); - if let Some(p) = self.find_pair(local, remote) { + if let Some(index) = self.find_pair(local, remote) { + let p = &self.agent_conn.checklist[index]; let nominated_pair_is_none = self.nominated_pair.is_none(); log::trace!( "controllingSelector: after findPair {}, p.state: {}, {}", p, - p.state.load(Ordering::SeqCst), + p.state, nominated_pair_is_none, //self.agent_conn.get_selected_pair().await.is_none() //, {} ); - if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 + if p.state == CandidatePairState::Succeeded && nominated_pair_is_none && self.agent_conn.get_selected_pair().is_none() { @@ -326,13 +312,13 @@ impl ControllingSelector for Agent { "controllingSelector: getBestAvailableCandidatePair {}", best_pair ); - if best_pair == p - && self.is_nominatable(&p.local) - && self.is_nominatable(&p.remote) + if best_pair == index + && self.is_nominatable(p.local, true) + && self.is_nominatable(p.remote, false) { log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated", p.local, p.remote); - self.nominated_pair = Some(p); + self.nominated_pair = Some(*p); self.nominate_pair(); } } else { @@ -341,7 +327,7 @@ impl ControllingSelector for Agent { } } else { log::trace!("controllingSelector: addPair"); - self.add_pair(local.clone(), remote.clone()); + self.add_pair(local, remote); } } } @@ -363,7 +349,7 @@ impl ControlledSelector for Agent { } } - fn ping_candidate(&mut self, local: &Rc, remote: &Rc) { + fn ping_candidate(&mut self, local: usize, remote: usize) { let (msg, result) = { let ufrag_pwd = &self.ufrag_pwd; let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str(); @@ -373,7 +359,7 @@ impl ControlledSelector for Agent { Box::new(TransactionId::new()), Box::new(Username::new(ATTR_USERNAME, username)), Box::new(AttrControlled(self.tie_breaker)), - Box::new(PriorityAttr(local.priority())), + Box::new(PriorityAttr(self.local_candidates[local].priority())), Box::new(MessageIntegrity::new_short_term_integrity( ufrag_pwd.remote_pwd.clone(), )), @@ -392,8 +378,8 @@ impl ControlledSelector for Agent { fn handle_success_response( &mut self, m: &Message, - local: &Rc, - remote: &Rc, + local: usize, + remote: usize, remote_addr: SocketAddr, ) { // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 @@ -418,10 +404,10 @@ impl ControlledSelector for Agent { local ); - if let Some(p) = self.find_pair(local, remote) { - p.state - .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst); - log::trace!("Found valid candidate pair: {}", p); + if let Some(index) = self.find_pair(local, remote) { + let p = &mut self.agent_conn.checklist[index]; + p.state = CandidatePairState::Succeeded; + log::trace!("Found valid candidate pair: {}", *p); } else { // This shouldn't happen log::error!("Success response from invalid candidate pair"); @@ -435,28 +421,24 @@ impl ControlledSelector for Agent { } } - fn handle_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ) { + fn handle_binding_request(&mut self, m: &Message, local: usize, remote: usize) { if self.find_pair(local, remote).is_none() { - self.add_pair(local.clone(), remote.clone()); + self.add_pair(local, remote); } - if let Some(p) = self.find_pair(local, remote) { + if let Some(index) = self.find_pair(local, remote) { + let p = &self.agent_conn.checklist[index]; let use_candidate = m.contains(ATTR_USE_CANDIDATE); if use_candidate { // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 - if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 { + if p.state == CandidatePairState::Succeeded { // If the state of this pair is Succeeded, it means that the check // previously sent by this pair produced a successful response and // generated a valid pair (Section 7.2.5.3.2). The agent sets the // nominated flag value of the valid pair to true. if self.agent_conn.get_selected_pair().is_none() { - self.set_selected_pair(Some(Rc::clone(&p))); + self.set_selected_pair(Some(index)); } self.send_binding_success(m, local, remote); } else { diff --git a/rtc-ice/src/agent/agent_stats.rs b/rtc-ice/src/agent/agent_stats.rs index 15702ed..df1066d 100644 --- a/rtc-ice/src/agent/agent_stats.rs +++ b/rtc-ice/src/agent/agent_stats.rs @@ -1,8 +1,7 @@ use crate::agent::Agent; -use std::sync::atomic::Ordering; use std::time::Instant; -use crate::candidate::{CandidatePairState, CandidateType}; +use crate::candidate::{candidate_pair::CandidatePairState, CandidateType}; /// Contains ICE candidate pair statistics. pub struct CandidatePairStats { @@ -207,10 +206,10 @@ impl Agent { for cp in checklist { let stat = CandidatePairStats { timestamp: Instant::now(), - local_candidate_id: cp.local.id(), - remote_candidate_id: cp.remote.id(), - state: cp.state.load(Ordering::SeqCst).into(), - nominated: cp.nominated.load(Ordering::SeqCst), + local_candidate_id: self.local_candidates[cp.local].id(), + remote_candidate_id: self.remote_candidates[cp.remote].id(), + state: cp.state, + nominated: cp.nominated, ..CandidatePairStats::default() }; res.push(stat); diff --git a/rtc-ice/src/agent/agent_transport.rs b/rtc-ice/src/agent/agent_transport.rs index e39e63a..cf5064f 100644 --- a/rtc-ice/src/agent/agent_transport.rs +++ b/rtc-ice/src/agent/agent_transport.rs @@ -1,6 +1,5 @@ use super::*; -use std::rc::Rc; -use std::sync::atomic::Ordering; +use crate::candidate::candidate_pair::{CandidatePair, CandidatePairState}; impl Agent { /// Connects to the remote agent, acting as the controlling ice agent. @@ -66,12 +65,8 @@ impl Agent { } pub(crate) struct AgentConn { - pub(crate) selected_pair: Option>, - pub(crate) checklist: Vec>, - - //pub(crate) buffer: Buffer, - pub(crate) bytes_received: usize, - pub(crate) bytes_sent: usize, + pub(crate) selected_pair: Option, + pub(crate) checklist: Vec, pub(crate) done: bool, } @@ -80,67 +75,53 @@ impl AgentConn { Self { selected_pair: None, checklist: vec![], - // Make sure the buffer doesn't grow indefinitely. - // NOTE: We actually won't get anywhere close to this limit. - // SRTP will constantly read from the endpoint and drop packets if it's full. - //buffer: Buffer::new(0, MAX_BUFFER_SIZE), - bytes_received: 0, - bytes_sent: 0, done: false, } } - pub(crate) fn get_selected_pair(&self) -> Option> { - self.selected_pair.clone() + pub(crate) fn get_selected_pair(&self) -> Option { + self.selected_pair } - pub(crate) fn get_best_available_candidate_pair(&self) -> Option> { - let mut best: Option<&Rc> = None; + pub(crate) fn get_best_available_candidate_pair(&self) -> Option { + let mut best: Option = None; - for p in &self.checklist { - if p.state.load(Ordering::SeqCst) == CandidatePairState::Failed as u8 { + for (index, p) in self.checklist.iter().enumerate() { + if p.state == CandidatePairState::Failed { continue; } - if let Some(b) = &mut best { + if let Some(best_index) = &mut best { + let b = &self.checklist[*best_index]; if b.priority() < p.priority() { - *b = p; + *best_index = index; } } else { - best = Some(p); + best = Some(index); } } - best.cloned() + best } - pub(crate) fn get_best_valid_candidate_pair(&self) -> Option> { - let mut best: Option<&Rc> = None; + pub(crate) fn get_best_valid_candidate_pair(&self) -> Option { + let mut best: Option = None; - for p in &self.checklist { - if p.state.load(Ordering::SeqCst) != CandidatePairState::Succeeded as u8 { + for (index, p) in self.checklist.iter().enumerate() { + if p.state != CandidatePairState::Succeeded { continue; } - if let Some(b) = &mut best { + if let Some(best_index) = &mut best { + let b = &self.checklist[*best_index]; if b.priority() < p.priority() { - *b = p; + *best_index = index; } } else { - best = Some(p); + best = Some(index); } } - best.cloned() - } - - /// Returns the number of bytes sent. - pub fn bytes_sent(&self) -> usize { - self.bytes_sent - } - - /// Returns the number of bytes received. - pub fn bytes_received(&self) -> usize { - self.bytes_received + best } } diff --git a/rtc-ice/src/agent/mod.rs b/rtc-ice/src/agent/mod.rs index f1b55fa..98fcf28 100644 --- a/rtc-ice/src/agent/mod.rs +++ b/rtc-ice/src/agent/mod.rs @@ -10,8 +10,6 @@ pub mod agent_transport; use agent_config::*; use std::net::{Ipv4Addr, SocketAddr}; -use std::rc::Rc; -use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use stun::attributes::*; use stun::fingerprint::*; @@ -21,6 +19,7 @@ use stun::textattrs::Username; use stun::xoraddr::*; use crate::agent::agent_transport::*; +use crate::candidate::candidate_pair::{CandidatePair, CandidatePairState}; use crate::candidate::*; use crate::rand::*; use crate::state::*; @@ -82,15 +81,15 @@ pub struct Agent { pub(crate) lite: bool, pub(crate) start_time: Instant, - pub(crate) nominated_pair: Option>, + pub(crate) nominated_pair: Option, pub(crate) connection_state: ConnectionState, //pub(crate) started_ch_tx: Mutex>>, pub(crate) ufrag_pwd: UfragPwd, - pub(crate) local_candidates: Vec>, - pub(crate) remote_candidates: Vec>, + pub(crate) local_candidates: Vec>, + pub(crate) remote_candidates: Vec>, // LRU of outbound Binding request Transaction IDs pub(crate) pending_binding_requests: Vec, @@ -224,59 +223,27 @@ impl Agent { Ok(agent) } - /// Gets bytes received - pub fn get_bytes_received(&self) -> usize { - self.agent_conn.bytes_received() - } - - /// Gets bytes sent - pub fn get_bytes_sent(&self) -> usize { - self.agent_conn.bytes_sent() - } - /// Adds a new local candidate. - pub fn add_local_candidate(&mut self, c: Rc) -> Result<()> { - /*todo:let initialized_ch = { - let started_ch_tx = self.started_ch_tx.lock().await; - (*started_ch_tx).as_ref().map(|tx| tx.subscribe()) - };*/ - - self.start_candidate(&c /*, initialized_ch*/); - + pub fn add_local_candidate(&mut self, c: Box) -> Result<()> { for cand in &self.local_candidates { if cand.equal(&*c) { - if let Err(err) = c.close() { - log::warn!( - "[{}]: Failed to close duplicate candidate: {}", - self.get_name(), - err - ); - } - //TODO: why return? return Ok(()); } } - self.local_candidates.push(c.clone()); + self.local_candidates.push(c); - for remote_cand in self.remote_candidates.clone() { - self.add_pair(c.clone(), remote_cand); + for remote_cand in 0..self.remote_candidates.len() { + self.add_pair(self.local_candidates.len() - 1, remote_cand); } self.request_connectivity_check(); - /*TODO: - { - let chan_candidate_tx = &self.chan_candidate_tx.lock().await; - if let Some(tx) = &*chan_candidate_tx { - let _ = tx.send(Some(c.clone())).await; - } - }*/ Ok(()) } /// Adds a new remote candidate. - pub fn add_remote_candidate(&mut self, c: Rc) -> Result<()> { + pub fn add_remote_candidate(&mut self, c: Box) -> Result<()> { // If we have a mDNS Candidate lets fully resolve it before adding it locally if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") { log::warn!( @@ -292,10 +259,10 @@ impl Agent { } } - self.remote_candidates.push(c.clone()); + self.remote_candidates.push(c); - for local_cand in self.local_candidates.clone() { - self.add_pair(local_cand, c.clone()); + for local_cand in 0..self.local_candidates.len() { + self.add_pair(local_cand, self.remote_candidates.len() - 1); } self.request_connectivity_check(); @@ -327,8 +294,8 @@ impl Agent { Ok(()) } - /// Returns the selected pair or nil if there is none - pub fn get_selected_candidate_pair(&self) -> Option> { + /// Returns the selected pair or none if there is none + pub fn get_selected_candidate_pair(&self) -> Option { self.agent_conn.get_selected_pair() } @@ -390,14 +357,8 @@ impl Agent { } // Returns the local candidates. - pub(crate) fn get_local_candidates(&self) -> Result>> { - let mut res = vec![]; - - for candidate in &self.local_candidates { - res.push(Rc::clone(candidate)); - } - - Ok(res) + pub(crate) fn get_local_candidates(&self) -> &[Box] { + &self.local_candidates } pub(crate) fn start_connectivity_checks( @@ -549,16 +510,17 @@ impl Agent { } } - pub(crate) fn set_selected_pair(&mut self, p: Option>) { - log::trace!( - "[{}]: Set selected candidate pair: {:?}", - self.get_name(), - p - ); + pub(crate) fn set_selected_pair(&mut self, selected_pair: Option) { + if let Some(index) = selected_pair { + log::trace!( + "[{}]: Set selected candidate pair: {:?}", + self.get_name(), + self.agent_conn.checklist[index] + ); - if let Some(p) = p { - p.nominated.store(true, Ordering::SeqCst); - self.agent_conn.selected_pair = Some(p); + let p = &mut self.agent_conn.checklist[index]; + p.nominated = true; + self.agent_conn.selected_pair = Some(index); self.update_connection_state(ConnectionState::Connected); @@ -583,7 +545,7 @@ impl Agent { pub(crate) fn ping_all_candidates(&mut self) { log::trace!("[{}]: pinging all candidates", self.get_name(),); - let mut pairs: Vec<(Rc, Rc)> = vec![]; + let mut pairs: Vec<(usize, usize)> = vec![]; { let name = self.get_name().to_string(); @@ -595,50 +557,49 @@ impl Agent { ); } for p in checklist { - let p_state = p.state.load(Ordering::SeqCst); - if p_state == CandidatePairState::Waiting as u8 { - p.state - .store(CandidatePairState::InProgress as u8, Ordering::SeqCst); - } else if p_state != CandidatePairState::InProgress as u8 { + if p.state == CandidatePairState::Waiting { + p.state = CandidatePairState::InProgress; + } else if p.state != CandidatePairState::InProgress { continue; } - if p.binding_request_count.load(Ordering::SeqCst) > self.max_binding_requests { + if p.binding_request_count > self.max_binding_requests { log::trace!( "[{}]: max requests reached for pair {}, marking it as failed", name, - p + *p ); - p.state - .store(CandidatePairState::Failed as u8, Ordering::SeqCst); + p.state = CandidatePairState::Failed; } else { - p.binding_request_count.fetch_add(1, Ordering::SeqCst); - let local = p.local.clone(); - let remote = p.remote.clone(); + p.binding_request_count += 1; + let local = p.local; + let remote = p.remote; pairs.push((local, remote)); } } } for (local, remote) in pairs { - self.ping_candidate(&local, &remote); + self.ping_candidate(local, remote); } } - pub(crate) fn add_pair(&mut self, local: Rc, remote: Rc) { - let p = Rc::new(CandidatePair::new(local, remote, self.is_controlling)); + pub(crate) fn add_pair(&mut self, local: usize, remote: usize) { + let p = CandidatePair::new( + local, + remote, + self.local_candidates[local].priority(), + self.remote_candidates[remote].priority(), + self.is_controlling, + ); self.agent_conn.checklist.push(p); } - pub(crate) fn find_pair( - &self, - local: &Rc, - remote: &Rc, - ) -> Option> { + pub(crate) fn find_pair(&self, local: usize, remote: usize) -> Option { let checklist = &self.agent_conn.checklist; - for p in checklist { - if p.local.equal(&**local) && p.remote.equal(&**remote) { - return Some(p.clone()); + for (index, p) in checklist.iter().enumerate() { + if p.local == local && p.remote == remote { + return Some(index); } } None @@ -648,12 +609,13 @@ impl Agent { /// Note: the caller should hold the agent lock. pub(crate) fn validate_selected_pair(&mut self) -> bool { let (valid, disconnected_time) = { - let selected_pair = &self.agent_conn.selected_pair; - (*selected_pair).as_ref().map_or_else( + self.agent_conn.selected_pair.as_ref().map_or_else( || (false, Duration::from_secs(0)), |selected_pair| { - let disconnected_time = - Instant::now().duration_since(selected_pair.remote.last_received()); + let remote = self.agent_conn.checklist[*selected_pair].remote; + + let disconnected_time = Instant::now() + .duration_since(self.remote_candidates[remote].last_received()); (true, disconnected_time) }, ) @@ -690,17 +652,16 @@ impl Agent { .selected_pair .as_ref() .map_or((None, None), |selected_pair| { - ( - Some(selected_pair.local.clone()), - Some(selected_pair.remote.clone()), - ) + let p = &self.agent_conn.checklist[*selected_pair]; + (Some(p.local), Some(p.remote)) }) }; if let (Some(local), Some(remote)) = (local, remote) { - let last_sent = Instant::now().duration_since(local.last_sent()); + let last_sent = Instant::now().duration_since(self.local_candidates[local].last_sent()); - let last_received = Instant::now().duration_since(remote.last_received()); + let last_received = + Instant::now().duration_since(self.remote_candidates[remote].last_received()); if (self.keepalive_interval != Duration::from_secs(0)) && ((last_sent > self.keepalive_interval) @@ -708,7 +669,7 @@ impl Agent { { // we use binding request instead of indication to support refresh consent schemas // see https://tools.ietf.org/html/rfc7675 - self.ping_candidate(&local, &remote); + self.ping_candidate(local, remote); } } } @@ -739,22 +700,17 @@ impl Agent { self.remote_candidates.clear(); } - pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option> { + pub(crate) fn find_remote_candidate(&self, addr: SocketAddr) -> Option { let (ip, port) = (addr.ip(), addr.port()); - for c in &self.remote_candidates { + for (index, c) in self.remote_candidates.iter().enumerate() { if c.address() == ip.to_string() && c.port() == port { - return Some(c.clone()); + return Some(index); } } None } - pub(crate) fn send_binding_request( - &mut self, - m: &Message, - local: &Rc, - remote: &Rc, - ) { + pub(crate) fn send_binding_request(&mut self, m: &Message, local: usize, remote: usize) { log::trace!( "[{}]: ping STUN from {} to {}", self.get_name(), @@ -767,7 +723,7 @@ impl Agent { self.pending_binding_requests.push(BindingRequest { timestamp: Instant::now(), transaction_id: m.transaction_id, - destination: remote.addr(), + destination: self.remote_candidates[remote].addr(), is_use_candidate: m.contains(ATTR_USE_CANDIDATE), }); } @@ -775,13 +731,8 @@ impl Agent { self.send_stun(m, local, remote); } - pub(crate) fn send_binding_success( - &self, - m: &Message, - local: &Rc, - remote: &Rc, - ) { - let addr = remote.addr(); + pub(crate) fn send_binding_success(&mut self, m: &Message, local: usize, remote: usize) { + let addr = self.remote_candidates[remote].addr(); let (ip, port) = (addr.ip(), addr.port()); let local_pwd = self.ufrag_pwd.local_pwd.clone(); @@ -862,8 +813,8 @@ impl Agent { pub(crate) fn handle_inbound( &mut self, m: &mut Message, - local: &Rc, - remote: SocketAddr, + local: usize, + remote_addr: SocketAddr, ) { if m.typ.method != METHOD_BINDING || !(m.typ.class == CLASS_SUCCESS_RESPONSE @@ -873,7 +824,7 @@ impl Agent { log::trace!( "[{}]: unhandled STUN from {} to {} class({}) method({})", self.get_name(), - remote, + remote_addr, local, m.typ.class, m.typ.method @@ -903,7 +854,7 @@ impl Agent { return; } - let remote_candidate = self.find_remote_candidate(remote); + let remote_candidate_index = self.find_remote_candidate(remote_addr); if m.typ.class == CLASS_SUCCESS_RESPONSE { { let ufrag_pwd = &self.ufrag_pwd; @@ -913,20 +864,20 @@ impl Agent { log::warn!( "[{}]: discard message from ({}), {}", self.get_name(), - remote, + remote_addr, err ); return; } } - if let Some(rc) = &remote_candidate { - self.handle_success_response(m, local, rc, remote); + if let Some(remote) = &remote_candidate_index { + self.handle_success_response(m, local, *remote, remote_addr); } else { log::warn!( "[{}]: discard success message from ({}), no such remote", self.get_name(), - remote + remote_addr ); return; } @@ -939,7 +890,7 @@ impl Agent { log::warn!( "[{}]: discard message from ({}), {}", self.get_name(), - remote, + remote_addr, err ); return; @@ -949,7 +900,7 @@ impl Agent { log::warn!( "[{}]: discard message from ({}), {}", self.get_name(), - remote, + remote_addr, err ); return; @@ -997,37 +948,34 @@ impl Agent { log::trace!( "[{}]: inbound STUN (Request) from {} to {}", self.get_name(), - remote, + remote_addr, local ); - if let Some(rc) = &remote_candidate { - self.handle_binding_request(m, local, rc); + if let Some(remote) = &remote_candidate_index { + self.handle_binding_request(m, local, *remote); } } - if let Some(rc) = remote_candidate { - rc.seen(false); + if let Some(remote) = remote_candidate_index { + self.remote_candidates[remote].seen(false); } } // Processes non STUN traffic from a remote candidate, and returns true if it is an actual // remote candidate. - pub(crate) fn validate_non_stun_traffic(&self, remote: SocketAddr) -> bool { - self.find_remote_candidate(remote) - .map_or(false, |remote_candidate| { - remote_candidate.seen(false); + pub(crate) fn validate_non_stun_traffic(&mut self, remote_addr: SocketAddr) -> bool { + self.find_remote_candidate(remote_addr) + .map_or(false, |remote| { + self.remote_candidates[remote].seen(false); true }) } - pub(crate) fn send_stun( - &self, - msg: &Message, - local: &Rc, - remote: &Rc, - ) { - if let Err(err) = local.write_to(&msg.raw, &**remote) { + pub(crate) fn send_stun(&mut self, msg: &Message, local: usize, remote: usize) { + if let Err(err) = + self.local_candidates[local].write_to(&msg.raw, &*self.remote_candidates[remote]) + { log::trace!( "[{}]: failed to send STUN message: {}", self.get_name(), @@ -1037,20 +985,20 @@ impl Agent { } // Runs the candidate using the provided connection. - fn start_candidate( + /*fn start_candidate( &self, candidate: &Rc, //TODO: _initialized_ch: Option>, ) { - /*TODO: let (closed_ch_tx, _closed_ch_rx) = broadcast::channel(1); + TODO: let (closed_ch_tx, _closed_ch_rx) = broadcast::channel(1); { let closed_ch = candidate.get_closed_ch(); let mut closed = closed_ch.lock().await; *closed = Some(closed_ch_tx); - }*/ + } let _cand = Rc::clone(candidate); - /*TODO:if let Some(conn) = candidate.get_conn() { + TODO:if let Some(conn) = candidate.get_conn() { let conn = Arc::clone(conn); let addr = candidate.addr(); let ai = Arc::clone(self); @@ -1059,11 +1007,11 @@ impl Agent { .recv_loop(cand, closed_ch_rx, initialized_ch, conn, addr) .await; }); - } else */ + } else { log::error!("[{}]: Can't start due to conn is_none", self.get_name(),); } - } + }*/ pub(super) fn start_on_connection_state_change_routine( &mut self, @@ -1130,7 +1078,7 @@ impl Agent { */ } - async fn recv_loop( + /*TODO fn recv_loop( &self, _candidate: Rc, //mut _closed_ch_rx: broadcast::Receiver<()>, @@ -1138,7 +1086,7 @@ impl Agent { //TODO:conn: Arc, _addr: SocketAddr, ) -> Result<()> { - /* if let Some(mut initialized_ch) = initialized_ch { + if let Some(mut initialized_ch) = initialized_ch { tokio::select! { _ = initialized_ch.recv() => {} _ = closed_ch_rx.recv() => return Err(Error::ErrClosed), @@ -1164,13 +1112,13 @@ impl Agent { self.handle_inbound_candidate_msg(&candidate, &buffer[..n], src_addr, addr) .await; - }*/ + } Ok(()) - } + }*/ fn handle_inbound_candidate_msg( &mut self, - c: &Rc, + local: usize, buf: &[u8], src_addr: SocketAddr, addr: SocketAddr, @@ -1192,7 +1140,7 @@ impl Agent { err ); } else { - self.handle_inbound(&mut m, c, src_addr); + self.handle_inbound(&mut m, local, src_addr); } } else if !self.validate_non_stun_traffic(src_addr) { log::warn!( diff --git a/rtc-ice/src/candidate/candidate_pair.rs b/rtc-ice/src/candidate/candidate_pair.rs new file mode 100644 index 0000000..1ed4da8 --- /dev/null +++ b/rtc-ice/src/candidate/candidate_pair.rs @@ -0,0 +1,150 @@ +use serde::Serialize; +use std::fmt; + +/// Represent the ICE candidate pair state. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +pub enum CandidatePairState { + #[serde(rename = "unspecified")] + Unspecified = 0, + + /// Means a check has not been performed for this pair. + #[serde(rename = "waiting")] + Waiting = 1, + + /// Means a check has been sent for this pair, but the transaction is in progress. + #[serde(rename = "in-progress")] + InProgress = 2, + + /// Means a check for this pair was already done and failed, either never producing any response + /// or producing an unrecoverable failure response. + #[serde(rename = "failed")] + Failed = 3, + + /// Means a check for this pair was already done and produced a successful result. + #[serde(rename = "succeeded")] + Succeeded = 4, +} + +impl From for CandidatePairState { + fn from(v: u8) -> Self { + match v { + 1 => Self::Waiting, + 2 => Self::InProgress, + 3 => Self::Failed, + 4 => Self::Succeeded, + _ => Self::Unspecified, + } + } +} + +impl Default for CandidatePairState { + fn default() -> Self { + Self::Unspecified + } +} + +impl fmt::Display for CandidatePairState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match *self { + Self::Waiting => "waiting", + Self::InProgress => "in-progress", + Self::Failed => "failed", + Self::Succeeded => "succeeded", + Self::Unspecified => "unspecified", + }; + + write!(f, "{s}") + } +} + +/// Represents a combination of a local and remote candidate. +#[derive(Clone, Copy)] +pub struct CandidatePair { + pub(crate) ice_role_controlling: bool, + pub remote: usize, + pub local: usize, + pub remote_priority: u32, + pub local_priority: u32, + pub(crate) binding_request_count: u16, + pub(crate) state: CandidatePairState, + pub(crate) nominated: bool, +} + +impl fmt::Debug for CandidatePair { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "prio {} (local, prio {}) {} <-> {} (remote, prio {})", + self.priority(), + self.local_priority, + self.local, + self.remote, + self.remote_priority, + ) + } +} + +impl fmt::Display for CandidatePair { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "prio {} (local, prio {}) {} <-> {} (remote, prio {})", + self.priority(), + self.local_priority, + self.local, + self.remote, + self.remote_priority, + ) + } +} + +impl PartialEq for CandidatePair { + fn eq(&self, other: &Self) -> bool { + self.local == other.local && self.remote == other.remote + } +} + +impl CandidatePair { + #[must_use] + pub fn new( + local: usize, + remote: usize, + local_priority: u32, + remote_priority: u32, + controlling: bool, + ) -> Self { + Self { + ice_role_controlling: controlling, + remote, + local, + remote_priority, + local_priority, + state: CandidatePairState::Waiting, + binding_request_count: 0, + nominated: false, + } + } + + /// RFC 5245 - 5.7.2. Computing Pair Priority and Ordering Pairs + /// Let G be the priority for the candidate provided by the controlling + /// agent. Let D be the priority for the candidate provided by the + /// controlled agent. + /// pair priority = 2^32*MIN(G,D) + 2*MAX(G,D) + (G>D?1:0) + pub fn priority(&self) -> u64 { + let (g, d) = if self.ice_role_controlling { + (self.local_priority, self.remote_priority) + } else { + (self.remote_priority, self.local_priority) + }; + + // 1<<32 overflows uint32; and if both g && d are + // maxUint32, this result would overflow uint64 + ((1 << 32_u64) - 1) * u64::from(std::cmp::min(g, d)) + + 2 * u64::from(std::cmp::max(g, d)) + + u64::from(g > d) + } + + /*TODO: pub fn write(&mut self, b: &[u8]) -> shared::error::Result { + self.local.write_to(b, &*self.remote) + }*/ +} diff --git a/rtc-ice/src/candidate/candidate_pair_test.rs b/rtc-ice/src/candidate/candidate_pair_test.rs index 610433a..4276f4e 100644 --- a/rtc-ice/src/candidate/candidate_pair_test.rs +++ b/rtc-ice/src/candidate/candidate_pair_test.rs @@ -1,5 +1,7 @@ use super::*; +use crate::candidate::candidate_base::{CandidateBase, CandidateBaseConfig}; use crate::candidate::candidate_host::CandidateHostConfig; +use crate::candidate::candidate_pair::CandidatePair; use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig; use crate::candidate::candidate_relay::CandidateRelayConfig; use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig; @@ -58,67 +60,95 @@ pub(crate) fn relay_candidate() -> Result { #[test] fn test_candidate_pair_priority() -> Result<()> { + const HOST_INDEX: usize = 0; + const PRFLX_INDEX: usize = 1; + const SRFLX_INDEX: usize = 2; + const RELAY_INDEX: usize = 3; + + let candidates = vec![ + host_candidate()?, + prflx_candidate()?, + srflx_candidate()?, + relay_candidate()?, + ]; + let tests = vec![ ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(host_candidate()?), + HOST_INDEX, + HOST_INDEX, + candidates[HOST_INDEX].priority(), + candidates[HOST_INDEX].priority(), false, ), 9151314440652587007, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(host_candidate()?), + HOST_INDEX, + HOST_INDEX, + candidates[HOST_INDEX].priority(), + candidates[HOST_INDEX].priority(), true, ), 9151314440652587007, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(prflx_candidate()?), + HOST_INDEX, + PRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[PRFLX_INDEX].priority(), true, ), 7998392936314175488, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(prflx_candidate()?), + HOST_INDEX, + PRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[PRFLX_INDEX].priority(), false, ), 7998392936314175487, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(srflx_candidate()?), + HOST_INDEX, + SRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[SRFLX_INDEX].priority(), true, ), 7277816996102668288, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(srflx_candidate()?), + HOST_INDEX, + SRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[SRFLX_INDEX].priority(), false, ), 7277816996102668287, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(relay_candidate()?), + HOST_INDEX, + RELAY_INDEX, + candidates[HOST_INDEX].priority(), + candidates[RELAY_INDEX].priority(), true, ), 72057593987596288, ), ( CandidatePair::new( - Box::new(host_candidate()?), - Box::new(relay_candidate()?), + HOST_INDEX, + RELAY_INDEX, + candidates[HOST_INDEX].priority(), + candidates[RELAY_INDEX].priority(), false, ), 72057593987596287, @@ -138,14 +168,30 @@ fn test_candidate_pair_priority() -> Result<()> { #[test] fn test_candidate_pair_equality() -> Result<()> { + const HOST_INDEX: usize = 0; + const PRFLX_INDEX: usize = 1; + const SRFLX_INDEX: usize = 2; + const RELAY_INDEX: usize = 3; + + let candidates = vec![ + host_candidate()?, + prflx_candidate()?, + srflx_candidate()?, + relay_candidate()?, + ]; + let pair_a = CandidatePair::new( - Box::new(host_candidate()?), - Box::new(srflx_candidate()?), + HOST_INDEX, + SRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[SRFLX_INDEX].priority(), true, ); let pair_b = CandidatePair::new( - Box::new(host_candidate()?), - Box::new(srflx_candidate()?), + HOST_INDEX, + SRFLX_INDEX, + candidates[HOST_INDEX].priority(), + candidates[SRFLX_INDEX].priority(), false, ); diff --git a/rtc-ice/src/candidate/candidate_test.rs b/rtc-ice/src/candidate/candidate_test.rs index a3fa665..2011583 100644 --- a/rtc-ice/src/candidate/candidate_test.rs +++ b/rtc-ice/src/candidate/candidate_test.rs @@ -1,4 +1,6 @@ use super::*; +use crate::candidate::candidate_base::{unmarshal_candidate, CandidateBase}; +use crate::candidate::candidate_pair::CandidatePairState; #[test] fn test_candidate_priority() -> Result<()> { diff --git a/rtc-ice/src/candidate/mod.rs b/rtc-ice/src/candidate/mod.rs index 374ce6d..39269ae 100644 --- a/rtc-ice/src/candidate/mod.rs +++ b/rtc-ice/src/candidate/mod.rs @@ -11,13 +11,13 @@ mod candidate_server_reflexive_test; pub mod candidate_base; pub mod candidate_host; +pub mod candidate_pair; pub mod candidate_peer_reflexive; pub mod candidate_relay; pub mod candidate_server_reflexive; use crate::network_type::NetworkType; use crate::tcp_type::TcpType; -use candidate_base::*; use serde::Serialize; use shared::error::*; use std::fmt; @@ -161,153 +161,3 @@ impl fmt::Display for CandidateRelatedAddress { write!(f, " related {}:{}", self.address, self.port) } } - -/// Represent the ICE candidate pair state. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] -pub enum CandidatePairState { - #[serde(rename = "unspecified")] - Unspecified = 0, - - /// Means a check has not been performed for this pair. - #[serde(rename = "waiting")] - Waiting = 1, - - /// Means a check has been sent for this pair, but the transaction is in progress. - #[serde(rename = "in-progress")] - InProgress = 2, - - /// Means a check for this pair was already done and failed, either never producing any response - /// or producing an unrecoverable failure response. - #[serde(rename = "failed")] - Failed = 3, - - /// Means a check for this pair was already done and produced a successful result. - #[serde(rename = "succeeded")] - Succeeded = 4, -} - -impl From for CandidatePairState { - fn from(v: u8) -> Self { - match v { - 1 => Self::Waiting, - 2 => Self::InProgress, - 3 => Self::Failed, - 4 => Self::Succeeded, - _ => Self::Unspecified, - } - } -} - -impl Default for CandidatePairState { - fn default() -> Self { - Self::Unspecified - } -} - -impl fmt::Display for CandidatePairState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match *self { - Self::Waiting => "waiting", - Self::InProgress => "in-progress", - Self::Failed => "failed", - Self::Succeeded => "succeeded", - Self::Unspecified => "unspecified", - }; - - write!(f, "{s}") - } -} - -/// Represents a combination of a local and remote candidate. -pub struct CandidatePair { - pub(crate) ice_role_controlling: bool, - pub remote: Box, - pub local: Box, - pub(crate) binding_request_count: u16, - pub(crate) state: CandidatePairState, - pub(crate) nominated: bool, -} - -impl Default for CandidatePair { - fn default() -> Self { - Self { - ice_role_controlling: false, - remote: Box::new(CandidateBase::default()), - local: Box::new(CandidateBase::default()), - state: CandidatePairState::Waiting, - binding_request_count: 0, - nominated: false, - } - } -} - -impl fmt::Debug for CandidatePair { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "prio {} (local, prio {}) {} <-> {} (remote, prio {})", - self.priority(), - self.local.priority(), - self.local, - self.remote, - self.remote.priority() - ) - } -} - -impl fmt::Display for CandidatePair { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "prio {} (local, prio {}) {} <-> {} (remote, prio {})", - self.priority(), - self.local.priority(), - self.local, - self.remote, - self.remote.priority() - ) - } -} - -impl PartialEq for CandidatePair { - fn eq(&self, other: &Self) -> bool { - self.local.equal(&*other.local) && self.remote.equal(&*other.remote) - } -} - -impl CandidatePair { - #[must_use] - pub fn new(local: Box, remote: Box, controlling: bool) -> Self { - Self { - ice_role_controlling: controlling, - remote, - local, - state: CandidatePairState::Waiting, - binding_request_count: 0, - nominated: false, - } - } - - /// RFC 5245 - 5.7.2. Computing Pair Priority and Ordering Pairs - /// Let G be the priority for the candidate provided by the controlling - /// agent. Let D be the priority for the candidate provided by the - /// controlled agent. - /// pair priority = 2^32*MIN(G,D) + 2*MAX(G,D) + (G>D?1:0) - pub fn priority(&self) -> u64 { - let (g, d) = if self.ice_role_controlling { - (self.local.priority(), self.remote.priority()) - } else { - (self.remote.priority(), self.local.priority()) - }; - - // 1<<32 overflows uint32; and if both g && d are - // maxUint32, this result would overflow uint64 - ((1 << 32_u64) - 1) * u64::from(std::cmp::min(g, d)) - + 2 * u64::from(std::cmp::max(g, d)) - + u64::from(g > d) - } - - pub fn write(&mut self, b: &[u8]) -> Result { - self.local.write_to(b, &*self.remote) - } -} diff --git a/rtc-ice/src/lib.rs b/rtc-ice/src/lib.rs index d30290f..33fdc7f 100644 --- a/rtc-ice/src/lib.rs +++ b/rtc-ice/src/lib.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![allow(dead_code)] -//TODO: pub mod agent; +pub mod agent; pub mod attributes; pub mod candidate; pub mod network_type; diff --git a/rtc-ice/src/stats/mod.rs b/rtc-ice/src/stats/mod.rs index 210befc..440e910 100644 --- a/rtc-ice/src/stats/mod.rs +++ b/rtc-ice/src/stats/mod.rs @@ -1,5 +1,6 @@ use std::time::Instant; +use crate::candidate::candidate_pair::CandidatePairState; use crate::candidate::*; // CandidatePairStats contains ICE candidate pair statistics