diff --git a/iroh-relay/src/server/clients.rs b/iroh-relay/src/server/clients.rs index 607f7960b9..322df35b78 100644 --- a/iroh-relay/src/server/clients.rs +++ b/iroh-relay/src/server/clients.rs @@ -11,7 +11,7 @@ use iroh_metrics::inc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, trace}; -use super::client::{Client, Config, Packet}; +use super::client::{Client, Config}; use crate::server::metrics::Metrics; /// Manages the connections to all currently connected clients. @@ -56,7 +56,14 @@ impl Clients { /// Removes the client from the map of clients, & sends a notification /// to each client that peers has sent data to, to let them know that /// peer is gone from the network. - async fn unregister(&self, node_id: NodeId) { + /// + /// Explicitly drops the reference to the client to avoid deadlock. + async fn unregister<'a>( + &self, + client: dashmap::mapref::one::Ref<'a, iroh_base::PublicKey, Client>, + node_id: NodeId, + ) { + drop(client); // avoid deadlock trace!(node_id = node_id.fmt_short(), "unregistering client"); if let Some((_, client)) = self.0.clients.remove(&node_id) { @@ -83,42 +90,53 @@ impl Clients { } } - /// Attempt to send a packet to client with [`NodeId`] `dst` + /// Attempt to send a packet to client with [`NodeId`] `dst`. pub(super) async fn send_packet(&self, dst: NodeId, data: Bytes, src: NodeId) -> Result<()> { - if let Some(client) = self.0.clients.get(&dst) { - let res = client.try_send_packet(src, data); - return self.process_result(src, dst, res).await; + let Some(client) = self.0.clients.get(&dst) else { + debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); + inc!(Metrics, send_packets_dropped); + return Ok(()); + }; + match client.try_send_packet(src, data) { + Ok(_) => { + // Record sent_to relationship + self.0.sent_to.entry(src).or_default().insert(dst); + Ok(()) + } + Err(TrySendError::Full(_)) => { + debug!( + dst = dst.fmt_short(), + "client too busy to receive packet, dropping packet" + ); + bail!("failed to send message: full"); + } + Err(TrySendError::Closed(_)) => { + debug!( + dst = dst.fmt_short(), + "can no longer write to client, dropping message and pruning connection" + ); + self.unregister(client, dst).await; + bail!("failed to send message: gone"); + } } - debug!(dst = dst.fmt_short(), "no connected client, dropped packet"); - inc!(Metrics, send_packets_dropped); - Ok(()) } + /// Attempt to send a disco packet to client with [`NodeId`] `dst`. pub(super) async fn send_disco_packet( &self, dst: NodeId, data: Bytes, src: NodeId, ) -> Result<()> { - if let Some(client) = self.0.clients.get(&dst) { - let res = client.try_send_disco_packet(src, data); - return self.process_result(src, dst, res).await; - } - debug!( - dst = dst.fmt_short(), - "no connected client, dropped disco packet" - ); - inc!(Metrics, disco_packets_dropped); - Ok(()) - } - - async fn process_result( - &self, - src: NodeId, - dst: NodeId, - res: Result<(), TrySendError>, - ) -> Result<()> { - match res { + let Some(client) = self.0.clients.get(&dst) else { + debug!( + dst = dst.fmt_short(), + "no connected client, dropped disco packet" + ); + inc!(Metrics, disco_packets_dropped); + return Ok(()); + }; + match client.try_send_disco_packet(src, data) { Ok(_) => { // Record sent_to relationship self.0.sent_to.entry(src).or_default().insert(dst); @@ -127,17 +145,17 @@ impl Clients { Err(TrySendError::Full(_)) => { debug!( dst = dst.fmt_short(), - "client too busy to receive packet, dropping packet" + "client too busy to receive disco packet, dropping packet" ); - bail!("failed to send message"); + bail!("failed to send message: full"); } Err(TrySendError::Closed(_)) => { debug!( dst = dst.fmt_short(), - "can no longer write to client, dropping message and pruning connection" + "can no longer write to client, dropping disco message and pruning connection" ); - self.unregister(dst).await; - bail!("failed to send message"); + self.unregister(client, dst).await; + bail!("failed to send message: gone"); } } } @@ -212,8 +230,11 @@ mod tests { } ); - // send peer_gone - clients.unregister(a_key).await; + let client = clients.0.clients.get(&a_key).unwrap(); + + // send peer_gone. Also, tests that we do not get a deadlock + // when unregistering. + clients.unregister(client, a_key).await; assert!(!clients.0.clients.contains_key(&a_key)); clients.shutdown().await;