Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlapa committed Jul 3, 2024
1 parent 5b5fefc commit cc5ca8a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 68 deletions.
6 changes: 3 additions & 3 deletions src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl Allocation {
con::Error::Decode(_)
| con::Error::ChannelData(_)
| con::Error::Io(_) => {
log::error!(
log::warn!(
"Failed to send ChannelData from \
allocation {src_addr}: {err}",
);
Expand All @@ -395,9 +395,9 @@ impl Allocation {
}
}
Err(err) => {
log::error!(
log::warn!(
"Failed to send ChannelData from allocation \
{src_addr}: {err}"
{src_addr}: {err}"
);
}
};
Expand Down
5 changes: 0 additions & 5 deletions src/con/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ pub trait Conn {
///
/// [IANA]: https://tinyurl.com/iana-protocol-numbers
fn proto(&self) -> u8;

/// Closes the underlying transport.
async fn close(&self);
}

/// Performs a DNS resolution.
Expand Down Expand Up @@ -166,8 +163,6 @@ impl Conn for UdpSocket {
fn proto(&self) -> u8 {
PROTO_UDP
}

async fn close(&self) {}
}

#[cfg(test)]
Expand Down
6 changes: 0 additions & 6 deletions src/con/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use std::{
collections::{hash_map::Entry, HashMap},
mem,
net::SocketAddr,
sync::Arc,
};
Expand Down Expand Up @@ -91,11 +90,6 @@ impl Conn for TcpServer {
fn proto(&self) -> u8 {
PROTO_TCP
}

async fn close(&self) {
self.ingress_rx.lock().await.close();
drop(mem::take(&mut *self.writers.lock().await));
}
}

impl TcpServer {
Expand Down
76 changes: 23 additions & 53 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) const INBOUND_MTU: usize = 1500;
#[derive(Debug)]
pub struct Server {
/// Channel to [`Server`]'s internal loop.
command_tx: Option<broadcast::Sender<Command>>,
command_tx: broadcast::Sender<Command>,
}

impl Server {
Expand All @@ -46,7 +46,7 @@ impl Server {
A: AuthHandler + Send + Sync + 'static,
{
let (command_tx, _) = broadcast::channel(16);
let this = Self { command_tx: Some(command_tx.clone()) };
let this = Self { command_tx: command_tx.clone() };
let channel_bind_lifetime =
if config.channel_bind_lifetime == Duration::from_secs(0) {
DEFAULT_LIFETIME
Expand All @@ -55,9 +55,9 @@ impl Server {
};

for conn in config.connections {
let mut nonces = HashMap::new();
let auth_handler = Arc::clone(&config.auth_handler);
let realm = config.realm.clone();
let mut nonces = HashMap::new();
let mut handle_rx = command_tx.subscribe();
let mut allocation_manager = Manager::new(ManagerConfig {
relay_addr_generator: config.relay_addr_generator.clone(),
Expand All @@ -79,7 +79,7 @@ impl Server {
)) => {
allocation_manager
.delete_allocations_by_username(
name.as_str(),
&name,
);
drop(completion);
}
Expand All @@ -95,11 +95,6 @@ impl Server {
close_rx.close();
break;
}
Ok(Command::Close(completion)) => {
close_rx.close();
drop(completion);
break;
}
Err(RecvError::Lagged(n)) => {
log::warn!(
"Turn server has lagged by {n} \
Expand Down Expand Up @@ -131,19 +126,17 @@ impl Server {
dst_addr: local_con_addr,
protocol,
},
realm.as_str(),
&realm,
channel_bind_lifetime,
&mut allocation_manager,
&mut nonces,
&auth_handler,
);

if let Err(err) = handle.await {
log::error!("error when handling datagram: {err}");
log::warn!("Error when handling STUN request: {err}");
}
}

conn.close().await;
}));
}

Expand All @@ -159,19 +152,16 @@ impl Server {
&self,
username: String,
) -> Result<(), Error> {
let (closed_tx, closed_rx) = mpsc::channel(1);
#[allow(clippy::map_err_ignore)]
if let Some(tx) = &self.command_tx {
let (closed_tx, closed_rx) = mpsc::channel(1);
_ = tx
.send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
.map_err(|_| Error::Closed)?;
let _: usize = self
.command_tx
.send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
.map_err(|_| Error::Closed)?;

closed_tx.closed().await;
closed_tx.closed().await;

Ok(())
} else {
Err(Error::Closed)
}
Ok(())
}

/// Returns [`AllocInfo`]s by specified [`FiveTuple`]s.
Expand All @@ -196,38 +186,21 @@ impl Server {
}
}

#[allow(clippy::map_err_ignore)]
if let Some(tx) = &self.command_tx {
let (infos_tx, mut infos_rx) = mpsc::channel(1);
let (infos_tx, mut infos_rx) = mpsc::channel(1);

_ = tx
.send(Command::GetAllocationsInfo(five_tuples, infos_tx))
.map_err(|_| Error::Closed)?;

let mut info: HashMap<FiveTuple, AllocInfo> = HashMap::new();
#[allow(clippy::map_err_ignore)]
let _: usize = self
.command_tx
.send(Command::GetAllocationsInfo(five_tuples, infos_tx))
.map_err(|_| Error::Closed)?;

for _ in 0..tx.receiver_count() {
info.extend(infos_rx.recv().await.ok_or(Error::Closed)?);
}
let mut info: HashMap<FiveTuple, AllocInfo> = HashMap::new();

Ok(info)
} else {
Err(Error::Closed)
for _ in 0..self.command_tx.receiver_count() {
info.extend(infos_rx.recv().await.ok_or(Error::Closed)?);
}
}

/// Close stops the TURN Server. It cleans up any associated state and
/// closes all connections it is managing.
pub async fn close(&self) {
if let Some(tx) = &self.command_tx {
if tx.receiver_count() == 0 {
return;
}

let (closed_tx, closed_rx) = mpsc::channel(1);
drop(tx.send(Command::Close(Arc::new(closed_rx))));
closed_tx.closed().await;
}
Ok(info)
}
}

Expand All @@ -248,7 +221,4 @@ enum Command {
Option<Vec<FiveTuple>>,
mpsc::Sender<HashMap<FiveTuple, AllocInfo>>,
),

/// Command to close the [`Server`].
Close(Arc<mpsc::Receiver<()>>),
}
6 changes: 5 additions & 1 deletion src/server/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
XorRelayAddress, PROTO_UDP,
},
chandata::ChannelData,
con,
con::{Conn, Request},
server::DEFAULT_LIFETIME,
AuthHandler, Error,
Expand Down Expand Up @@ -796,7 +797,10 @@ async fn send_to(
.encode_into_bytes(msg)
.map_err(|e| Error::Encode(*e.kind()))?;

Ok(conn.send_to(bytes, dst).await?)
match conn.send_to(bytes, dst).await {
Ok(()) | Err(con::Error::TransportIsDead) => Ok(()),
Err(err) => Err(Error::from(err)),
}
}

/// Send a STUN packet and return the original error to the caller
Expand Down

0 comments on commit cc5ca8a

Please sign in to comment.