From cc5ca8abc7ba51a7ab512eeeb38dbc3dc46358e8 Mon Sep 17 00:00:00 2001 From: alexlapa Date: Wed, 3 Jul 2024 11:57:18 +0300 Subject: [PATCH] refactor --- src/allocation/mod.rs | 6 ++-- src/con/mod.rs | 5 --- src/con/tcp.rs | 6 ---- src/server/mod.rs | 76 +++++++++++++------------------------------ src/server/request.rs | 6 +++- 5 files changed, 31 insertions(+), 68 deletions(-) diff --git a/src/allocation/mod.rs b/src/allocation/mod.rs index f0c4126c6..48748fffe 100644 --- a/src/allocation/mod.rs +++ b/src/allocation/mod.rs @@ -386,7 +386,7 @@ impl Allocation { con::Error::Decode(_) | con::Error::ChannelData(_) | con::Error::Io(_) => { - log::error!( + log::warn!( "Failed to send ChannelData from \ allocation {src_addr}: {err}", ); @@ -395,9 +395,9 @@ impl Allocation { } } Err(err) => { - log::error!( + log::warn!( "Failed to send ChannelData from allocation \ - {src_addr}: {err}" + {src_addr}: {err}" ); } }; diff --git a/src/con/mod.rs b/src/con/mod.rs index 84a7824f8..4a2693396 100644 --- a/src/con/mod.rs +++ b/src/con/mod.rs @@ -96,9 +96,6 @@ pub trait Conn { /// /// [IANA]: https://tinyurl.com/iana-protocol-numbers fn proto(&self) -> u8; - - /// Closes the underlying transport. - async fn close(&self); } /// Performs a DNS resolution. @@ -166,8 +163,6 @@ impl Conn for UdpSocket { fn proto(&self) -> u8 { PROTO_UDP } - - async fn close(&self) {} } #[cfg(test)] diff --git a/src/con/tcp.rs b/src/con/tcp.rs index a6a0c170b..3e08500a2 100644 --- a/src/con/tcp.rs +++ b/src/con/tcp.rs @@ -4,7 +4,6 @@ use std::{ collections::{hash_map::Entry, HashMap}, - mem, net::SocketAddr, sync::Arc, }; @@ -91,11 +90,6 @@ impl Conn for TcpServer { fn proto(&self) -> u8 { PROTO_TCP } - - async fn close(&self) { - self.ingress_rx.lock().await.close(); - drop(mem::take(&mut *self.writers.lock().await)); - } } impl TcpServer { diff --git a/src/server/mod.rs b/src/server/mod.rs index 329cb623e..5425a3959 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -35,7 +35,7 @@ pub(crate) const INBOUND_MTU: usize = 1500; #[derive(Debug)] pub struct Server { /// Channel to [`Server`]'s internal loop. - command_tx: Option>, + command_tx: broadcast::Sender, } impl Server { @@ -46,7 +46,7 @@ impl Server { A: AuthHandler + Send + Sync + 'static, { let (command_tx, _) = broadcast::channel(16); - let this = Self { command_tx: Some(command_tx.clone()) }; + let this = Self { command_tx: command_tx.clone() }; let channel_bind_lifetime = if config.channel_bind_lifetime == Duration::from_secs(0) { DEFAULT_LIFETIME @@ -55,9 +55,9 @@ impl Server { }; for conn in config.connections { - let mut nonces = HashMap::new(); let auth_handler = Arc::clone(&config.auth_handler); let realm = config.realm.clone(); + let mut nonces = HashMap::new(); let mut handle_rx = command_tx.subscribe(); let mut allocation_manager = Manager::new(ManagerConfig { relay_addr_generator: config.relay_addr_generator.clone(), @@ -79,7 +79,7 @@ impl Server { )) => { allocation_manager .delete_allocations_by_username( - name.as_str(), + &name, ); drop(completion); } @@ -95,11 +95,6 @@ impl Server { close_rx.close(); break; } - Ok(Command::Close(completion)) => { - close_rx.close(); - drop(completion); - break; - } Err(RecvError::Lagged(n)) => { log::warn!( "Turn server has lagged by {n} \ @@ -131,7 +126,7 @@ impl Server { dst_addr: local_con_addr, protocol, }, - realm.as_str(), + &realm, channel_bind_lifetime, &mut allocation_manager, &mut nonces, @@ -139,11 +134,9 @@ impl Server { ); if let Err(err) = handle.await { - log::error!("error when handling datagram: {err}"); + log::warn!("Error when handling STUN request: {err}"); } } - - conn.close().await; })); } @@ -159,19 +152,16 @@ impl Server { &self, username: String, ) -> Result<(), Error> { + let (closed_tx, closed_rx) = mpsc::channel(1); #[allow(clippy::map_err_ignore)] - if let Some(tx) = &self.command_tx { - let (closed_tx, closed_rx) = mpsc::channel(1); - _ = tx - .send(Command::DeleteAllocations(username, Arc::new(closed_rx))) - .map_err(|_| Error::Closed)?; + let _: usize = self + .command_tx + .send(Command::DeleteAllocations(username, Arc::new(closed_rx))) + .map_err(|_| Error::Closed)?; - closed_tx.closed().await; + closed_tx.closed().await; - Ok(()) - } else { - Err(Error::Closed) - } + Ok(()) } /// Returns [`AllocInfo`]s by specified [`FiveTuple`]s. @@ -196,38 +186,21 @@ impl Server { } } - #[allow(clippy::map_err_ignore)] - if let Some(tx) = &self.command_tx { - let (infos_tx, mut infos_rx) = mpsc::channel(1); + let (infos_tx, mut infos_rx) = mpsc::channel(1); - _ = tx - .send(Command::GetAllocationsInfo(five_tuples, infos_tx)) - .map_err(|_| Error::Closed)?; - - let mut info: HashMap = HashMap::new(); + #[allow(clippy::map_err_ignore)] + let _: usize = self + .command_tx + .send(Command::GetAllocationsInfo(five_tuples, infos_tx)) + .map_err(|_| Error::Closed)?; - for _ in 0..tx.receiver_count() { - info.extend(infos_rx.recv().await.ok_or(Error::Closed)?); - } + let mut info: HashMap = HashMap::new(); - Ok(info) - } else { - Err(Error::Closed) + for _ in 0..self.command_tx.receiver_count() { + info.extend(infos_rx.recv().await.ok_or(Error::Closed)?); } - } - /// Close stops the TURN Server. It cleans up any associated state and - /// closes all connections it is managing. - pub async fn close(&self) { - if let Some(tx) = &self.command_tx { - if tx.receiver_count() == 0 { - return; - } - - let (closed_tx, closed_rx) = mpsc::channel(1); - drop(tx.send(Command::Close(Arc::new(closed_rx)))); - closed_tx.closed().await; - } + Ok(info) } } @@ -248,7 +221,4 @@ enum Command { Option>, mpsc::Sender>, ), - - /// Command to close the [`Server`]. - Close(Arc>), } diff --git a/src/server/request.rs b/src/server/request.rs index 71ba7aa0c..e6991a50c 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -36,6 +36,7 @@ use crate::{ XorRelayAddress, PROTO_UDP, }, chandata::ChannelData, + con, con::{Conn, Request}, server::DEFAULT_LIFETIME, AuthHandler, Error, @@ -796,7 +797,10 @@ async fn send_to( .encode_into_bytes(msg) .map_err(|e| Error::Encode(*e.kind()))?; - Ok(conn.send_to(bytes, dst).await?) + match conn.send_to(bytes, dst).await { + Ok(()) | Err(con::Error::TransportIsDead) => Ok(()), + Err(err) => Err(Error::from(err)), + } } /// Send a STUN packet and return the original error to the caller