Skip to content

Commit

Permalink
Update zenoh
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Sep 12, 2024
1 parent 4da5247 commit f7eae53
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 47 deletions.
54 changes: 27 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion zenoh-bridge-dds/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use clap::{App, Arg};
use zenoh::{
config::{Config, ModeDependentValue},
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
prelude::*,
session::ZenohId,
};
use zenoh_plugin_dds::DDSPlugin;
Expand Down
3 changes: 1 addition & 2 deletions zenoh-plugin-dds/src/dds_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ use tracing::{debug, error, warn};
use zenoh::{
bytes::ZBytes,
key_expr::{KeyExpr, OwnedKeyExpr},
prelude::*,
qos::CongestionControl,
Session,
Session, Wait,
};

const MAX_SAMPLES: usize = 32;
Expand Down
9 changes: 4 additions & 5 deletions zenoh-plugin-dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ use zenoh::{
},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
liveliness::LivelinessToken,
prelude::*,
qos::CongestionControl,
query::{ConsolidationMode, Query, QueryTarget, Queryable, Selector},
sample::{Locality, Sample, SampleKind},
Result as ZResult, Session,
Result as ZResult, Session, Wait,
};
use zenoh_ext::{SessionExt, SubscriberBuilderExt};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
Expand Down Expand Up @@ -300,7 +299,7 @@ pub(crate) struct DdsPluginRuntime<'a> {
// Note: &'a Arc<Session> here to keep the ownership of Session outside this struct
// and be able to store the publishers/subscribers it creates in this same struct.
zsession: &'a Arc<Session>,
_member: LivelinessToken<'a>,
_member: LivelinessToken,
dp: dds_entity_t,
// maps of all discovered DDS entities (indexed by DDS key)
discovered_participants: HashMap<String, DdsParticipant>,
Expand Down Expand Up @@ -786,7 +785,7 @@ impl<'a> DdsPluginRuntime<'a> {
group_subscriber: &Receiver<Sample>,
dds_disco_rcv: &Receiver<DiscoveryEvent>,
admin_keyexpr_prefix: OwnedKeyExpr,
admin_queryable: &Queryable<'_, flume::Receiver<Query>>,
admin_queryable: &Queryable<flume::Receiver<Query>>,
) {
debug!(r#"Run in "local discovery" mode"#);

Expand Down Expand Up @@ -979,7 +978,7 @@ impl<'a> DdsPluginRuntime<'a> {
group_subscriber: &Receiver<Sample>,
dds_disco_rcv: &Receiver<DiscoveryEvent>,
admin_keyexpr_prefix: OwnedKeyExpr,
admin_queryable: &Queryable<'_, flume::Receiver<Query>>,
admin_queryable: &Queryable<flume::Receiver<Query>>,
) {
debug!(r#"Run in "forward discovery" mode"#);

Expand Down
10 changes: 7 additions & 3 deletions zenoh-plugin-dds/src/route_dds_zenoh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use cyclors::{
use serde::Serialize;
use zenoh::{
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
prelude::*,
qos::CongestionControl,
sample::Locality,
};
Expand All @@ -32,7 +31,7 @@ use crate::{dds_mgt::*, qos_helpers::*, DdsPluginRuntime, KE_PREFIX_PUB_CACHE};

enum ZPublisher<'a> {
Publisher(KeyExpr<'a>),
PublicationCache(PublicationCache<'a>),
PublicationCache(PublicationCache),
}

impl ZPublisher<'_> {
Expand Down Expand Up @@ -157,7 +156,12 @@ impl RouteDDSZenoh<'_> {
})?;
ZPublisher::PublicationCache(pub_cache)
} else {
if let Err(e) = plugin.zsession.declare_publisher(declared_ke.clone()).await {
if let Err(e) = plugin
.zsession
.declare_publisher(declared_ke.clone())
.reliability(zenoh::pubsub::Reliability::Reliable)
.await
{
tracing::warn!(
"Failed to declare publisher for key {} (rid={}): {}",
ke,
Expand Down
15 changes: 6 additions & 9 deletions zenoh-plugin-dds/src/route_zenoh_dds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use cyclors::{
use serde::{Serialize, Serializer};
use zenoh::{
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
prelude::*,
pubsub::Subscriber,
query::{ConsolidationMode, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample},
Session,
Session, Wait,
};
use zenoh_ext::{FetchingSubscriber, SubscriberBuilderExt};

Expand All @@ -46,12 +45,12 @@ use crate::{
type AtomicDDSEntity = AtomicI32;
const DDS_ENTITY_NULL: dds_entity_t = 0;

enum ZSubscriber<'a> {
Subscriber(Subscriber<'a, ()>),
FetchingSubscriber(FetchingSubscriber<'a, ()>),
enum ZSubscriber {
Subscriber(Subscriber<()>),
FetchingSubscriber(FetchingSubscriber<()>),
}

impl ZSubscriber<'_> {
impl ZSubscriber {
fn key_expr(&self) -> &KeyExpr<'static> {
match self {
ZSubscriber::Subscriber(s) => s.key_expr(),
Expand All @@ -69,7 +68,7 @@ pub(crate) struct RouteZenohDDS<'a> {
zenoh_session: &'a Arc<Session>,
// the zenoh subscriber receiving data to be re-published by the DDS Writer
#[serde(skip)]
zenoh_subscriber: ZSubscriber<'a>,
zenoh_subscriber: ZSubscriber,
// the DDS topic name for re-publication
topic_name: String,
// the DDS topic type
Expand Down Expand Up @@ -188,7 +187,6 @@ impl RouteZenohDDS<'_> {
.declare_subscriber(ke.clone())
.callback(subscriber_callback)
.allowed_origin(Locality::Remote) // Allow only remote publications to avoid loops
.reliable()
.querying()
.query_timeout(plugin.config.queries_timeout)
.query_selector(query_selector)
Expand All @@ -206,7 +204,6 @@ impl RouteZenohDDS<'_> {
.declare_subscriber(ke.clone())
.callback(subscriber_callback)
.allowed_origin(Locality::Remote) // Allow only remote publications to avoid loops
.reliable()
.await
.map_err(|e| {
format!(
Expand Down

0 comments on commit f7eae53

Please sign in to comment.