Skip to content

Commit

Permalink
Add support boost asio
Browse files Browse the repository at this point in the history
  • Loading branch information
fantasy-peak committed May 15, 2024
1 parent 709b735 commit e133e15
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 29 deletions.
18 changes: 9 additions & 9 deletions out/bi_web/include/fantasy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1326,13 +1326,13 @@ class StreamClient final {
[] {});
auto channel_ptr =
std::make_shared<asio::experimental::concurrent_channel<void(
asio::error_code, std::string)>>(m_pool_ptr->getIoContext(),
frpc::error_code, std::string)>>(m_pool_ptr->getIoContext(),
m_config.channel_size);
{
std::function<void(std::tuple<std::string>)> func =
[channel_ptr, this](std::tuple<std::string> tp) mutable {
auto& [reply] = tp;
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(reply)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
Expand Down Expand Up @@ -1433,20 +1433,20 @@ class StreamClient final {
struct StreamServerHandler {
virtual void hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
};

struct CoroStreamServerHandler {
#ifdef __cpp_impl_coroutine
virtual asio::awaitable<void> hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
#else
virtual void hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
#endif
};
Expand Down Expand Up @@ -1580,26 +1580,26 @@ class StreamServer final {
recv_bufs[2].data(), recv_bufs[2].size());
auto& [bank_name] = tp;
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>
void(frpc::error_code, std::string)>>
channel_ptr;
{
std::lock_guard lk(m_mtx);
if (m_channel_mapping.contains(req_id)) {
channel_ptr = std::any_cast<decltype(channel_ptr)>(
m_channel_mapping[req_id]);
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(bank_name)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
return;
}
channel_ptr = std::make_shared<
asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>(
void(frpc::error_code, std::string)>>(
m_pool_ptr->getIoContext(), m_config.channel_size);
m_channel_mapping[req_id] = channel_ptr;
}
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(bank_name)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
Expand Down
14 changes: 14 additions & 0 deletions out/bi_web/include/impl/asio_context_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,23 @@
#ifndef _FRPC_CONTEXT_POOL_H_
#define _FRPC_CONTEXT_POOL_H_

#ifdef FRPC_USE_BOOST_ASIO
#include <boost/asio.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace asio = boost::asio;

namespace frpc {
using error_code = boost::system::error_code;
}
#else
#include <asio.hpp>
#include <asio/experimental/concurrent_channel.hpp>

namespace frpc {
using error_code = asio::error_code;
}
#endif

namespace frpc {

class ContextPool final {
Expand Down
18 changes: 9 additions & 9 deletions out/include/fantasy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1326,13 +1326,13 @@ class StreamClient final {
[] {});
auto channel_ptr =
std::make_shared<asio::experimental::concurrent_channel<void(
asio::error_code, std::string)>>(m_pool_ptr->getIoContext(),
frpc::error_code, std::string)>>(m_pool_ptr->getIoContext(),
m_config.channel_size);
{
std::function<void(std::tuple<std::string>)> func =
[channel_ptr, this](std::tuple<std::string> tp) mutable {
auto& [reply] = tp;
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(reply)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
Expand Down Expand Up @@ -1433,20 +1433,20 @@ class StreamClient final {
struct StreamServerHandler {
virtual void hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
};

struct CoroStreamServerHandler {
#ifdef __cpp_impl_coroutine
virtual asio::awaitable<void> hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
#else
virtual void hello_world(
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>,
void(frpc::error_code, std::string)>>,
std::shared_ptr<frpc::Stream<void(std::string)>>) noexcept = 0;
#endif
};
Expand Down Expand Up @@ -1580,26 +1580,26 @@ class StreamServer final {
recv_bufs[2].data(), recv_bufs[2].size());
auto& [bank_name] = tp;
std::shared_ptr<asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>
void(frpc::error_code, std::string)>>
channel_ptr;
{
std::lock_guard lk(m_mtx);
if (m_channel_mapping.contains(req_id)) {
channel_ptr = std::any_cast<decltype(channel_ptr)>(
m_channel_mapping[req_id]);
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(bank_name)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
return;
}
channel_ptr = std::make_shared<
asio::experimental::concurrent_channel<
void(asio::error_code, std::string)>>(
void(frpc::error_code, std::string)>>(
m_pool_ptr->getIoContext(), m_config.channel_size);
m_channel_mapping[req_id] = channel_ptr;
}
if (!channel_ptr->try_send(asio::error_code{},
if (!channel_ptr->try_send(frpc::error_code{},
std::move(bank_name)))
m_error(FRPC_ERROR_FORMAT(
"Failed to store message to channel!!!"));
Expand Down
14 changes: 14 additions & 0 deletions out/include/impl/asio_context_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,23 @@
#ifndef _FRPC_CONTEXT_POOL_H_
#define _FRPC_CONTEXT_POOL_H_

#ifdef FRPC_USE_BOOST_ASIO
#include <boost/asio.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace asio = boost::asio;

namespace frpc {
using error_code = boost::system::error_code;
}
#else
#include <asio.hpp>
#include <asio/experimental/concurrent_channel.hpp>

namespace frpc {
using error_code = asio::error_code;
}
#endif

namespace frpc {

class ContextPool final {
Expand Down
18 changes: 9 additions & 9 deletions template/cpp/bi_stream.inja
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ public:
m_channel->send(std::move(snd_bufs));
},
[] {});
auto channel_ptr = std::make_shared<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.outputs)}})>>(
auto channel_ptr = std::make_shared<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.outputs)}})>>(
m_pool_ptr->getIoContext(),
m_config.channel_size);
{
std::function<void(std::tuple<{{_format_args_type(func.outputs)}}>)> func = [channel_ptr, this](std::tuple<{{_format_args_type(func.outputs)}}> tp) mutable {
auto& [{{_format_args_name(func.outputs)}}] = tp;
if (!channel_ptr->try_send(asio::error_code{}, {{_format_args_name_and_move(func.outputs)}}))
if (!channel_ptr->try_send(frpc::error_code{}, {{_format_args_name_and_move(func.outputs)}}))
m_error(FRPC_ERROR_FORMAT("Failed to store message to channel!!!"));
};
std::lock_guard lk(m_mtx);
Expand Down Expand Up @@ -179,18 +179,18 @@ private:

struct {{value.callee}}Handler {
{% for func in value.definitions %}
virtual void {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.inputs)}})>>,
virtual void {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.inputs)}})>>,
std::shared_ptr<frpc::Stream<void({{_format_args_type(func.outputs)}})>>) noexcept = 0;
{% endfor %}
};

