Skip to content

Commit

Permalink
reduce cloning and add rebind method to UdpConn
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Nov 19, 2024
1 parent 26bf7d8 commit e4b7f4d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 34 deletions.
37 changes: 14 additions & 23 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use futures_util::stream::BoxStream;
use iroh_base::key::NodeId;
use iroh_metrics::{inc, inc_by};
use iroh_relay::protos::stun;
use netwatch::{interfaces, ip::LocalAddresses, netmon};
use netwatch::{interfaces, ip::LocalAddresses, netmon, UdpSocket};
use quinn::AsyncUdpSocket;
use rand::{seq::SliceRandom, Rng, SeedableRng};
use smallvec::{smallvec, SmallVec};
Expand Down Expand Up @@ -441,11 +441,8 @@ impl MagicSock {
// Right now however we have one single poller behaving the same for each
// connection. It checks all paths and returns Poll::Ready as soon as any path is
// ready.
let ipv4_poller = Arc::new(self.pconn4.clone()).create_io_poller();
let ipv6_poller = self
.pconn6
.as_ref()
.map(|sock| Arc::new(sock.clone()).create_io_poller());
let ipv4_poller = self.pconn4.create_io_poller();
let ipv6_poller = self.pconn6.as_ref().map(|sock| sock.create_io_poller());
let relay_sender = self.relay_actor_sender.clone();
Box::pin(IoPoller {
ipv4_poller,
Expand Down Expand Up @@ -1091,7 +1088,6 @@ impl MagicSock {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
// This is the socket .try_send_disco_message_udp used.
let sock = self.conn_for_addr(dst)?;
let sock = Arc::new(sock.clone());
let mut poller = sock.create_io_poller();
match poller.as_mut().poll_writable(cx)? {
Poll::Ready(()) => continue,
Expand Down Expand Up @@ -1405,6 +1401,9 @@ impl Handle {
let ipv4_addr = pconn4.local_addr()?;
let ipv6_addr = pconn6.as_ref().and_then(|c| c.local_addr().ok());

let pconn4_sock = pconn4.as_socket();
let pconn6_sock = pconn6.as_ref().map(|p| p.as_socket());

let net_checker = netcheck::Client::new(Some(port_mapper.clone()), dns_resolver.clone())?;

let (actor_sender, actor_receiver) = mpsc::channel(256);
Expand All @@ -1430,8 +1429,8 @@ impl Handle {
ipv6_reported: Arc::new(AtomicBool::new(false)),
relay_map,
my_relay: Default::default(),
pconn4: pconn4.clone(),
pconn6: pconn6.clone(),
pconn4,
pconn6,
net_checker: net_checker.addr(),
disco_secrets: DiscoSecrets::default(),
node_map,
Expand Down Expand Up @@ -1480,8 +1479,8 @@ impl Handle {
periodic_re_stun_timer: new_re_stun_timer(false),
net_info_last: None,
port_mapper,
pconn4,
pconn6,
pconn4: pconn4_sock,
pconn6: pconn6_sock,
no_v4_send: false,
net_checker,
network_monitor,
Expand Down Expand Up @@ -1719,8 +1718,8 @@ struct Actor {
net_info_last: Option<NetInfo>,

// The underlying UDP sockets used to send/rcv packets.
pconn4: UdpConn,
pconn6: Option<UdpConn>,
pconn4: Arc<UdpSocket>,
pconn6: Option<Arc<UdpSocket>>,

/// The NAT-PMP/PCP/UPnP prober/client, for requesting port mappings from NAT devices.
port_mapper: portmapper::Client,
Expand Down Expand Up @@ -1892,14 +1891,6 @@ impl Actor {
self.port_mapper.deactivate();
self.relay_actor_cancel_token.cancel();

// Ignore errors from pconnN
// They will frequently have been closed already by a call to connBind.Close.
debug!("stopping connections");
if let Some(ref conn) = self.pconn6 {
conn.close().await.ok();
}
self.pconn4.close().await.ok();

debug!("shutdown complete");
return true;
}
Expand Down Expand Up @@ -2202,8 +2193,8 @@ impl Actor {
}

let relay_map = self.msock.relay_map.clone();
let pconn4 = Some(self.pconn4.as_socket());
let pconn6 = self.pconn6.as_ref().map(|p| p.as_socket());
let pconn4 = Some(self.pconn4.clone());
let pconn6 = self.pconn6.clone();

debug!("requesting netcheck report");
match self
Expand Down
32 changes: 21 additions & 11 deletions iroh-net/src/magicsock/udp_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use tokio::io::Interest;
use tracing::{debug, trace};

/// A UDP socket implementing Quinn's [`AsyncUdpSocket`].
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct UdpConn {
io: Arc<UdpSocket>,
inner: Arc<quinn_udp::UdpSocketState>,
inner: quinn_udp::UdpSocketState,
}

impl UdpConn {
Expand All @@ -28,32 +28,42 @@ impl UdpConn {

pub(super) fn bind(addr: SocketAddr) -> anyhow::Result<Self> {
let sock = bind(addr)?;
let state = sock.with_socket(move |socket| {
let state = sock.with_socket(|socket| {
quinn_udp::UdpSocketState::new(quinn_udp::UdpSockRef::from(socket))
})?;

Ok(Self {
io: Arc::new(sock),
inner: Arc::new(state),
inner: state,
})
}

pub(super) fn rebind(&mut self) -> anyhow::Result<()> {
// Rebind underlying socket
self.io.rebind()?;

// update socket state
let new_state = self.io.with_socket(|socket| {
quinn_udp::UdpSocketState::new(quinn_udp::UdpSockRef::from(socket))
})?;
self.inner = new_state;
Ok(())
}

pub fn port(&self) -> u16 {
self.local_addr().map(|p| p.port()).unwrap_or_default()
}

#[allow(clippy::unused_async)]
pub async fn close(&self) -> Result<(), io::Error> {
// Nothing to do atm
Ok(())
pub(super) fn create_io_poller(&self) -> Pin<Box<dyn quinn::UdpPoller>> {
Box::pin(IoPoller {
io: self.io.clone(),
})
}
}

impl AsyncUdpSocket for UdpConn {
fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn quinn::UdpPoller>> {
Box::pin(IoPoller {
io: self.io.clone(),
})
(&*self).create_io_poller()
}

fn try_send(&self, transmit: &Transmit<'_>) -> io::Result<()> {
Expand Down

0 comments on commit e4b7f4d

Please sign in to comment.