Skip to content

Commit

Permalink
fix poll_timout/handle_timeout for Relay
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 16, 2024
1 parent 01ac2ba commit d639bfb
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 64 deletions.
35 changes: 33 additions & 2 deletions rtc-turn/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,43 @@ impl Client {
})
}

pub fn poll_timout(&self) -> Option<Instant> {
self.tr_map.poll_timout()
pub fn poll_timout(&mut self) -> Option<Instant> {
let mut eto = None;
if let Some(to) = self.tr_map.poll_timout() {
if eto.is_none() || to < *eto.as_ref().unwrap() {
eto = Some(to);
}
}

#[allow(clippy::map_clone)]
let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
for relayed_addr in relayed_addrs {
let relay = Relay {
relayed_addr,
client: self,
};
if let Some(to) = relay.poll_timeout() {
if eto.is_none() || to < *eto.as_ref().unwrap() {
eto = Some(to);
}
}
}

eto
}

pub fn handle_timeout(&mut self, now: Instant) {
self.tr_map.handle_timeout(now);

#[allow(clippy::map_clone)]
let relayed_addrs: Vec<SocketAddr> = self.relays.keys().map(|key| *key).collect();
for relayed_addr in relayed_addrs {
let mut relay = Relay {
relayed_addr,
client: self,
};
relay.handle_timeout(now);
}
}

pub fn poll_transmit(&mut self) -> Option<Transmit<BytesMut>> {
Expand Down
47 changes: 0 additions & 47 deletions rtc-turn/src/client/permission.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::net::SocketAddr;

#[derive(Default, Copy, Clone, PartialEq, Debug)]
pub(crate) enum PermState {
#[default]
Expand Down Expand Up @@ -31,47 +28,3 @@ impl Permission {
self.st
}
}

// Thread-safe Permission map
#[derive(Default)]
pub(crate) struct PermissionMap {
perm_map: HashMap<String, Permission>,
}

impl PermissionMap {
pub(crate) fn new() -> PermissionMap {
PermissionMap {
perm_map: HashMap::new(),
}
}

pub(crate) fn insert(&mut self, addr: SocketAddr, p: Permission) {
self.perm_map.insert(addr.ip().to_string(), p);
}

pub(crate) fn contains(&self, addr: &SocketAddr) -> bool {
self.perm_map.contains_key(&addr.ip().to_string())
}

pub(crate) fn get(&self, addr: &SocketAddr) -> Option<&Permission> {
self.perm_map.get(&addr.ip().to_string())
}

pub(crate) fn get_mut(&mut self, addr: &SocketAddr) -> Option<&mut Permission> {
self.perm_map.get_mut(&addr.ip().to_string())
}

pub(crate) fn delete(&mut self, addr: &SocketAddr) {
self.perm_map.remove(&addr.ip().to_string());
}

pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
let mut a = vec![];
for k in self.perm_map.keys() {
if let Ok(ip) = k.parse() {
a.push(SocketAddr::new(ip, 0));
}
}
a
}
}
26 changes: 14 additions & 12 deletions rtc-turn/src/client/relay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//TODO: #[cfg(test)]
//mod relay_conn_test;
//TODO #[cfg(test)]
//mod relay_test;

use log::{debug, warn};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::Add;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -30,13 +31,13 @@ pub(crate) struct RelayState {
pub(crate) integrity: MessageIntegrity,
pub(crate) nonce: Nonce,
pub(crate) lifetime: Duration,
perm_map: PermissionMap,
perm_map: HashMap<SocketAddr, Permission>,
refresh_alloc_timer: Instant,
refresh_perms_timer: Instant,
}

impl RelayState {
pub(crate) fn new(
pub(super) fn new(
relayed_addr: RelayedAddr,
integrity: MessageIntegrity,
nonce: Nonce,
Expand All @@ -49,13 +50,13 @@ impl RelayState {
integrity,
nonce,
lifetime,
perm_map: PermissionMap::new(),
perm_map: HashMap::new(),
refresh_alloc_timer: Instant::now().add(lifetime / 2),
refresh_perms_timer: Instant::now().add(PERM_REFRESH_INTERVAL),
}
}

pub fn set_nonce_from_msg(&mut self, msg: &Message) {
pub(super) fn set_nonce_from_msg(&mut self, msg: &Message) {
// Update nonce
match Nonce::get_from_as(msg, ATTR_NONCE) {
Ok(nonce) => {
Expand All @@ -76,10 +77,10 @@ pub struct Relay<'a> {
impl<'a> Relay<'a> {
pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<()> {
if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
if !relay.perm_map.contains(&peer_addr) {
relay.perm_map.insert(peer_addr, Permission::default());
}

relay
.perm_map
.entry(peer_addr)
.or_insert_with(Permission::default);
if let Some(perm) = relay.perm_map.get(&peer_addr) {
if perm.state() == PermState::Idle {
// punch a hole! (this would block a bit..)
Expand Down Expand Up @@ -300,7 +301,7 @@ impl<'a> Relay<'a> {
self.client
.events
.push_back(Event::CreatePermissionError(res.transaction_id, err));
relay.perm_map.delete(&peer_addr);
relay.perm_map.remove(&peer_addr);
}
} else if let Some(peer_addr) = peer_addr_opt {
if let Some(perm) = relay.perm_map.get_mut(&peer_addr) {
Expand Down Expand Up @@ -375,7 +376,8 @@ impl<'a> Relay<'a> {

fn refresh_permissions(&mut self) -> Result<()> {
if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) {
let addrs = relay.perm_map.addrs();
#[allow(clippy::map_clone)]
let addrs: Vec<SocketAddr> = relay.perm_map.keys().map(|addr| *addr).collect();
if addrs.is_empty() {
debug!("no permission to refresh");
return Ok(());
Expand Down
6 changes: 3 additions & 3 deletions rtc-turn/src/client/relay/relay_test.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::net::Ipv4Addr;

use super::*;
use crate::error::Result;
use shared::error::Result;

struct DummyRelayConnObserver {
turn_server_addr: String,
username: Username,
realm: Realm,
}

#[tokio::test]
async fn test_relay_conn() -> Result<()> {
#[test]
fn test_relay() -> Result<()> {
let obs = DummyRelayConnObserver {
turn_server_addr: String::new(),
username: Username::new(ATTR_USERNAME, "username".to_owned()),
Expand Down

0 comments on commit d639bfb

Please sign in to comment.