struct Coro{{value.callee}}Handler {
{% for func in value.definitions %}
#ifdef __cpp_impl_coroutine
virtual asio::awaitable<void> {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.inputs)}})>>,
virtual asio::awaitable<void> {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.inputs)}})>>,
std::shared_ptr<frpc::Stream<void({{_format_args_type(func.outputs)}})>>) noexcept = 0;
#else
virtual void {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.inputs)}})>>,
virtual void {{func.func_name}}(std::shared_ptr<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.inputs)}})>>,
std::shared_ptr<frpc::Stream<void({{_format_args_type(func.outputs)}})>>) noexcept = 0;
#endif
{% endfor %}
Expand Down Expand Up @@ -291,21 +291,21 @@ private:
case {{value.caller}}{{value.callee}}::{{func.func_name}}: {
auto tp = frpc::unpack<std::tuple<{{_format_args_type(func.inputs)}}>>(recv_bufs[2].data(), recv_bufs[2].size());
auto& [{{_format_args_name(func.inputs)}}] = tp;
std::shared_ptr<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.inputs)}})>> channel_ptr;
std::shared_ptr<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.inputs)}})>> channel_ptr;
{
std::lock_guard lk(m_mtx);
if (m_channel_mapping.contains(req_id)) {
channel_ptr = std::any_cast<decltype(channel_ptr)>(m_channel_mapping[req_id]);
if (!channel_ptr->try_send(asio::error_code{}, {{_format_args_name_and_move(func.inputs)}}))
if (!channel_ptr->try_send(frpc::error_code{}, {{_format_args_name_and_move(func.inputs)}}))
m_error(FRPC_ERROR_FORMAT("Failed to store message to channel!!!"));
return;
}
channel_ptr = std::make_shared<asio::experimental::concurrent_channel<void(asio::error_code, {{_format_args_type(func.inputs)}})>>(
channel_ptr = std::make_shared<asio::experimental::concurrent_channel<void(frpc::error_code, {{_format_args_type(func.inputs)}})>>(
m_pool_ptr->getIoContext(),
m_config.channel_size);
m_channel_mapping[req_id] = channel_ptr;
}
if (!channel_ptr->try_send(asio::error_code{}, {{_format_args_name_and_move(func.inputs)}}))
if (!channel_ptr->try_send(frpc::error_code{}, {{_format_args_name_and_move(func.inputs)}}))
m_error(FRPC_ERROR_FORMAT("Failed to store message to channel!!!"));

auto is_open = std::make_tuple(req_id, req_type, false);
Expand Down
12 changes: 12 additions & 0 deletions template/cpp/impl/asio_context_pool.inja
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,20 @@
#ifndef _FRPC_CONTEXT_POOL_H_
#define _FRPC_CONTEXT_POOL_H_

#ifdef FRPC_USE_BOOST_ASIO
#include <boost/asio.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
namespace asio = boost::asio;
namespace frpc {
using error_code = boost::system::error_code;
}
#else
#include <asio.hpp>
#include <asio/experimental/concurrent_channel.hpp>
namespace frpc {
using error_code = asio::error_code;
}
#endif

namespace frpc {

Expand Down
2 changes: 1 addition & 1 deletion test/cpp/bi_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void start(std::function<void()> func) {

#ifdef __cpp_impl_coroutine
struct CoroStreamServerHandler final : public fantasy::CoroStreamServerHandler {
virtual asio::awaitable<void> hello_world(std::shared_ptr<asio::experimental::concurrent_channel<void(asio::error_code, std::string)>> ins,
virtual asio::awaitable<void> hello_world(std::shared_ptr<asio::experimental::concurrent_channel<void(frpc::error_code, std::string)>> ins,
std::shared_ptr<frpc::Stream<void(std::string)>> outs) noexcept override {
start([outs = std::move(outs)]() mutable {
for (int i = 0; i < 5; i++) {
Expand Down
10 changes: 9 additions & 1 deletion test/cpp/coro_bi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <spdlog/spdlog.h>

#define FRPC_USE_BOOST_ASIO 1
#include "fantasy.hpp"

inline std::string addr{"tcp://127.0.0.1:5878"};
Expand Down Expand Up @@ -37,6 +38,13 @@ struct CoroHandler final : public fantasy::AsioCoroHelloWorldServerHandler {
fantasy::toString(bank_info), bank_name, blance, date.has_value() ? date.value() : "nullopt", frpc::toString(date_time));
fantasy::Info info;
info.name = "coro test";
boost::asio::co_spawn(
co_await boost::asio::this_coro::executor,
[]() -> boost::asio::awaitable<void> {
spdlog::info("test boost");
co_return;
},
boost::asio::detached);
cb("coro hello world", std::move(info), 556, std::nullopt);
co_return;
}
Expand Down Expand Up @@ -76,7 +84,7 @@ void start_server() {
server->monitor(
[](std::tuple<zmq_event_t, std::string> data) {
auto& [event, point] = data;
spdlog::info("bi coro server monitor: {} {}",frpc::getEventName(event.event), point);
spdlog::info("bi coro server monitor: {} {}", frpc::getEventName(event.event), point);
},
ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
server->start();
Expand Down

0 comments on commit e133e15

Please sign in to comment.