From 2cc7f12a01e15517e6b8c358d2e63056d526c7ab Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 5 Nov 2024 15:38:05 +0100 Subject: [PATCH 01/24] Expose and use ke macro --- commons/zenoh-macros/src/lib.rs | 2 +- zenoh/src/api/admin.rs | 28 ++++++++++++++-------------- zenoh/src/lib.rs | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs index 2047c424cf..cc6f4a2295 100644 --- a/commons/zenoh-macros/src/lib.rs +++ b/commons/zenoh-macros/src/lib.rs @@ -515,7 +515,7 @@ pub fn ke(tokens: TokenStream) -> TokenStream { let value: LitStr = syn::parse(tokens).unwrap(); let ke = value.value(); match zenoh_keyexpr::keyexpr::new(&ke) { - Ok(_) => quote!(unsafe {::zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(), + Ok(_) => quote!(unsafe { zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(), Err(e) => panic!("{}", e), } } diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index b0d0bed0f7..74960e68f7 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -19,6 +19,7 @@ use std::{ use zenoh_core::{Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr; +use zenoh_macros::ke; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; @@ -26,6 +27,7 @@ use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; +use crate as zenoh; use crate::{ api::{ encoding::Encoding, @@ -38,17 +40,15 @@ use crate::{ handlers::Callback, }; -lazy_static::lazy_static!( - static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; - static ref KE_PREFIX: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") }; - static ref KE_SESSION: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("session") }; - static ref KE_TRANSPORT_UNICAST: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("transport/unicast") }; - static ref KE_LINK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("link") }; -); +static KE_STARSTAR: &keyexpr = ke!("**"); +static KE_PREFIX: &keyexpr = ke!("@"); +static KE_SESSION: &keyexpr = ke!("session"); +static KE_TRANSPORT_UNICAST: &keyexpr = ke!("transport/unicast"); +static KE_LINK: &keyexpr = ke!("link"); pub(crate) fn init(session: WeakSession) { if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) { - let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR) + let admin_key = KeyExpr::from(KE_PREFIX / own_zid / KE_SESSION / KE_STARSTAR) .to_wire(&session) .to_owned(); @@ -68,7 +68,7 @@ pub(crate) fn on_admin_query(session: &WeakSession, query: Query) { fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) { let zid = peer.zid.to_string(); if let Ok(zid) = keyexpr::new(&zid) { - let key_expr = *KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid; + let key_expr = KE_PREFIX / own_zid / KE_SESSION / KE_TRANSPORT_UNICAST / zid; if query.key_expr().intersects(&key_expr) { match serde_json::to_vec(&peer) { Ok(bytes) => { @@ -82,12 +82,12 @@ pub(crate) fn on_admin_query(session: &WeakSession, query: Query) { let mut s = DefaultHasher::new(); link.hash(&mut s); if let Ok(lid) = keyexpr::new(&s.finish().to_string()) { - let key_expr = *KE_PREFIX + let key_expr = KE_PREFIX / own_zid - / *KE_SESSION - / *KE_TRANSPORT_UNICAST + / KE_SESSION + / KE_TRANSPORT_UNICAST / zid - / *KE_LINK + / KE_LINK / lid; if query.key_expr().intersects(&key_expr) { match serde_json::to_vec(&link) { @@ -156,7 +156,7 @@ impl TransportMulticastEventHandler for Handler { if let Ok(own_zid) = keyexpr::new(&self.session.zid().to_string()) { if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) { let expr = WireExpr::from( - &(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid), + &(KE_PREFIX / own_zid / KE_SESSION / KE_TRANSPORT_UNICAST / zid), ) .to_owned(); let info = DataInfo { diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 701e13fc94..298ce6f1f6 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -172,7 +172,7 @@ pub mod key_expr { #[zenoh_macros::unstable] pub mod format { pub use zenoh_keyexpr::format::*; - pub use zenoh_macros::{kedefine, keformat, kewrite}; + pub use zenoh_macros::{ke, kedefine, keformat, kewrite}; pub mod macro_support { pub use zenoh_keyexpr::format::macro_support::*; } From 163d3d50ba38ec29167e8de3617e907aad7cdf43 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 8 Nov 2024 16:21:57 +0100 Subject: [PATCH 02/24] Fix SourceInfo publication --- zenoh/src/api/session.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c5ba24b98f..362fb8ce2f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1968,7 +1968,7 @@ impl SessionInner { timestamp, encoding: encoding.clone().into(), #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), + ext_sinfo: source_info.clone().into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, #[cfg(feature = "shared-memory")] @@ -1980,7 +1980,7 @@ impl SessionInner { SampleKind::Delete => PushBody::Del(Del { timestamp, #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), + ext_sinfo: source_info.clone().into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, ext_attachment: attachment.clone().map(|a| a.into()), @@ -1999,7 +1999,13 @@ impl SessionInner { kind, encoding: Some(encoding), timestamp, + #[cfg(feature = "unstable")] + source_id: source_info.source_id, + #[cfg(not(feature = "unstable"))] source_id: None, + #[cfg(feature = "unstable")] + source_sn: source_info.source_sn, + #[cfg(not(feature = "unstable"))] source_sn: None, qos: QoS::from(push::ext::QoSType::new( priority.into(), From 43a5d3ce19b84f6fb672702587bd194ef3f65eea Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 8 Nov 2024 16:23:16 +0100 Subject: [PATCH 03/24] Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber --- Cargo.lock | 1 + commons/zenoh-config/src/wrappers.rs | 10 + zenoh-ext/Cargo.toml | 1 + zenoh-ext/src/advanced_cache.rs | 327 +++++++++++++ zenoh-ext/src/advanced_publisher.rs | 392 ++++++++++++++++ zenoh-ext/src/advanced_subscriber.rs | 676 +++++++++++++++++++++++++++ zenoh-ext/src/lib.rs | 14 +- zenoh-ext/src/publisher_ext.rs | 46 ++ zenoh-ext/src/session_ext.rs | 20 + zenoh-ext/src/subscriber_ext.rs | 51 +- zenoh/src/api/builders/publisher.rs | 8 + 11 files changed, 1544 insertions(+), 2 deletions(-) create mode 100644 zenoh-ext/src/advanced_cache.rs create mode 100644 zenoh-ext/src/advanced_publisher.rs create mode 100644 zenoh-ext/src/advanced_subscriber.rs create mode 100644 zenoh-ext/src/publisher_ext.rs diff --git a/Cargo.lock b/Cargo.lock index 3fec7718ce..7beb1c9a7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5593,6 +5593,7 @@ dependencies = [ name = "zenoh-ext" version = "1.0.0-dev" dependencies = [ + "async-trait", "bincode", "flume", "futures", diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index 31afe358e7..759507c770 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -152,6 +152,16 @@ pub struct EntityGlobalId(EntityGlobalIdProto); pub type EntityId = u32; impl EntityGlobalId { + /// Creates a new EntityGlobalId. + #[zenoh_macros::internal] + pub fn new(zid: ZenohId, eid: EntityId) -> Self { + EntityGlobalIdProto { + zid: zid.into(), + eid, + } + .into() + } + /// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to. pub fn zid(&self) -> ZenohId { self.0.zid.into() diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 4de255d0d6..0b693fa820 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -38,6 +38,7 @@ tokio = { workspace = true, features = [ "macros", "io-std", ] } +async-trait = { workspace = true } bincode = { workspace = true } zenoh-util = { workspace = true } flume = { workspace = true } diff --git a/zenoh-ext/src/advanced_cache.rs b/zenoh-ext/src/advanced_cache.rs new file mode 100644 index 0000000000..b96b7fe4af --- /dev/null +++ b/zenoh-ext/src/advanced_cache.rs @@ -0,0 +1,327 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + borrow::Borrow, + collections::{HashMap, VecDeque}, + future::{IntoFuture, Ready}, +}; + +use flume::{bounded, Sender}; +use futures::{select, FutureExt}; +use tokio::task; +use zenoh::{ + handlers::FifoChannelHandler, + internal::{bail, ResolveFuture}, + key_expr::{ + format::{ke, kedefine}, + keyexpr, KeyExpr, OwnedKeyExpr, + }, + liveliness::LivelinessToken, + pubsub::Subscriber, + query::{Query, Queryable, ZenohParameters}, + sample::{Locality, Sample}, + Resolvable, Resolve, Result as ZResult, Session, Wait, +}; + +#[zenoh_macros::unstable] +pub(crate) static KE_STAR: &keyexpr = ke!("*"); +#[zenoh_macros::unstable] +pub(crate) static KE_PREFIX: &keyexpr = ke!("@cache"); +#[zenoh_macros::unstable] +kedefine!( + pub(crate) ke_liveliness: "@cache/${zid:*}/${eid:*}/${remaining:**}", +); + +/// The builder of AdvancedCache, allowing to configure it. +pub struct AdvancedCacheBuilder<'a, 'b, 'c> { + session: &'a Session, + pub_key_expr: ZResult>, + queryable_prefix: Option>>, + subscriber_origin: Locality, + queryable_origin: Locality, + history: usize, + liveliness: bool, + resources_limit: Option, +} + +impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { + pub(crate) fn new( + session: &'a Session, + pub_key_expr: ZResult>, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> { + AdvancedCacheBuilder { + session, + pub_key_expr, + queryable_prefix: Some(Ok((KE_PREFIX / KE_STAR / KE_STAR).into())), + subscriber_origin: Locality::default(), + queryable_origin: Locality::default(), + history: 1024, + liveliness: false, + resources_limit: None, + } + } + + /// Change the prefix used for queryable. + pub fn queryable_prefix(mut self, queryable_prefix: TryIntoKeyExpr) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + self.queryable_prefix = Some(queryable_prefix.try_into().map_err(Into::into)); + self + } + + /// Restrict the matching publications that will be cached by this [`AdvancedCache`] + /// to the ones that have the given [`Locality`](crate::prelude::Locality). + #[inline] + pub fn subscriber_allowed_origin(mut self, origin: Locality) -> Self { + self.subscriber_origin = origin; + self + } + + /// Restrict the matching queries that will be receive by this [`AdvancedCache`]'s queryable + /// to the ones that have the given [`Locality`](crate::prelude::Locality). + #[inline] + pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { + self.queryable_origin = origin; + self + } + + /// Change the history size for each resource. + pub fn history(mut self, history: usize) -> Self { + self.history = history; + self + } + + /// Change the limit number of cached resources. + pub fn resources_limit(mut self, limit: usize) -> Self { + self.resources_limit = Some(limit); + self + } + + pub fn liveliness(mut self, enabled: bool) -> Self { + self.liveliness = enabled; + self + } +} + +impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> { + type To = ZResult; +} + +impl Wait for AdvancedCacheBuilder<'_, '_, '_> { + fn wait(self) -> ::To { + AdvancedCache::new(self) + } +} + +impl<'a> IntoFuture for AdvancedCacheBuilder<'a, '_, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +fn decode_range(range: &str) -> (Option, Option) { + let mut split = range.split(".."); + let start = split.next().and_then(|s| s.parse::().ok()); + let end = split.next().map(|s| s.parse::().ok()).unwrap_or(start); + (start, end) +} + +fn sample_in_range(sample: &Sample, start: Option, end: Option) -> bool { + if start.is_none() && end.is_none() { + true + } else if let Some(source_sn) = sample.source_info().source_sn() { + match (start, end) { + (Some(start), Some(end)) => source_sn >= start && source_sn <= end, + (Some(start), None) => source_sn >= start, + (None, Some(end)) => source_sn <= end, + (None, None) => true, + } + } else { + false + } +} + +pub struct AdvancedCache { + _sub: Subscriber>, + _queryable: Queryable>, + _token: Option, + _stoptx: Sender, +} + +impl AdvancedCache { + fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult { + let key_expr = conf.pub_key_expr?; + // the queryable_prefix (optional), and the key_expr for AdvancedCache's queryable ("[]/") + let (queryable_prefix, queryable_key_expr): (Option, KeyExpr) = + match conf.queryable_prefix { + None => (None, key_expr.clone()), + Some(Ok(ke)) => { + let queryable_key_expr = (&ke) / &key_expr; + (Some(ke.into()), queryable_key_expr) + } + Some(Err(e)) => bail!("Invalid key expression for queryable_prefix: {}", e), + }; + tracing::debug!( + "Create AdvancedCache on {} with history={} resource_limit={:?}", + &key_expr, + conf.history, + conf.resources_limit + ); + + // declare the local subscriber that will store the local publications + let sub = conf + .session + .declare_subscriber(&key_expr) + .allowed_origin(conf.subscriber_origin) + .wait()?; + + // declare the queryable that will answer to queries on cache + let queryable = conf + .session + .declare_queryable(&queryable_key_expr) + .allowed_origin(conf.queryable_origin) + .wait()?; + + // take local ownership of stuff to be moved into task + let sub_recv = sub.handler().clone(); + let quer_recv = queryable.handler().clone(); + let pub_key_expr = key_expr.into_owned(); + let resources_limit = conf.resources_limit; + let history = conf.history; + + let (stoptx, stoprx) = bounded::(1); + task::spawn(async move { + let mut cache: HashMap> = + HashMap::with_capacity(resources_limit.unwrap_or(32)); + let limit = resources_limit.unwrap_or(usize::MAX); + + loop { + select!( + // on publication received by the local subscriber, store it + sample = sub_recv.recv_async() => { + if let Ok(sample) = sample { + let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix { + prefix.join(&sample.key_expr()).unwrap().into() + } else { + sample.key_expr().clone() + }; + + if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { + if queue.len() >= history { + queue.pop_front(); + } + queue.push_back(sample); + } else if cache.len() >= limit { + tracing::error!("AdvancedCache on {}: resource_limit exceeded - can't cache publication for a new resource", + pub_key_expr); + } else { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(sample); + cache.insert(queryable_key_expr.into(), queue); + } + } + }, + + // on query, reply with cache content + query = quer_recv.recv_async() => { + if let Ok(query) = query { + let (start, end) = query.parameters().get("_sn").map(decode_range).unwrap_or((None, None)); + if !query.selector().key_expr().as_str().contains('*') { + if let Some(queue) = cache.get(query.selector().key_expr().as_keyexpr()) { + for sample in queue { + if sample_in_range(sample, start, end) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if !time_range.contains(timestamp.get_time().to_system_time()){ + continue; + } + } + if let Err(e) = query.reply_sample(sample.clone()).await { + tracing::warn!("Error replying to query: {}", e); + } + } + } + } + } else { + for (key_expr, queue) in cache.iter() { + if query.selector().key_expr().intersects(key_expr.borrow()) { + for sample in queue { + if sample_in_range(sample, start, end) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if !time_range.contains(timestamp.get_time().to_system_time()){ + continue; + } + } + if let Err(e) = query.reply_sample(sample.clone()).await { + tracing::warn!("Error replying to query: {}", e); + } + } + } + } + } + } + } + }, + + // When stoptx is dropped, stop the task + _ = stoprx.recv_async().fuse() => { + return + } + ); + } + }); + + let token = if conf.liveliness { + Some( + conf.session + .liveliness() + .declare_token(queryable_key_expr) + .wait()?, + ) + } else { + None + }; + + Ok(AdvancedCache { + _sub: sub, + _queryable: queryable, + _token: token, + _stoptx: stoptx, + }) + } + + /// Close this AdvancedCache + #[inline] + pub fn close(self) -> impl Resolve> { + ResolveFuture::new(async move { + let AdvancedCache { + _queryable, + _sub, + _token, + _stoptx, + } = self; + _sub.undeclare().await?; + if let Some(token) = _token { + token.undeclare().await?; + } + _queryable.undeclare().await?; + drop(_stoptx); + Ok(()) + }) + } +} diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs new file mode 100644 index 0000000000..1e6bd77d3f --- /dev/null +++ b/zenoh-ext/src/advanced_publisher.rs @@ -0,0 +1,392 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::atomic::{AtomicU32, Ordering}, +}; + +use zenoh::{ + bytes::{Encoding, ZBytes}, + internal::bail, + key_expr::KeyExpr, + liveliness::LivelinessToken, + pubsub::{Publisher, PublisherDeleteBuilder, PublisherPutBuilder}, + qos::{CongestionControl, Priority}, + sample::{Locality, SourceInfo}, + session::EntityGlobalId, + Resolvable, Resolve, Result as ZResult, Session, Wait, +}; + +use crate::{ + advanced_cache::{AdvancedCache, KE_PREFIX}, + SessionExt, +}; + +pub enum Sequencing { + None, + Timestamp, + SequenceNumber, +} + +/// The builder of PublicationCache, allowing to configure it. +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +pub struct AdvancedPublisherBuilder<'a, 'b> { + session: &'a Session, + pub_key_expr: ZResult>, + sequencing: Sequencing, + liveliness: bool, + cache: bool, + history: usize, + resources_limit: Option, +} + +impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { + pub(crate) fn new( + session: &'a Session, + pub_key_expr: ZResult>, + ) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder { + session, + pub_key_expr, + sequencing: Sequencing::None, + liveliness: false, + cache: false, + history: 1, + resources_limit: None, + } + } + + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + pub fn retransmission(mut self) -> Self { + self.cache = true; + self.sequencing = Sequencing::SequenceNumber; + self + } + + /// Change the history size for each resource. + pub fn history(mut self, history: usize) -> Self { + self.cache = true; + self.history = history; + self + } + + /// Change the limit number of cached resources. + pub fn resources_limit(mut self, limit: usize) -> Self { + self.resources_limit = Some(limit); + self + } + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + pub fn late_joiner(mut self) -> Self { + self.liveliness = true; + self + } +} + +impl<'a> Resolvable for AdvancedPublisherBuilder<'a, '_> { + type To = ZResult>; +} + +impl Wait for AdvancedPublisherBuilder<'_, '_> { + fn wait(self) -> ::To { + AdvancedPublisher::new(self) + } +} + +impl<'a> IntoFuture for AdvancedPublisherBuilder<'a, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +pub struct AdvancedPublisher<'a> { + publisher: Publisher<'a>, + seqnum: Option, + _cache: Option, + _token: Option, +} + +impl<'a> AdvancedPublisher<'a> { + fn new(conf: AdvancedPublisherBuilder<'a, '_>) -> ZResult { + let key_expr = conf.pub_key_expr?; + + let publisher = conf + .session + .declare_publisher(key_expr.clone().into_owned()) + .wait()?; + let id = publisher.id(); + let prefix = KE_PREFIX + / &id.zid().into_keyexpr() + / &KeyExpr::try_from(id.eid().to_string()).unwrap(); + + let seqnum = match conf.sequencing { + Sequencing::SequenceNumber => Some(AtomicU32::new(0)), + Sequencing::Timestamp => { + if conf.session.hlc().is_none() { + bail!( + "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \ + the 'timestamping' setting must be enabled in the Zenoh configuration.", + key_expr, + ) + } + None + } + _ => None, + }; + + let cache = if conf.cache { + let mut builder = conf + .session + .declare_advanced_cache(key_expr.clone().into_owned()) + .subscriber_allowed_origin(Locality::SessionLocal) + .history(conf.history) + .queryable_prefix(&prefix); + if let Some(resources_limit) = conf.resources_limit { + builder = builder.resources_limit(resources_limit); + } + Some(builder.wait()?) + } else { + None + }; + + let token = if conf.liveliness { + Some( + conf.session + .liveliness() + .declare_token(prefix / &key_expr) + .wait()?, + ) + } else { + None + }; + + Ok(AdvancedPublisher { + publisher, + seqnum, + _cache: cache, + _token: token, + }) + } + + /// Returns the [`EntityGlobalId`] of this Publisher. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression") + /// .await + /// .unwrap(); + /// let publisher_id = publisher.id(); + /// # } + /// ``` + pub fn id(&self) -> EntityGlobalId { + self.publisher.id() + } + + #[inline] + pub fn key_expr(&self) -> &KeyExpr<'a> { + self.publisher.key_expr() + } + + /// Get the [`Encoding`] used when publishing data. + #[inline] + pub fn encoding(&self) -> &Encoding { + self.publisher.encoding() + } + + /// Get the `congestion_control` applied when routing the data. + #[inline] + pub fn congestion_control(&self) -> CongestionControl { + self.publisher.congestion_control() + } + + /// Get the priority of the written data. + #[inline] + pub fn priority(&self) -> Priority { + self.publisher.priority() + } + + /// Consumes the given `Publisher`, returning a thread-safe reference-counting + /// pointer to it (`Arc`). This is equivalent to `Arc::new(Publisher)`. + /// + /// This is useful to share ownership of the `Publisher` between several threads + /// and tasks. It also allows to create [`MatchingListener`] with static + /// lifetime that can be moved to several threads and tasks. + /// + /// Note: the given zenoh `Publisher` will be undeclared when the last reference to + /// it is dropped. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().await.unwrap(); + /// + /// tokio::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn into_arc(self) -> std::sync::Arc { + std::sync::Arc::new(self) + } + + /// Put data. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.put("value").await.unwrap(); + /// # } + /// ``` + #[inline] + pub fn put(&self, payload: IntoZBytes) -> PublisherPutBuilder<'_> + where + IntoZBytes: Into, + { + println!("here"); + let mut put = self.publisher.put(payload); + if let Some(seqnum) = &self.seqnum { + println!("there"); + put = put.source_info(SourceInfo::new( + Some(self.publisher.id()), + Some(seqnum.fetch_add(1, Ordering::Relaxed)), + )); + } + put + } + + /// Delete data. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.delete().await.unwrap(); + /// # } + /// ``` + pub fn delete(&self) -> PublisherDeleteBuilder<'_> { + let mut delete = self.publisher.delete(); + if let Some(seqnum) = &self.seqnum { + delete = delete.source_info(SourceInfo::new( + Some(self.publisher.id()), + Some(seqnum.fetch_add(1, Ordering::Relaxed)), + )); + } + delete + } + + /// Return the [`MatchingStatus`] of the publisher. + /// + /// [`MatchingStatus::matching_subscribers`] will return true if there exist Subscribers + /// matching the Publisher's key expression and false otherwise. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_subscribers: bool = publisher + /// .matching_status() + /// .await + /// .unwrap() + /// .matching_subscribers(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_status(&self) -> impl Resolve> + '_ { + self.publisher.matching_status() + } + + /// Return a [`MatchingListener`] for this Publisher. + /// + /// The [`MatchingListener`] that will send a notification each time the [`MatchingStatus`] of + /// the Publisher changes. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher.matching_listener().await.unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_listener( + &self, + ) -> zenoh::pubsub::MatchingListenerBuilder<'_, '_, zenoh::handlers::DefaultHandler> { + self.publisher.matching_listener() + } + + /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.undeclare().await.unwrap(); + /// # } + /// ``` + pub fn undeclare(self) -> impl Resolve> + 'a { + self.publisher.undeclare() + } +} diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs new file mode 100644 index 0000000000..92084181cd --- /dev/null +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -0,0 +1,676 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{future::IntoFuture, str::FromStr}; + +use zenoh::{ + config::ZenohId, + handlers::{Callback, IntoHandler}, + key_expr::KeyExpr, + query::{ConsolidationMode, Selector}, + sample::{Locality, Sample}, + session::{EntityGlobalId, EntityId}, + Resolvable, Resolve, Session, Wait, +}; +use zenoh_util::{Timed, TimedEvent, Timer}; +#[zenoh_macros::unstable] +use { + async_trait::async_trait, + std::collections::hash_map::Entry, + std::collections::HashMap, + std::convert::TryFrom, + std::future::Ready, + std::sync::{Arc, Mutex}, + std::time::Duration, + zenoh::handlers::{locked, DefaultHandler}, + zenoh::internal::zlock, + zenoh::pubsub::Subscriber, + zenoh::query::{QueryTarget, Reply, ReplyKeyExpr}, + zenoh::Result as ZResult, +}; + +use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR}; + +/// The builder of AdvancedSubscriber, allowing to configure it. +#[zenoh_macros::unstable] +pub struct AdvancedSubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + pub(crate) session: &'a Session, + pub(crate) key_expr: ZResult>, + pub(crate) origin: Locality, + pub(crate) retransmission: bool, + pub(crate) query_target: QueryTarget, + pub(crate) query_timeout: Duration, + pub(crate) period: Option, + pub(crate) history: bool, + pub(crate) liveliness: bool, + pub(crate) handler: Handler, +} + +#[zenoh_macros::unstable] +impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { + pub(crate) fn new( + session: &'a Session, + key_expr: ZResult>, + origin: Locality, + handler: Handler, + ) -> Self { + AdvancedSubscriberBuilder { + session, + key_expr, + origin, + handler, + retransmission: false, + query_target: QueryTarget::All, + query_timeout: Duration::from_secs(10), + history: false, + liveliness: false, + period: None, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b> AdvancedSubscriberBuilder<'a, 'b, DefaultHandler> { + /// Add callback to AdvancedSubscriber. + #[inline] + pub fn callback( + self, + callback: Callback, + ) -> AdvancedSubscriberBuilder<'a, 'b, Callback> + where + Callback: Fn(Sample) + Send + Sync + 'static, + { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + period: self.period, + history: self.history, + liveliness: self.liveliness, + handler: callback, + } + } + + /// Add callback to `AdvancedSubscriber`. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut` + #[inline] + pub fn callback_mut( + self, + callback: CallbackMut, + ) -> AdvancedSubscriberBuilder<'a, 'b, impl Fn(Sample) + Send + Sync + 'static> + where + CallbackMut: FnMut(Sample) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber). + #[inline] + pub fn with(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + period: self.period, + history: self.history, + liveliness: self.liveliness, + handler, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { + /// Restrict the matching publications that will be receive by this [`Subscriber`] + /// to the ones that have the given [`Locality`](crate::prelude::Locality). + #[zenoh_macros::unstable] + #[inline] + pub fn allowed_origin(mut self, origin: Locality) -> Self { + self.origin = origin; + self + } + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + #[zenoh_macros::unstable] + #[inline] + pub fn retransmission(mut self) -> Self { + self.retransmission = true; + self + } + + // /// Change the target to be used for queries. + + // #[inline] + // pub fn query_target(mut self, query_target: QueryTarget) -> Self { + // self.query_target = query_target; + // self + // } + + /// Change the timeout to be used for queries (history, retransmission). + #[zenoh_macros::unstable] + #[inline] + pub fn query_timeout(mut self, query_timeout: Duration) -> Self { + self.query_timeout = query_timeout; + self + } + + /// Enable periodic queries for not yet received Samples and specify their period. + /// + /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. + /// So it is useful for sporadic publications but useless for periodic publications + /// with a period smaller or equal to this period. + /// Retransmission can only be achieved by Publishers that also activate retransmission. + #[zenoh_macros::unstable] + #[inline] + pub fn periodic_queries(mut self, period: Option) -> Self { + self.period = period; + self + } + + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + #[zenoh_macros::unstable] + #[inline] + pub fn history(mut self) -> Self { + self.history = true; + self + } + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + #[zenoh_macros::unstable] + #[inline] + pub fn late_joiner(mut self) -> Self { + self.liveliness = true; + self + } + + fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, Handler> { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + period: self.period, + history: self.history, + liveliness: self.liveliness, + handler: self.handler, + } + } +} + +impl Resolvable for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + AdvancedSubscriber::new(self.with_static_keys()) + } +} + +impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +#[zenoh_macros::unstable] +struct InnerState { + last_seq_num: Option, + pending_queries: u64, + pending_samples: HashMap, +} + +#[zenoh_macros::unstable] +pub struct AdvancedSubscriber { + _subscriber: Subscriber<()>, + receiver: Receiver, + _liveliness_subscriber: Option>, +} + +#[zenoh_macros::unstable] +impl std::ops::Deref for AdvancedSubscriber { + type Target = Receiver; + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +#[zenoh_macros::unstable] +impl std::ops::DerefMut for AdvancedSubscriber { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} + +#[zenoh_macros::unstable] +fn handle_sample( + states: &mut HashMap, + wait: bool, + sample: Sample, + callback: &Callback, +) -> bool { + if let (Some(source_id), Some(source_sn)) = ( + sample.source_info().source_id(), + sample.source_info().source_sn(), + ) { + let entry = states.entry(*source_id); + let new = matches!(&entry, Entry::Occupied(_)); + let state = entry.or_insert(InnerState { + last_seq_num: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + if wait { + state.pending_samples.insert(source_sn, sample); + } else if state.last_seq_num.is_some() && source_sn != state.last_seq_num.unwrap() + 1 { + if source_sn > state.last_seq_num.unwrap() { + state.pending_samples.insert(source_sn, sample); + } + } else { + callback.call(sample); + let mut last_seq_num = source_sn; + state.last_seq_num = Some(last_seq_num); + while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { + callback.call(s); + last_seq_num += 1; + state.last_seq_num = Some(last_seq_num); + } + } + new + } else { + callback.call(sample); + true + } +} + +#[zenoh_macros::unstable] +fn seq_num_range(start: Option, end: Option) -> String { + match (start, end) { + (Some(start), Some(end)) => format!("_sn={}..{}", start, end), + (Some(start), None) => format!("_sn={}..", start), + (None, Some(end)) => format!("_sn=..{}", end), + (None, None) => "_sn=..".to_string(), + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct PeriodicQuery { + source_id: Option, + statesref: Arc, bool)>>, + key_expr: KeyExpr<'static>, + session: Session, + query_target: QueryTarget, + query_timeout: Duration, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl PeriodicQuery { + fn with_source_id(mut self, source_id: EntityGlobalId) -> Self { + self.source_id = Some(source_id); + self + } +} + +#[zenoh_macros::unstable] +#[async_trait] +impl Timed for PeriodicQuery { + async fn run(&mut self) { + let mut lock = zlock!(self.statesref); + let (states, _wait) = &mut *lock; + if let Some(source_id) = &self.source_id { + if let Some(state) = states.get_mut(source_id) { + state.pending_queries += 1; + let query_expr = KE_PREFIX + / &source_id.zid().into_keyexpr() + / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() + / &self.key_expr; + let seq_num_range = seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); + drop(lock); + let handler = RepliesHandler { + source_id: *source_id, + statesref: self.statesref.clone(), + callback: self.callback.clone(), + }; + let _ = self + .session + .get(Selector::from((query_expr, seq_num_range))) + .callback({ + let key_expr = self.key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let (ref mut states, wait) = &mut *zlock!(handler.statesref); + handle_sample(states, *wait, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(self.query_target) + .timeout(self.query_timeout) + .wait(); + } + } + } +} + +#[zenoh_macros::unstable] +impl AdvancedSubscriber { + fn new(conf: AdvancedSubscriberBuilder<'_, '_, H>) -> ZResult + where + H: IntoHandler + Send, + { + let statesref = Arc::new(Mutex::new((HashMap::new(), conf.history))); + let (callback, receiver) = conf.handler.into_handler(); + let key_expr = conf.key_expr?; + let retransmission = conf.retransmission; + let query_target = conf.query_target; + let query_timeout = conf.query_timeout; + let session = conf.session.clone(); + let periodic_query = conf.period.map(|period| { + ( + Arc::new(Timer::new(false)), + period, + PeriodicQuery { + source_id: None, + statesref: statesref.clone(), + key_expr: key_expr.clone().into_owned(), + session, + query_target, + query_timeout, + callback: callback.clone(), + }, + ) + }); + + let sub_callback = { + let statesref = statesref.clone(); + let session = conf.session.clone(); + let callback = callback.clone(); + let key_expr = key_expr.clone().into_owned(); + let periodic_query = periodic_query.clone(); + + move |s: Sample| { + let mut lock = zlock!(statesref); + let (states, wait) = &mut *lock; + let source_id = s.source_info().source_id().cloned(); + let new = handle_sample(states, *wait, s, &callback); + + if let Some(source_id) = source_id { + if new { + if let Some((timer, period, query)) = periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(source_id), + )) + } + } + + if let Some(state) = states.get_mut(&source_id) { + if retransmission + && state.pending_queries == 0 + && !state.pending_samples.is_empty() + { + state.pending_queries += 1; + let query_expr = KE_PREFIX + / &source_id.zid().into_keyexpr() + / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() + / &key_expr; + let seq_num_range = + seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); + drop(lock); + let handler = RepliesHandler { + source_id, + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((query_expr, seq_num_range))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let (ref mut states, wait) = + &mut *zlock!(handler.statesref); + handle_sample(states, *wait, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + } + } + } + }; + + let subscriber = conf + .session + .declare_subscriber(&key_expr) + .callback(sub_callback) + .allowed_origin(conf.origin) + .wait()?; + + if conf.history { + let handler = InitialRepliesHandler { + statesref: statesref.clone(), + callback: callback.clone(), + periodic_query, + }; + let _ = conf + .session + .get(Selector::from(( + KE_PREFIX / KE_STAR / KE_STAR / &key_expr, + "0..", + ))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let (ref mut states, wait) = &mut *zlock!(handler.statesref); + handle_sample(states, *wait, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + + let liveliness_subscriber = if conf.history && conf.liveliness { + let live_callback = { + let session = conf.session.clone(); + let statesref = statesref.clone(); + let key_expr = key_expr.clone().into_owned(); + move |s: Sample| { + if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { + if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { + if let Ok(eid) = EntityId::from_str(parsed.eid().as_str()) { + let handler = RepliesHandler { + source_id: EntityGlobalId::new(zid, eid), + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let (ref mut states, wait) = + &mut *zlock!(handler.statesref); + handle_sample( + states, + *wait, + s, + &handler.callback, + ); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + } + } else { + tracing::warn!( + "Received malformed liveliness token key expression: {}", + s.key_expr() + ); + } + } + }; + + Some( + conf + .session + .liveliness() + .declare_subscriber(KE_PREFIX / KE_STAR / KE_STAR / &key_expr) + // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap()) + .callback(live_callback) + .wait()?, + ) + } else { + None + }; + + let reliable_subscriber = AdvancedSubscriber { + _subscriber: subscriber, + receiver, + _liveliness_subscriber: liveliness_subscriber, + }; + + Ok(reliable_subscriber) + } + + /// Close this AdvancedSubscriber + #[inline] + pub fn close(self) -> impl Resolve> { + self._subscriber.undeclare() + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct InitialRepliesHandler { + statesref: Arc, bool)>>, + periodic_query: Option<(Arc, Duration, PeriodicQuery)>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for InitialRepliesHandler { + fn drop(&mut self) { + let (states, wait) = &mut *zlock!(self.statesref); + for (source_id, state) in states.iter_mut() { + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + state.last_seq_num = Some(seq_num); + self.callback.call(sample); + } + if let Some((timer, period, query)) = self.periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(*source_id), + )) + } + } + *wait = false; + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct RepliesHandler { + source_id: EntityGlobalId, + statesref: Arc, bool)>>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for RepliesHandler { + fn drop(&mut self) { + let (states, wait) = &mut *zlock!(self.statesref); + if let Some(state) = states.get_mut(&self.source_id) { + state.pending_queries = state.pending_queries.saturating_sub(1); + if !state.pending_samples.is_empty() && !*wait { + tracing::error!("Sample missed: unable to retrieve some missing samples."); + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + state.last_seq_num = Some(seq_num); + self.callback.call(sample); + } + } + } + } +} diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 4bab50804e..3a1046fd87 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -12,10 +12,18 @@ // ZettaScale Zenoh Team, // #[cfg(feature = "unstable")] +mod advanced_cache; +#[cfg(feature = "unstable")] +mod advanced_publisher; +#[cfg(feature = "unstable")] +mod advanced_subscriber; +#[cfg(feature = "unstable")] pub mod group; #[cfg(feature = "unstable")] mod publication_cache; #[cfg(feature = "unstable")] +mod publisher_ext; +#[cfg(feature = "unstable")] mod querying_subscriber; mod serialization; #[cfg(feature = "unstable")] @@ -31,11 +39,15 @@ pub use crate::serialization::{ }; #[cfg(feature = "unstable")] pub use crate::{ + advanced_cache::{AdvancedCache, AdvancedCacheBuilder}, + advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, Sequencing}, + advanced_subscriber::{AdvancedSubscriber, AdvancedSubscriberBuilder}, publication_cache::{PublicationCache, PublicationCacheBuilder}, + publisher_ext::PublisherBuilderExt, querying_subscriber::{ ExtractSample, FetchingSubscriber, FetchingSubscriberBuilder, KeySpace, LivelinessSpace, QueryingSubscriberBuilder, UserSpace, }, session_ext::SessionExt, - subscriber_ext::{SubscriberBuilderExt, SubscriberForward}, + subscriber_ext::{DataSubscriberBuilderExt, SubscriberBuilderExt, SubscriberForward}, }; diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs new file mode 100644 index 0000000000..b13cf29b68 --- /dev/null +++ b/zenoh-ext/src/publisher_ext.rs @@ -0,0 +1,46 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::pubsub::PublisherBuilder; + +use crate::AdvancedPublisherBuilder; + +/// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) +#[zenoh_macros::unstable] +pub trait PublisherBuilderExt<'a, 'b> { + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + fn history(self, history: usize) -> AdvancedPublisherBuilder<'a, 'b>; + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + fn late_joiner(self) -> AdvancedPublisherBuilder<'a, 'b>; +} + +impl<'a, 'b> PublisherBuilderExt<'a, 'b> for PublisherBuilder<'a, 'b> { + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + fn history(self, history: usize) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder::new(self.session, self.key_expr).history(history) + } + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + fn late_joiner(self) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder::new(self.session, self.key_expr).late_joiner() + } +} diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 9d3c430aaf..8c24ccc9e2 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -15,6 +15,7 @@ use zenoh::{key_expr::KeyExpr, session::Session, Error}; use super::PublicationCacheBuilder; +use crate::advanced_cache::AdvancedCacheBuilder; /// Some extensions to the [`zenoh::Session`](zenoh::Session) #[zenoh_macros::unstable] @@ -44,6 +45,14 @@ pub trait SessionExt { where TryIntoKeyExpr: TryInto>, >>::Error: Into; + + fn declare_advanced_cache<'a, 'b, 'c, TryIntoKeyExpr>( + &'a self, + pub_key_expr: TryIntoKeyExpr, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; } impl SessionExt for Session { @@ -57,4 +66,15 @@ impl SessionExt for Session { { PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) } + + fn declare_advanced_cache<'a, 'b, 'c, TryIntoKeyExpr>( + &'a self, + pub_key_expr: TryIntoKeyExpr, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + AdvancedCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) + } } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 8d931726c3..893f23325c 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -24,7 +24,8 @@ use zenoh::{ }; use crate::{ - querying_subscriber::QueryingSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, + querying_subscriber::QueryingSubscriberBuilder, AdvancedSubscriberBuilder, ExtractSample, + FetchingSubscriberBuilder, }; /// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)` @@ -123,6 +124,25 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>; } +/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder) +pub trait DataSubscriberBuilderExt<'a, 'b, Handler> { + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + fn retransmission(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + fn late_joiner(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; +} + impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> { type KeySpace = crate::UserSpace; @@ -227,6 +247,35 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde } } +impl<'a, 'b, Handler> DataSubscriberBuilderExt<'a, 'b, Handler> + for SubscriberBuilder<'a, 'b, Handler> +{ + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .history() + } + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + fn retransmission(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .retransmission() + } + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + fn late_joiner(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .late_joiner() + } +} + impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for LivelinessSubscriberBuilder<'a, 'b, Handler> { diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index ac6565cd27..b9580df03e 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -283,8 +283,16 @@ impl IntoFuture for PublicationBuilder, PublicationBuil #[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] #[derive(Debug)] pub struct PublisherBuilder<'a, 'b> { + #[cfg(feature = "internal")] + pub session: &'a Session, + #[cfg(not(feature = "internal"))] pub(crate) session: &'a Session, + + #[cfg(feature = "internal")] + pub key_expr: ZResult>, + #[cfg(not(feature = "internal"))] pub(crate) key_expr: ZResult>, + pub(crate) encoding: Encoding, pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, From 9f076a7e83e855070b3c2eda18e9f9bd28fc1837 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 8 Nov 2024 16:49:53 +0100 Subject: [PATCH 04/24] Fix doctests --- zenoh-ext/src/advanced_publisher.rs | 54 ++++------------------------- 1 file changed, 6 insertions(+), 48 deletions(-) diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 1e6bd77d3f..28d8f9df14 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -192,9 +192,8 @@ impl<'a> AdvancedPublisher<'a> { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression") /// .await /// .unwrap(); @@ -228,51 +227,14 @@ impl<'a> AdvancedPublisher<'a> { self.publisher.priority() } - /// Consumes the given `Publisher`, returning a thread-safe reference-counting - /// pointer to it (`Arc`). This is equivalent to `Arc::new(Publisher)`. - /// - /// This is useful to share ownership of the `Publisher` between several threads - /// and tasks. It also allows to create [`MatchingListener`] with static - /// lifetime that can be moved to several threads and tasks. - /// - /// Note: the given zenoh `Publisher` will be undeclared when the last reference to - /// it is dropped. - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); - /// let matching_listener = publisher.matching_listener().await.unwrap(); - /// - /// tokio::task::spawn(async move { - /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// } - /// }).await; - /// # } - /// ``` - #[zenoh_macros::unstable] - pub fn into_arc(self) -> std::sync::Arc { - std::sync::Arc::new(self) - } - /// Put data. /// /// # Examples /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.put("value").await.unwrap(); /// # } @@ -300,9 +262,8 @@ impl<'a> AdvancedPublisher<'a> { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.delete().await.unwrap(); /// # } @@ -327,9 +288,8 @@ impl<'a> AdvancedPublisher<'a> { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_subscribers: bool = publisher /// .matching_status() @@ -352,9 +312,8 @@ impl<'a> AdvancedPublisher<'a> { /// ```no_run /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher.matching_listener().await.unwrap(); /// while let Ok(matching_status) = matching_listener.recv_async().await { @@ -379,9 +338,8 @@ impl<'a> AdvancedPublisher<'a> { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.undeclare().await.unwrap(); /// # } From 3935974eae037925363f4c3b74cf54e049fa5629 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 8 Nov 2024 18:10:22 +0100 Subject: [PATCH 05/24] Fix doc warnings --- zenoh-ext/src/advanced_cache.rs | 4 ++-- zenoh-ext/src/advanced_publisher.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/zenoh-ext/src/advanced_cache.rs b/zenoh-ext/src/advanced_cache.rs index b96b7fe4af..8807b7fd57 100644 --- a/zenoh-ext/src/advanced_cache.rs +++ b/zenoh-ext/src/advanced_cache.rs @@ -83,7 +83,7 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { } /// Restrict the matching publications that will be cached by this [`AdvancedCache`] - /// to the ones that have the given [`Locality`](crate::prelude::Locality). + /// to the ones that have the given [`Locality`](zenoh::sample::Locality). #[inline] pub fn subscriber_allowed_origin(mut self, origin: Locality) -> Self { self.subscriber_origin = origin; @@ -91,7 +91,7 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { } /// Restrict the matching queries that will be receive by this [`AdvancedCache`]'s queryable - /// to the ones that have the given [`Locality`](crate::prelude::Locality). + /// to the ones that have the given [`Locality`](zenoh::sample::Locality). #[inline] pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { self.queryable_origin = origin; diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 28d8f9df14..e432e24b11 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -279,10 +279,10 @@ impl<'a> AdvancedPublisher<'a> { delete } - /// Return the [`MatchingStatus`] of the publisher. + /// Return the [`MatchingStatus`](zenoh::pubsub::MatchingStatus) of the publisher. /// - /// [`MatchingStatus::matching_subscribers`] will return true if there exist Subscribers - /// matching the Publisher's key expression and false otherwise. + /// [`MatchingStatus::matching_subscribers`](zenoh::pubsub::MatchingStatus::matching_subscribers) + /// will return true if there exist Subscribers matching the Publisher's key expression and false otherwise. /// /// # Examples /// ``` @@ -303,10 +303,10 @@ impl<'a> AdvancedPublisher<'a> { self.publisher.matching_status() } - /// Return a [`MatchingListener`] for this Publisher. + /// Return a [`MatchingListener`](zenoh::pubsub::MatchingStatus) for this Publisher. /// - /// The [`MatchingListener`] that will send a notification each time the [`MatchingStatus`] of - /// the Publisher changes. + /// The [`MatchingListener`](zenoh::pubsub::MatchingStatus) that will send a notification each time + /// the [`MatchingStatus`](zenoh::pubsub::MatchingStatus) of the Publisher changes. /// /// # Examples /// ```no_run From 405d76e7d3dc70ababeb51b1e53df3df18532d02 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 13:52:26 +0100 Subject: [PATCH 06/24] Remove debug trace --- zenoh-ext/src/advanced_publisher.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index e432e24b11..838167fb2b 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -244,7 +244,6 @@ impl<'a> AdvancedPublisher<'a> { where IntoZBytes: Into, { - println!("here"); let mut put = self.publisher.put(payload); if let Some(seqnum) = &self.seqnum { println!("there"); From 71b22d37ca4302613dd1cd5c8b2186b805ded43f Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 14:02:25 +0100 Subject: [PATCH 07/24] Add history test --- zenoh-ext/tests/advanced.rs | 96 +++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 zenoh-ext/tests/advanced.rs diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs new file mode 100644 index 0000000000..90a1aa32ab --- /dev/null +++ b/zenoh-ext/tests/advanced.rs @@ -0,0 +1,96 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::sample::SampleKind; +use zenoh_config::{EndPoint, WhatAmI}; +use zenoh_ext::{DataSubscriberBuilderExt, PublisherBuilderExt}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_history() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const PEER1_ENDPOINT: &str = "tcp/localhost:47450"; + + const ADVANCED_HISTORY_KEYEXPR: &str = "test/advanced/history"; + + zenoh_util::init_log_from_env_or("debug"); + + let peer1 = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (1) ZID: {}", s.zid()); + s + }; + + let publ = ztimeout!(peer1.declare_publisher(ADVANCED_HISTORY_KEYEXPR).history(3)).unwrap(); + ztimeout!(publ.put("1")).unwrap(); + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let peer2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer2.declare_subscriber(ADVANCED_HISTORY_KEYEXPR).history()).unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("5")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "5"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); +} From af1b2a2e8c20f997ba5453130ec6984a3eee5279 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:02:36 +0100 Subject: [PATCH 08/24] Fix periodic queries --- zenoh-ext/src/advanced_subscriber.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 92084181cd..e44007c1da 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -299,7 +299,7 @@ fn handle_sample( sample.source_info().source_sn(), ) { let entry = states.entry(*source_id); - let new = matches!(&entry, Entry::Occupied(_)); + let new = matches!(&entry, Entry::Vacant(_)); let state = entry.or_insert(InnerState { last_seq_num: None, pending_queries: 0, From bd3235624ea615dae852e230b35039e375508d10 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:02:50 +0100 Subject: [PATCH 09/24] Remove debug trace --- zenoh-ext/src/advanced_publisher.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 838167fb2b..bb0a701957 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -246,7 +246,6 @@ impl<'a> AdvancedPublisher<'a> { { let mut put = self.publisher.put(payload); if let Some(seqnum) = &self.seqnum { - println!("there"); put = put.source_info(SourceInfo::new( Some(self.publisher.id()), Some(seqnum.fetch_add(1, Ordering::Relaxed)), From 4e4bbb663d682243a66286c888643c1edfc3bbc1 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:03:36 +0100 Subject: [PATCH 10/24] Lower test debug level --- zenoh-ext/tests/advanced.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 90a1aa32ab..f9f85f4b55 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -28,7 +28,7 @@ async fn test_advanced_history() { const ADVANCED_HISTORY_KEYEXPR: &str = "test/advanced/history"; - zenoh_util::init_log_from_env_or("debug"); + zenoh_util::init_log_from_env_or("error"); let peer1 = { let mut c = zenoh::Config::default(); From f36c890f4ec7cf19351dce5149a2fb399f85fcab Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:05:53 +0100 Subject: [PATCH 11/24] Add retransmission tests --- zenoh-ext/tests/advanced.rs | 252 ++++++++++++++++++++++++++++++++++++ 1 file changed, 252 insertions(+) diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f9f85f4b55..4c945baa06 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -94,3 +94,255 @@ async fn test_advanced_history() { peer1.close().await.unwrap(); peer2.close().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(5); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47451"; + + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) + .retransmission()) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) + .history(10) + .retransmission()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + ztimeout!(publ.put("5")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "5"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission_periodic() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(8); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47452"; + + const ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR: &str = "test/advanced/retransmission/periodic"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) + .retransmission() + .periodic_queries(Some(Duration::from_secs(1)))) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) + .history(10) + .retransmission()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +} From 23f145da22cd86f1815df97b001054c84c9bc871 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:24:04 +0100 Subject: [PATCH 12/24] Liveliness sub callback shoud increase pending queries counter --- zenoh-ext/src/advanced_subscriber.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e44007c1da..65270bc95b 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -543,8 +543,18 @@ impl AdvancedSubscriber { if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { if let Ok(eid) = EntityId::from_str(parsed.eid().as_str()) { + let source_id = EntityGlobalId::new(zid, eid); + let (ref mut states, _wait) = &mut *zlock!(statesref); + let entry = states.entry(source_id); + let state = entry.or_insert(InnerState { + last_seq_num: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + state.pending_queries += 1; + let handler = RepliesHandler { - source_id: EntityGlobalId::new(zid, eid), + source_id, statesref: statesref.clone(), callback: callback.clone(), }; From de396a4fdcb952ec825e049a28c8930fe6538a51 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:33:14 +0100 Subject: [PATCH 13/24] Liveliness sub callback shoud spawn periodic queries when enbaled --- zenoh-ext/src/advanced_subscriber.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 65270bc95b..d5c83ada41 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -508,7 +508,7 @@ impl AdvancedSubscriber { let handler = InitialRepliesHandler { statesref: statesref.clone(), callback: callback.clone(), - periodic_query, + periodic_query: periodic_query.clone(), }; let _ = conf .session @@ -546,6 +546,7 @@ impl AdvancedSubscriber { let source_id = EntityGlobalId::new(zid, eid); let (ref mut states, _wait) = &mut *zlock!(statesref); let entry = states.entry(source_id); + let new = matches!(&entry, Entry::Vacant(_)); let state = entry.or_insert(InnerState { last_seq_num: None, pending_queries: 0, @@ -582,6 +583,15 @@ impl AdvancedSubscriber { .target(query_target) .timeout(query_timeout) .wait(); + + if new { + if let Some((timer, period, query)) = periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(source_id), + )) + } + } } } } else { From ff241356a7ec15550258058a38806e6403a968f7 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 13 Nov 2024 22:49:33 +0100 Subject: [PATCH 14/24] Add late_joiner test --- zenoh-ext/tests/advanced.rs | 104 ++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 4c945baa06..432cceff80 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -346,3 +346,107 @@ async fn test_advanced_retransmission_periodic() { router.close().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_late_joiner() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(8); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47453"; + + const ADVANCED_LATE_JOINER_KEYEXPR: &str = "test/advanced/late_joiner"; + + zenoh_util::init_log_from_env_or("error"); + + let peer1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (1) ZID: {}", s.zid()); + s + }; + + let peer2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer2 + .declare_subscriber(ADVANCED_LATE_JOINER_KEYEXPR) + .history() + .late_joiner()) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(peer1 + .declare_publisher(ADVANCED_LATE_JOINER_KEYEXPR) + .history(10) + .late_joiner()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); + + router.close().await.unwrap(); +} From 975aba4606fa52a0930bbbead8d3dfca1c3aa683 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 14 Nov 2024 11:41:08 +0100 Subject: [PATCH 15/24] Only treat pending samples when there are no more pending queries --- zenoh-ext/src/advanced_subscriber.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index d5c83ada41..777bea0ae5 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -679,7 +679,7 @@ impl Drop for RepliesHandler { let (states, wait) = &mut *zlock!(self.statesref); if let Some(state) = states.get_mut(&self.source_id) { state.pending_queries = state.pending_queries.saturating_sub(1); - if !state.pending_samples.is_empty() && !*wait { + if state.pending_queries == 0 && !state.pending_samples.is_empty() && !*wait { tracing::error!("Sample missed: unable to retrieve some missing samples."); let mut pending_samples = state .pending_samples From 5d9ac8d4a8ac6ff9f69e4e135b0887e01e481d00 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 14 Nov 2024 22:38:31 +0100 Subject: [PATCH 16/24] Apply proper sequencing for history --- zenoh-ext/src/advanced_publisher.rs | 1 + zenoh-ext/tests/advanced.rs | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index bb0a701957..38af274eb0 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -79,6 +79,7 @@ impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { /// Change the history size for each resource. pub fn history(mut self, history: usize) -> Self { self.cache = true; + self.sequencing = Sequencing::Timestamp; self.history = history; self } diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index 432cceff80..f099e2b265 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -13,7 +13,7 @@ // use zenoh::sample::SampleKind; -use zenoh_config::{EndPoint, WhatAmI}; +use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; use zenoh_ext::{DataSubscriberBuilderExt, PublisherBuilderExt}; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -37,6 +37,9 @@ async fn test_advanced_history() { .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) .unwrap(); c.scouting.multicast.set_enabled(Some(false)).unwrap(); + c.timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); let _ = c.set_mode(Some(WhatAmI::Peer)); let s = ztimeout!(zenoh::open(c)).unwrap(); tracing::info!("Peer (1) ZID: {}", s.zid()); @@ -369,6 +372,9 @@ async fn test_advanced_late_joiner() { .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) .unwrap(); c.scouting.multicast.set_enabled(Some(false)).unwrap(); + c.timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); let _ = c.set_mode(Some(WhatAmI::Peer)); let s = ztimeout!(zenoh::open(c)).unwrap(); tracing::info!("Peer (1) ZID: {}", s.zid()); From 2305b4128196ead67be15345b85c10e5d8843806 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 14 Nov 2024 22:41:28 +0100 Subject: [PATCH 17/24] Improve AdvancedSubscriber --- Cargo.lock | 1 + zenoh-ext/Cargo.toml | 1 + zenoh-ext/src/advanced_cache.rs | 2 + zenoh-ext/src/advanced_publisher.rs | 13 +- zenoh-ext/src/advanced_subscriber.rs | 327 +++++++++++++++++++-------- 5 files changed, 250 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7beb1c9a7e..00242a3cb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5602,6 +5602,7 @@ dependencies = [ "serde", "tokio", "tracing", + "uhlc", "zenoh", "zenoh-config", "zenoh-macros", diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 0b693fa820..d433aa137c 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -46,6 +46,7 @@ futures = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["default"] } leb128 = { workspace = true } +uhlc = { workspace = true } zenoh = { workspace = true, default-features = false } zenoh-macros = { workspace = true } diff --git a/zenoh-ext/src/advanced_cache.rs b/zenoh-ext/src/advanced_cache.rs index 8807b7fd57..410df20c44 100644 --- a/zenoh-ext/src/advanced_cache.rs +++ b/zenoh-ext/src/advanced_cache.rs @@ -39,6 +39,8 @@ pub(crate) static KE_STAR: &keyexpr = ke!("*"); #[zenoh_macros::unstable] pub(crate) static KE_PREFIX: &keyexpr = ke!("@cache"); #[zenoh_macros::unstable] +pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc"); +#[zenoh_macros::unstable] kedefine!( pub(crate) ke_liveliness: "@cache/${zid:*}/${eid:*}/${remaining:**}", ); diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 38af274eb0..01c2264c3b 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -29,7 +29,7 @@ use zenoh::{ }; use crate::{ - advanced_cache::{AdvancedCache, KE_PREFIX}, + advanced_cache::{AdvancedCache, KE_PREFIX, KE_UHLC}, SessionExt, }; @@ -134,9 +134,14 @@ impl<'a> AdvancedPublisher<'a> { .declare_publisher(key_expr.clone().into_owned()) .wait()?; let id = publisher.id(); - let prefix = KE_PREFIX - / &id.zid().into_keyexpr() - / &KeyExpr::try_from(id.eid().to_string()).unwrap(); + let prefix = match conf.sequencing { + Sequencing::SequenceNumber => { + KE_PREFIX + / &id.zid().into_keyexpr() + / &KeyExpr::try_from(id.eid().to_string()).unwrap() + } + _ => KE_PREFIX / &id.zid().into_keyexpr() / KE_UHLC, + }; let seqnum = match conf.sequencing { Sequencing::SequenceNumber => Some(AtomicU32::new(0)), diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 777bea0ae5..3823500927 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -18,7 +18,7 @@ use zenoh::{ handlers::{Callback, IntoHandler}, key_expr::KeyExpr, query::{ConsolidationMode, Selector}, - sample::{Locality, Sample}, + sample::{Locality, Sample, SampleKind}, session::{EntityGlobalId, EntityId}, Resolvable, Resolve, Session, Wait, }; @@ -32,14 +32,16 @@ use { std::future::Ready, std::sync::{Arc, Mutex}, std::time::Duration, + uhlc::ID, zenoh::handlers::{locked, DefaultHandler}, zenoh::internal::zlock, zenoh::pubsub::Subscriber, zenoh::query::{QueryTarget, Reply, ReplyKeyExpr}, + zenoh::time::Timestamp, zenoh::Result as ZResult, }; -use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR}; +use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR, KE_UHLC}; /// The builder of AdvancedSubscriber, allowing to configure it. #[zenoh_macros::unstable] @@ -259,12 +261,26 @@ where } #[zenoh_macros::unstable] -struct InnerState { +struct State { + global_pending_queries: u64, + sequenced_states: HashMap, + timestamped_states: HashMap, +} + +#[zenoh_macros::unstable] +struct SequencedState { last_seq_num: Option, pending_queries: u64, pending_samples: HashMap, } +#[zenoh_macros::unstable] +struct TimestampedState { + last_timestamp: Option, + pending_queries: u64, + pending_samples: HashMap, +} + #[zenoh_macros::unstable] pub struct AdvancedSubscriber { _subscriber: Subscriber<()>, @@ -288,24 +304,19 @@ impl std::ops::DerefMut for AdvancedSubscriber { } #[zenoh_macros::unstable] -fn handle_sample( - states: &mut HashMap, - wait: bool, - sample: Sample, - callback: &Callback, -) -> bool { +fn handle_sample(states: &mut State, sample: Sample, callback: &Callback) -> bool { if let (Some(source_id), Some(source_sn)) = ( sample.source_info().source_id(), sample.source_info().source_sn(), ) { - let entry = states.entry(*source_id); + let entry = states.sequenced_states.entry(*source_id); let new = matches!(&entry, Entry::Vacant(_)); - let state = entry.or_insert(InnerState { + let state = entry.or_insert(SequencedState { last_seq_num: None, pending_queries: 0, pending_samples: HashMap::new(), }); - if wait { + if states.global_pending_queries != 0 { state.pending_samples.insert(source_sn, sample); } else if state.last_seq_num.is_some() && source_sn != state.last_seq_num.unwrap() + 1 { if source_sn > state.last_seq_num.unwrap() { @@ -322,9 +333,25 @@ fn handle_sample( } } new + } else if let Some(timestamp) = sample.timestamp() { + let entry = states.timestamped_states.entry(*timestamp.get_id()); + let state = entry.or_insert(TimestampedState { + last_timestamp: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + if state.last_timestamp.map(|t| t < *timestamp).unwrap_or(true) { + if states.global_pending_queries == 0 && state.pending_queries == 0 { + state.last_timestamp = Some(*timestamp); + callback.call(sample); + } else { + state.pending_samples.entry(*timestamp).or_insert(sample); + } + } + false } else { callback.call(sample); - true + false } } @@ -342,7 +369,7 @@ fn seq_num_range(start: Option, end: Option) -> String { #[derive(Clone)] struct PeriodicQuery { source_id: Option, - statesref: Arc, bool)>>, + statesref: Arc>, key_expr: KeyExpr<'static>, session: Session, query_target: QueryTarget, @@ -363,9 +390,9 @@ impl PeriodicQuery { impl Timed for PeriodicQuery { async fn run(&mut self) { let mut lock = zlock!(self.statesref); - let (states, _wait) = &mut *lock; + let states = &mut *lock; if let Some(source_id) = &self.source_id { - if let Some(state) = states.get_mut(source_id) { + if let Some(state) = states.sequenced_states.get_mut(source_id) { state.pending_queries += 1; let query_expr = KE_PREFIX / &source_id.zid().into_keyexpr() @@ -373,7 +400,7 @@ impl Timed for PeriodicQuery { / &self.key_expr; let seq_num_range = seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); drop(lock); - let handler = RepliesHandler { + let handler = SequencedRepliesHandler { source_id: *source_id, statesref: self.statesref.clone(), callback: self.callback.clone(), @@ -386,8 +413,8 @@ impl Timed for PeriodicQuery { move |r: Reply| { if let Ok(s) = r.into_result() { if key_expr.intersects(s.key_expr()) { - let (ref mut states, wait) = &mut *zlock!(handler.statesref); - handle_sample(states, *wait, s, &handler.callback); + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); } } } @@ -408,7 +435,11 @@ impl AdvancedSubscriber { where H: IntoHandler + Send, { - let statesref = Arc::new(Mutex::new((HashMap::new(), conf.history))); + let statesref = Arc::new(Mutex::new(State { + sequenced_states: HashMap::new(), + timestamped_states: HashMap::new(), + global_pending_queries: if conf.history { 1 } else { 0 }, + })); let (callback, receiver) = conf.handler.into_handler(); let key_expr = conf.key_expr?; let retransmission = conf.retransmission; @@ -440,9 +471,9 @@ impl AdvancedSubscriber { move |s: Sample| { let mut lock = zlock!(statesref); - let (states, wait) = &mut *lock; + let states = &mut *lock; let source_id = s.source_info().source_id().cloned(); - let new = handle_sample(states, *wait, s, &callback); + let new = handle_sample(states, s, &callback); if let Some(source_id) = source_id { if new { @@ -454,7 +485,7 @@ impl AdvancedSubscriber { } } - if let Some(state) = states.get_mut(&source_id) { + if let Some(state) = states.sequenced_states.get_mut(&source_id) { if retransmission && state.pending_queries == 0 && !state.pending_samples.is_empty() @@ -467,7 +498,7 @@ impl AdvancedSubscriber { let seq_num_range = seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); drop(lock); - let handler = RepliesHandler { + let handler = SequencedRepliesHandler { source_id, statesref: statesref.clone(), callback: callback.clone(), @@ -479,9 +510,8 @@ impl AdvancedSubscriber { move |r: Reply| { if let Ok(s) = r.into_result() { if key_expr.intersects(s.key_expr()) { - let (ref mut states, wait) = - &mut *zlock!(handler.statesref); - handle_sample(states, *wait, s, &handler.callback); + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); } } } @@ -521,8 +551,8 @@ impl AdvancedSubscriber { move |r: Reply| { if let Ok(s) = r.into_result() { if key_expr.intersects(s.key_expr()) { - let (ref mut states, wait) = &mut *zlock!(handler.statesref); - handle_sample(states, *wait, s, &handler.callback); + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); } } } @@ -540,23 +570,98 @@ impl AdvancedSubscriber { let statesref = statesref.clone(); let key_expr = key_expr.clone().into_owned(); move |s: Sample| { - if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { - if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { - if let Ok(eid) = EntityId::from_str(parsed.eid().as_str()) { - let source_id = EntityGlobalId::new(zid, eid); - let (ref mut states, _wait) = &mut *zlock!(statesref); - let entry = states.entry(source_id); - let new = matches!(&entry, Entry::Vacant(_)); - let state = entry.or_insert(InnerState { - last_seq_num: None, - pending_queries: 0, - pending_samples: HashMap::new(), - }); - state.pending_queries += 1; - - let handler = RepliesHandler { - source_id, + if s.kind() == SampleKind::Put { + if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { + if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { + if parsed.eid() == KE_UHLC { + let states = &mut *zlock!(statesref); + let entry = states.timestamped_states.entry(ID::from(zid)); + let state = entry.or_insert(TimestampedState { + last_timestamp: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + state.pending_queries += 1; + + let handler = TimestampedRepliesHandler { + id: ID::from(zid), + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = + &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } else if let Ok(eid) = EntityId::from_str(parsed.eid().as_str()) { + let source_id = EntityGlobalId::new(zid, eid); + let states = &mut *zlock!(statesref); + let entry = states.sequenced_states.entry(source_id); + let new = matches!(&entry, Entry::Vacant(_)); + let state = entry.or_insert(SequencedState { + last_seq_num: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + state.pending_queries += 1; + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = + &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + + if new { + if let Some((timer, period, query)) = + periodic_query.as_ref() + { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(source_id), + )) + } + } + } + } else { + let states = &mut *zlock!(statesref); + states.global_pending_queries += 1; + + let handler = InitialRepliesHandler { statesref: statesref.clone(), + periodic_query: None, callback: callback.clone(), }; let _ = session @@ -566,14 +671,8 @@ impl AdvancedSubscriber { move |r: Reply| { if let Ok(s) = r.into_result() { if key_expr.intersects(s.key_expr()) { - let (ref mut states, wait) = - &mut *zlock!(handler.statesref); - handle_sample( - states, - *wait, - s, - &handler.callback, - ); + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); } } } @@ -583,22 +682,13 @@ impl AdvancedSubscriber { .target(query_target) .timeout(query_timeout) .wait(); - - if new { - if let Some((timer, period, query)) = periodic_query.as_ref() { - timer.add(TimedEvent::periodic( - *period, - query.clone().with_source_id(source_id), - )) - } - } } + } else { + tracing::warn!( + "Received malformed liveliness token key expression: {}", + s.key_expr() + ); } - } else { - tracing::warn!( - "Received malformed liveliness token key expression: {}", - s.key_expr() - ); } } }; @@ -635,7 +725,7 @@ impl AdvancedSubscriber { #[zenoh_macros::unstable] #[derive(Clone)] struct InitialRepliesHandler { - statesref: Arc, bool)>>, + statesref: Arc>, periodic_query: Option<(Arc, Duration, PeriodicQuery)>, callback: Callback, } @@ -643,43 +733,68 @@ struct InitialRepliesHandler { #[zenoh_macros::unstable] impl Drop for InitialRepliesHandler { fn drop(&mut self) { - let (states, wait) = &mut *zlock!(self.statesref); - for (source_id, state) in states.iter_mut() { - let mut pending_samples = state - .pending_samples - .drain() - .collect::>(); - pending_samples.sort_by_key(|(k, _s)| *k); - for (seq_num, sample) in pending_samples { - state.last_seq_num = Some(seq_num); - self.callback.call(sample); + let states = &mut *zlock!(self.statesref); + states.global_pending_queries = states.global_pending_queries.saturating_sub(1); + + if states.global_pending_queries == 0 { + for (source_id, state) in states.sequenced_states.iter_mut() { + if state.pending_queries == 0 + && !state.pending_samples.is_empty() + && states.global_pending_queries == 0 + { + tracing::error!("Sample missed: unable to retrieve some missing samples."); + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + state.last_seq_num = Some(seq_num); + self.callback.call(sample); + } + } + if let Some((timer, period, query)) = self.periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(*source_id), + )) + } } - if let Some((timer, period, query)) = self.periodic_query.as_ref() { - timer.add(TimedEvent::periodic( - *period, - query.clone().with_source_id(*source_id), - )) + for state in states.timestamped_states.values_mut() { + if state.pending_queries == 0 && !state.pending_samples.is_empty() { + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (timestamp, sample) in pending_samples { + state.last_timestamp = Some(timestamp); + self.callback.call(sample); + } + } } } - *wait = false; } } #[zenoh_macros::unstable] #[derive(Clone)] -struct RepliesHandler { +struct SequencedRepliesHandler { source_id: EntityGlobalId, - statesref: Arc, bool)>>, + statesref: Arc>, callback: Callback, } #[zenoh_macros::unstable] -impl Drop for RepliesHandler { +impl Drop for SequencedRepliesHandler { fn drop(&mut self) { - let (states, wait) = &mut *zlock!(self.statesref); - if let Some(state) = states.get_mut(&self.source_id) { + let states = &mut *zlock!(self.statesref); + if let Some(state) = states.sequenced_states.get_mut(&self.source_id) { state.pending_queries = state.pending_queries.saturating_sub(1); - if state.pending_queries == 0 && !state.pending_samples.is_empty() && !*wait { + if state.pending_queries == 0 + && !state.pending_samples.is_empty() + && states.global_pending_queries == 0 + { tracing::error!("Sample missed: unable to retrieve some missing samples."); let mut pending_samples = state .pending_samples @@ -694,3 +809,35 @@ impl Drop for RepliesHandler { } } } + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct TimestampedRepliesHandler { + id: ID, + statesref: Arc>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for TimestampedRepliesHandler { + fn drop(&mut self) { + let states = &mut *zlock!(self.statesref); + if let Some(state) = states.timestamped_states.get_mut(&self.id) { + state.pending_queries = state.pending_queries.saturating_sub(1); + if state.pending_queries == 0 + && !state.pending_samples.is_empty() + && states.global_pending_queries == 0 + { + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (timestamp, sample) in pending_samples { + state.last_timestamp = Some(timestamp); + self.callback.call(sample); + } + } + } + } +} From 883885bee16adfa0dbb4e874e26d0f2b85a3c236 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 14 Nov 2024 22:55:45 +0100 Subject: [PATCH 18/24] Code reorg --- zenoh-ext/src/advanced_subscriber.rs | 57 ++++++++++++---------------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 3823500927..9936e85348 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -263,22 +263,15 @@ where #[zenoh_macros::unstable] struct State { global_pending_queries: u64, - sequenced_states: HashMap, - timestamped_states: HashMap, + sequenced_states: HashMap>, + timestamped_states: HashMap>, } #[zenoh_macros::unstable] -struct SequencedState { - last_seq_num: Option, +struct SourceState { + last_delivered: Option, pending_queries: u64, - pending_samples: HashMap, -} - -#[zenoh_macros::unstable] -struct TimestampedState { - last_timestamp: Option, - pending_queries: u64, - pending_samples: HashMap, + pending_samples: HashMap, } #[zenoh_macros::unstable] @@ -311,38 +304,38 @@ fn handle_sample(states: &mut State, sample: Sample, callback: &Callback ) { let entry = states.sequenced_states.entry(*source_id); let new = matches!(&entry, Entry::Vacant(_)); - let state = entry.or_insert(SequencedState { - last_seq_num: None, + let state = entry.or_insert(SourceState:: { + last_delivered: None, pending_queries: 0, pending_samples: HashMap::new(), }); if states.global_pending_queries != 0 { state.pending_samples.insert(source_sn, sample); - } else if state.last_seq_num.is_some() && source_sn != state.last_seq_num.unwrap() + 1 { - if source_sn > state.last_seq_num.unwrap() { + } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 { + if source_sn > state.last_delivered.unwrap() { state.pending_samples.insert(source_sn, sample); } } else { callback.call(sample); let mut last_seq_num = source_sn; - state.last_seq_num = Some(last_seq_num); + state.last_delivered = Some(last_seq_num); while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { callback.call(s); last_seq_num += 1; - state.last_seq_num = Some(last_seq_num); + state.last_delivered = Some(last_seq_num); } } new } else if let Some(timestamp) = sample.timestamp() { let entry = states.timestamped_states.entry(*timestamp.get_id()); - let state = entry.or_insert(TimestampedState { - last_timestamp: None, + let state = entry.or_insert(SourceState:: { + last_delivered: None, pending_queries: 0, pending_samples: HashMap::new(), }); - if state.last_timestamp.map(|t| t < *timestamp).unwrap_or(true) { + if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) { if states.global_pending_queries == 0 && state.pending_queries == 0 { - state.last_timestamp = Some(*timestamp); + state.last_delivered = Some(*timestamp); callback.call(sample); } else { state.pending_samples.entry(*timestamp).or_insert(sample); @@ -398,7 +391,7 @@ impl Timed for PeriodicQuery { / &source_id.zid().into_keyexpr() / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() / &self.key_expr; - let seq_num_range = seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); + let seq_num_range = seq_num_range(Some(state.last_delivered.unwrap() + 1), None); drop(lock); let handler = SequencedRepliesHandler { source_id: *source_id, @@ -496,7 +489,7 @@ impl AdvancedSubscriber { / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() / &key_expr; let seq_num_range = - seq_num_range(Some(state.last_seq_num.unwrap() + 1), None); + seq_num_range(Some(state.last_delivered.unwrap() + 1), None); drop(lock); let handler = SequencedRepliesHandler { source_id, @@ -576,8 +569,8 @@ impl AdvancedSubscriber { if parsed.eid() == KE_UHLC { let states = &mut *zlock!(statesref); let entry = states.timestamped_states.entry(ID::from(zid)); - let state = entry.or_insert(TimestampedState { - last_timestamp: None, + let state = entry.or_insert(SourceState:: { + last_delivered: None, pending_queries: 0, pending_samples: HashMap::new(), }); @@ -612,8 +605,8 @@ impl AdvancedSubscriber { let states = &mut *zlock!(statesref); let entry = states.sequenced_states.entry(source_id); let new = matches!(&entry, Entry::Vacant(_)); - let state = entry.or_insert(SequencedState { - last_seq_num: None, + let state = entry.or_insert(SourceState:: { + last_delivered: None, pending_queries: 0, pending_samples: HashMap::new(), }); @@ -749,7 +742,7 @@ impl Drop for InitialRepliesHandler { .collect::>(); pending_samples.sort_by_key(|(k, _s)| *k); for (seq_num, sample) in pending_samples { - state.last_seq_num = Some(seq_num); + state.last_delivered = Some(seq_num); self.callback.call(sample); } } @@ -768,7 +761,7 @@ impl Drop for InitialRepliesHandler { .collect::>(); pending_samples.sort_by_key(|(k, _s)| *k); for (timestamp, sample) in pending_samples { - state.last_timestamp = Some(timestamp); + state.last_delivered = Some(timestamp); self.callback.call(sample); } } @@ -802,7 +795,7 @@ impl Drop for SequencedRepliesHandler { .collect::>(); pending_samples.sort_by_key(|(k, _s)| *k); for (seq_num, sample) in pending_samples { - state.last_seq_num = Some(seq_num); + state.last_delivered = Some(seq_num); self.callback.call(sample); } } @@ -834,7 +827,7 @@ impl Drop for TimestampedRepliesHandler { .collect::>(); pending_samples.sort_by_key(|(k, _s)| *k); for (timestamp, sample) in pending_samples { - state.last_timestamp = Some(timestamp); + state.last_delivered = Some(timestamp); self.callback.call(sample); } } From 320170010e17f770027c50bf79e164997377e45e Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 14 Nov 2024 23:13:55 +0100 Subject: [PATCH 19/24] Code reorg --- zenoh-ext/src/advanced_subscriber.rs | 94 ++++++++++++---------------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 9936e85348..915a58b138 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -715,6 +715,41 @@ impl AdvancedSubscriber { } } +#[zenoh_macros::unstable] +#[inline] +fn flush_sequenced_source(state: &mut SourceState, callback: &Callback) { + if state.pending_queries == 0 && !state.pending_samples.is_empty() { + if state.last_delivered.is_some() { + tracing::error!("Sample missed: unable to retrieve some missing samples."); + } + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + state.last_delivered = Some(seq_num); + callback.call(sample); + } + } +} + +#[zenoh_macros::unstable] +#[inline] +fn flush_timestamped_source(state: &mut SourceState, callback: &Callback) { + if state.pending_queries == 0 && !state.pending_samples.is_empty() { + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + state.last_delivered = Some(seq_num); + callback.call(sample); + } + } +} + #[zenoh_macros::unstable] #[derive(Clone)] struct InitialRepliesHandler { @@ -731,21 +766,7 @@ impl Drop for InitialRepliesHandler { if states.global_pending_queries == 0 { for (source_id, state) in states.sequenced_states.iter_mut() { - if state.pending_queries == 0 - && !state.pending_samples.is_empty() - && states.global_pending_queries == 0 - { - tracing::error!("Sample missed: unable to retrieve some missing samples."); - let mut pending_samples = state - .pending_samples - .drain() - .collect::>(); - pending_samples.sort_by_key(|(k, _s)| *k); - for (seq_num, sample) in pending_samples { - state.last_delivered = Some(seq_num); - self.callback.call(sample); - } - } + flush_sequenced_source(state, &self.callback); if let Some((timer, period, query)) = self.periodic_query.as_ref() { timer.add(TimedEvent::periodic( *period, @@ -754,17 +775,7 @@ impl Drop for InitialRepliesHandler { } } for state in states.timestamped_states.values_mut() { - if state.pending_queries == 0 && !state.pending_samples.is_empty() { - let mut pending_samples = state - .pending_samples - .drain() - .collect::>(); - pending_samples.sort_by_key(|(k, _s)| *k); - for (timestamp, sample) in pending_samples { - state.last_delivered = Some(timestamp); - self.callback.call(sample); - } - } + flush_timestamped_source(state, &self.callback); } } } @@ -784,20 +795,8 @@ impl Drop for SequencedRepliesHandler { let states = &mut *zlock!(self.statesref); if let Some(state) = states.sequenced_states.get_mut(&self.source_id) { state.pending_queries = state.pending_queries.saturating_sub(1); - if state.pending_queries == 0 - && !state.pending_samples.is_empty() - && states.global_pending_queries == 0 - { - tracing::error!("Sample missed: unable to retrieve some missing samples."); - let mut pending_samples = state - .pending_samples - .drain() - .collect::>(); - pending_samples.sort_by_key(|(k, _s)| *k); - for (seq_num, sample) in pending_samples { - state.last_delivered = Some(seq_num); - self.callback.call(sample); - } + if states.global_pending_queries == 0 { + flush_sequenced_source(state, &self.callback) } } } @@ -817,19 +816,8 @@ impl Drop for TimestampedRepliesHandler { let states = &mut *zlock!(self.statesref); if let Some(state) = states.timestamped_states.get_mut(&self.id) { state.pending_queries = state.pending_queries.saturating_sub(1); - if state.pending_queries == 0 - && !state.pending_samples.is_empty() - && states.global_pending_queries == 0 - { - let mut pending_samples = state - .pending_samples - .drain() - .collect::>(); - pending_samples.sort_by_key(|(k, _s)| *k); - for (timestamp, sample) in pending_samples { - state.last_delivered = Some(timestamp); - self.callback.call(sample); - } + if states.global_pending_queries == 0 { + flush_timestamped_source(state, &self.callback); } } } From ded789cc8b0e6349163023580fca53139befc712 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 15 Nov 2024 00:35:58 +0100 Subject: [PATCH 20/24] Fix deduplication --- zenoh-ext/src/advanced_subscriber.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 915a58b138..dd715dd8e0 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -728,8 +728,14 @@ fn flush_sequenced_source(state: &mut SourceState, callback: &Callback>(); pending_samples.sort_by_key(|(k, _s)| *k); for (seq_num, sample) in pending_samples { - state.last_delivered = Some(seq_num); - callback.call(sample); + if state + .last_delivered + .map(|last| seq_num > last) + .unwrap_or(true) + { + state.last_delivered = Some(seq_num); + callback.call(sample); + } } } } @@ -743,9 +749,15 @@ fn flush_timestamped_source(state: &mut SourceState, callback: &Callb .drain() .collect::>(); pending_samples.sort_by_key(|(k, _s)| *k); - for (seq_num, sample) in pending_samples { - state.last_delivered = Some(seq_num); - callback.call(sample); + for (timestamp, sample) in pending_samples { + if state + .last_delivered + .map(|last| timestamp > last) + .unwrap_or(true) + { + state.last_delivered = Some(timestamp); + callback.call(sample); + } } } } From e03355377a3f07c8cfbc4284434f09dc94d7ec1e Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 15 Nov 2024 10:33:28 +0100 Subject: [PATCH 21/24] Subscribe to liveliness tokens with history --- zenoh-ext/src/advanced_subscriber.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index dd715dd8e0..9c78c8b973 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -566,6 +566,8 @@ impl AdvancedSubscriber { if s.kind() == SampleKind::Put { if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { + // TODO : If we already have a state associated to this discovered source + // we should query with the appropriate range to avoid unnecessary retransmissions if parsed.eid() == KE_UHLC { let states = &mut *zlock!(statesref); let entry = states.timestamped_states.entry(ID::from(zid)); @@ -692,6 +694,7 @@ impl AdvancedSubscriber { .liveliness() .declare_subscriber(KE_PREFIX / KE_STAR / KE_STAR / &key_expr) // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap()) + .history(true) .callback(live_callback) .wait()?, ) From acaf341717ac7d5c17dc9ac23c5c26164894a827 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 15 Nov 2024 11:20:31 +0100 Subject: [PATCH 22/24] Update builders --- zenoh-ext/src/advanced_cache.rs | 56 +++++++++++++------ zenoh-ext/src/advanced_publisher.rs | 34 ++++-------- zenoh-ext/src/advanced_subscriber.rs | 82 +++++++++++++++------------- zenoh-ext/src/lib.rs | 4 +- zenoh-ext/src/publisher_ext.rs | 6 +- zenoh-ext/src/subscriber_ext.rs | 14 +++-- zenoh-ext/tests/advanced.rs | 20 ++++--- 7 files changed, 122 insertions(+), 94 deletions(-) diff --git a/zenoh-ext/src/advanced_cache.rs b/zenoh-ext/src/advanced_cache.rs index 410df20c44..af7751b43b 100644 --- a/zenoh-ext/src/advanced_cache.rs +++ b/zenoh-ext/src/advanced_cache.rs @@ -45,6 +45,38 @@ kedefine!( pub(crate) ke_liveliness: "@cache/${zid:*}/${eid:*}/${remaining:**}", ); +#[derive(Debug, Clone)] +/// Configure the history size of an [`AdvancedCache`]. +pub struct HistoryConf { + sample_depth: usize, + resources_limit: Option, +} + +impl Default for HistoryConf { + fn default() -> Self { + Self { + sample_depth: 1, + resources_limit: None, + } + } +} + +impl HistoryConf { + /// Specify how many samples to keep for each resource. + pub fn sample_depth(mut self, depth: usize) -> Self { + self.sample_depth = depth; + self + } + + // TODO pub fn time_depth(mut self, depth: Duration) -> Self + + /// Specify the maximum total number of samples to keep. + pub fn resources_limit(mut self, limit: usize) -> Self { + self.resources_limit = Some(limit); + self + } +} + /// The builder of AdvancedCache, allowing to configure it. pub struct AdvancedCacheBuilder<'a, 'b, 'c> { session: &'a Session, @@ -52,9 +84,8 @@ pub struct AdvancedCacheBuilder<'a, 'b, 'c> { queryable_prefix: Option>>, subscriber_origin: Locality, queryable_origin: Locality, - history: usize, + history: HistoryConf, liveliness: bool, - resources_limit: Option, } impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { @@ -68,9 +99,8 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { queryable_prefix: Some(Ok((KE_PREFIX / KE_STAR / KE_STAR).into())), subscriber_origin: Locality::default(), queryable_origin: Locality::default(), - history: 1024, + history: HistoryConf::default(), liveliness: false, - resources_limit: None, } } @@ -101,17 +131,11 @@ impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { } /// Change the history size for each resource. - pub fn history(mut self, history: usize) -> Self { + pub fn history(mut self, history: HistoryConf) -> Self { self.history = history; self } - /// Change the limit number of cached resources. - pub fn resources_limit(mut self, limit: usize) -> Self { - self.resources_limit = Some(limit); - self - } - pub fn liveliness(mut self, enabled: bool) -> Self { self.liveliness = enabled; self @@ -180,10 +204,9 @@ impl AdvancedCache { Some(Err(e)) => bail!("Invalid key expression for queryable_prefix: {}", e), }; tracing::debug!( - "Create AdvancedCache on {} with history={} resource_limit={:?}", + "Create AdvancedCache on {} with history={:?}", &key_expr, conf.history, - conf.resources_limit ); // declare the local subscriber that will store the local publications @@ -204,14 +227,13 @@ impl AdvancedCache { let sub_recv = sub.handler().clone(); let quer_recv = queryable.handler().clone(); let pub_key_expr = key_expr.into_owned(); - let resources_limit = conf.resources_limit; let history = conf.history; let (stoptx, stoprx) = bounded::(1); task::spawn(async move { let mut cache: HashMap> = - HashMap::with_capacity(resources_limit.unwrap_or(32)); - let limit = resources_limit.unwrap_or(usize::MAX); + HashMap::with_capacity(history.resources_limit.unwrap_or(32)); + let limit = history.resources_limit.unwrap_or(usize::MAX); loop { select!( @@ -225,7 +247,7 @@ impl AdvancedCache { }; if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { - if queue.len() >= history { + if queue.len() >= history.sample_depth { queue.pop_front(); } queue.push_back(sample); diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 01c2264c3b..912b2679f2 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -29,7 +29,7 @@ use zenoh::{ }; use crate::{ - advanced_cache::{AdvancedCache, KE_PREFIX, KE_UHLC}, + advanced_cache::{AdvancedCache, HistoryConf, KE_PREFIX, KE_UHLC}, SessionExt, }; @@ -47,8 +47,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b> { sequencing: Sequencing, liveliness: bool, cache: bool, - history: usize, - resources_limit: Option, + history: HistoryConf, } impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { @@ -62,8 +61,7 @@ impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { sequencing: Sequencing::None, liveliness: false, cache: false, - history: 1, - resources_limit: None, + history: HistoryConf::default(), } } @@ -77,19 +75,13 @@ impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { } /// Change the history size for each resource. - pub fn history(mut self, history: usize) -> Self { + pub fn history(mut self, history: HistoryConf) -> Self { self.cache = true; self.sequencing = Sequencing::Timestamp; self.history = history; self } - /// Change the limit number of cached resources. - pub fn resources_limit(mut self, limit: usize) -> Self { - self.resources_limit = Some(limit); - self - } - /// Allow this publisher to be detected by subscribers. /// /// This allows Subscribers to retrieve the local history. @@ -159,16 +151,14 @@ impl<'a> AdvancedPublisher<'a> { }; let cache = if conf.cache { - let mut builder = conf - .session - .declare_advanced_cache(key_expr.clone().into_owned()) - .subscriber_allowed_origin(Locality::SessionLocal) - .history(conf.history) - .queryable_prefix(&prefix); - if let Some(resources_limit) = conf.resources_limit { - builder = builder.resources_limit(resources_limit); - } - Some(builder.wait()?) + Some( + conf.session + .declare_advanced_cache(key_expr.clone().into_owned()) + .subscriber_allowed_origin(Locality::SessionLocal) + .history(conf.history) + .queryable_prefix(&prefix) + .wait()?, + ) } else { None }; diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 9c78c8b973..ebb56e45e9 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -43,16 +43,38 @@ use { use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR, KE_UHLC}; +#[derive(Debug, Default, Clone)] +/// Configure the history size of an [`AdvancedCache`]. +pub struct RetransmissionConf { + periodic_queries: Option, +} + +impl RetransmissionConf { + /// Enable periodic queries for not yet received Samples and specify their period. + /// + /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. + /// So it is useful for sporadic publications but useless for periodic publications + /// with a period smaller or equal to this period. + /// Retransmission can only be achieved by Publishers that also activate retransmission. + #[zenoh_macros::unstable] + #[inline] + pub fn periodic_queries(mut self, period: Option) -> Self { + self.periodic_queries = period; + self + } + + // TODO pub fn sample_miss_callback(mut self, callback: Callback) -> Self +} + /// The builder of AdvancedSubscriber, allowing to configure it. #[zenoh_macros::unstable] pub struct AdvancedSubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) origin: Locality, - pub(crate) retransmission: bool, + pub(crate) retransmission: Option, pub(crate) query_target: QueryTarget, pub(crate) query_timeout: Duration, - pub(crate) period: Option, pub(crate) history: bool, pub(crate) liveliness: bool, pub(crate) handler: Handler, @@ -71,12 +93,11 @@ impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { key_expr, origin, handler, - retransmission: false, + retransmission: None, query_target: QueryTarget::All, query_timeout: Duration::from_secs(10), history: false, liveliness: false, - period: None, } } } @@ -99,7 +120,6 @@ impl<'a, 'b> AdvancedSubscriberBuilder<'a, 'b, DefaultHandler> { retransmission: self.retransmission, query_target: self.query_target, query_timeout: self.query_timeout, - period: self.period, history: self.history, liveliness: self.liveliness, handler: callback, @@ -134,7 +154,6 @@ impl<'a, 'b> AdvancedSubscriberBuilder<'a, 'b, DefaultHandler> { retransmission: self.retransmission, query_target: self.query_target, query_timeout: self.query_timeout, - period: self.period, history: self.history, liveliness: self.liveliness, handler, @@ -158,8 +177,8 @@ impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { /// Retransmission can only be achieved by Publishers that also activate retransmission. #[zenoh_macros::unstable] #[inline] - pub fn retransmission(mut self) -> Self { - self.retransmission = true; + pub fn retransmission(mut self, conf: RetransmissionConf) -> Self { + self.retransmission = Some(conf); self } @@ -179,25 +198,13 @@ impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { self } - /// Enable periodic queries for not yet received Samples and specify their period. - /// - /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. - /// So it is useful for sporadic publications but useless for periodic publications - /// with a period smaller or equal to this period. - /// Retransmission can only be achieved by Publishers that also activate retransmission. - #[zenoh_macros::unstable] - #[inline] - pub fn periodic_queries(mut self, period: Option) -> Self { - self.period = period; - self - } - /// Enable query for historical data. /// /// History can only be retransmitted by Publishers that also activate history. #[zenoh_macros::unstable] #[inline] pub fn history(mut self) -> Self { + // TODO take HistoryConf as parameter self.history = true; self } @@ -221,7 +228,6 @@ impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { retransmission: self.retransmission, query_target: self.query_target, query_timeout: self.query_timeout, - period: self.period, history: self.history, liveliness: self.liveliness, handler: self.handler, @@ -439,20 +445,22 @@ impl AdvancedSubscriber { let query_target = conf.query_target; let query_timeout = conf.query_timeout; let session = conf.session.clone(); - let periodic_query = conf.period.map(|period| { - ( - Arc::new(Timer::new(false)), - period, - PeriodicQuery { - source_id: None, - statesref: statesref.clone(), - key_expr: key_expr.clone().into_owned(), - session, - query_target, - query_timeout, - callback: callback.clone(), - }, - ) + let periodic_query = retransmission.as_ref().and_then(|r| { + r.periodic_queries.map(|period| { + ( + Arc::new(Timer::new(false)), + period, + PeriodicQuery { + source_id: None, + statesref: statesref.clone(), + key_expr: key_expr.clone().into_owned(), + session, + query_target, + query_timeout, + callback: callback.clone(), + }, + ) + }) }); let sub_callback = { @@ -479,7 +487,7 @@ impl AdvancedSubscriber { } if let Some(state) = states.sequenced_states.get_mut(&source_id) { - if retransmission + if retransmission.is_some() && state.pending_queries == 0 && !state.pending_samples.is_empty() { diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 3a1046fd87..7a64ba441e 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -39,9 +39,9 @@ pub use crate::serialization::{ }; #[cfg(feature = "unstable")] pub use crate::{ - advanced_cache::{AdvancedCache, AdvancedCacheBuilder}, + advanced_cache::{AdvancedCache, AdvancedCacheBuilder, HistoryConf}, advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, Sequencing}, - advanced_subscriber::{AdvancedSubscriber, AdvancedSubscriberBuilder}, + advanced_subscriber::{AdvancedSubscriber, AdvancedSubscriberBuilder, RetransmissionConf}, publication_cache::{PublicationCache, PublicationCacheBuilder}, publisher_ext::PublisherBuilderExt, querying_subscriber::{ diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs index b13cf29b68..2767c96dc2 100644 --- a/zenoh-ext/src/publisher_ext.rs +++ b/zenoh-ext/src/publisher_ext.rs @@ -13,7 +13,7 @@ // use zenoh::pubsub::PublisherBuilder; -use crate::AdvancedPublisherBuilder; +use crate::{advanced_cache::HistoryConf, AdvancedPublisherBuilder}; /// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) #[zenoh_macros::unstable] @@ -21,7 +21,7 @@ pub trait PublisherBuilderExt<'a, 'b> { /// Allow matching Subscribers to detect lost samples and ask for retransimission. /// /// Retransmission can only be achieved if history is enabled. - fn history(self, history: usize) -> AdvancedPublisherBuilder<'a, 'b>; + fn history(self, history: HistoryConf) -> AdvancedPublisherBuilder<'a, 'b>; /// Allow this publisher to be detected by subscribers. /// @@ -33,7 +33,7 @@ impl<'a, 'b> PublisherBuilderExt<'a, 'b> for PublisherBuilder<'a, 'b> { /// Allow matching Subscribers to detect lost samples and ask for retransimission. /// /// Retransmission can only be achieved if history is enabled. - fn history(self, history: usize) -> AdvancedPublisherBuilder<'a, 'b> { + fn history(self, history: HistoryConf) -> AdvancedPublisherBuilder<'a, 'b> { AdvancedPublisherBuilder::new(self.session, self.key_expr).history(history) } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 893f23325c..e3af8309cf 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -25,7 +25,7 @@ use zenoh::{ use crate::{ querying_subscriber::QueryingSubscriberBuilder, AdvancedSubscriberBuilder, ExtractSample, - FetchingSubscriberBuilder, + FetchingSubscriberBuilder, RetransmissionConf, }; /// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)` @@ -129,12 +129,13 @@ pub trait DataSubscriberBuilderExt<'a, 'b, Handler> { /// Enable query for historical data. /// /// History can only be retransmitted by Publishers that also activate history. - fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; + fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; // TODO take HistoryConf as parameter /// Ask for retransmission of detected lost Samples. /// /// Retransmission can only be achieved by Publishers that also activate retransmission. - fn retransmission(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; + fn retransmission(self, conf: RetransmissionConf) + -> AdvancedSubscriberBuilder<'a, 'b, Handler>; /// Enable detection of late joiner publishers and query for their historical data. /// @@ -261,9 +262,12 @@ impl<'a, 'b, Handler> DataSubscriberBuilderExt<'a, 'b, Handler> /// Ask for retransmission of detected lost Samples. /// /// Retransmission can only be achieved by Publishers that also activate retransmission. - fn retransmission(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + fn retransmission( + self, + conf: RetransmissionConf, + ) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) - .retransmission() + .retransmission(conf) } /// Enable detection of late joiner publishers and query for their historical data. diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f099e2b265..b69cce3f73 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -14,7 +14,7 @@ use zenoh::sample::SampleKind; use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; -use zenoh_ext::{DataSubscriberBuilderExt, PublisherBuilderExt}; +use zenoh_ext::{DataSubscriberBuilderExt, HistoryConf, PublisherBuilderExt, RetransmissionConf}; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_advanced_history() { @@ -46,7 +46,10 @@ async fn test_advanced_history() { s }; - let publ = ztimeout!(peer1.declare_publisher(ADVANCED_HISTORY_KEYEXPR).history(3)).unwrap(); + let publ = ztimeout!(peer1 + .declare_publisher(ADVANCED_HISTORY_KEYEXPR) + .history(HistoryConf::default().sample_depth(3))) + .unwrap(); ztimeout!(publ.put("1")).unwrap(); ztimeout!(publ.put("2")).unwrap(); ztimeout!(publ.put("3")).unwrap(); @@ -154,13 +157,13 @@ async fn test_advanced_retransmission() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) - .retransmission()) + .retransmission(RetransmissionConf::default())) .unwrap(); tokio::time::sleep(SLEEP).await; let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) - .history(10) + .history(HistoryConf::default().sample_depth(10)) .retransmission()) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -283,14 +286,15 @@ async fn test_advanced_retransmission_periodic() { let sub = ztimeout!(client2 .declare_subscriber(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) - .retransmission() - .periodic_queries(Some(Duration::from_secs(1)))) + .retransmission( + RetransmissionConf::default().periodic_queries(Some(Duration::from_secs(1))) + )) .unwrap(); tokio::time::sleep(SLEEP).await; let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) - .history(10) + .history(HistoryConf::default().sample_depth(10)) .retransmission()) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -403,7 +407,7 @@ async fn test_advanced_late_joiner() { let publ = ztimeout!(peer1 .declare_publisher(ADVANCED_LATE_JOINER_KEYEXPR) - .history(10) + .history(HistoryConf::default().sample_depth(10)) .late_joiner()) .unwrap(); ztimeout!(publ.put("1")).unwrap(); From ef6316586f7e84dc5e4cf18230a0211b012c3682 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 15 Nov 2024 11:29:14 +0100 Subject: [PATCH 23/24] Add examples --- zenoh-ext/examples/examples/README.md | 27 +++++++ zenoh-ext/examples/examples/z_advanced_pub.rs | 73 +++++++++++++++++++ zenoh-ext/examples/examples/z_advanced_sub.rs | 69 ++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 zenoh-ext/examples/examples/z_advanced_pub.rs create mode 100644 zenoh-ext/examples/examples/z_advanced_sub.rs diff --git a/zenoh-ext/examples/examples/README.md b/zenoh-ext/examples/examples/README.md index 498a1ca6fe..f294d21ddb 100644 --- a/zenoh-ext/examples/examples/README.md +++ b/zenoh-ext/examples/examples/README.md @@ -15,6 +15,33 @@ ## Examples description +### z_advanced_pub + + Declares an AdvancedPublisher with a given key expression. + All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource, default 1). The cache can be queried by an AdvancedSubscriber for hsitory + or retransmission. + + Typical usage: + ```bash + z_advanced_pub + ``` + or + ```bash + z_advanced_pub --history 10 + ``` + +### z_advanced_sub + + Declares an AdvancedSubscriber with a given key expression. + The AdvancedSubscriber can query for AdvancedPublisher history at startup + and on late joiner publisher detection. The AdvancedSubscriber can detect + sample loss and ask for retransmission. + + Typical usage: + ```bash + z_advanced_sub + ``` + ### z_pub_cache Declares a publisher and an associated publication cache with a given key expression. diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs new file mode 100644 index 0000000000..8d18ef36a0 --- /dev/null +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -0,0 +1,73 @@ +use std::time::Duration; + +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::{arg, Parser}; +use zenoh::{config::Config, key_expr::KeyExpr}; +use zenoh_config::ModeDependentValue; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr, value, history) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring AdvancedPublisher on {}", &key_expr); + let publisher = session + .declare_publisher(&key_expr) + .history(HistoryConf::default().sample_depth(history)) + .retransmission() + .late_joiner() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + for idx in 0..u32::MAX { + tokio::time::sleep(Duration::from_secs(1)).await; + let buf = format!("[{idx:4}] {value}"); + println!("Put Data ('{}': '{}')", &key_expr, buf); + publisher.put(buf).await.unwrap(); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] + /// The key expression to publish. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Pub from Rust!")] + /// The value to reply to queries. + value: String, + #[arg(short = 'i', long, default_value = "1")] + /// The number of publications to keep in cache. + history: usize, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, usize) { + let args = Args::parse(); + let mut config: Config = args.common.into(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + (config, args.key, args.value, args.history) +} diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs new file mode 100644 index 0000000000..b9ad95435a --- /dev/null +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -0,0 +1,69 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::time::Duration; + +use clap::{arg, Parser}; +use zenoh::config::Config; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring AdvancedSubscriber on {}", key_expr,); + let subscriber = session + .declare_subscriber(key_expr) + .history() + .retransmission( + RetransmissionConf::default().periodic_queries(Some(Duration::from_secs(1))), + ) + .late_joiner() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(sample) = subscriber.recv_async().await { + let payload = sample + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + println!( + ">> [Subscriber] Received {} ('{}': '{}')", + sample.kind(), + sample.key_expr().as_str(), + payload + ); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The key expression to subscribe onto. + key: String, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, String) { + let args = Args::parse(); + (args.common.into(), args.key) +} From 788a8e51e1c764fa351413c0cecbbe6a87c96ef2 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 15 Nov 2024 12:31:05 +0100 Subject: [PATCH 24/24] Fix rustdoc --- zenoh-ext/src/advanced_subscriber.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index ebb56e45e9..36b1263e24 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -44,7 +44,7 @@ use { use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR, KE_UHLC}; #[derive(Debug, Default, Clone)] -/// Configure the history size of an [`AdvancedCache`]. +/// Configure retransmission. pub struct RetransmissionConf { periodic_queries: Option, }