From 6453c08ba30773c4e0ce6da49a1fbe96f7e9be78 Mon Sep 17 00:00:00 2001 From: yngrtc Date: Sun, 10 Mar 2024 09:05:53 -0700 Subject: [PATCH] move transmits from candidate to agent --- rtc-ice/src/agent/agent_stats.rs | 4 +-- rtc-ice/src/agent/mod.rs | 52 +++++++++++++++++++------------- rtc-ice/src/candidate/mod.rs | 47 ++--------------------------- 3 files changed, 35 insertions(+), 68 deletions(-) diff --git a/rtc-ice/src/agent/agent_stats.rs b/rtc-ice/src/agent/agent_stats.rs index 2874857..5e00cf9 100644 --- a/rtc-ice/src/agent/agent_stats.rs +++ b/rtc-ice/src/agent/agent_stats.rs @@ -223,7 +223,7 @@ impl Agent { let stat = CandidateStats { timestamp: Instant::now(), id: c.id(), - ip: c.address(), + ip: c.address().to_owned(), port: c.port(), candidate_type: c.candidate_type(), priority: c.priority(), @@ -244,7 +244,7 @@ impl Agent { let stat = CandidateStats { timestamp: Instant::now(), id: c.id(), - ip: c.address(), + ip: c.address().to_owned(), port: c.port(), candidate_type: c.candidate_type(), priority: c.priority(), diff --git a/rtc-ice/src/agent/mod.rs b/rtc-ice/src/agent/mod.rs index 14f7d41..a1f2a25 100644 --- a/rtc-ice/src/agent/mod.rs +++ b/rtc-ice/src/agent/mod.rs @@ -6,6 +6,8 @@ pub mod agent_selector; pub mod agent_stats; use agent_config::*; +use bytes::BytesMut; +use std::collections::VecDeque; use std::net::{Ipv4Addr, SocketAddr}; use std::time::{Duration, Instant}; use stun::attributes::*; @@ -20,6 +22,7 @@ use crate::rand::*; use crate::state::*; use crate::url::*; use shared::error::*; +use shared::{Protocol, Transmit, TransportContext}; #[derive(Debug, Clone)] pub(crate) struct BindingRequest { @@ -109,6 +112,8 @@ pub struct Agent { pub(crate) candidate_types: Vec, pub(crate) urls: Vec, + + pub(crate) transmits: VecDeque>, } impl Agent { @@ -207,6 +212,8 @@ impl Agent { candidate_types, urls: config.urls.clone(), + + transmits: VecDeque::new(), }; // Restart is also used to initialize the agent for the first time @@ -281,6 +288,10 @@ impl Agent { ) } + pub fn poll_transmit(&mut self) -> Option> { + self.transmits.pop_front() + } + /// Cleans up the Agent. pub fn close(&mut self) -> Result<()> { self.delete_all_candidates(false); @@ -682,22 +693,9 @@ impl Agent { /// /// This is used for restarts, failures and on close. pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) { - let name = self.get_name().to_string(); - if !keep_local_candidates { - for c in &self.local_candidates { - if let Err(err) = c.close() { - log::warn!("[{}]: Failed to close candidate {}: {}", name, c, err); - } - } self.local_candidates.clear(); } - - for c in &self.remote_candidates { - if let Err(err) = c.close() { - log::warn!("[{}]: Failed to close candidate {}: {}", name, c, err); - } - } self.remote_candidates.clear(); } @@ -984,14 +982,26 @@ impl Agent { } pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) { - let remote_addr = self.remote_candidates[remote_index].addr(); - if let Err(err) = self.local_candidates[local_index].write(&msg.raw, remote_addr) { - log::trace!( - "[{}]: failed to send STUN message: {}", - self.get_name(), - err - ); - } + let peer_addr = self.remote_candidates[remote_index].addr(); + let local_addr = self.local_candidates[local_index].addr(); + let protocol = if self.local_candidates[local_index].network_type().is_tcp() { + Protocol::TCP + } else { + Protocol::UDP + }; + + self.transmits.push_back(Transmit { + now: Instant::now(), + transport: TransportContext { + local_addr, + peer_addr, + ecn: None, + protocol, + }, + message: BytesMut::from(&msg.raw[..]), + }); + + self.local_candidates[local_index].seen(true); } // Runs the candidate using the provided connection. diff --git a/rtc-ice/src/candidate/mod.rs b/rtc-ice/src/candidate/mod.rs index 90a2a12..405dbfe 100644 --- a/rtc-ice/src/candidate/mod.rs +++ b/rtc-ice/src/candidate/mod.rs @@ -17,11 +17,9 @@ pub mod candidate_server_reflexive; use crate::network_type::NetworkType; use crate::tcp_type::TcpType; -use bytes::BytesMut; use crc::{Crc, CRC_32_ISCSI}; use serde::Serialize; use shared::error::*; -use std::collections::VecDeque; use std::fmt; use std::net::{IpAddr, SocketAddr}; use std::time::Instant; @@ -31,7 +29,6 @@ use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig; use crate::candidate::candidate_relay::CandidateRelayConfig; use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig; use crate::network_type::determine_network_type; -use shared::{Protocol, Transmit, TransportContext}; pub(crate) const RECEIVE_MTU: usize = 8192; pub(crate) const DEFAULT_LOCAL_PREFERENCE: u16 = 65535; @@ -147,7 +144,6 @@ pub struct Candidate { pub(crate) tcp_type: TcpType, pub(crate) resolved_addr: SocketAddr, - pub(crate) transmits: VecDeque>, pub(crate) last_sent: Instant, pub(crate) last_received: Instant, @@ -176,7 +172,6 @@ impl Default for Candidate { tcp_type: TcpType::default(), resolved_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0), - transmits: VecDeque::new(), last_sent: Instant::now(), last_received: Instant::now(), @@ -262,8 +257,8 @@ impl Candidate { } /// Returns Candidate Address. - pub fn address(&self) -> String { - self.address.clone() + pub fn address(&self) -> &str { + self.address.as_str() } /// Returns Candidate Port. @@ -334,19 +329,6 @@ impl Candidate { self.resolved_addr } - /// Stops the recvLoop. - pub fn close(&self) -> Result<()> { - /*TODO:{ - let mut closed_ch = self.closed_ch.lock().await; - if closed_ch.is_none() { - return Err(Error::ErrClosed); - } - closed_ch.take(); - }*/ - - Ok(()) - } - pub fn seen(&mut self, outbound: bool) { let now = Instant::now(); @@ -357,31 +339,6 @@ impl Candidate { } } - pub fn write(&mut self, raw: &[u8], remote: SocketAddr) -> Result { - let n = raw.len(); - self.transmits.push_back(Transmit { - now: Instant::now(), - transport: TransportContext { - local_addr: self.resolved_addr, - peer_addr: remote, - ecn: None, - protocol: if self.network_type.is_tcp() { - Protocol::TCP - } else { - Protocol::UDP - }, - }, - message: BytesMut::from(raw), - }); - - self.seen(true); - Ok(n) - } - - pub fn poll_transmit(&mut self) -> Option> { - self.transmits.pop_front() - } - /// Used to compare two candidateBases. pub fn equal(&self, other: &Candidate) -> bool { self.network_type() == other.network_type()