Skip to content

Commit

Permalink
added matching status and matching listener support
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Oct 30, 2024
1 parent b5fca3e commit 605b7bc
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 8 deletions.
17 changes: 13 additions & 4 deletions examples/getargs.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
struct CmdArg {
const char *name;
const char **value;
bool only_presence = false;
};

inline void getargs(int argc, char **argv, const std::vector<CmdArg> &required,
Expand All @@ -43,8 +44,12 @@ inline void getargs(int argc, char **argv, const std::vector<CmdArg> &required,
if (*p.value) defaults = true;
}
for (auto &p : named) {
std::cout << " " << p.first << " [" << p.second.name << "]";
if (*p.second.value) defaults = true;
if (p.second.only_presence) {
std::cout << " [" << p.first << "]";
} else {
std::cout << " " << p.first << " [" << p.second.name << "]";
if (*p.second.value) defaults = true;
}
}
std::cout << std::endl;

Expand All @@ -56,7 +61,7 @@ inline void getargs(int argc, char **argv, const std::vector<CmdArg> &required,
}
}
for (auto &p : named) {
if (*p.second.value) {
if (!p.second.only_presence && *p.second.value) {
std::cout << " " << p.second.name << " = " << *p.second.value << std::endl;
}
}
Expand All @@ -76,7 +81,11 @@ inline void getargs(int argc, char **argv, const std::vector<CmdArg> &required,
}
auto param = named.find(argv[i]);
if (param != named.end()) {
destination = param->second.value;
if (param->second.only_presence) {
*param->second.value = "true";
} else {
destination = param->second.value;
}
} else if (position < positioned.size()) {
*positioned[position].value = argv[i];
++position;
Expand Down
21 changes: 20 additions & 1 deletion examples/universal/z_pub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-pico-pub";
int _main(int argc, char **argv) {
const char *keyexpr = default_keyexpr;
const char *value = default_value;
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}});
const char *add_matching_listener = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
#endif
);

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));
Expand All @@ -48,6 +54,19 @@ int _main(int argc, char **argv) {
auto pub = session.declare_publisher(KeyExpr(keyexpr));

std::cout << "Publisher on '" << keyexpr << "' declared" << std::endl;
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
} else {
std::cout << "No subscribers matched" << std::endl;
}
},
closures::none);
}
#endif

std::cout << "Press CTRL-C to quit..." << std::endl;
for (int idx = 0; idx < std::numeric_limits<int>::max(); ++idx) {
Expand Down
22 changes: 21 additions & 1 deletion examples/zenohc/z_pub_shm.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,34 @@ const char *default_keyexpr = "demo/example/zenoh-cpp-zenoh-c-pub";
int _main(int argc, char **argv) {
const char *keyexpr = default_keyexpr;
const char *value = default_value;
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}});
const char *add_matching_listener = "false";
Config config = parse_args(argc, argv, {}, {{"key_expression", &keyexpr}, {"payload_value", &value}}
#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
,
{{"--add-matching-listener", {CmdArg{"", &add_matching_listener, true}}}}
#endif
);

std::cout << "Opening session..." << std::endl;
auto session = Session::open(std::move(config));

std::cout << "Declaring Publisher on '" << keyexpr << "'..." << std::endl;
auto pub = session.declare_publisher(KeyExpr(keyexpr));

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
if (std::string(add_matching_listener) == "true") {
pub.declare_background_matching_listener(
[](const Publisher::MatchingStatus &s) {
if (s.matching) {
std::cout << "Subscriber matched" << std::endl;
} else {
std::cout << "No subscribers matched" << std::endl;
}
},
closures::none);
}
#endif

std::cout << "Publisher on '" << keyexpr << "' declared" << std::endl;

