Skip to content

Commit

Permalink
Merge pull request #57 from eclipse-zenoh/closure_api_update
Browse files Browse the repository at this point in the history
API update: closures accepts references instead of pointers
  • Loading branch information
milyin authored Jul 21, 2023
2 parents 36bd8ae + cead526 commit 08294b9
Show file tree
Hide file tree
Showing 17 changed files with 716 additions and 461 deletions.
36 changes: 17 additions & 19 deletions examples/universal/z_get.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,23 @@ int _main(int argc, char **argv) {
std::condition_variable done_signal;
bool done = false;

session.get(
keyexpr, "",
[&m, &done, &done_signal](Reply reply) {
if (reply.check()) {
auto result = reply.get();
if (auto sample = std::get_if<Sample>(&result)) {
std::cout << "Received ('" << sample->get_keyexpr().as_string_view() << "' : '"
<< sample->get_payload().as_string_view() << "')\n";
} else if (auto error = std::get_if<ErrorMessage>(&result)) {
std::cout << "Received an error :" << error->as_string_view() << "\n";
}
} else {
// No more replies
std::lock_guard lock(m);
done = true;
done_signal.notify_all();
}
},
opts);
auto on_reply = [](Reply &&reply) {
auto result = reply.get();
if (auto sample = std::get_if<Sample>(&result)) {
std::cout << "Received ('" << sample->get_keyexpr().as_string_view() << "' : '"
<< sample->get_payload().as_string_view() << "')\n";
} else if (auto error = std::get_if<ErrorMessage>(&result)) {
std::cout << "Received an error :" << error->as_string_view() << "\n";
}
};

auto on_done = [&m, &done, &done_signal]() {
std::lock_guard lock(m);
done = true;
done_signal.notify_all();
};

session.get(keyexpr, "", {on_reply, on_done}, opts);

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
6 changes: 3 additions & 3 deletions examples/universal/z_info.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "zenoh.hxx"
using namespace zenoh;

void print_zid(const Id* id) {
if (id) std::cout << id << std::endl;
void print_zid(const Id& id) {
std::cout << id << std::endl;
}

int _main(int argc, char** argv) {
Expand Down Expand Up @@ -48,7 +48,7 @@ int _main(int argc, char** argv) {

auto self_id = session.info_zid();
printf("own id: ");
print_zid(&self_id);
print_zid(self_id);

printf("routers ids:\n");
session.info_routers_zid(print_zid);
Expand Down
6 changes: 2 additions & 4 deletions examples/universal/z_ping.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ int _main(int argc, char** argv) {
auto session = std::get<Session>(open(std::move(config)));

auto sub = std::get<Subscriber>(
session.declare_subscriber("test/pong", std::move([&condvar](const Sample* sample) mutable {
if (sample) {
condvar.notify_one();
}
session.declare_subscriber("test/pong", std::move([&condvar](const Sample&) mutable {
condvar.notify_one();
})));
auto pub = std::get<Publisher>(session.declare_publisher("test/ping"));
std::vector<char> data(args.size);
Expand Down
6 changes: 2 additions & 4 deletions examples/universal/z_pong.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ int _main(int argc, char **argv) {

auto pub = std::get<Publisher>(session.declare_publisher("test/pong"));
auto sub = std::get<Subscriber>(
session.declare_subscriber("test/ping", std::move([pub = std::move(pub)](const Sample *sample) mutable {
if (sample) {
pub.put(sample->get_payload());
}
session.declare_subscriber("test/ping", std::move([pub = std::move(pub)](const Sample& sample) mutable {
pub.put(sample.get_payload());
})));
std::cout << "Pong ready, press any key to quit\n";
std::getchar();
Expand Down
14 changes: 5 additions & 9 deletions examples/universal/z_pull.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ using namespace zenoh;

const char *kind_to_str(SampleKind kind);

void data_handler(const Sample *sample) {
if (sample) {
std::cout << ">> [Subscriber] Received " << kind_to_str(sample->get_kind()) << " ('"
<< sample->get_keyexpr().as_string_view() << "' : '" << sample->get_payload().as_string_view()
<< "')\n";
}
void data_handler(const Sample& sample) {
std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('"
<< sample.get_keyexpr().as_string_view() << "' : '" << sample.get_payload().as_string_view()
<< "')\n";
}

int _main(int argc, char **argv) {
Expand Down Expand Up @@ -62,10 +60,8 @@ int _main(int argc, char **argv) {
printf("Opening session...\n");
auto session = std::get<Session>(open(std::move(config)));

auto callback = ClosureSample(data_handler);

printf("Declaring Subscriber on '%s'...\n", expr);
auto subscriber = std::get<PullSubscriber>(session.declare_pull_subscriber(keyexpr, std::move(callback)));
auto subscriber = std::get<PullSubscriber>(session.declare_pull_subscriber(keyexpr, data_handler));

printf("Press <enter> to pull data...\n");
char c = 0;
Expand Down
28 changes: 14 additions & 14 deletions examples/universal/z_queryable.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,20 @@ int _main(int argc, char **argv) {

printf("Declaring Queryable on '%s'...\n", expr);

auto queryable = std::get<Queryable>(session.declare_queryable(keyexpr, [](const Query *query) {
if (query) {
auto keystr = query->get_keyexpr();
auto pred = query->get_parameters();
auto query_value = query->get_value();
std::cout << ">> [Queryable ] Received Query '" << keystr.as_string_view() << "?" << pred.as_string_view()
<< "' value = '" << query_value.as_string_view() << "'\n";
QueryReplyOptions options;
options.set_encoding(Encoding(Z_ENCODING_PREFIX_TEXT_PLAIN));
query->reply(expr, value, options);
} else {
std::cout << "Destroying queryable\n";
}
}));
auto on_query = [](const Query &query) {
auto keystr = query.get_keyexpr();
auto pred = query.get_parameters();
auto query_value = query.get_value();
std::cout << ">> [Queryable ] Received Query '" << keystr.as_string_view() << "?" << pred.as_string_view()
<< "' value = '" << query_value.as_string_view() << "'\n";
QueryReplyOptions options;
options.set_encoding(Encoding(Z_ENCODING_PREFIX_TEXT_PLAIN));
query.reply(expr, value, options);
};

auto on_drop_queryable = []() { std::cout << "Destroying queryable\n"; };

auto queryable = std::get<Queryable>(session.declare_queryable(keyexpr, {on_query, on_drop_queryable}));

printf("Enter 'q' to quit...\n");
char c = 0;
Expand Down
31 changes: 16 additions & 15 deletions examples/universal/z_scout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,22 @@ int _main(int argc, char **argv) {
std::condition_variable done_signal;
bool done = false;

scout(std::move(config.create_scouting_config()), [&m, &done, &done_signal, &count](Hello hello) {
if (hello.check()) {
printhello(hello);
std::cout << std::endl;
count++;
} else {
std::cout << "Dropping scout\n";
if (!count) std::cout << "Did not find any zenoh process.\n";
std::lock_guard lock(m);
done = true;
done_signal.notify_all();
}
});

std::cout << "Scout started" << std::endl;
auto on_hello = [&m, &done, &done_signal, &count](Hello hello) {
printhello(hello);
std::cout << std::endl;
count++;
};

auto on_end_scouting = [&m, &done, &done_signal, &count]() {
if (!count) std::cout << "Did not find any zenoh process.\n";
std::lock_guard lock(m);
done = true;
done_signal.notify_all();
};

std::cout << "Scout starting" << std::endl;

scout(std::move(config.create_scouting_config()), {on_hello, on_end_scouting});

std::unique_lock lock(m);
done_signal.wait(lock, [&done] { return done; });
Expand Down
10 changes: 4 additions & 6 deletions examples/universal/z_sub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ using namespace zenoh;

const char *kind_to_str(z_sample_kind_t kind);

void data_handler(const Sample *sample) {
if (sample) {
std::cout << ">> [Subscriber] Received " << kind_to_str(sample->get_kind()) << " ('"
<< sample->get_keyexpr().as_string_view() << "' : '" << sample->get_payload().as_string_view()
<< "')\n";
}
void data_handler(const Sample& sample) {
std::cout << ">> [Subscriber] Received " << kind_to_str(sample.get_kind()) << " ('"
<< sample.get_keyexpr().as_string_view() << "' : '" << sample.get_payload().as_string_view()
<< "')\n";
}

int _main(int argc, char **argv) {
Expand Down
32 changes: 15 additions & 17 deletions examples/universal/z_sub_thr.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,25 @@ struct Stats {
volatile clock_t end = 0;
volatile clock_t first_start = 0;

void operator()(const Sample *sample) {
if (sample) {
if (count == 0) {
start = clock();
if (!first_start) {
first_start = start;
}
count++;
} else if (count < N) {
count++;
} else {
stop = clock();
finished_rounds++;
printf("%f msg/s\n", N * (double)CLOCKS_PER_SEC / (double)(stop - start));
count = 0;
void operator()(const Sample &sample) {
if (count == 0) {
start = clock();
if (!first_start) {
first_start = start;
}
count++;
} else if (count < N) {
count++;
} else {
end = clock();
stop = clock();
finished_rounds++;
printf("%f msg/s\n", N * (double)CLOCKS_PER_SEC / (double)(stop - start));
count = 0;
}
}

void operator()() { end = clock(); }

void print() const {
const double elapsed = (double)(end - first_start) / (double)CLOCKS_PER_SEC;
const unsigned long sent_messages = N * finished_rounds + count;
Expand Down Expand Up @@ -83,7 +81,7 @@ int _main(int argc, char **argv) {
KeyExpr keyexpr = session.declare_keyexpr("test/thr");

Stats stats;
auto subscriber = std::get<Subscriber>(session.declare_subscriber(keyexpr, stats));
auto subscriber = std::get<Subscriber>(session.declare_subscriber(keyexpr, {stats, stats}));
char c = 0;
while (c != 'q') {
c = fgetc(stdin);
Expand Down
6 changes: 3 additions & 3 deletions include/zenohcxx/api.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -815,17 +815,17 @@ typedef ClosureMoveParam<::z_owned_closure_reply_t, ::z_owned_reply_t, z::Reply>
//
// Represents the query closure.
//
typedef ClosureConstPtrParam<::z_owned_closure_query_t, ::z_query_t, z::Query> ClosureQuery;
typedef ClosureConstRefParam<::z_owned_closure_query_t, ::z_query_t, z::Query> ClosureQuery;

//
// Represents the sample closure.
//
typedef ClosureConstPtrParam<::z_owned_closure_sample_t, ::z_sample_t, z::Sample> ClosureSample;
typedef ClosureConstRefParam<::z_owned_closure_sample_t, ::z_sample_t, z::Sample> ClosureSample;

//
// Represents the zenoh ID closure.
//
typedef ClosureConstPtrParam<::z_owned_closure_zid_t, ::z_id_t, z::Id> ClosureZid;
typedef ClosureConstRefParam<::z_owned_closure_zid_t, ::z_id_t, z::Id> ClosureZid;

//
// Represents the scouting closure
Expand Down
Loading

0 comments on commit 08294b9

Please sign in to comment.