Skip to content

Commit

Permalink
update test.cxx
Browse files Browse the repository at this point in the history
  • Loading branch information
serg06 committed May 28, 2020
1 parent ce0a739 commit 1809191
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 60 deletions.
14 changes: 7 additions & 7 deletions src/messaging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace msg
{
static zmq::context_t ctx(0);

void BusProxy(zmq::context_t* ctx)
void BusProxy(zmq::context_t* ctx, on_ready_fn on_ready)
{
// create sub/pub
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
Expand All @@ -23,12 +23,7 @@ namespace msg
publisher.bind("inproc://bus-out");

// connect to creator and tell them we're ready
zmq::socket_t push(*ctx, zmq::socket_type::push);
push.connect("inproc://bus-status");
push.send(zmq::buffer(msg::BUS_CREATED));

// disconnect
push.close();
on_ready();

// receive and send repeatedly
// TODO: Change to proxy_steerable so we can remotely shut it down
Expand All @@ -38,6 +33,11 @@ namespace msg
subscriber.close();
publisher.close();
}

std::future<void> launch_bus_proxy(zmq::context_t& ctx)
{
return launch_thread_wait_until_ready(ctx, BusProxy);
}
}

//
Expand Down
13 changes: 9 additions & 4 deletions src/messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace msg
{
extern zmq::context_t ctx;

using notifier_thread = std::function<void(zmq::context_t*, std::function<void()>)>;
using on_ready_fn = std::function<void()>;
using notifier_thread = std::function<void(zmq::context_t*, on_ready_fn)>;

const std::string READY = "READY";
const std::string BUS_CREATED = "BUS_CREATED";
Expand Down Expand Up @@ -51,31 +52,35 @@ namespace msg
return out.str();
}

inline std::future<void> launch_thread_wait_until_ready(zmq::context_t& ctx, zmq::socket_t& listener, notifier_thread thread)
inline std::future<void> launch_thread_wait_until_ready(zmq::context_t& ctx, notifier_thread thread)
{
// Listen to bus start
std::string unique_addr = gen_unique_addr();
zmq::socket_t pull(ctx, zmq::socket_type::pull);
pull.bind(unique_addr);

std::function<void()> on_complete = [&]() {
on_ready_fn on_complete = [&]() {
zmq::socket_t push(ctx, zmq::socket_type::push);
push.connect(unique_addr);
push.send(zmq::buffer(msg::READY));
push.close();
};

// Launch
std::future<void> result = std::async(std::launch::async, thread, &ctx, on_complete);

// Wait for response
std::vector<zmq::message_t> recv_msgs;
auto res = zmq::recv_multipart(listener, std::back_inserter(recv_msgs));
auto res = zmq::recv_multipart(pull, std::back_inserter(recv_msgs));
assert(res);
assert(recv_msgs[0].to_string_view() == msg::READY);
pull.close();

// Done
return result;
}

std::future<void> launch_bus_proxy(zmq::context_t& ctx);
}
//
//int main()
Expand Down
61 changes: 12 additions & 49 deletions src/test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ std::string multi_to_str(const std::vector<zmq::message_t>& resp)
return s.str();
}

void MeshGenThread(zmq::context_t* ctx)
void MeshGenThread(zmq::context_t* ctx, msg::on_ready_fn on_ready)
{
// Connect to bus
zmq::socket_t pub(*ctx, zmq::socket_type::pub);
Expand All @@ -43,9 +43,7 @@ void MeshGenThread(zmq::context_t* ctx)
sub.connect("inproc://bus-out");

// Prove you're connected
zmq::socket_t push(*ctx, zmq::socket_type::push);
push.connect("inproc://bus-status");
push.send(zmq::buffer(msg::CONNECTED_TO_BUS));
on_ready();

// Wait for START signal before running
while (true)
Expand Down Expand Up @@ -93,7 +91,7 @@ void MeshGenThread(zmq::context_t* ctx)
}
}

