diff --git a/rtc-turn/src/client/mod.rs b/rtc-turn/src/client/mod.rs index 0d2a4ad..87ad8f0 100644 --- a/rtc-turn/src/client/mod.rs +++ b/rtc-turn/src/client/mod.rs @@ -39,20 +39,23 @@ const DEFAULT_RTO_IN_MS: u64 = 200; const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium const MAX_READ_QUEUE_SIZE: usize = 1024; +pub type RelayedAddr = SocketAddr; +pub type ReflexiveAddr = SocketAddr; +pub type PeerAddr = SocketAddr; + pub enum Event { TransactionTimeout(TransactionId), - BindingResponse(TransactionId, SocketAddr), - BindingError(TransactionId, Box), + BindingResponse(TransactionId, ReflexiveAddr), + BindingError(TransactionId, Error), - AllocateResponse(TransactionId, SocketAddr), - AllocateError(TransactionId, Box), + AllocateResponse(TransactionId, RelayedAddr), + AllocateError(TransactionId, Error), CreatePermissionResponse(TransactionId), - CreatePermissionError(TransactionId, Box), + CreatePermissionError(TransactionId, Error), - DataIndication(SocketAddr, BytesMut), - ChannelData(ChannelNumber, SocketAddr, BytesMut), + DataIndicationOrChannelData(Option, PeerAddr, BytesMut), } enum AllocateState { @@ -98,7 +101,7 @@ pub struct Client { binding_mgr: BindingManager, rto_in_ms: u64, - relays: HashMap, + relays: HashMap, transmits: VecDeque>, events: VecDeque, } @@ -232,8 +235,11 @@ impl Client { log::debug!("data indication received from {}", from); - self.events - .push_back(Event::DataIndication(from, BytesMut::from(&data.0[..]))) + self.events.push_back(Event::DataIndicationOrChannelData( + None, + from, + BytesMut::from(&data.0[..]), + )) } return Ok(()); @@ -261,21 +267,19 @@ impl Client { Error::Other(format!("{} (error {})", msg.typ, code)) }; self.events - .push_back(Event::BindingError(tr.transaction_id, Box::new(err))); + .push_back(Event::BindingError(tr.transaction_id, err)); } else { let mut refl_addr = XorMappedAddress::default(); match refl_addr.get_from(&msg) { Ok(_) => { self.events.push_back(Event::BindingResponse( tr.transaction_id, - SocketAddr::new(refl_addr.ip, refl_addr.port), + ReflexiveAddr::new(refl_addr.ip, refl_addr.port), )); } Err(err) => { - self.events.push_back(Event::BindingError( - tr.transaction_id, - Box::new(err), - )); + self.events + .push_back(Event::BindingError(tr.transaction_id, err)); } } } @@ -294,7 +298,15 @@ impl Client { relay.handle_create_permission_response(msg, peer_addr)?; } } - METHOD_REFRESH => {} + METHOD_REFRESH => { + if let TransactionType::RefreshRequest(relayed_addr) = tr.transaction_type { + let mut relay = Relay { + relayed_addr, + client: self, + }; + relay.handle_refresh_allocation_response(msg)?; + } + } METHOD_CHANNEL_BIND => {} _ => {} } @@ -320,8 +332,8 @@ impl Client { ch_data.number.0 ); - self.events.push_back(Event::ChannelData( - ch_data.number, + self.events.push_back(Event::DataIndicationOrChannelData( + Some(ch_data.number), addr, BytesMut::from(&ch_data.data[..]), )); @@ -334,6 +346,17 @@ impl Client { self.tr_map.delete_all(); } + pub fn relay(&mut self, relayed_addr: SocketAddr) -> Result> { + if !self.relays.contains_key(&relayed_addr) { + Err(Error::ErrStreamNotExisted) + } else { + Ok(Relay { + relayed_addr, + client: self, + }) + } + } + /// send_binding_request_to sends a new STUN request to the given transport address /// return key to find out corresponding Event either BindingResponse or BindingRequestTimeout pub fn send_binding_request_to(&mut self, to: SocketAddr) -> Result { @@ -452,20 +475,16 @@ impl Client { let nonce = match Nonce::get_from_as(&response, ATTR_NONCE) { Ok(nonce) => nonce, Err(err) => { - self.events.push_back(Event::AllocateError( - response.transaction_id, - Box::new(err), - )); + self.events + .push_back(Event::AllocateError(response.transaction_id, err)); return Ok(()); } }; self.realm = match Realm::get_from_as(&response, ATTR_REALM) { Ok(realm) => realm, Err(err) => { - self.events.push_back(Event::AllocateError( - response.transaction_id, - Box::new(err), - )); + self.events + .push_back(Event::AllocateError(response.transaction_id, err)); return Ok(()); } }; @@ -511,14 +530,14 @@ impl Client { Error::Other(format!("{} (error {})", response.typ, code)) }; self.events - .push_back(Event::AllocateError(response.transaction_id, Box::new(err))); + .push_back(Event::AllocateError(response.transaction_id, err)); return Ok(()); } // Getting relayed addresses from response. let mut relayed = RelayedAddress::default(); relayed.get_from(&response)?; - let relayed_addr = SocketAddr::new(relayed.ip, relayed.port); + let relayed_addr = RelayedAddr::new(relayed.ip, relayed.port); // Getting lifetime from response let mut lifetime = Lifetime::default(); diff --git a/rtc-turn/src/client/relay.rs b/rtc-turn/src/client/relay.rs index 551542b..dd9af83 100644 --- a/rtc-turn/src/client/relay.rs +++ b/rtc-turn/src/client/relay.rs @@ -1,6 +1,7 @@ //TODO: #[cfg(test)] //mod relay_conn_test; +use log::{debug, warn}; use std::net::SocketAddr; use std::ops::Add; use std::time::{Duration, Instant}; @@ -16,7 +17,7 @@ use super::permission::*; use super::transaction::*; use crate::proto; -use crate::client::{Client, Event}; +use crate::client::{Client, Event, RelayedAddr}; use shared::error::{Error, Result}; const PERM_REFRESH_INTERVAL: Duration = Duration::from_secs(120); @@ -24,7 +25,7 @@ const MAX_RETRY_ATTEMPTS: u16 = 3; // RelayState is a set of params use by Relay pub(crate) struct RelayState { - pub(crate) relayed_addr: SocketAddr, + pub(crate) relayed_addr: RelayedAddr, pub(crate) integrity: MessageIntegrity, pub(crate) nonce: Nonce, pub(crate) lifetime: Duration, @@ -35,12 +36,12 @@ pub(crate) struct RelayState { impl RelayState { pub(crate) fn new( - relayed_addr: SocketAddr, + relayed_addr: RelayedAddr, integrity: MessageIntegrity, nonce: Nonce, lifetime: Duration, ) -> Self { - log::debug!("initial lifetime: {} seconds", lifetime.as_secs()); + debug!("initial lifetime: {} seconds", lifetime.as_secs()); Self { relayed_addr, @@ -58,16 +59,16 @@ impl RelayState { match Nonce::get_from_as(msg, ATTR_NONCE) { Ok(nonce) => { self.nonce = nonce; - log::debug!("refresh allocation: 438, got new nonce."); + debug!("refresh allocation: 438, got new nonce."); } - Err(_) => log::warn!("refresh allocation: 438 but no nonce."), + Err(_) => warn!("refresh allocation: 438 but no nonce."), } } } // Relay is the implementation of the Conn interfaces for UDP Relayed network connections. pub struct Relay<'a> { - pub(crate) relayed_addr: SocketAddr, + pub(crate) relayed_addr: RelayedAddr, pub(crate) client: &'a mut Client, } @@ -79,16 +80,16 @@ impl<'a> Relay<'a> { /// all the data transmission. This is done assuming that the request /// will be mostly likely successful and we can tolerate some loss of /// UDP packet (or reorder), inorder to minimize the latency in most cases. - pub fn create_permission(&mut self, addr: SocketAddr) -> Result<()> { + pub fn create_permission(&mut self, peer_addr: SocketAddr) -> Result<()> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { - if !relay.perm_map.contains(&addr) { - relay.perm_map.insert(addr, Permission::default()); + if !relay.perm_map.contains(&peer_addr) { + relay.perm_map.insert(peer_addr, Permission::default()); } - if let Some(perm) = relay.perm_map.get(&addr) { + if let Some(perm) = relay.perm_map.get(&peer_addr) { if perm.state() == PermState::Idle { // punch a hole! (this would block a bit..) - self.create_permissions(&[addr], Some(addr))?; + self.create_permissions(&[peer_addr], Some(peer_addr))?; } } Ok(()) @@ -97,7 +98,7 @@ impl<'a> Relay<'a> { } } - pub fn poll_timeout(&self) -> Option { + pub(crate) fn poll_timeout(&self) -> Option { if let Some(relay) = self.client.relays.get(&self.relayed_addr) { if relay.refresh_alloc_timer < relay.refresh_perms_timer { Some(relay.refresh_alloc_timer) @@ -109,7 +110,7 @@ impl<'a> Relay<'a> { } } - pub fn handle_timeout(&mut self, now: Instant) { + pub(crate) fn handle_timeout(&mut self, now: Instant) { let (refresh_alloc_timer, refresh_perms_timer) = if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { @@ -140,16 +141,10 @@ impl<'a> Relay<'a> { } } - // new creates a new instance of UDPConn - // write_to writes a packet with payload p to addr. - // write_to can be made to time out and return - // an Error with Timeout() == true after a fixed time limit; - // see SetDeadline and SetWriteDeadline. - // On packet-oriented connections, write timeouts are rare. - pub fn send_to(&mut self, _p: &[u8], addr: SocketAddr) -> Result<()> { + pub fn send_to(&mut self, _p: &[u8], peer_addr: SocketAddr) -> Result<()> { // check if we have a permission for the destination IP addr if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { - if let Some(perm) = relay.perm_map.get_mut(&addr) { + if let Some(perm) = relay.perm_map.get_mut(&peer_addr) { if perm.state() != PermState::Permitted { Err(Error::ErrNoPermission) } else { @@ -213,7 +208,7 @@ impl<'a> Relay<'a> { } // keep going... - log::warn!("bind() failed: {}", err); + warn!("bind() failed: {}", err); } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) { b.set_state(BindingState::Ready); @@ -272,7 +267,7 @@ impl<'a> Relay<'a> { } // keep going... - log::warn!("bind() for refresh failed: {}", err); + warn!("bind() for refresh failed: {}", err); } else if let Some(b) = self.client.binding_mgr.get_by_addr(&bind_addr) { b.set_refreshed_at(Instant::now()); b.set_state(BindingState::Ready); @@ -297,9 +292,9 @@ impl<'a> Relay<'a> { fn create_permissions( &mut self, - addrs: &[SocketAddr], - addr: Option, - ) -> Result { + peer_addrs: &[SocketAddr], + peer_addr_opt: Option, + ) -> Result<()> { let (username, realm) = (self.client.username(), self.client.realm()); if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { let msg = { @@ -308,7 +303,7 @@ impl<'a> Relay<'a> { Box::new(MessageType::new(METHOD_CREATE_PERMISSION, CLASS_REQUEST)), ]; - for addr in addrs { + for addr in peer_addrs { setters.push(Box::new(proto::peeraddr::PeerAddress { ip: addr.ip(), port: addr.port(), @@ -326,20 +321,22 @@ impl<'a> Relay<'a> { msg }; - Ok(self.client.perform_transaction( + let _ = self.client.perform_transaction( &msg, self.client.turn_server_addr(), - TransactionType::CreatePermissionRequest(self.relayed_addr, addr), - )) + TransactionType::CreatePermissionRequest(self.relayed_addr, peer_addr_opt), + ); + + Ok(()) } else { Err(Error::ErrConnClosed) } } - pub(crate) fn handle_create_permission_response( + pub(super) fn handle_create_permission_response( &mut self, res: Message, - addr: Option, + peer_addr_opt: Option, ) -> Result<()> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { if res.typ.class == CLASS_ERROR_RESPONSE { @@ -353,15 +350,14 @@ impl<'a> Relay<'a> { } else { Error::Other(format!("{} (error {})", res.typ, code)) }; - self.client.events.push_back(Event::CreatePermissionError( - res.transaction_id, - Box::new(err), - )); - if let Some(addr) = addr { - relay.perm_map.delete(&addr); + if let Some(peer_addr) = peer_addr_opt { + self.client + .events + .push_back(Event::CreatePermissionError(res.transaction_id, err)); + relay.perm_map.delete(&peer_addr); } - } else if let Some(addr) = addr { - if let Some(perm) = relay.perm_map.get_mut(&addr) { + } else if let Some(peer_addr) = peer_addr_opt { + if let Some(perm) = relay.perm_map.get_mut(&peer_addr) { perm.set_state(PermState::Permitted); self.client .events @@ -393,7 +389,7 @@ impl<'a> Relay<'a> { let _ = self.client.perform_transaction( &msg, self.client.turn_server_addr(), - TransactionType::RefreshRequest, + TransactionType::RefreshRequest(self.relayed_addr), ); Ok(()) @@ -402,32 +398,30 @@ impl<'a> Relay<'a> { } } - pub(crate) fn handle_refresh_allocation_response(&mut self, res: Message) -> Result<()> { + pub(super) fn handle_refresh_allocation_response(&mut self, res: Message) -> Result<()> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { if res.typ.class == CLASS_ERROR_RESPONSE { let mut code = ErrorCodeAttribute::default(); let result = code.get_from(&res); - let err = if result.is_err() { - Error::Other(format!("{}", res.typ)) + if result.is_err() { + Err(Error::Other(format!("{}", res.typ))) } else if code.code == CODE_STALE_NONCE { relay.set_nonce_from_msg(&res); - Error::ErrTryAgain + //Error::ErrTryAgain + Ok(()) } else { - Error::Other(format!("{} (error {})", res.typ, code)) - }; - self.client.events.push_back(Event::CreatePermissionError( - res.transaction_id, - Box::new(err), - )); + Err(Error::Other(format!("{} (error {})", res.typ, code))) + } } else { // Getting lifetime from response let mut updated_lifetime = proto::lifetime::Lifetime::default(); updated_lifetime.get_from(&res)?; relay.lifetime = updated_lifetime.0; - log::debug!("updated lifetime: {} seconds", relay.lifetime.as_secs()); + debug!("updated lifetime: {} seconds", relay.lifetime.as_secs()); + + Ok(()) } - Ok(()) } else { Err(Error::ErrConnClosed) } @@ -437,19 +431,10 @@ impl<'a> Relay<'a> { if let Some(relay) = self.client.relays.get_mut(&self.relayed_addr) { let addrs = relay.perm_map.addrs(); if addrs.is_empty() { - log::debug!("no permission to refresh"); + debug!("no permission to refresh"); return Ok(()); } - - if let Err(err) = self.create_permissions(&addrs, None) { - if Error::ErrTryAgain != err { - log::error!("fail to refresh permissions: {}", err); - } - return Err(err); - } - - log::debug!("refresh permissions successful"); - Ok(()) + self.create_permissions(&addrs, None) } else { Err(Error::ErrConnClosed) } @@ -484,7 +469,7 @@ impl<'a> Relay<'a> { (msg, self.client.turn_server_addr()) }; - log::debug!("UDPConn.bind call PerformTransaction 1"); + debug!("UDPConn.bind call PerformTransaction 1"); let tr_res = self.client.perform_transaction( &msg, turn_server_addr, @@ -497,7 +482,7 @@ impl<'a> Relay<'a> { return Err(Error::ErrUnexpectedResponse); } - log::debug!("channel binding successful: {} {}", bind_addr, bind_number); + debug!("channel binding successful: {} {}", bind_addr, bind_number); // Success. Ok(()) @@ -517,44 +502,3 @@ impl<'a> Relay<'a> { Ok(()) } } - -/* -impl PeriodicTimerTimeoutHandler for RelayConnInternal { - async fn on_timeout(&mut self, id: TimerIdRefresh) { - log::debug!("refresh timer {:?} expired", id); - match id { - TimerIdRefresh::Alloc => { - let lifetime = self.lifetime; - // limit the max retries on errTryAgain to 3 - // when stale nonce returns, sencond retry should succeed - let mut result = Ok(()); - for _ in 0..MAX_RETRY_ATTEMPTS { - result = self.refresh_allocation(lifetime, false).await; - if let Err(err) = &result { - if Error::ErrTryAgain != *err { - break; - } - } - } - if result.is_err() { - log::warn!("refresh allocation failed"); - } - } - TimerIdRefresh::Perms => { - let mut result = Ok(()); - for _ in 0..MAX_RETRY_ATTEMPTS { - result = self.refresh_permissions().await; - if let Err(err) = &result { - if Error::ErrTryAgain != *err { - break; - } - } - } - if result.is_err() { - log::warn!("refresh permissions failed"); - } - } - } - } -} -*/ diff --git a/rtc-turn/src/client/transaction.rs b/rtc-turn/src/client/transaction.rs index 227cbf2..7daa77a 100644 --- a/rtc-turn/src/client/transaction.rs +++ b/rtc-turn/src/client/transaction.rs @@ -18,7 +18,7 @@ pub(crate) enum TransactionType { AllocateAttempt, AllocateRequest(TextAttribute), CreatePermissionRequest(SocketAddr, Option), - RefreshRequest, + RefreshRequest(SocketAddr), ChannelBindRequest, }