diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 450bd60f6d5..e5215e04a16 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -391,7 +391,7 @@ impl DiscoveryTask { debug!("discovery: start"); loop { let next = tokio::select! { - _ = ep.cancelled() => break, + _ = ep.cancel_token().cancelled() => break, next = stream.next() => next }; match next { diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 60d575f76cc..a257192aa40 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -26,7 +26,7 @@ use derive_more::Debug; use futures_lite::{Stream, StreamExt}; use iroh_base::relay_map::RelayMap; use pin_project::pin_project; -use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn}; use url::Url; @@ -965,27 +965,30 @@ impl Endpoint { /// Returns an error if closing the magic socket failed. /// TODO: Document error cases. pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> { - let Endpoint { - msock, - endpoint, - cancel_token, - .. - } = self; - cancel_token.cancel(); + if self.is_closed() { + return Ok(()); + } + + self.cancel_token.cancel(); tracing::debug!("Closing connections"); - endpoint.close(error_code, reason); - endpoint.wait_idle().await; + self.endpoint.close(error_code, reason); + self.endpoint.wait_idle().await; tracing::debug!("Connections closed"); - - msock.close().await?; + self.msock.close().await?; Ok(()) } + /// Check if this endpoint is still alive, or already closed. + pub fn is_closed(&self) -> bool { + self.cancel_token.is_cancelled() && self.msock.is_closed() + } + // # Remaining private methods - pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancel_token.cancelled() + /// Expose the internal [`CancellationToken`] to link shutdowns. + pub(crate) fn cancel_token(&self) -> &CancellationToken { + &self.cancel_token } /// Return the quic mapped address for this `node_id` and possibly start discovery diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index d88408d48d2..162dcbba075 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -274,7 +274,7 @@ impl MagicSock { self.closing.load(Ordering::Relaxed) } - fn is_closed(&self) -> bool { + pub(crate) fn is_closed(&self) -> bool { self.closed.load(Ordering::SeqCst) } diff --git a/iroh/src/protocol.rs b/iroh/src/protocol.rs index 6349063255c..fa6ba672194 100644 --- a/iroh/src/protocol.rs +++ b/iroh/src/protocol.rs @@ -268,7 +268,10 @@ impl RouterBuilder { let mut join_set = JoinSet::new(); let endpoint = self.endpoint.clone(); let protos = protocols.clone(); - let cancel = CancellationToken::new(); + + // We use a child token of the endpoint, to ensure that this is shutdown + // when the endpoint is shutdown, but that we can shutdown ourselves independently. + let cancel = endpoint.cancel_token().child_token(); let cancel_token = cancel.clone(); let run_loop_fut = async move {