std::cout << "Preparing SHM Provider...\n";
Expand Down
3 changes: 1 addition & 2 deletions include/zenoh/api/base.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Owned {
protected:
typedef ZC_OWNED_TYPE OwnedType;

public:
protected:
/// Move constructor.
Owned(Owned&& v) : Owned(&v._0) {}
/// Move assignment.
Expand All @@ -80,7 +80,6 @@ class Owned {
/// Destructor drops owned value using z_drop from zenoh API
~Owned() { ::z_drop(::z_move(_0)); }

protected:
OwnedType _0;

explicit Owned(OwnedType* pv) {
Expand Down
120 changes: 120 additions & 0 deletions include/zenoh/api/publisher.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#if defined(ZENOHCXX_ZENOHC) || Z_FEATURE_PUBLICATION == 1

#include "../detail/closures_concrete.hxx"
#include "base.hxx"
#include "bytes.hxx"
#include "encoding.hxx"
Expand All @@ -29,6 +30,13 @@

namespace zenoh {

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
namespace detail::closures {
extern "C" {
void _zenoh_on_status_change_call(const ::zc_matching_status_t* status, void* context);
}
} // namespace detail::closures
#endif
class Session;

/// A Zenoh publisher. Constructed by ``Session::declare_publisher`` method.
Expand Down Expand Up @@ -126,6 +134,118 @@ class Publisher : public Owned<::z_owned_publisher_t> {
return interop::into_copyable_cpp_obj<EntityGlobalId>(::z_publisher_id(interop::as_loaned_c_ptr(*this)));
}
#endif

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief A struct that indicates if there exist Subscribers matching the Publisher's key expression.
/// @note Zenoh-c only.
struct MatchingStatus {
/// True if there exist Subscribers matching the Publisher's key expression, false otherwise.
bool matching;
};

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief A Zenoh matching listener.
///
/// A listener that sends notifications when the [`MatchingStatus`] of a publisher changes.
/// Dropping the corresponding publisher, disables the matching listener.
class MatchingListener : public Owned<::zc_owned_matching_listener_t> {
MatchingListener(zenoh::detail::null_object_t) : Owned(nullptr){};
friend struct interop::detail::Converter;
friend class Publisher;
};

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief Construct matching listener, registering a callback for notifying subscribers matching with a given
/// publisher.
///
/// @param on_status_change: the callable that will be called every time the matching status of the publisher
/// changes (If last subscriber, disconnects or when the first subscriber connects).
/// @param on_drop the callable that will be called once publisher is destroyed or undeclared.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @return a ``MatchingListener`` object.
/// @note Zenoh-c only.
template <class C, class D>
[[nodiscard]] MatchingListener declare_matching_listener(C&& on_status_change, D&& on_drop,
ZResult* err = nullptr) const {
static_assert(std::is_invocable_r<void, C, const MatchingStatus&>::value,
"on_status_change should be callable with the following signature: void on_status_change(const "
"zenoh::Publisher::MatchingStatus& status)");
static_assert(std::is_invocable_r<void, D>::value,
"on_drop should be callable with the following signature: void on_drop()");
::zc_owned_closure_matching_status_t c_closure;
using Cval = std::remove_reference_t<C>;
using Dval = std::remove_reference_t<D>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, const MatchingStatus&>;
auto closure = ClosureType::into_context(std::forward<C>(on_status_change), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_status_change_call, detail::closures::_zenoh_on_drop,
closure);
MatchingListener m(zenoh::detail::null_object);
ZResult res = ::zc_publisher_declare_matching_listener(interop::as_loaned_c_ptr(*this),
interop::as_owned_c_ptr(m), ::z_move(c_closure));
__ZENOH_RESULT_CHECK(res, err, "Failed to declare Matching Listener");
return m;
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief Declare matching listener, registering a callback for notifying subscribers matching with a given
/// publisher. The callback will be run in the background until the corresponding publisher is destroyed.
///
/// @param on_status_change: the callable that will be called every time the matching status of the publisher
/// changes (If last subscriber, disconnects or when the first subscriber connects).
/// @param on_drop the callable that will be called once publisher is destroyed or undeclared.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-c only.
template <class C, class D>
void declare_background_matching_listener(C&& on_status_change, D&& on_drop, ZResult* err = nullptr) const {
static_assert(std::is_invocable_r<void, C, const MatchingStatus&>::value,
"on_status_change should be callable with the following signature: void on_status_change(const "
"zenoh::Publisher::MatchingStatus& status)");
static_assert(std::is_invocable_r<void, D>::value,
"on_drop should be callable with the following signature: void on_drop()");
::zc_owned_closure_matching_status_t c_closure;
using Cval = std::remove_reference_t<C>;
using Dval = std::remove_reference_t<D>;
using ClosureType = typename detail::closures::Closure<Cval, Dval, void, const MatchingStatus&>;
auto closure = ClosureType::into_context(std::forward<C>(on_status_change), std::forward<D>(on_drop));
::z_closure(&c_closure, detail::closures::_zenoh_on_status_change_call, detail::closures::_zenoh_on_drop,
closure);
ZResult res =
::zc_publisher_declare_background_matching_listener(interop::as_loaned_c_ptr(*this), ::z_move(c_closure));
__ZENOH_RESULT_CHECK(res, err, "Failed to declare background Matching Listener");
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future
/// release.
/// @brief Gets publisher matching status - i.e. if there are any subscribers matching its key expression.
/// @param err if not null, the result code will be written to this location, otherwise ZException exception will be
/// thrown in case of error.
/// @note Zenoh-c only.
MatchingStatus get_matching_status(ZResult* err = nullptr) const {
::zc_matching_status_t m;
ZResult res = ::zc_publisher_get_matching_status(interop::as_loaned_c_ptr(*this), &m);
__ZENOH_RESULT_CHECK(res, err, "Failed to get matching status");
return {m.matching};
}
#endif
};

#if defined(ZENOHCXX_ZENOHC) && defined(Z_FEATURE_UNSTABLE_API)
namespace detail::closures {
extern "C" {
inline void _zenoh_on_status_change_call(const ::zc_matching_status_t* status, void* context) {
IClosure<void, const Publisher::MatchingStatus&>::call_from_context(context,
Publisher::MatchingStatus{status->matching});
}
}
} // namespace detail::closures
#endif

} // namespace zenoh
#endif

0 comments on commit 605b7bc

Please sign in to comment.