Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close builder #1576

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
42 changes: 25 additions & 17 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,23 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_all_async())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
}
self.tracker.len()
})
.wait()
}

/// Async version of [`TaskController::terminate_all()`].
pub async fn terminate_all_async(&self, timeout: Duration) -> usize {
pub async fn terminate_all_async(&self) {
self.tracker.close();
self.token.cancel();
if tokio::time::timeout(timeout, self.tracker.wait())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
return self.tracker.len();
}
0
self.tracker.wait().await
}
}

Expand Down Expand Up @@ -181,18 +183,24 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_async())
.await
.is_err()
{
tracing::error!("Failed to terminate the task");
return false;
};
true
})
.wait()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self) {
self.token.cancel();
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
let _ = handle.await;
}
true
}
}
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,7 @@ impl TransportManager {

pub async fn close(&self) {
self.close_unicast().await;
self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;
}

/*************************************/
Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ impl TransportMulticastInner {
cb.closed();
}

self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;

Ok(())
}
Expand Down
96 changes: 96 additions & 0 deletions zenoh/src/api/builders/close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::{
future::{Future, IntoFuture},
pin::Pin,
time::Duration,
};

use async_trait::async_trait;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use zenoh_runtime::ZRuntime;

/// A builder for close operations.
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
pub struct CloseBuilder<TCloseable: Closeable> {
closee: TCloseable::TClosee,
timeout: Duration,
}

// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TCloseable: Closeable> CloseBuilder<TCloseable> {
pub(crate) fn new(closeable: &TCloseable) -> Self {
Self {
closee: closeable.get_closee(),
timeout: Duration::from_secs(3600),
}
}

/// Set the timeout for close operation
///
/// # Arguments
///
/// * `timeout` - The timeout value for close operation
///
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}

impl<TCloseable: Closeable> Resolvable for CloseBuilder<TCloseable> {
type To = ZResult<()>;
}

impl<TCloseable: Closeable> Wait for CloseBuilder<TCloseable> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

impl<TCloseable: Closeable> IntoFuture for CloseBuilder<TCloseable> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(
async move {
if tokio::time::timeout(self.timeout, self.closee.close_inner())
.await
.is_err()
{
bail!("close operation timed out!")
}
Ok(())
}
.into_future(),
)
}
}

#[async_trait]
pub(crate) trait Closee: Send + Sync + 'static {
async fn close_inner(&self);
}

pub(crate) trait Closeable {
type TClosee: Closee;
fn get_closee(&self) -> Self::TClosee;
}
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

pub(crate) mod close;
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
Expand Down
96 changes: 54 additions & 42 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use tracing::{error, info, trace, warn};
use uhlc::Timestamp;
#[cfg(feature = "internal")]
Expand Down Expand Up @@ -67,6 +68,7 @@ use zenoh_result::ZResult;
use zenoh_shm::api::client_storage::ShmClientStorage;
use zenoh_task::TaskController;

use super::builders::close::{CloseBuilder, Closeable, Closee};
#[cfg(feature = "unstable")]
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -615,8 +617,8 @@ impl Session {
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
pub fn close(&self) -> CloseBuilder<Self> {
CloseBuilder::new(self)
}

/// Check if the session has been closed.
Expand Down Expand Up @@ -1073,51 +1075,12 @@ impl Session {
})
}
}

impl SessionInner {
pub fn zid(&self) -> ZenohId {
self.runtime.zid()
}

fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
if self.owns_runtime {
info!(zid = %self.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
} else {
primitives.send_close();
}
// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _tokens = std::mem::take(&mut state.tokens);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
Ok(())
})
}

pub(crate) fn declare_prefix<'a>(
&'a self,
prefix: &'a str,
Expand Down Expand Up @@ -2887,3 +2850,52 @@ where
{
OpenBuilder::new(config)
}

#[async_trait]
impl Closee for Arc<SessionInner> {
async fn close_inner(&self) {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return;
};

if self.owns_runtime {
info!(zid = %self.zid(), "close session");
self.task_controller.terminate_all_async().await;
self.runtime.get_closee().close_inner().await;
} else {
self.task_controller.terminate_all_async().await;
primitives.send_close();
}

// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _tokens = std::mem::take(&mut state.tokens);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
}
}

impl Closeable for Session {
type TClosee = Arc<SessionInner>;

fn get_closee(&self) -> Self::TClosee {
self.0.clone()
}
}
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub mod session {
pub use crate::api::builders::session::{init, InitBuilder};
pub use crate::api::{
builders::{
close::CloseBuilder,
info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder},
publisher::{SessionDeleteBuilder, SessionPutBuilder},
query::SessionGetBuilder,
Expand Down
Loading
Loading