void BusListener(zmq::context_t* ctx)
void BusListener(zmq::context_t* ctx, msg::on_ready_fn on_ready)
{
int idx = 0;

Expand All @@ -103,9 +101,7 @@ void BusListener(zmq::context_t* ctx)
sock.connect("inproc://bus-out");

// Prove you're connected
zmq::socket_t push(*ctx, zmq::socket_type::push);
push.connect("inproc://bus-status");
push.send(zmq::buffer(msg::CONNECTED_TO_BUS));
on_ready();

// Receive messages
while (true)
Expand All @@ -124,7 +120,7 @@ void BusListener(zmq::context_t* ctx)
}
}

void BusSender(zmq::context_t* ctx)
void BusSender(zmq::context_t* ctx, msg::on_ready_fn on_ready)
{
int idx = 0;

Expand All @@ -133,9 +129,7 @@ void BusSender(zmq::context_t* ctx)
sock.connect("inproc://bus-in");

// Prove you're connected
zmq::socket_t push(*ctx, zmq::socket_type::push);
push.connect("inproc://bus-status");
push.send(zmq::buffer(msg::CONNECTED_TO_BUS));
on_ready();

// Send messages
while (true)
Expand Down Expand Up @@ -168,7 +162,7 @@ void BusSender(zmq::context_t* ctx)

namespace
{
void BusProxy_old(zmq::context_t* ctx)
void BusProxy(zmq::context_t* ctx, msg::on_ready_fn on_ready)
{
// create sub/pub
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
Expand All @@ -179,12 +173,7 @@ namespace
publisher.bind("inproc://bus-out");

// connect to creator and tell them we're ready
zmq::socket_t push(*ctx, zmq::socket_type::push);
push.connect("inproc://bus-status");
push.send(zmq::buffer(msg::BUS_CREATED));

// disconnect
push.close();
on_ready();

// receive and send repeatedly
// TODO: Change to proxy_steerable so we can remotely shut it down
Expand All @@ -196,39 +185,13 @@ namespace
}
}


std::future<void> launch_thread_wait_until_ready(zmq::context_t& ctx, zmq::socket_t& listener, std::function<void(zmq::context_t*)> thread)
{
// Launch
std::future<void> result = std::async(std::launch::async, thread, &ctx);

// Wait for response
std::vector<zmq::message_t> recv_msgs;
auto res = zmq::recv_multipart(listener, std::back_inserter(recv_msgs));
assert(res);

// Just in case
assert(recv_msgs[0].to_string_view() == msg::CONNECTED_TO_BUS);

return result;
}

int main()
{
// Create context
zmq::context_t ctx(0);

// Listen to bus start
zmq::socket_t pull(ctx, zmq::socket_type::pull);
pull.bind("inproc://bus-status");

// Start bus
auto thread1 = std::async(std::launch::async, BusProxy_old, &ctx);

// Wait for it to start
zmq::message_t msg;
auto ret = pull.recv(msg);
assert(ret && msg.to_string_view() == msg::BUS_CREATED);
auto thread1 = msg::launch_thread_wait_until_ready(ctx, BusProxy);

// Wew!
OutputDebugString("Successfully launched bus!\n");
Expand All @@ -237,16 +200,16 @@ int main()
std::vector<std::future<void>> meshers;
for (int i = 0; i < 1; i++)
{
meshers.push_back(launch_thread_wait_until_ready(ctx, pull, MeshGenThread));
meshers.push_back(msg::launch_thread_wait_until_ready(ctx, MeshGenThread));
}

// Create senders and listeners
std::vector<std::future<void>> listeners;
std::vector<std::future<void>> senders;
for (int i = 0; i < 1; i++)
{
listeners.push_back(launch_thread_wait_until_ready(ctx, pull, BusListener));
senders.push_back(launch_thread_wait_until_ready(ctx, pull, BusSender));
listeners.push_back(msg::launch_thread_wait_until_ready(ctx, BusListener));
senders.push_back(msg::launch_thread_wait_until_ready(ctx, BusSender));
}

OutputDebugString("Launched listeners.\n");
Expand Down

0 comments on commit 1809191

Please sign in to comment.