Skip to content

Commit

Permalink
feat: improve shutdown interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Nov 29, 2024
1 parent fbcaaa5 commit fc49925
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl DiscoveryTask {
debug!("discovery: start");
loop {
let next = tokio::select! {
_ = ep.cancelled() => break,
_ = ep.cancel_token().cancelled() => break,
next = stream.next() => next
};
match next {
Expand Down
31 changes: 17 additions & 14 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use derive_more::Debug;
use futures_lite::{Stream, StreamExt};
use iroh_base::relay_map::RelayMap;
use pin_project::pin_project;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument, trace, warn};
use url::Url;

Expand Down Expand Up @@ -965,27 +965,30 @@ impl Endpoint {
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> {
let Endpoint {
msock,
endpoint,
cancel_token,
..
} = self;
cancel_token.cancel();
if self.is_closed() {
return Ok(());
}

self.cancel_token.cancel();
tracing::debug!("Closing connections");
endpoint.close(error_code, reason);
endpoint.wait_idle().await;
self.endpoint.close(error_code, reason);
self.endpoint.wait_idle().await;

tracing::debug!("Connections closed");

msock.close().await?;
self.msock.close().await?;
Ok(())
}

/// Check if this endpoint is still alive, or already closed.
pub fn is_closed(&self) -> bool {
self.cancel_token.is_cancelled() && self.msock.is_closed()
}

// # Remaining private methods

pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.cancel_token.cancelled()
/// Expose the internal [`CancellationToken`] to link shutdowns.
pub(crate) fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}

/// Return the quic mapped address for this `node_id` and possibly start discovery
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl MagicSock {
self.closing.load(Ordering::Relaxed)
}

fn is_closed(&self) -> bool {
pub(crate) fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}

Expand Down
5 changes: 4 additions & 1 deletion iroh/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ impl RouterBuilder {
let mut join_set = JoinSet::new();
let endpoint = self.endpoint.clone();
let protos = protocols.clone();
let cancel = CancellationToken::new();

// We use a child token of the endpoint, to ensure that this is shutdown
// when the endpoint is shutdown, but that we can shutdown ourselves independently.
let cancel = endpoint.cancel_token().child_token();
let cancel_token = cancel.clone();

let run_loop_fut = async move {
Expand Down

0 comments on commit fc49925

Please sign in to comment.