Skip to content

Commit

Permalink
move transmits from candidate to agent
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 10, 2024
1 parent 78d83f7 commit 6453c08
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 68 deletions.
4 changes: 2 additions & 2 deletions rtc-ice/src/agent/agent_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Agent {
let stat = CandidateStats {
timestamp: Instant::now(),
id: c.id(),
ip: c.address(),
ip: c.address().to_owned(),
port: c.port(),
candidate_type: c.candidate_type(),
priority: c.priority(),
Expand All @@ -244,7 +244,7 @@ impl Agent {
let stat = CandidateStats {
timestamp: Instant::now(),
id: c.id(),
ip: c.address(),
ip: c.address().to_owned(),
port: c.port(),
candidate_type: c.candidate_type(),
priority: c.priority(),
Expand Down
52 changes: 31 additions & 21 deletions rtc-ice/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod agent_selector;
pub mod agent_stats;

use agent_config::*;
use bytes::BytesMut;
use std::collections::VecDeque;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
use stun::attributes::*;
Expand All @@ -20,6 +22,7 @@ use crate::rand::*;
use crate::state::*;
use crate::url::*;
use shared::error::*;
use shared::{Protocol, Transmit, TransportContext};

#[derive(Debug, Clone)]
pub(crate) struct BindingRequest {
Expand Down Expand Up @@ -109,6 +112,8 @@ pub struct Agent {

pub(crate) candidate_types: Vec<CandidateType>,
pub(crate) urls: Vec<Url>,

pub(crate) transmits: VecDeque<Transmit<BytesMut>>,
}

impl Agent {
Expand Down Expand Up @@ -207,6 +212,8 @@ impl Agent {

candidate_types,
urls: config.urls.clone(),

transmits: VecDeque::new(),
};

// Restart is also used to initialize the agent for the first time
Expand Down Expand Up @@ -281,6 +288,10 @@ impl Agent {
)
}

pub fn poll_transmit(&mut self) -> Option<Transmit<BytesMut>> {
self.transmits.pop_front()
}

/// Cleans up the Agent.
pub fn close(&mut self) -> Result<()> {
self.delete_all_candidates(false);
Expand Down Expand Up @@ -682,22 +693,9 @@ impl Agent {
///
/// This is used for restarts, failures and on close.
pub(crate) fn delete_all_candidates(&mut self, keep_local_candidates: bool) {
let name = self.get_name().to_string();

if !keep_local_candidates {
for c in &self.local_candidates {
if let Err(err) = c.close() {
log::warn!("[{}]: Failed to close candidate {}: {}", name, c, err);
}
}
self.local_candidates.clear();
}

for c in &self.remote_candidates {
if let Err(err) = c.close() {
log::warn!("[{}]: Failed to close candidate {}: {}", name, c, err);
}
}
self.remote_candidates.clear();
}

Expand Down Expand Up @@ -984,14 +982,26 @@ impl Agent {
}

pub(crate) fn send_stun(&mut self, msg: &Message, local_index: usize, remote_index: usize) {
let remote_addr = self.remote_candidates[remote_index].addr();
if let Err(err) = self.local_candidates[local_index].write(&msg.raw, remote_addr) {
log::trace!(
"[{}]: failed to send STUN message: {}",
self.get_name(),
err
);
}
let peer_addr = self.remote_candidates[remote_index].addr();
let local_addr = self.local_candidates[local_index].addr();
let protocol = if self.local_candidates[local_index].network_type().is_tcp() {
Protocol::TCP
} else {
Protocol::UDP
};

self.transmits.push_back(Transmit {
now: Instant::now(),
transport: TransportContext {
local_addr,
peer_addr,
ecn: None,
protocol,
},
message: BytesMut::from(&msg.raw[..]),
});

self.local_candidates[local_index].seen(true);
}

// Runs the candidate using the provided connection.
Expand Down
47 changes: 2 additions & 45 deletions rtc-ice/src/candidate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ pub mod candidate_server_reflexive;

use crate::network_type::NetworkType;
use crate::tcp_type::TcpType;
use bytes::BytesMut;
use crc::{Crc, CRC_32_ISCSI};
use serde::Serialize;
use shared::error::*;
use std::collections::VecDeque;
use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::time::Instant;
Expand All @@ -31,7 +29,6 @@ use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
use crate::candidate::candidate_relay::CandidateRelayConfig;
use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
use crate::network_type::determine_network_type;
use shared::{Protocol, Transmit, TransportContext};

pub(crate) const RECEIVE_MTU: usize = 8192;
pub(crate) const DEFAULT_LOCAL_PREFERENCE: u16 = 65535;
Expand Down Expand Up @@ -147,7 +144,6 @@ pub struct Candidate {
pub(crate) tcp_type: TcpType,

pub(crate) resolved_addr: SocketAddr,
pub(crate) transmits: VecDeque<Transmit<BytesMut>>,

pub(crate) last_sent: Instant,
pub(crate) last_received: Instant,
Expand Down Expand Up @@ -176,7 +172,6 @@ impl Default for Candidate {
tcp_type: TcpType::default(),

resolved_addr: SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0),
transmits: VecDeque::new(),

last_sent: Instant::now(),
last_received: Instant::now(),
Expand Down Expand Up @@ -262,8 +257,8 @@ impl Candidate {
}

/// Returns Candidate Address.
pub fn address(&self) -> String {
self.address.clone()
pub fn address(&self) -> &str {
self.address.as_str()
}

/// Returns Candidate Port.
Expand Down Expand Up @@ -334,19 +329,6 @@ impl Candidate {
self.resolved_addr
}

/// Stops the recvLoop.
pub fn close(&self) -> Result<()> {
/*TODO:{
let mut closed_ch = self.closed_ch.lock().await;
if closed_ch.is_none() {
return Err(Error::ErrClosed);
}
closed_ch.take();
}*/

Ok(())
}

pub fn seen(&mut self, outbound: bool) {
let now = Instant::now();

Expand All @@ -357,31 +339,6 @@ impl Candidate {
}
}

pub fn write(&mut self, raw: &[u8], remote: SocketAddr) -> Result<usize> {
let n = raw.len();
self.transmits.push_back(Transmit {
now: Instant::now(),
transport: TransportContext {
local_addr: self.resolved_addr,
peer_addr: remote,
ecn: None,
protocol: if self.network_type.is_tcp() {
Protocol::TCP
} else {
Protocol::UDP
},
},
message: BytesMut::from(raw),
});

self.seen(true);
Ok(n)
}

pub fn poll_transmit(&mut self) -> Option<Transmit<BytesMut>> {
self.transmits.pop_front()
}

/// Used to compare two candidateBases.
pub fn equal(&self, other: &Candidate) -> bool {
self.network_type() == other.network_type()
Expand Down

0 comments on commit 6453c08

Please sign in to comment.