From d639bfb8c973951fb897a225cf63269301cd9df2 Mon Sep 17 00:00:00 2001 From: yngrtc Date: Sat, 16 Mar 2024 08:40:20 -0700 Subject: [PATCH] fix poll_timout/handle_timeout for Relay --- rtc-turn/src/client/mod.rs | 35 ++++++++++++++++-- rtc-turn/src/client/permission.rs | 47 ------------------------- rtc-turn/src/client/relay.rs | 26 +++++++------- rtc-turn/src/client/relay/relay_test.rs | 6 ++-- 4 files changed, 50 insertions(+), 64 deletions(-) diff --git a/rtc-turn/src/client/mod.rs b/rtc-turn/src/client/mod.rs index f35e545..6daaf60 100644 --- a/rtc-turn/src/client/mod.rs +++ b/rtc-turn/src/client/mod.rs @@ -145,12 +145,43 @@ impl Client { }) } - pub fn poll_timout(&self) -> Option { - self.tr_map.poll_timout() + pub fn poll_timout(&mut self) -> Option { + let mut eto = None; + if let Some(to) = self.tr_map.poll_timout() { + if eto.is_none() || to < *eto.as_ref().unwrap() { + eto = Some(to); + } + } + + #[allow(clippy::map_clone)] + let relayed_addrs: Vec = self.relays.keys().map(|key| *key).collect(); + for relayed_addr in relayed_addrs { + let relay = Relay { + relayed_addr, + client: self, + }; + if let Some(to) = relay.poll_timeout() { + if eto.is_none() || to < *eto.as_ref().unwrap() { + eto = Some(to); + } + } + } + + eto } pub fn handle_timeout(&mut self, now: Instant) { self.tr_map.handle_timeout(now); + + #[allow(clippy::map_clone)] + let relayed_addrs: Vec = self.relays.keys().map(|key| *key).collect(); + for relayed_addr in relayed_addrs { + let mut relay = Relay { + relayed_addr, + client: self, + }; + relay.handle_timeout(now); + } } pub fn poll_transmit(&mut self) -> Option> { diff --git a/rtc-turn/src/client/permission.rs b/rtc-turn/src/client/permission.rs index 9c37b9c..74b90c1 100644 --- a/rtc-turn/src/client/permission.rs +++ b/rtc-turn/src/client/permission.rs @@ -1,6 +1,3 @@ -use std::collections::HashMap; -use std::net::SocketAddr; - #[derive(Default, Copy, Clone, PartialEq, Debug)] pub(crate) enum PermState { #[default] @@ -31,47 +28,3 @@ impl Permission { self.st } } - -// Thread-safe Permission map -#[derive(Default)] -pub(crate) struct PermissionMap { - perm_map: HashMap, -} - -impl PermissionMap { - pub(crate) fn new() -> PermissionMap { - PermissionMap { - perm_map: HashMap::new(), - } - } - - pub(crate) fn insert(&mut self, addr: SocketAddr, p: Permission) { - self.perm_map.insert(addr.ip().to_string(), p); - } - - pub(crate) fn contains(&self, addr: &SocketAddr) -> bool { - self.perm_map.contains_key(&addr.ip().to_string()) - } - - pub(crate) fn get(&self, addr: &SocketAddr) -> Option<&Permission> { - self.perm_map.get(&addr.ip().to_string()) - } - - pub(crate) fn get_mut(&mut self, addr: &SocketAddr) -> Option<&mut Permission> { - self.perm_map.get_mut(&addr.ip().to_string()) - } - - pub(crate) fn delete(&mut self, addr: &SocketAddr) { - self.perm_map.remove(&addr.ip().to_string()); - } - - pub(crate) fn addrs(&self) -> Vec { - let mut a = vec![]; - for k in self.perm_map.keys() { - if let Ok(ip) = k.parse() { - a.push(SocketAddr::new(ip, 0)); - } - } - a - } -} diff --git a/rtc-turn/src/client/relay.rs b/rtc-turn/src/client/relay.rs index 81b9773..046ce4e 100644 --- a/rtc-turn/src/client/relay.rs +++ b/rtc-turn/src/client/relay.rs @@ -1,7 +1,8 @@ -//TODO: #[cfg(test)] -//mod relay_conn_test; +//TODO #[cfg(test)] +//mod relay_test; use log::{debug, warn}; +use std::collections::HashMap; use std::net::SocketAddr; use std::ops::Add; use std::time::{Duration, Instant}; @@ -30,13 +31,13 @@ pub(crate) struct RelayState { pub(crate) integrity: MessageIntegrity, pub(crate) nonce: Nonce, pub(crate) lifetime: Duration, - perm_map: PermissionMap, + perm_map: HashMap, refresh_alloc_timer: Instant, refresh_perms_timer: Instant, } impl RelayState { - pub(crate) fn new( + pub(super) fn new( relayed_addr: RelayedAddr, integrity: MessageIntegrity, nonce: Nonce, @@ -49,13 +50,13 @@ impl RelayState { integrity, nonce, lifetime, - perm_map: PermissionMap::new(), + perm_map: HashMap::new(), refresh_alloc_timer: Instant::now().add(lifetime / 2), refresh_perms_timer: Instant::now().add(PERM_REFRESH_INTERVAL), } } - pub fn set_nonce_from_msg(&mut self, msg: &Message) { + pub(super) fn set_nonce_from_msg(&mut self, msg: &Message) { // Update nonce match Nonce::get_from_as(msg, ATTR_NONCE) { Ok(nonce) => { @@ -76,10 +77,10 @@ pub struct Relay<'a> { impl<'a> Relay<'a> { pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<()> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { - if !relay.perm_map.contains(&peer_addr) { - relay.perm_map.insert(peer_addr, Permission::default()); - } - + relay + .perm_map + .entry(peer_addr) + .or_insert_with(Permission::default); if let Some(perm) = relay.perm_map.get(&peer_addr) { if perm.state() == PermState::Idle { // punch a hole! (this would block a bit..) @@ -300,7 +301,7 @@ impl<'a> Relay<'a> { self.client .events .push_back(Event::CreatePermissionError(res.transaction_id, err)); - relay.perm_map.delete(&peer_addr); + relay.perm_map.remove(&peer_addr); } } else if let Some(peer_addr) = peer_addr_opt { if let Some(perm) = relay.perm_map.get_mut(&peer_addr) { @@ -375,7 +376,8 @@ impl<'a> Relay<'a> { fn refresh_permissions(&mut self) -> Result<()> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { - let addrs = relay.perm_map.addrs(); + #[allow(clippy::map_clone)] + let addrs: Vec = relay.perm_map.keys().map(|addr| *addr).collect(); if addrs.is_empty() { debug!("no permission to refresh"); return Ok(()); diff --git a/rtc-turn/src/client/relay/relay_test.rs b/rtc-turn/src/client/relay/relay_test.rs index dc1338e..04876bc 100644 --- a/rtc-turn/src/client/relay/relay_test.rs +++ b/rtc-turn/src/client/relay/relay_test.rs @@ -1,7 +1,7 @@ use std::net::Ipv4Addr; use super::*; -use crate::error::Result; +use shared::error::Result; struct DummyRelayConnObserver { turn_server_addr: String, @@ -9,8 +9,8 @@ struct DummyRelayConnObserver { realm: Realm, } -#[tokio::test] -async fn test_relay_conn() -> Result<()> { +#[test] +fn test_relay() -> Result<()> { let obs = DummyRelayConnObserver { turn_server_addr: String::new(), username: Username::new(ATTR_USERNAME, "username".to_owned()),