From 1809191f65824fe10bffa5409222d5dc37755459 Mon Sep 17 00:00:00 2001 From: Serguei Date: Thu, 28 May 2020 00:42:57 -0400 Subject: [PATCH] update test.cxx --- src/messaging.cpp | 14 +++++------ src/messaging.h | 13 ++++++---- src/test.cxx | 61 ++++++++++------------------------------------- 3 files changed, 28 insertions(+), 60 deletions(-) diff --git a/src/messaging.cpp b/src/messaging.cpp index a88c58f..3bdc67a 100644 --- a/src/messaging.cpp +++ b/src/messaging.cpp @@ -12,7 +12,7 @@ namespace msg { static zmq::context_t ctx(0); - void BusProxy(zmq::context_t* ctx) + void BusProxy(zmq::context_t* ctx, on_ready_fn on_ready) { // create sub/pub zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); @@ -23,12 +23,7 @@ namespace msg publisher.bind("inproc://bus-out"); // connect to creator and tell them we're ready - zmq::socket_t push(*ctx, zmq::socket_type::push); - push.connect("inproc://bus-status"); - push.send(zmq::buffer(msg::BUS_CREATED)); - - // disconnect - push.close(); + on_ready(); // receive and send repeatedly // TODO: Change to proxy_steerable so we can remotely shut it down @@ -38,6 +33,11 @@ namespace msg subscriber.close(); publisher.close(); } + + std::future launch_bus_proxy(zmq::context_t& ctx) + { + return launch_thread_wait_until_ready(ctx, BusProxy); + } } // diff --git a/src/messaging.h b/src/messaging.h index 42da37b..981e8ff 100644 --- a/src/messaging.h +++ b/src/messaging.h @@ -15,7 +15,8 @@ namespace msg { extern zmq::context_t ctx; - using notifier_thread = std::function)>; + using on_ready_fn = std::function; + using notifier_thread = std::function; const std::string READY = "READY"; const std::string BUS_CREATED = "BUS_CREATED"; @@ -51,17 +52,18 @@ namespace msg return out.str(); } - inline std::future launch_thread_wait_until_ready(zmq::context_t& ctx, zmq::socket_t& listener, notifier_thread thread) + inline std::future launch_thread_wait_until_ready(zmq::context_t& ctx, notifier_thread thread) { // Listen to bus start std::string unique_addr = gen_unique_addr(); zmq::socket_t pull(ctx, zmq::socket_type::pull); pull.bind(unique_addr); - std::function on_complete = [&]() { + on_ready_fn on_complete = [&]() { zmq::socket_t push(ctx, zmq::socket_type::push); push.connect(unique_addr); push.send(zmq::buffer(msg::READY)); + push.close(); }; // Launch @@ -69,13 +71,16 @@ namespace msg // Wait for response std::vector recv_msgs; - auto res = zmq::recv_multipart(listener, std::back_inserter(recv_msgs)); + auto res = zmq::recv_multipart(pull, std::back_inserter(recv_msgs)); assert(res); assert(recv_msgs[0].to_string_view() == msg::READY); + pull.close(); // Done return result; } + + std::future launch_bus_proxy(zmq::context_t& ctx); } // //int main() diff --git a/src/test.cxx b/src/test.cxx index 0b6bc40..8118d89 100644 --- a/src/test.cxx +++ b/src/test.cxx @@ -24,7 +24,7 @@ std::string multi_to_str(const std::vector& resp) return s.str(); } -void MeshGenThread(zmq::context_t* ctx) +void MeshGenThread(zmq::context_t* ctx, msg::on_ready_fn on_ready) { // Connect to bus zmq::socket_t pub(*ctx, zmq::socket_type::pub); @@ -43,9 +43,7 @@ void MeshGenThread(zmq::context_t* ctx) sub.connect("inproc://bus-out"); // Prove you're connected - zmq::socket_t push(*ctx, zmq::socket_type::push); - push.connect("inproc://bus-status"); - push.send(zmq::buffer(msg::CONNECTED_TO_BUS)); + on_ready(); // Wait for START signal before running while (true) @@ -93,7 +91,7 @@ void MeshGenThread(zmq::context_t* ctx) } } -void BusListener(zmq::context_t* ctx) +void BusListener(zmq::context_t* ctx, msg::on_ready_fn on_ready) { int idx = 0; @@ -103,9 +101,7 @@ void BusListener(zmq::context_t* ctx) sock.connect("inproc://bus-out"); // Prove you're connected - zmq::socket_t push(*ctx, zmq::socket_type::push); - push.connect("inproc://bus-status"); - push.send(zmq::buffer(msg::CONNECTED_TO_BUS)); + on_ready(); // Receive messages while (true) @@ -124,7 +120,7 @@ void BusListener(zmq::context_t* ctx) } } -void BusSender(zmq::context_t* ctx) +void BusSender(zmq::context_t* ctx, msg::on_ready_fn on_ready) { int idx = 0; @@ -133,9 +129,7 @@ void BusSender(zmq::context_t* ctx) sock.connect("inproc://bus-in"); // Prove you're connected - zmq::socket_t push(*ctx, zmq::socket_type::push); - push.connect("inproc://bus-status"); - push.send(zmq::buffer(msg::CONNECTED_TO_BUS)); + on_ready(); // Send messages while (true) @@ -168,7 +162,7 @@ void BusSender(zmq::context_t* ctx) namespace { - void BusProxy_old(zmq::context_t* ctx) + void BusProxy(zmq::context_t* ctx, msg::on_ready_fn on_ready) { // create sub/pub zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); @@ -179,12 +173,7 @@ namespace publisher.bind("inproc://bus-out"); // connect to creator and tell them we're ready - zmq::socket_t push(*ctx, zmq::socket_type::push); - push.connect("inproc://bus-status"); - push.send(zmq::buffer(msg::BUS_CREATED)); - - // disconnect - push.close(); + on_ready(); // receive and send repeatedly // TODO: Change to proxy_steerable so we can remotely shut it down @@ -196,39 +185,13 @@ namespace } } - -std::future launch_thread_wait_until_ready(zmq::context_t& ctx, zmq::socket_t& listener, std::function thread) -{ - // Launch - std::future result = std::async(std::launch::async, thread, &ctx); - - // Wait for response - std::vector recv_msgs; - auto res = zmq::recv_multipart(listener, std::back_inserter(recv_msgs)); - assert(res); - - // Just in case - assert(recv_msgs[0].to_string_view() == msg::CONNECTED_TO_BUS); - - return result; -} - int main() { // Create context zmq::context_t ctx(0); - // Listen to bus start - zmq::socket_t pull(ctx, zmq::socket_type::pull); - pull.bind("inproc://bus-status"); - // Start bus - auto thread1 = std::async(std::launch::async, BusProxy_old, &ctx); - - // Wait for it to start - zmq::message_t msg; - auto ret = pull.recv(msg); - assert(ret && msg.to_string_view() == msg::BUS_CREATED); + auto thread1 = msg::launch_thread_wait_until_ready(ctx, BusProxy); // Wew! OutputDebugString("Successfully launched bus!\n"); @@ -237,7 +200,7 @@ int main() std::vector> meshers; for (int i = 0; i < 1; i++) { - meshers.push_back(launch_thread_wait_until_ready(ctx, pull, MeshGenThread)); + meshers.push_back(msg::launch_thread_wait_until_ready(ctx, MeshGenThread)); } // Create senders and listeners @@ -245,8 +208,8 @@ int main() std::vector> senders; for (int i = 0; i < 1; i++) { - listeners.push_back(launch_thread_wait_until_ready(ctx, pull, BusListener)); - senders.push_back(launch_thread_wait_until_ready(ctx, pull, BusSender)); + listeners.push_back(msg::launch_thread_wait_until_ready(ctx, BusListener)); + senders.push_back(msg::launch_thread_wait_until_ready(ctx, BusSender)); } OutputDebugString("Launched listeners.\n");