Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advanced pub/sub #1582

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2cc7f12
Expose and use ke macro
OlivierHecart Nov 5, 2024
163d3d5
Fix SourceInfo publication
OlivierHecart Nov 8, 2024
43a5d3c
Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber
OlivierHecart Nov 8, 2024
9f076a7
Fix doctests
OlivierHecart Nov 8, 2024
3935974
Fix doc warnings
OlivierHecart Nov 8, 2024
405d76e
Remove debug trace
OlivierHecart Nov 13, 2024
71b22d3
Add history test
OlivierHecart Nov 13, 2024
af1b2a2
Fix periodic queries
OlivierHecart Nov 13, 2024
bd32356
Remove debug trace
OlivierHecart Nov 13, 2024
4e4bbb6
Lower test debug level
OlivierHecart Nov 13, 2024
f36c890
Add retransmission tests
OlivierHecart Nov 13, 2024
23f145d
Liveliness sub callback shoud increase pending queries counter
OlivierHecart Nov 13, 2024
de396a4
Liveliness sub callback shoud spawn periodic queries when enbaled
OlivierHecart Nov 13, 2024
ff24135
Add late_joiner test
OlivierHecart Nov 13, 2024
975aba4
Only treat pending samples when there are no more pending queries
OlivierHecart Nov 14, 2024
5d9ac8d
Apply proper sequencing for history
OlivierHecart Nov 14, 2024
2305b41
Improve AdvancedSubscriber
OlivierHecart Nov 14, 2024
883885b
Code reorg
OlivierHecart Nov 14, 2024
3201700
Code reorg
OlivierHecart Nov 14, 2024
ded789c
Fix deduplication
OlivierHecart Nov 14, 2024
e033553
Subscribe to liveliness tokens with history
OlivierHecart Nov 15, 2024
acaf341
Update builders
OlivierHecart Nov 15, 2024
ef63165
Add examples
OlivierHecart Nov 15, 2024
788a8e5
Fix rustdoc
OlivierHecart Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ pub struct EntityGlobalId(EntityGlobalIdProto);
pub type EntityId = u32;

impl EntityGlobalId {
/// Creates a new EntityGlobalId.
#[zenoh_macros::internal]
pub fn new(zid: ZenohId, eid: EntityId) -> Self {
EntityGlobalIdProto {
zid: zid.into(),
eid,
}
.into()
}

/// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to.
pub fn zid(&self) -> ZenohId {
self.0.zid.into()
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub fn ke(tokens: TokenStream) -> TokenStream {
let value: LitStr = syn::parse(tokens).unwrap();
let ke = value.value();
match zenoh_keyexpr::keyexpr::new(&ke) {
Ok(_) => quote!(unsafe {::zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Ok(_) => quote!(unsafe { zenoh::key_expr::keyexpr::from_str_unchecked(#ke)}).into(),
Err(e) => panic!("{}", e),
}
}
Expand Down
2 changes: 2 additions & 0 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ tokio = { workspace = true, features = [
"macros",
"io-std",
] }
async-trait = { workspace = true }
bincode = { workspace = true }
zenoh-util = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["default"] }
leb128 = { workspace = true }
uhlc = { workspace = true }
zenoh = { workspace = true, default-features = false }
zenoh-macros = { workspace = true }

Expand Down
27 changes: 27 additions & 0 deletions zenoh-ext/examples/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,33 @@

## Examples description

### z_advanced_pub

Declares an AdvancedPublisher with a given key expression.
All the publications are locally cached (with a configurable history size - i.e. max number of cached data per resource, default 1). The cache can be queried by an AdvancedSubscriber for hsitory
or retransmission.

Typical usage:
```bash
z_advanced_pub
```
or
```bash
z_advanced_pub --history 10
```

### z_advanced_sub

Declares an AdvancedSubscriber with a given key expression.
The AdvancedSubscriber can query for AdvancedPublisher history at startup
and on late joiner publisher detection. The AdvancedSubscriber can detect
sample loss and ask for retransmission.

Typical usage:
```bash
z_advanced_sub
```

### z_pub_cache

Declares a publisher and an associated publication cache with a given key expression.
Expand Down
73 changes: 73 additions & 0 deletions zenoh-ext/examples/examples/z_advanced_pub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::time::Duration;

//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use clap::{arg, Parser};
use zenoh::{config::Config, key_expr::KeyExpr};
use zenoh_config::ModeDependentValue;
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr, value, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring AdvancedPublisher on {}", &key_expr);
let publisher = session
.declare_publisher(&key_expr)
.history(HistoryConf::default().sample_depth(history))
.retransmission()
.late_joiner()
.await
.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Put Data ('{}': '{}')", &key_expr, buf);
publisher.put(buf).await.unwrap();
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/zenoh-rs-pub")]
/// The key expression to publish.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Pub from Rust!")]
/// The value to reply to queries.
value: String,
#[arg(short = 'i', long, default_value = "1")]
/// The number of publications to keep in cache.
history: usize,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String, usize) {
let args = Args::parse();
let mut config: Config = args.common.into();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
(config, args.key, args.value, args.history)
}
69 changes: 69 additions & 0 deletions zenoh-ext/examples/examples/z_advanced_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::time::Duration;

use clap::{arg, Parser};
use zenoh::config::Config;
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh::init_log_from_env_or("error");

let (config, key_expr) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Declaring AdvancedSubscriber on {}", key_expr,);
let subscriber = session
.declare_subscriber(key_expr)
.history()
.retransmission(
RetransmissionConf::default().periodic_queries(Some(Duration::from_secs(1))),
)
.late_joiner()
.await
.unwrap();

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
let payload = sample
.payload()
.try_to_string()
.unwrap_or_else(|e| e.to_string().into());
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The key expression to subscribe onto.
key: String,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, String) {
let args = Args::parse();
(args.common.into(), args.key)
}
Loading