diff --git a/Cargo.lock b/Cargo.lock index 3fec7718ce..00242a3cb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5593,6 +5593,7 @@ dependencies = [ name = "zenoh-ext" version = "1.0.0-dev" dependencies = [ + "async-trait", "bincode", "flume", "futures", @@ -5601,6 +5602,7 @@ dependencies = [ "serde", "tokio", "tracing", + "uhlc", "zenoh", "zenoh-config", "zenoh-macros", diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index 31afe358e7..759507c770 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -152,6 +152,16 @@ pub struct EntityGlobalId(EntityGlobalIdProto); pub type EntityId = u32; impl EntityGlobalId { + /// Creates a new EntityGlobalId. + #[zenoh_macros::internal] + pub fn new(zid: ZenohId, eid: EntityId) -> Self { + EntityGlobalIdProto { + zid: zid.into(), + eid, + } + .into() + } + /// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to. pub fn zid(&self) -> ZenohId { self.0.zid.into() diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs index 2047c424cf..cc6f4a2295 100644 --- a/commons/zenoh-macros/src/lib.rs +++ b/commons/zenoh-macros/src/lib.rs @@ -515,7 +515,7 @@ pub fn ke(tokens: TokenStream) -> TokenStream { let value: LitStr = syn::parse(tokens).unwrap(); let ke = value.value(); match zenoh_keyexpr::keyexpr::new(&ke) { - Ok(_) => quote!(unsafe {::zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(), + Ok(_) => quote!(unsafe { zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(), Err(e) => panic!("{}", e), } } diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 4de255d0d6..d433aa137c 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -38,6 +38,7 @@ tokio = { workspace = true, features = [ "macros", "io-std", ] } +async-trait = { workspace = true } bincode = { workspace = true } zenoh-util = { workspace = true } flume = { workspace = true } @@ -45,6 +46,7 @@ futures = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["default"] } leb128 = { workspace = true } +uhlc = { workspace = true } zenoh = { workspace = true, default-features = false } zenoh-macros = { workspace = true } diff --git a/zenoh-ext/examples/examples/README.md b/zenoh-ext/examples/examples/README.md index 498a1ca6fe..f294d21ddb 100644 --- a/zenoh-ext/examples/examples/README.md +++ b/zenoh-ext/examples/examples/README.md @@ -15,6 +15,33 @@ ## Examples description +### z_advanced_pub + + Declares an AdvancedPublisher with a given key expression. + All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource, default 1). The cache can be queried by an AdvancedSubscriber for hsitory + or retransmission. + + Typical usage: + ```bash + z_advanced_pub + ``` + or + ```bash + z_advanced_pub --history 10 + ``` + +### z_advanced_sub + + Declares an AdvancedSubscriber with a given key expression. + The AdvancedSubscriber can query for AdvancedPublisher history at startup + and on late joiner publisher detection. The AdvancedSubscriber can detect + sample loss and ask for retransmission. + + Typical usage: + ```bash + z_advanced_sub + ``` + ### z_pub_cache Declares a publisher and an associated publication cache with a given key expression. diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs new file mode 100644 index 0000000000..8d18ef36a0 --- /dev/null +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -0,0 +1,73 @@ +use std::time::Duration; + +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::{arg, Parser}; +use zenoh::{config::Config, key_expr::KeyExpr}; +use zenoh_config::ModeDependentValue; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr, value, history) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring AdvancedPublisher on {}", &key_expr); + let publisher = session + .declare_publisher(&key_expr) + .history(HistoryConf::default().sample_depth(history)) + .retransmission() + .late_joiner() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + for idx in 0..u32::MAX { + tokio::time::sleep(Duration::from_secs(1)).await; + let buf = format!("[{idx:4}] {value}"); + println!("Put Data ('{}': '{}')", &key_expr, buf); + publisher.put(buf).await.unwrap(); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] + /// The key expression to publish. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Pub from Rust!")] + /// The value to reply to queries. + value: String, + #[arg(short = 'i', long, default_value = "1")] + /// The number of publications to keep in cache. + history: usize, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, usize) { + let args = Args::parse(); + let mut config: Config = args.common.into(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + (config, args.key, args.value, args.history) +} diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs new file mode 100644 index 0000000000..b9ad95435a --- /dev/null +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -0,0 +1,69 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::time::Duration; + +use clap::{arg, Parser}; +use zenoh::config::Config; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring AdvancedSubscriber on {}", key_expr,); + let subscriber = session + .declare_subscriber(key_expr) + .history() + .retransmission( + RetransmissionConf::default().periodic_queries(Some(Duration::from_secs(1))), + ) + .late_joiner() + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(sample) = subscriber.recv_async().await { + let payload = sample + .payload() + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + println!( + ">> [Subscriber] Received {} ('{}': '{}')", + sample.kind(), + sample.key_expr().as_str(), + payload + ); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The key expression to subscribe onto. + key: String, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, String) { + let args = Args::parse(); + (args.common.into(), args.key) +} diff --git a/zenoh-ext/src/advanced_cache.rs b/zenoh-ext/src/advanced_cache.rs new file mode 100644 index 0000000000..af7751b43b --- /dev/null +++ b/zenoh-ext/src/advanced_cache.rs @@ -0,0 +1,351 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + borrow::Borrow, + collections::{HashMap, VecDeque}, + future::{IntoFuture, Ready}, +}; + +use flume::{bounded, Sender}; +use futures::{select, FutureExt}; +use tokio::task; +use zenoh::{ + handlers::FifoChannelHandler, + internal::{bail, ResolveFuture}, + key_expr::{ + format::{ke, kedefine}, + keyexpr, KeyExpr, OwnedKeyExpr, + }, + liveliness::LivelinessToken, + pubsub::Subscriber, + query::{Query, Queryable, ZenohParameters}, + sample::{Locality, Sample}, + Resolvable, Resolve, Result as ZResult, Session, Wait, +}; + +#[zenoh_macros::unstable] +pub(crate) static KE_STAR: &keyexpr = ke!("*"); +#[zenoh_macros::unstable] +pub(crate) static KE_PREFIX: &keyexpr = ke!("@cache"); +#[zenoh_macros::unstable] +pub(crate) static KE_UHLC: &keyexpr = ke!("uhlc"); +#[zenoh_macros::unstable] +kedefine!( + pub(crate) ke_liveliness: "@cache/${zid:*}/${eid:*}/${remaining:**}", +); + +#[derive(Debug, Clone)] +/// Configure the history size of an [`AdvancedCache`]. +pub struct HistoryConf { + sample_depth: usize, + resources_limit: Option, +} + +impl Default for HistoryConf { + fn default() -> Self { + Self { + sample_depth: 1, + resources_limit: None, + } + } +} + +impl HistoryConf { + /// Specify how many samples to keep for each resource. + pub fn sample_depth(mut self, depth: usize) -> Self { + self.sample_depth = depth; + self + } + + // TODO pub fn time_depth(mut self, depth: Duration) -> Self + + /// Specify the maximum total number of samples to keep. + pub fn resources_limit(mut self, limit: usize) -> Self { + self.resources_limit = Some(limit); + self + } +} + +/// The builder of AdvancedCache, allowing to configure it. +pub struct AdvancedCacheBuilder<'a, 'b, 'c> { + session: &'a Session, + pub_key_expr: ZResult>, + queryable_prefix: Option>>, + subscriber_origin: Locality, + queryable_origin: Locality, + history: HistoryConf, + liveliness: bool, +} + +impl<'a, 'b, 'c> AdvancedCacheBuilder<'a, 'b, 'c> { + pub(crate) fn new( + session: &'a Session, + pub_key_expr: ZResult>, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> { + AdvancedCacheBuilder { + session, + pub_key_expr, + queryable_prefix: Some(Ok((KE_PREFIX / KE_STAR / KE_STAR).into())), + subscriber_origin: Locality::default(), + queryable_origin: Locality::default(), + history: HistoryConf::default(), + liveliness: false, + } + } + + /// Change the prefix used for queryable. + pub fn queryable_prefix(mut self, queryable_prefix: TryIntoKeyExpr) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + self.queryable_prefix = Some(queryable_prefix.try_into().map_err(Into::into)); + self + } + + /// Restrict the matching publications that will be cached by this [`AdvancedCache`] + /// to the ones that have the given [`Locality`](zenoh::sample::Locality). + #[inline] + pub fn subscriber_allowed_origin(mut self, origin: Locality) -> Self { + self.subscriber_origin = origin; + self + } + + /// Restrict the matching queries that will be receive by this [`AdvancedCache`]'s queryable + /// to the ones that have the given [`Locality`](zenoh::sample::Locality). + #[inline] + pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { + self.queryable_origin = origin; + self + } + + /// Change the history size for each resource. + pub fn history(mut self, history: HistoryConf) -> Self { + self.history = history; + self + } + + pub fn liveliness(mut self, enabled: bool) -> Self { + self.liveliness = enabled; + self + } +} + +impl Resolvable for AdvancedCacheBuilder<'_, '_, '_> { + type To = ZResult; +} + +impl Wait for AdvancedCacheBuilder<'_, '_, '_> { + fn wait(self) -> ::To { + AdvancedCache::new(self) + } +} + +impl<'a> IntoFuture for AdvancedCacheBuilder<'a, '_, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +fn decode_range(range: &str) -> (Option, Option) { + let mut split = range.split(".."); + let start = split.next().and_then(|s| s.parse::().ok()); + let end = split.next().map(|s| s.parse::().ok()).unwrap_or(start); + (start, end) +} + +fn sample_in_range(sample: &Sample, start: Option, end: Option) -> bool { + if start.is_none() && end.is_none() { + true + } else if let Some(source_sn) = sample.source_info().source_sn() { + match (start, end) { + (Some(start), Some(end)) => source_sn >= start && source_sn <= end, + (Some(start), None) => source_sn >= start, + (None, Some(end)) => source_sn <= end, + (None, None) => true, + } + } else { + false + } +} + +pub struct AdvancedCache { + _sub: Subscriber>, + _queryable: Queryable>, + _token: Option, + _stoptx: Sender, +} + +impl AdvancedCache { + fn new(conf: AdvancedCacheBuilder<'_, '_, '_>) -> ZResult { + let key_expr = conf.pub_key_expr?; + // the queryable_prefix (optional), and the key_expr for AdvancedCache's queryable ("[]/") + let (queryable_prefix, queryable_key_expr): (Option, KeyExpr) = + match conf.queryable_prefix { + None => (None, key_expr.clone()), + Some(Ok(ke)) => { + let queryable_key_expr = (&ke) / &key_expr; + (Some(ke.into()), queryable_key_expr) + } + Some(Err(e)) => bail!("Invalid key expression for queryable_prefix: {}", e), + }; + tracing::debug!( + "Create AdvancedCache on {} with history={:?}", + &key_expr, + conf.history, + ); + + // declare the local subscriber that will store the local publications + let sub = conf + .session + .declare_subscriber(&key_expr) + .allowed_origin(conf.subscriber_origin) + .wait()?; + + // declare the queryable that will answer to queries on cache + let queryable = conf + .session + .declare_queryable(&queryable_key_expr) + .allowed_origin(conf.queryable_origin) + .wait()?; + + // take local ownership of stuff to be moved into task + let sub_recv = sub.handler().clone(); + let quer_recv = queryable.handler().clone(); + let pub_key_expr = key_expr.into_owned(); + let history = conf.history; + + let (stoptx, stoprx) = bounded::(1); + task::spawn(async move { + let mut cache: HashMap> = + HashMap::with_capacity(history.resources_limit.unwrap_or(32)); + let limit = history.resources_limit.unwrap_or(usize::MAX); + + loop { + select!( + // on publication received by the local subscriber, store it + sample = sub_recv.recv_async() => { + if let Ok(sample) = sample { + let queryable_key_expr: KeyExpr<'_> = if let Some(prefix) = &queryable_prefix { + prefix.join(&sample.key_expr()).unwrap().into() + } else { + sample.key_expr().clone() + }; + + if let Some(queue) = cache.get_mut(queryable_key_expr.as_keyexpr()) { + if queue.len() >= history.sample_depth { + queue.pop_front(); + } + queue.push_back(sample); + } else if cache.len() >= limit { + tracing::error!("AdvancedCache on {}: resource_limit exceeded - can't cache publication for a new resource", + pub_key_expr); + } else { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(sample); + cache.insert(queryable_key_expr.into(), queue); + } + } + }, + + // on query, reply with cache content + query = quer_recv.recv_async() => { + if let Ok(query) = query { + let (start, end) = query.parameters().get("_sn").map(decode_range).unwrap_or((None, None)); + if !query.selector().key_expr().as_str().contains('*') { + if let Some(queue) = cache.get(query.selector().key_expr().as_keyexpr()) { + for sample in queue { + if sample_in_range(sample, start, end) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if !time_range.contains(timestamp.get_time().to_system_time()){ + continue; + } + } + if let Err(e) = query.reply_sample(sample.clone()).await { + tracing::warn!("Error replying to query: {}", e); + } + } + } + } + } else { + for (key_expr, queue) in cache.iter() { + if query.selector().key_expr().intersects(key_expr.borrow()) { + for sample in queue { + if sample_in_range(sample, start, end) { + if let (Some(Ok(time_range)), Some(timestamp)) = (query.parameters().time_range(), sample.timestamp()) { + if !time_range.contains(timestamp.get_time().to_system_time()){ + continue; + } + } + if let Err(e) = query.reply_sample(sample.clone()).await { + tracing::warn!("Error replying to query: {}", e); + } + } + } + } + } + } + } + }, + + // When stoptx is dropped, stop the task + _ = stoprx.recv_async().fuse() => { + return + } + ); + } + }); + + let token = if conf.liveliness { + Some( + conf.session + .liveliness() + .declare_token(queryable_key_expr) + .wait()?, + ) + } else { + None + }; + + Ok(AdvancedCache { + _sub: sub, + _queryable: queryable, + _token: token, + _stoptx: stoptx, + }) + } + + /// Close this AdvancedCache + #[inline] + pub fn close(self) -> impl Resolve> { + ResolveFuture::new(async move { + let AdvancedCache { + _queryable, + _sub, + _token, + _stoptx, + } = self; + _sub.undeclare().await?; + if let Some(token) = _token { + token.undeclare().await?; + } + _queryable.undeclare().await?; + drop(_stoptx); + Ok(()) + }) + } +} diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs new file mode 100644 index 0000000000..912b2679f2 --- /dev/null +++ b/zenoh-ext/src/advanced_publisher.rs @@ -0,0 +1,344 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::atomic::{AtomicU32, Ordering}, +}; + +use zenoh::{ + bytes::{Encoding, ZBytes}, + internal::bail, + key_expr::KeyExpr, + liveliness::LivelinessToken, + pubsub::{Publisher, PublisherDeleteBuilder, PublisherPutBuilder}, + qos::{CongestionControl, Priority}, + sample::{Locality, SourceInfo}, + session::EntityGlobalId, + Resolvable, Resolve, Result as ZResult, Session, Wait, +}; + +use crate::{ + advanced_cache::{AdvancedCache, HistoryConf, KE_PREFIX, KE_UHLC}, + SessionExt, +}; + +pub enum Sequencing { + None, + Timestamp, + SequenceNumber, +} + +/// The builder of PublicationCache, allowing to configure it. +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +pub struct AdvancedPublisherBuilder<'a, 'b> { + session: &'a Session, + pub_key_expr: ZResult>, + sequencing: Sequencing, + liveliness: bool, + cache: bool, + history: HistoryConf, +} + +impl<'a, 'b> AdvancedPublisherBuilder<'a, 'b> { + pub(crate) fn new( + session: &'a Session, + pub_key_expr: ZResult>, + ) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder { + session, + pub_key_expr, + sequencing: Sequencing::None, + liveliness: false, + cache: false, + history: HistoryConf::default(), + } + } + + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + pub fn retransmission(mut self) -> Self { + self.cache = true; + self.sequencing = Sequencing::SequenceNumber; + self + } + + /// Change the history size for each resource. + pub fn history(mut self, history: HistoryConf) -> Self { + self.cache = true; + self.sequencing = Sequencing::Timestamp; + self.history = history; + self + } + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + pub fn late_joiner(mut self) -> Self { + self.liveliness = true; + self + } +} + +impl<'a> Resolvable for AdvancedPublisherBuilder<'a, '_> { + type To = ZResult>; +} + +impl Wait for AdvancedPublisherBuilder<'_, '_> { + fn wait(self) -> ::To { + AdvancedPublisher::new(self) + } +} + +impl<'a> IntoFuture for AdvancedPublisherBuilder<'a, '_> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +pub struct AdvancedPublisher<'a> { + publisher: Publisher<'a>, + seqnum: Option, + _cache: Option, + _token: Option, +} + +impl<'a> AdvancedPublisher<'a> { + fn new(conf: AdvancedPublisherBuilder<'a, '_>) -> ZResult { + let key_expr = conf.pub_key_expr?; + + let publisher = conf + .session + .declare_publisher(key_expr.clone().into_owned()) + .wait()?; + let id = publisher.id(); + let prefix = match conf.sequencing { + Sequencing::SequenceNumber => { + KE_PREFIX + / &id.zid().into_keyexpr() + / &KeyExpr::try_from(id.eid().to_string()).unwrap() + } + _ => KE_PREFIX / &id.zid().into_keyexpr() / KE_UHLC, + }; + + let seqnum = match conf.sequencing { + Sequencing::SequenceNumber => Some(AtomicU32::new(0)), + Sequencing::Timestamp => { + if conf.session.hlc().is_none() { + bail!( + "Cannot create AdvancedPublisher {} with Sequencing::Timestamp: \ + the 'timestamping' setting must be enabled in the Zenoh configuration.", + key_expr, + ) + } + None + } + _ => None, + }; + + let cache = if conf.cache { + Some( + conf.session + .declare_advanced_cache(key_expr.clone().into_owned()) + .subscriber_allowed_origin(Locality::SessionLocal) + .history(conf.history) + .queryable_prefix(&prefix) + .wait()?, + ) + } else { + None + }; + + let token = if conf.liveliness { + Some( + conf.session + .liveliness() + .declare_token(prefix / &key_expr) + .wait()?, + ) + } else { + None + }; + + Ok(AdvancedPublisher { + publisher, + seqnum, + _cache: cache, + _token: token, + }) + } + + /// Returns the [`EntityGlobalId`] of this Publisher. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression") + /// .await + /// .unwrap(); + /// let publisher_id = publisher.id(); + /// # } + /// ``` + pub fn id(&self) -> EntityGlobalId { + self.publisher.id() + } + + #[inline] + pub fn key_expr(&self) -> &KeyExpr<'a> { + self.publisher.key_expr() + } + + /// Get the [`Encoding`] used when publishing data. + #[inline] + pub fn encoding(&self) -> &Encoding { + self.publisher.encoding() + } + + /// Get the `congestion_control` applied when routing the data. + #[inline] + pub fn congestion_control(&self) -> CongestionControl { + self.publisher.congestion_control() + } + + /// Get the priority of the written data. + #[inline] + pub fn priority(&self) -> Priority { + self.publisher.priority() + } + + /// Put data. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.put("value").await.unwrap(); + /// # } + /// ``` + #[inline] + pub fn put(&self, payload: IntoZBytes) -> PublisherPutBuilder<'_> + where + IntoZBytes: Into, + { + let mut put = self.publisher.put(payload); + if let Some(seqnum) = &self.seqnum { + put = put.source_info(SourceInfo::new( + Some(self.publisher.id()), + Some(seqnum.fetch_add(1, Ordering::Relaxed)), + )); + } + put + } + + /// Delete data. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.delete().await.unwrap(); + /// # } + /// ``` + pub fn delete(&self) -> PublisherDeleteBuilder<'_> { + let mut delete = self.publisher.delete(); + if let Some(seqnum) = &self.seqnum { + delete = delete.source_info(SourceInfo::new( + Some(self.publisher.id()), + Some(seqnum.fetch_add(1, Ordering::Relaxed)), + )); + } + delete + } + + /// Return the [`MatchingStatus`](zenoh::pubsub::MatchingStatus) of the publisher. + /// + /// [`MatchingStatus::matching_subscribers`](zenoh::pubsub::MatchingStatus::matching_subscribers) + /// will return true if there exist Subscribers matching the Publisher's key expression and false otherwise. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_subscribers: bool = publisher + /// .matching_status() + /// .await + /// .unwrap() + /// .matching_subscribers(); + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_status(&self) -> impl Resolve> + '_ { + self.publisher.matching_status() + } + + /// Return a [`MatchingListener`](zenoh::pubsub::MatchingStatus) for this Publisher. + /// + /// The [`MatchingListener`](zenoh::pubsub::MatchingStatus) that will send a notification each time + /// the [`MatchingStatus`](zenoh::pubsub::MatchingStatus) of the Publisher changes. + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// let matching_listener = publisher.matching_listener().await.unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # } + /// ``` + #[zenoh_macros::unstable] + pub fn matching_listener( + &self, + ) -> zenoh::pubsub::MatchingListenerBuilder<'_, '_, zenoh::handlers::DefaultHandler> { + self.publisher.matching_listener() + } + + /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").await.unwrap(); + /// publisher.undeclare().await.unwrap(); + /// # } + /// ``` + pub fn undeclare(self) -> impl Resolve> + 'a { + self.publisher.undeclare() + } +} diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs new file mode 100644 index 0000000000..36b1263e24 --- /dev/null +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -0,0 +1,847 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{future::IntoFuture, str::FromStr}; + +use zenoh::{ + config::ZenohId, + handlers::{Callback, IntoHandler}, + key_expr::KeyExpr, + query::{ConsolidationMode, Selector}, + sample::{Locality, Sample, SampleKind}, + session::{EntityGlobalId, EntityId}, + Resolvable, Resolve, Session, Wait, +}; +use zenoh_util::{Timed, TimedEvent, Timer}; +#[zenoh_macros::unstable] +use { + async_trait::async_trait, + std::collections::hash_map::Entry, + std::collections::HashMap, + std::convert::TryFrom, + std::future::Ready, + std::sync::{Arc, Mutex}, + std::time::Duration, + uhlc::ID, + zenoh::handlers::{locked, DefaultHandler}, + zenoh::internal::zlock, + zenoh::pubsub::Subscriber, + zenoh::query::{QueryTarget, Reply, ReplyKeyExpr}, + zenoh::time::Timestamp, + zenoh::Result as ZResult, +}; + +use crate::advanced_cache::{ke_liveliness, KE_PREFIX, KE_STAR, KE_UHLC}; + +#[derive(Debug, Default, Clone)] +/// Configure retransmission. +pub struct RetransmissionConf { + periodic_queries: Option, +} + +impl RetransmissionConf { + /// Enable periodic queries for not yet received Samples and specify their period. + /// + /// This allows to retrieve the last Sample(s) if the last Sample(s) is/are lost. + /// So it is useful for sporadic publications but useless for periodic publications + /// with a period smaller or equal to this period. + /// Retransmission can only be achieved by Publishers that also activate retransmission. + #[zenoh_macros::unstable] + #[inline] + pub fn periodic_queries(mut self, period: Option) -> Self { + self.periodic_queries = period; + self + } + + // TODO pub fn sample_miss_callback(mut self, callback: Callback) -> Self +} + +/// The builder of AdvancedSubscriber, allowing to configure it. +#[zenoh_macros::unstable] +pub struct AdvancedSubscriberBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + pub(crate) session: &'a Session, + pub(crate) key_expr: ZResult>, + pub(crate) origin: Locality, + pub(crate) retransmission: Option, + pub(crate) query_target: QueryTarget, + pub(crate) query_timeout: Duration, + pub(crate) history: bool, + pub(crate) liveliness: bool, + pub(crate) handler: Handler, +} + +#[zenoh_macros::unstable] +impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { + pub(crate) fn new( + session: &'a Session, + key_expr: ZResult>, + origin: Locality, + handler: Handler, + ) -> Self { + AdvancedSubscriberBuilder { + session, + key_expr, + origin, + handler, + retransmission: None, + query_target: QueryTarget::All, + query_timeout: Duration::from_secs(10), + history: false, + liveliness: false, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b> AdvancedSubscriberBuilder<'a, 'b, DefaultHandler> { + /// Add callback to AdvancedSubscriber. + #[inline] + pub fn callback( + self, + callback: Callback, + ) -> AdvancedSubscriberBuilder<'a, 'b, Callback> + where + Callback: Fn(Sample) + Send + Sync + 'static, + { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + history: self.history, + liveliness: self.liveliness, + handler: callback, + } + } + + /// Add callback to `AdvancedSubscriber`. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](AdvancedSubscriberBuilder::callback) method, we suggest you use it instead of `callback_mut` + #[inline] + pub fn callback_mut( + self, + callback: CallbackMut, + ) -> AdvancedSubscriberBuilder<'a, 'b, impl Fn(Sample) + Send + Sync + 'static> + where + CallbackMut: FnMut(Sample) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Make the built AdvancedSubscriber an [`AdvancedSubscriber`](AdvancedSubscriber). + #[inline] + pub fn with(self, handler: Handler) -> AdvancedSubscriberBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + history: self.history, + liveliness: self.liveliness, + handler, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'b, Handler> AdvancedSubscriberBuilder<'a, 'b, Handler> { + /// Restrict the matching publications that will be receive by this [`Subscriber`] + /// to the ones that have the given [`Locality`](crate::prelude::Locality). + #[zenoh_macros::unstable] + #[inline] + pub fn allowed_origin(mut self, origin: Locality) -> Self { + self.origin = origin; + self + } + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + #[zenoh_macros::unstable] + #[inline] + pub fn retransmission(mut self, conf: RetransmissionConf) -> Self { + self.retransmission = Some(conf); + self + } + + // /// Change the target to be used for queries. + + // #[inline] + // pub fn query_target(mut self, query_target: QueryTarget) -> Self { + // self.query_target = query_target; + // self + // } + + /// Change the timeout to be used for queries (history, retransmission). + #[zenoh_macros::unstable] + #[inline] + pub fn query_timeout(mut self, query_timeout: Duration) -> Self { + self.query_timeout = query_timeout; + self + } + + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + #[zenoh_macros::unstable] + #[inline] + pub fn history(mut self) -> Self { + // TODO take HistoryConf as parameter + self.history = true; + self + } + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + #[zenoh_macros::unstable] + #[inline] + pub fn late_joiner(mut self) -> Self { + self.liveliness = true; + self + } + + fn with_static_keys(self) -> AdvancedSubscriberBuilder<'a, 'static, Handler> { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr.map(|s| s.into_owned()), + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + history: self.history, + liveliness: self.liveliness, + handler: self.handler, + } + } +} + +impl Resolvable for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + AdvancedSubscriber::new(self.with_static_keys()) + } +} + +impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +#[zenoh_macros::unstable] +struct State { + global_pending_queries: u64, + sequenced_states: HashMap>, + timestamped_states: HashMap>, +} + +#[zenoh_macros::unstable] +struct SourceState { + last_delivered: Option, + pending_queries: u64, + pending_samples: HashMap, +} + +#[zenoh_macros::unstable] +pub struct AdvancedSubscriber { + _subscriber: Subscriber<()>, + receiver: Receiver, + _liveliness_subscriber: Option>, +} + +#[zenoh_macros::unstable] +impl std::ops::Deref for AdvancedSubscriber { + type Target = Receiver; + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +#[zenoh_macros::unstable] +impl std::ops::DerefMut for AdvancedSubscriber { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} + +#[zenoh_macros::unstable] +fn handle_sample(states: &mut State, sample: Sample, callback: &Callback) -> bool { + if let (Some(source_id), Some(source_sn)) = ( + sample.source_info().source_id(), + sample.source_info().source_sn(), + ) { + let entry = states.sequenced_states.entry(*source_id); + let new = matches!(&entry, Entry::Vacant(_)); + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + if states.global_pending_queries != 0 { + state.pending_samples.insert(source_sn, sample); + } else if state.last_delivered.is_some() && source_sn != state.last_delivered.unwrap() + 1 { + if source_sn > state.last_delivered.unwrap() { + state.pending_samples.insert(source_sn, sample); + } + } else { + callback.call(sample); + let mut last_seq_num = source_sn; + state.last_delivered = Some(last_seq_num); + while let Some(s) = state.pending_samples.remove(&(last_seq_num + 1)) { + callback.call(s); + last_seq_num += 1; + state.last_delivered = Some(last_seq_num); + } + } + new + } else if let Some(timestamp) = sample.timestamp() { + let entry = states.timestamped_states.entry(*timestamp.get_id()); + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + if state.last_delivered.map(|t| t < *timestamp).unwrap_or(true) { + if states.global_pending_queries == 0 && state.pending_queries == 0 { + state.last_delivered = Some(*timestamp); + callback.call(sample); + } else { + state.pending_samples.entry(*timestamp).or_insert(sample); + } + } + false + } else { + callback.call(sample); + false + } +} + +#[zenoh_macros::unstable] +fn seq_num_range(start: Option, end: Option) -> String { + match (start, end) { + (Some(start), Some(end)) => format!("_sn={}..{}", start, end), + (Some(start), None) => format!("_sn={}..", start), + (None, Some(end)) => format!("_sn=..{}", end), + (None, None) => "_sn=..".to_string(), + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct PeriodicQuery { + source_id: Option, + statesref: Arc>, + key_expr: KeyExpr<'static>, + session: Session, + query_target: QueryTarget, + query_timeout: Duration, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl PeriodicQuery { + fn with_source_id(mut self, source_id: EntityGlobalId) -> Self { + self.source_id = Some(source_id); + self + } +} + +#[zenoh_macros::unstable] +#[async_trait] +impl Timed for PeriodicQuery { + async fn run(&mut self) { + let mut lock = zlock!(self.statesref); + let states = &mut *lock; + if let Some(source_id) = &self.source_id { + if let Some(state) = states.sequenced_states.get_mut(source_id) { + state.pending_queries += 1; + let query_expr = KE_PREFIX + / &source_id.zid().into_keyexpr() + / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() + / &self.key_expr; + let seq_num_range = seq_num_range(Some(state.last_delivered.unwrap() + 1), None); + drop(lock); + let handler = SequencedRepliesHandler { + source_id: *source_id, + statesref: self.statesref.clone(), + callback: self.callback.clone(), + }; + let _ = self + .session + .get(Selector::from((query_expr, seq_num_range))) + .callback({ + let key_expr = self.key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(self.query_target) + .timeout(self.query_timeout) + .wait(); + } + } + } +} + +#[zenoh_macros::unstable] +impl AdvancedSubscriber { + fn new(conf: AdvancedSubscriberBuilder<'_, '_, H>) -> ZResult + where + H: IntoHandler + Send, + { + let statesref = Arc::new(Mutex::new(State { + sequenced_states: HashMap::new(), + timestamped_states: HashMap::new(), + global_pending_queries: if conf.history { 1 } else { 0 }, + })); + let (callback, receiver) = conf.handler.into_handler(); + let key_expr = conf.key_expr?; + let retransmission = conf.retransmission; + let query_target = conf.query_target; + let query_timeout = conf.query_timeout; + let session = conf.session.clone(); + let periodic_query = retransmission.as_ref().and_then(|r| { + r.periodic_queries.map(|period| { + ( + Arc::new(Timer::new(false)), + period, + PeriodicQuery { + source_id: None, + statesref: statesref.clone(), + key_expr: key_expr.clone().into_owned(), + session, + query_target, + query_timeout, + callback: callback.clone(), + }, + ) + }) + }); + + let sub_callback = { + let statesref = statesref.clone(); + let session = conf.session.clone(); + let callback = callback.clone(); + let key_expr = key_expr.clone().into_owned(); + let periodic_query = periodic_query.clone(); + + move |s: Sample| { + let mut lock = zlock!(statesref); + let states = &mut *lock; + let source_id = s.source_info().source_id().cloned(); + let new = handle_sample(states, s, &callback); + + if let Some(source_id) = source_id { + if new { + if let Some((timer, period, query)) = periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(source_id), + )) + } + } + + if let Some(state) = states.sequenced_states.get_mut(&source_id) { + if retransmission.is_some() + && state.pending_queries == 0 + && !state.pending_samples.is_empty() + { + state.pending_queries += 1; + let query_expr = KE_PREFIX + / &source_id.zid().into_keyexpr() + / &KeyExpr::try_from(source_id.eid().to_string()).unwrap() + / &key_expr; + let seq_num_range = + seq_num_range(Some(state.last_delivered.unwrap() + 1), None); + drop(lock); + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((query_expr, seq_num_range))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + } + } + } + }; + + let subscriber = conf + .session + .declare_subscriber(&key_expr) + .callback(sub_callback) + .allowed_origin(conf.origin) + .wait()?; + + if conf.history { + let handler = InitialRepliesHandler { + statesref: statesref.clone(), + callback: callback.clone(), + periodic_query: periodic_query.clone(), + }; + let _ = conf + .session + .get(Selector::from(( + KE_PREFIX / KE_STAR / KE_STAR / &key_expr, + "0..", + ))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + + let liveliness_subscriber = if conf.history && conf.liveliness { + let live_callback = { + let session = conf.session.clone(); + let statesref = statesref.clone(); + let key_expr = key_expr.clone().into_owned(); + move |s: Sample| { + if s.kind() == SampleKind::Put { + if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { + if let Ok(zid) = ZenohId::from_str(parsed.zid().as_str()) { + // TODO : If we already have a state associated to this discovered source + // we should query with the appropriate range to avoid unnecessary retransmissions + if parsed.eid() == KE_UHLC { + let states = &mut *zlock!(statesref); + let entry = states.timestamped_states.entry(ID::from(zid)); + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + state.pending_queries += 1; + + let handler = TimestampedRepliesHandler { + id: ID::from(zid), + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = + &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } else if let Ok(eid) = EntityId::from_str(parsed.eid().as_str()) { + let source_id = EntityGlobalId::new(zid, eid); + let states = &mut *zlock!(statesref); + let entry = states.sequenced_states.entry(source_id); + let new = matches!(&entry, Entry::Vacant(_)); + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: HashMap::new(), + }); + state.pending_queries += 1; + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = + &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + + if new { + if let Some((timer, period, query)) = + periodic_query.as_ref() + { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(source_id), + )) + } + } + } + } else { + let states = &mut *zlock!(statesref); + states.global_pending_queries += 1; + + let handler = InitialRepliesHandler { + statesref: statesref.clone(), + periodic_query: None, + callback: callback.clone(), + }; + let _ = session + .get(Selector::from((s.key_expr(), "0.."))) + .callback({ + let key_expr = key_expr.clone().into_owned(); + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s, &handler.callback); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + } else { + tracing::warn!( + "Received malformed liveliness token key expression: {}", + s.key_expr() + ); + } + } + } + }; + + Some( + conf + .session + .liveliness() + .declare_subscriber(KE_PREFIX / KE_STAR / KE_STAR / &key_expr) + // .declare_subscriber(keformat!(ke_liveliness_all::formatter(), zid = 0, eid = 0, remaining = key_expr).unwrap()) + .history(true) + .callback(live_callback) + .wait()?, + ) + } else { + None + }; + + let reliable_subscriber = AdvancedSubscriber { + _subscriber: subscriber, + receiver, + _liveliness_subscriber: liveliness_subscriber, + }; + + Ok(reliable_subscriber) + } + + /// Close this AdvancedSubscriber + #[inline] + pub fn close(self) -> impl Resolve> { + self._subscriber.undeclare() + } +} + +#[zenoh_macros::unstable] +#[inline] +fn flush_sequenced_source(state: &mut SourceState, callback: &Callback) { + if state.pending_queries == 0 && !state.pending_samples.is_empty() { + if state.last_delivered.is_some() { + tracing::error!("Sample missed: unable to retrieve some missing samples."); + } + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (seq_num, sample) in pending_samples { + if state + .last_delivered + .map(|last| seq_num > last) + .unwrap_or(true) + { + state.last_delivered = Some(seq_num); + callback.call(sample); + } + } + } +} + +#[zenoh_macros::unstable] +#[inline] +fn flush_timestamped_source(state: &mut SourceState, callback: &Callback) { + if state.pending_queries == 0 && !state.pending_samples.is_empty() { + let mut pending_samples = state + .pending_samples + .drain() + .collect::>(); + pending_samples.sort_by_key(|(k, _s)| *k); + for (timestamp, sample) in pending_samples { + if state + .last_delivered + .map(|last| timestamp > last) + .unwrap_or(true) + { + state.last_delivered = Some(timestamp); + callback.call(sample); + } + } + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct InitialRepliesHandler { + statesref: Arc>, + periodic_query: Option<(Arc, Duration, PeriodicQuery)>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for InitialRepliesHandler { + fn drop(&mut self) { + let states = &mut *zlock!(self.statesref); + states.global_pending_queries = states.global_pending_queries.saturating_sub(1); + + if states.global_pending_queries == 0 { + for (source_id, state) in states.sequenced_states.iter_mut() { + flush_sequenced_source(state, &self.callback); + if let Some((timer, period, query)) = self.periodic_query.as_ref() { + timer.add(TimedEvent::periodic( + *period, + query.clone().with_source_id(*source_id), + )) + } + } + for state in states.timestamped_states.values_mut() { + flush_timestamped_source(state, &self.callback); + } + } + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct SequencedRepliesHandler { + source_id: EntityGlobalId, + statesref: Arc>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for SequencedRepliesHandler { + fn drop(&mut self) { + let states = &mut *zlock!(self.statesref); + if let Some(state) = states.sequenced_states.get_mut(&self.source_id) { + state.pending_queries = state.pending_queries.saturating_sub(1); + if states.global_pending_queries == 0 { + flush_sequenced_source(state, &self.callback) + } + } + } +} + +#[zenoh_macros::unstable] +#[derive(Clone)] +struct TimestampedRepliesHandler { + id: ID, + statesref: Arc>, + callback: Callback, +} + +#[zenoh_macros::unstable] +impl Drop for TimestampedRepliesHandler { + fn drop(&mut self) { + let states = &mut *zlock!(self.statesref); + if let Some(state) = states.timestamped_states.get_mut(&self.id) { + state.pending_queries = state.pending_queries.saturating_sub(1); + if states.global_pending_queries == 0 { + flush_timestamped_source(state, &self.callback); + } + } + } +} diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 4bab50804e..7a64ba441e 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -12,10 +12,18 @@ // ZettaScale Zenoh Team, // #[cfg(feature = "unstable")] +mod advanced_cache; +#[cfg(feature = "unstable")] +mod advanced_publisher; +#[cfg(feature = "unstable")] +mod advanced_subscriber; +#[cfg(feature = "unstable")] pub mod group; #[cfg(feature = "unstable")] mod publication_cache; #[cfg(feature = "unstable")] +mod publisher_ext; +#[cfg(feature = "unstable")] mod querying_subscriber; mod serialization; #[cfg(feature = "unstable")] @@ -31,11 +39,15 @@ pub use crate::serialization::{ }; #[cfg(feature = "unstable")] pub use crate::{ + advanced_cache::{AdvancedCache, AdvancedCacheBuilder, HistoryConf}, + advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, Sequencing}, + advanced_subscriber::{AdvancedSubscriber, AdvancedSubscriberBuilder, RetransmissionConf}, publication_cache::{PublicationCache, PublicationCacheBuilder}, + publisher_ext::PublisherBuilderExt, querying_subscriber::{ ExtractSample, FetchingSubscriber, FetchingSubscriberBuilder, KeySpace, LivelinessSpace, QueryingSubscriberBuilder, UserSpace, }, session_ext::SessionExt, - subscriber_ext::{SubscriberBuilderExt, SubscriberForward}, + subscriber_ext::{DataSubscriberBuilderExt, SubscriberBuilderExt, SubscriberForward}, }; diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs new file mode 100644 index 0000000000..2767c96dc2 --- /dev/null +++ b/zenoh-ext/src/publisher_ext.rs @@ -0,0 +1,46 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::pubsub::PublisherBuilder; + +use crate::{advanced_cache::HistoryConf, AdvancedPublisherBuilder}; + +/// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) +#[zenoh_macros::unstable] +pub trait PublisherBuilderExt<'a, 'b> { + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + fn history(self, history: HistoryConf) -> AdvancedPublisherBuilder<'a, 'b>; + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + fn late_joiner(self) -> AdvancedPublisherBuilder<'a, 'b>; +} + +impl<'a, 'b> PublisherBuilderExt<'a, 'b> for PublisherBuilder<'a, 'b> { + /// Allow matching Subscribers to detect lost samples and ask for retransimission. + /// + /// Retransmission can only be achieved if history is enabled. + fn history(self, history: HistoryConf) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder::new(self.session, self.key_expr).history(history) + } + + /// Allow this publisher to be detected by subscribers. + /// + /// This allows Subscribers to retrieve the local history. + fn late_joiner(self) -> AdvancedPublisherBuilder<'a, 'b> { + AdvancedPublisherBuilder::new(self.session, self.key_expr).late_joiner() + } +} diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 9d3c430aaf..8c24ccc9e2 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -15,6 +15,7 @@ use zenoh::{key_expr::KeyExpr, session::Session, Error}; use super::PublicationCacheBuilder; +use crate::advanced_cache::AdvancedCacheBuilder; /// Some extensions to the [`zenoh::Session`](zenoh::Session) #[zenoh_macros::unstable] @@ -44,6 +45,14 @@ pub trait SessionExt { where TryIntoKeyExpr: TryInto>, >>::Error: Into; + + fn declare_advanced_cache<'a, 'b, 'c, TryIntoKeyExpr>( + &'a self, + pub_key_expr: TryIntoKeyExpr, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; } impl SessionExt for Session { @@ -57,4 +66,15 @@ impl SessionExt for Session { { PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) } + + fn declare_advanced_cache<'a, 'b, 'c, TryIntoKeyExpr>( + &'a self, + pub_key_expr: TryIntoKeyExpr, + ) -> AdvancedCacheBuilder<'a, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + AdvancedCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) + } } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index 8d931726c3..e3af8309cf 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -24,7 +24,8 @@ use zenoh::{ }; use crate::{ - querying_subscriber::QueryingSubscriberBuilder, ExtractSample, FetchingSubscriberBuilder, + querying_subscriber::QueryingSubscriberBuilder, AdvancedSubscriberBuilder, ExtractSample, + FetchingSubscriberBuilder, RetransmissionConf, }; /// Allows writing `subscriber.forward(receiver)` instead of `subscriber.stream().map(Ok).forward(publisher)` @@ -123,6 +124,26 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { fn querying(self) -> QueryingSubscriberBuilder<'a, 'b, Self::KeySpace, Handler>; } +/// Some extensions to the [`zenoh::subscriber::SubscriberBuilder`](zenoh::pubsub::SubscriberBuilder) +pub trait DataSubscriberBuilderExt<'a, 'b, Handler> { + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; // TODO take HistoryConf as parameter + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + fn retransmission(self, conf: RetransmissionConf) + -> AdvancedSubscriberBuilder<'a, 'b, Handler>; + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + fn late_joiner(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler>; +} + impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilder<'a, 'b, Handler> { type KeySpace = crate::UserSpace; @@ -227,6 +248,38 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde } } +impl<'a, 'b, Handler> DataSubscriberBuilderExt<'a, 'b, Handler> + for SubscriberBuilder<'a, 'b, Handler> +{ + /// Enable query for historical data. + /// + /// History can only be retransmitted by Publishers that also activate history. + fn history(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .history() + } + + /// Ask for retransmission of detected lost Samples. + /// + /// Retransmission can only be achieved by Publishers that also activate retransmission. + fn retransmission( + self, + conf: RetransmissionConf, + ) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .retransmission(conf) + } + + /// Enable detection of late joiner publishers and query for their historical data. + /// + /// Let joiner detectiopn can only be achieved for Publishers that also activate late_joiner. + /// History can only be retransmitted by Publishers that also activate history. + fn late_joiner(self) -> AdvancedSubscriberBuilder<'a, 'b, Handler> { + AdvancedSubscriberBuilder::new(self.session, self.key_expr, self.origin, self.handler) + .late_joiner() + } +} + impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for LivelinessSubscriberBuilder<'a, 'b, Handler> { diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs new file mode 100644 index 0000000000..b69cce3f73 --- /dev/null +++ b/zenoh-ext/tests/advanced.rs @@ -0,0 +1,462 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::sample::SampleKind; +use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; +use zenoh_ext::{DataSubscriberBuilderExt, HistoryConf, PublisherBuilderExt, RetransmissionConf}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_history() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const PEER1_ENDPOINT: &str = "tcp/localhost:47450"; + + const ADVANCED_HISTORY_KEYEXPR: &str = "test/advanced/history"; + + zenoh_util::init_log_from_env_or("error"); + + let peer1 = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + c.timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (1) ZID: {}", s.zid()); + s + }; + + let publ = ztimeout!(peer1 + .declare_publisher(ADVANCED_HISTORY_KEYEXPR) + .history(HistoryConf::default().sample_depth(3))) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let peer2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer2.declare_subscriber(ADVANCED_HISTORY_KEYEXPR).history()).unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("5")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "5"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(5); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47451"; + + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) + .retransmission(RetransmissionConf::default())) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) + .history(HistoryConf::default().sample_depth(10)) + .retransmission()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + ztimeout!(publ.put("5")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "5"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission_periodic() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(8); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47452"; + + const ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR: &str = "test/advanced/retransmission/periodic"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) + .retransmission( + RetransmissionConf::default().periodic_queries(Some(Duration::from_secs(1))) + )) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) + .history(HistoryConf::default().sample_depth(10)) + .retransmission()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_late_joiner() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(8); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47453"; + + const ADVANCED_LATE_JOINER_KEYEXPR: &str = "test/advanced/late_joiner"; + + zenoh_util::init_log_from_env_or("error"); + + let peer1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + c.timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (1) ZID: {}", s.zid()); + s + }; + + let peer2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer2 + .declare_subscriber(ADVANCED_LATE_JOINER_KEYEXPR) + .history() + .late_joiner()) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(peer1 + .declare_publisher(ADVANCED_LATE_JOINER_KEYEXPR) + .history(HistoryConf::default().sample_depth(10)) + .late_joiner()) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); + + router.close().await.unwrap(); +} diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index b0d0bed0f7..74960e68f7 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -19,6 +19,7 @@ use std::{ use zenoh_core::{Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr; +use zenoh_macros::ke; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; @@ -26,6 +27,7 @@ use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; +use crate as zenoh; use crate::{ api::{ encoding::Encoding, @@ -38,17 +40,15 @@ use crate::{ handlers::Callback, }; -lazy_static::lazy_static!( - static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; - static ref KE_PREFIX: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("@") }; - static ref KE_SESSION: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("session") }; - static ref KE_TRANSPORT_UNICAST: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("transport/unicast") }; - static ref KE_LINK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("link") }; -); +static KE_STARSTAR: &keyexpr = ke!("**"); +static KE_PREFIX: &keyexpr = ke!("@"); +static KE_SESSION: &keyexpr = ke!("session"); +static KE_TRANSPORT_UNICAST: &keyexpr = ke!("transport/unicast"); +static KE_LINK: &keyexpr = ke!("link"); pub(crate) fn init(session: WeakSession) { if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) { - let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR) + let admin_key = KeyExpr::from(KE_PREFIX / own_zid / KE_SESSION / KE_STARSTAR) .to_wire(&session) .to_owned(); @@ -68,7 +68,7 @@ pub(crate) fn on_admin_query(session: &WeakSession, query: Query) { fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) { let zid = peer.zid.to_string(); if let Ok(zid) = keyexpr::new(&zid) { - let key_expr = *KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid; + let key_expr = KE_PREFIX / own_zid / KE_SESSION / KE_TRANSPORT_UNICAST / zid; if query.key_expr().intersects(&key_expr) { match serde_json::to_vec(&peer) { Ok(bytes) => { @@ -82,12 +82,12 @@ pub(crate) fn on_admin_query(session: &WeakSession, query: Query) { let mut s = DefaultHasher::new(); link.hash(&mut s); if let Ok(lid) = keyexpr::new(&s.finish().to_string()) { - let key_expr = *KE_PREFIX + let key_expr = KE_PREFIX / own_zid - / *KE_SESSION - / *KE_TRANSPORT_UNICAST + / KE_SESSION + / KE_TRANSPORT_UNICAST / zid - / *KE_LINK + / KE_LINK / lid; if query.key_expr().intersects(&key_expr) { match serde_json::to_vec(&link) { @@ -156,7 +156,7 @@ impl TransportMulticastEventHandler for Handler { if let Ok(own_zid) = keyexpr::new(&self.session.zid().to_string()) { if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) { let expr = WireExpr::from( - &(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid), + &(KE_PREFIX / own_zid / KE_SESSION / KE_TRANSPORT_UNICAST / zid), ) .to_owned(); let info = DataInfo { diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index ac6565cd27..b9580df03e 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -283,8 +283,16 @@ impl IntoFuture for PublicationBuilder, PublicationBuil #[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] #[derive(Debug)] pub struct PublisherBuilder<'a, 'b> { + #[cfg(feature = "internal")] + pub session: &'a Session, + #[cfg(not(feature = "internal"))] pub(crate) session: &'a Session, + + #[cfg(feature = "internal")] + pub key_expr: ZResult>, + #[cfg(not(feature = "internal"))] pub(crate) key_expr: ZResult>, + pub(crate) encoding: Encoding, pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c5ba24b98f..362fb8ce2f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1968,7 +1968,7 @@ impl SessionInner { timestamp, encoding: encoding.clone().into(), #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), + ext_sinfo: source_info.clone().into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, #[cfg(feature = "shared-memory")] @@ -1980,7 +1980,7 @@ impl SessionInner { SampleKind::Delete => PushBody::Del(Del { timestamp, #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), + ext_sinfo: source_info.clone().into(), #[cfg(not(feature = "unstable"))] ext_sinfo: None, ext_attachment: attachment.clone().map(|a| a.into()), @@ -1999,7 +1999,13 @@ impl SessionInner { kind, encoding: Some(encoding), timestamp, + #[cfg(feature = "unstable")] + source_id: source_info.source_id, + #[cfg(not(feature = "unstable"))] source_id: None, + #[cfg(feature = "unstable")] + source_sn: source_info.source_sn, + #[cfg(not(feature = "unstable"))] source_sn: None, qos: QoS::from(push::ext::QoSType::new( priority.into(), diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 701e13fc94..298ce6f1f6 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -172,7 +172,7 @@ pub mod key_expr { #[zenoh_macros::unstable] pub mod format { pub use zenoh_keyexpr::format::*; - pub use zenoh_macros::{kedefine, keformat, kewrite}; + pub use zenoh_macros::{ke, kedefine, keformat, kewrite}; pub mod macro_support { pub use zenoh_keyexpr::format::macro_support::*; }