From 1900f929e25f3cd7ea8fa2bcb93c8413faa65244 Mon Sep 17 00:00:00 2001 From: fantasy-peak <1356346239@qq.com> Date: Fri, 29 Mar 2024 16:19:16 +0800 Subject: [PATCH] Add c++ brpc --- .clang-format | 88 +++ .github/workflows/ubuntu-gcc13.yaml | 47 ++ README.md | 7 + config/config.yaml | 59 ++ include/cmdline.h | 813 +++++++++++++++++++++++ include/utils.h | 168 +++++ out/brpc.hpp | 992 ++++++++++++++++++++++++++++ src/main.cpp | 319 +++++++++ template/cpp/asio_context_pool.inja | 43 ++ template/cpp/ast.cpp.inja | 65 ++ template/cpp/bi.inja | 280 ++++++++ template/cpp/bi_channel.inja | 135 ++++ template/cpp/enum.inja | 31 + template/cpp/struct.inja | 22 + template/cpp/uni.inja | 192 ++++++ template/cpp/uni_channel.inja | 97 +++ template/cpp/utils.inja | 103 +++ test/cpp/main.cpp | 207 ++++++ test_xmake.lua | 36 + xmake.lua | 19 + 20 files changed, 3723 insertions(+) create mode 100644 .clang-format create mode 100644 .github/workflows/ubuntu-gcc13.yaml create mode 100644 README.md create mode 100644 config/config.yaml create mode 100644 include/cmdline.h create mode 100644 include/utils.h create mode 100644 out/brpc.hpp create mode 100644 src/main.cpp create mode 100644 template/cpp/asio_context_pool.inja create mode 100644 template/cpp/ast.cpp.inja create mode 100644 template/cpp/bi.inja create mode 100644 template/cpp/bi_channel.inja create mode 100644 template/cpp/enum.inja create mode 100644 template/cpp/struct.inja create mode 100644 template/cpp/uni.inja create mode 100644 template/cpp/uni_channel.inja create mode 100644 template/cpp/utils.inja create mode 100644 test/cpp/main.cpp create mode 100644 test_xmake.lua create mode 100644 xmake.lua diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..25d11ae --- /dev/null +++ b/.clang-format @@ -0,0 +1,88 @@ +# https://blog.csdn.net/softimite_zifeng/article/details/78357898 +--- +Language: Cpp +#BasedOnStyle: Google +AccessModifierOffset: -4 +#AlignAfterOpenBracket: false +AlignAfterOpenBracket: Align +#AlignAfterOpenBracket: AlwaysBreak +AlignConsecutiveAssignments: false +AlignEscapedNewlinesLeft: true +AlignOperands: true +AlignTrailingComments: true +AllowAllParametersOfDeclarationOnNextLine: true +AllowShortBlocksOnASingleLine: false +AllowShortCaseLabelsOnASingleLine: false +AllowShortFunctionsOnASingleLine: None +AllowShortIfStatementsOnASingleLine: false +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakBeforeMultilineStrings: true +AlwaysBreakTemplateDeclarations: true +BinPackArguments: true +BinPackParameters: true +BreakBeforeBinaryOperators: None +BreakBeforeBraces: Attach +BreakBeforeTernaryOperators: true +BreakConstructorInitializersBeforeComma: true +ColumnLimit: 0 +#ColumnLimit: 300 +CommentPragmas: '^ IWYU pragma:' +#ConstructorInitializerAllOnOneLineOrOnePerLine: true +ConstructorInitializerIndentWidth: 4 +ContinuationIndentWidth: 4 +Cpp11BracedListStyle: true +DerivePointerAlignment: false +DisableFormat: false +ExperimentalAutoDetectBinPacking: false +ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +IncludeCategories: + - Regex: '^<.*\.h>' + Priority: 1 + - Regex: '^<.*' + Priority: 2 + - Regex: '.*' + Priority: 3 +IndentCaseLabels: true +IndentWidth: 4 +IndentWrappedFunctionNames: false +KeepEmptyLinesAtTheStartOfBlocks: false +MacroBlockBegin: '' +MacroBlockEnd: '' +MaxEmptyLinesToKeep: 1 +NamespaceIndentation: None +ObjCBlockIndentWidth: 2 +ObjCSpaceAfterProperty: false +ObjCSpaceBeforeProtocolList: false +PenaltyBreakBeforeFirstCallParameter: 1 +PenaltyBreakComment: 300 +PenaltyBreakFirstLessLess: 120 +PenaltyBreakString: 1000 +PenaltyExcessCharacter: 1000000 +PenaltyReturnTypeOnItsOwnLine: 200 +PointerAlignment: Left +SpaceAfterCStyleCast: false +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +SpacesInAngles: false +SpacesInContainerLiterals: true +SpacesInCStyleCastParentheses: false +SpacesInParentheses: false +SpacesInSquareBrackets: false +Standard: Auto +TabWidth: 4 +# 使用tab字符: Never, ForIndentation, ForContinuationAndIndentation, Always +# UseTab: ForContinuationAndIndentation +# 在方括号的[后和]前添加空格,lamda表达式和未指明大小的数组的声明不受影响 +#SpacesInSquareBrackets: true +# 对齐续航符位置 +AlignEscapedNewlines: Left +# https://clang.llvm.org/docs/ClangFormatStyleOptions.html +AllowShortLambdasOnASingleLine: Empty +FixNamespaceComments: true +#SBS_Never: Never +#CompactNamespaces: false +#LanguageStandard: LS_Cpp17 +... diff --git a/.github/workflows/ubuntu-gcc13.yaml b/.github/workflows/ubuntu-gcc13.yaml new file mode 100644 index 0000000..06de6db --- /dev/null +++ b/.github/workflows/ubuntu-gcc13.yaml @@ -0,0 +1,47 @@ +name: ubuntu-gcc13 + +on: + push: + branches: ["main", "dev"] + pull_request: + branches: ["main"] + +jobs: + build: + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ubuntu-22.04] + + steps: + - name: Installation + run: | + sudo apt-get update + sudo apt-get install -y libgl1-mesa-dev libglu1-mesa-dev p7zip gobjc g++-13 wget sudo libmsgpack-dev/jammy + + - uses: xmake-io/github-action-setup-xmake@v1 + with: + xmake-version: v2.8.9 + + - name: checkout + uses: actions/checkout@v3 + - name: build + run: | + export XMAKE_ROOT="y" + g++-13 -v + export CXX=g++-13 + export CC=gcc-13 + xmake build -y + xmake install -o . + ./bin/brpc -f ./config/config.yaml -t ./template/cpp -o ./out + xmake build -v -y --file=./test_xmake.lua + xmake install -o . --file=./example_xmake.lua + ./ + + - name: build test and run test + run: | + ./bin/brpc -f ./config/config.yaml -t ./template/cpp -o ./out + xmake build -v -y --file=./test_xmake.lua + xmake install -o . --file=./example_xmake.lua + ./bin/cpp23 diff --git a/README.md b/README.md new file mode 100644 index 0000000..0c21ce6 --- /dev/null +++ b/README.md @@ -0,0 +1,7 @@ +# brpc +``` +xmake build -v -y +xmake install -o . +cd bin +./brpc -f ../config/config.yaml -t ../template/cpp/ast.cpp.inja -o ../out +``` diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 0000000..163137e --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,59 @@ +property: + filename: brpc.hpp + namespace: brpc + +TestType: + type: enum + value_type: i32 + definitions: + EnumOne: { default: 0, comment: "zero" } + EnumTwo: { default: 1, comment: "one" } + +Info: + type: struct + definitions: + Name: { type: string, comment: "test Name" } + +BankInfo: + type: struct + definitions: + Name: { type: string, comment: "test Name" } + Type: { type: TestType, comment: "test Type" } + TestOne: { type: i32, comment: "test 1" } + TestTwo: { type: u32, comment: "test 2" } + TestMapOne: { type: map, comment: "test map" } + TestMap: { type: map, comment: "test map" } + TestVector: { type: vector, comment: "test map" } + Info: { type: Info, comment: "Info" } + +HelloWorldApi: + type: interface + pattern: bi + caller: HelloWorldClient + callee: HelloWorldServer + definitions: + hello_world: + type: func + inputs: + bank_info: {type: BankInfo} + bank_name: {type: string} + blance: {type: u64} + outputs: + reply: {type: string} + info: {type: Info} + count: {type: u64} + +UniInterface: + type: interface + pattern: uni + caller: HelloWorldSender + callee: HelloWorldReceiver + definitions: + hello_world: + inputs: + in: {type: string} + notice: + inputs: + in: {type: i32} + info: {type: string} + diff --git a/include/cmdline.h b/include/cmdline.h new file mode 100644 index 0000000..95f1b14 --- /dev/null +++ b/include/cmdline.h @@ -0,0 +1,813 @@ +/* + Copyright (c) 2009, Hideyuki Tanaka + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY ''AS IS'' AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY + DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cmdline { + +namespace detail { + +template +class lexical_cast_t { +public: + static Target cast(const Source& arg) { + Target ret; + std::stringstream ss; + if (!(ss << arg && ss >> ret && ss.eof())) + throw std::bad_cast(); + + return ret; + } +}; + +template +class lexical_cast_t { +public: + static Target cast(const Source& arg) { + return arg; + } +}; + +template +class lexical_cast_t { +public: + static std::string cast(const Source& arg) { + std::ostringstream ss; + ss << arg; + return ss.str(); + } +}; + +template +class lexical_cast_t { +public: + static Target cast(const std::string& arg) { + Target ret; + std::istringstream ss(arg); + if (!(ss >> ret && ss.eof())) + throw std::bad_cast(); + return ret; + } +}; + +template +struct is_same { + static const bool value = false; +}; + +template +struct is_same { + static const bool value = true; +}; + +template +Target lexical_cast(const Source& arg) { + return lexical_cast_t::value>::cast(arg); +} + +static inline std::string demangle(const std::string& name) { + int status = 0; + char* p = abi::__cxa_demangle(name.c_str(), 0, 0, &status); + std::string ret(p); + free(p); + return ret; +} + +template +std::string readable_typename() { + return demangle(typeid(T).name()); +} + +template +std::string default_value(T def) { + return detail::lexical_cast(def); +} + +template <> +inline std::string readable_typename() { + return "string"; +} + +} // namespace detail + +//----- + +class cmdline_error : public std::exception { +public: + cmdline_error(const std::string& msg) + : msg(msg) { + } + ~cmdline_error() throw() { + } + const char* what() const throw() { + return msg.c_str(); + } + +private: + std::string msg; +}; + +template +struct default_reader { + T operator()(const std::string& str) { + return detail::lexical_cast(str); + } +}; + +template +struct range_reader { + range_reader(const T& low, const T& high) + : low(low) + , high(high) { + } + T operator()(const std::string& s) const { + T ret = default_reader()(s); + if (!(ret >= low && ret <= high)) + throw cmdline::cmdline_error("range_error"); + return ret; + } + +private: + T low, high; +}; + +template +range_reader range(const T& low, const T& high) { + return range_reader(low, high); +} + +template +struct oneof_reader { + T operator()(const std::string& s) { + T ret = default_reader()(s); + if (std::find(alt.begin(), alt.end(), ret) == alt.end()) + throw cmdline_error(""); + return ret; + } + void add(const T& v) { + alt.push_back(v); + } + +private: + std::vector alt; +}; + +template +oneof_reader oneof(T a1) { + oneof_reader ret; + ret.add(a1); + return ret; +} + +template +oneof_reader oneof(T a1, T a2) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5, T a6) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + ret.add(a6); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5, T a6, T a7) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + ret.add(a6); + ret.add(a7); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5, T a6, T a7, T a8) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + ret.add(a6); + ret.add(a7); + ret.add(a8); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5, T a6, T a7, T a8, T a9) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + ret.add(a6); + ret.add(a7); + ret.add(a8); + ret.add(a9); + return ret; +} + +template +oneof_reader oneof(T a1, T a2, T a3, T a4, T a5, T a6, T a7, T a8, T a9, + T a10) { + oneof_reader ret; + ret.add(a1); + ret.add(a2); + ret.add(a3); + ret.add(a4); + ret.add(a5); + ret.add(a6); + ret.add(a7); + ret.add(a8); + ret.add(a9); + ret.add(a10); + return ret; +} + +//----- + +class parser { +public: + parser() { + } + ~parser() { + for (std::map::iterator p = options.begin(); + p != options.end(); p++) + delete p->second; + } + + void add(const std::string& name, char short_name = 0, + const std::string& desc = "") { + if (options.count(name)) + throw cmdline_error("multiple definition: " + name); + options[name] = new option_without_value(name, short_name, desc); + ordered.push_back(options[name]); + } + + template + void add(const std::string& name, char short_name = 0, + const std::string& desc = "", bool need = true, const T def = T()) { + add(name, short_name, desc, need, def, default_reader()); + } + + template + void add(const std::string& name, char short_name = 0, + const std::string& desc = "", bool need = true, const T def = T(), + F reader = F()) { + if (options.count(name)) + throw cmdline_error("multiple definition: " + name); + options[name] = new option_with_value_with_reader( + name, short_name, need, def, desc, reader); + ordered.push_back(options[name]); + } + + void footer(const std::string& f) { + ftr = f; + } + + void set_program_name(const std::string& name) { + prog_name = name; + } + + bool exist(const std::string& name) const { + if (options.count(name) == 0) + throw cmdline_error("there is no flag: --" + name); + return options.find(name)->second->has_set(); + } + + template + const T& get(const std::string& name) const { + if (options.count(name) == 0) + throw cmdline_error("there is no flag: --" + name); + const option_with_value* p = + dynamic_cast*>(options.find(name)->second); + if (p == NULL) + throw cmdline_error("type mismatch flag '" + name + "'"); + return p->get(); + } + + const std::vector& rest() const { + return others; + } + + bool parse(const std::string& arg) { + std::vector args; + + std::string buf; + bool in_quote = false; + for (std::string::size_type i = 0; i < arg.length(); i++) { + if (arg[i] == '\"') { + in_quote = !in_quote; + continue; + } + + if (arg[i] == ' ' && !in_quote) { + args.push_back(buf); + buf = ""; + continue; + } + + if (arg[i] == '\\') { + i++; + if (i >= arg.length()) { + errors.push_back("unexpected occurrence of '\\' at end of string"); + return false; + } + } + + buf += arg[i]; + } + + if (in_quote) { + errors.push_back("quote is not closed"); + return false; + } + + if (buf.length() > 0) + args.push_back(buf); + + for (size_t i = 0; i < args.size(); i++) + std::cout << "\"" << args[i] << "\"" << std::endl; + + return parse(args); + } + + bool parse(const std::vector& args) { + int argc = static_cast(args.size()); + std::vector argv(argc); + + for (int i = 0; i < argc; i++) + argv[i] = args[i].c_str(); + + return parse(argc, &argv[0]); + } + + bool parse(int argc, const char* const argv[]) { + errors.clear(); + others.clear(); + + if (argc < 1) { + errors.push_back("argument number must be longer than 0"); + return false; + } + if (prog_name == "") + prog_name = argv[0]; + + std::map lookup; + for (std::map::iterator p = options.begin(); + p != options.end(); p++) { + if (p->first.length() == 0) + continue; + char initial = p->second->short_name(); + if (initial) { + if (lookup.count(initial) > 0) { + lookup[initial] = ""; + errors.push_back(std::string("short option '") + initial + + "' is ambiguous"); + return false; + } else + lookup[initial] = p->first; + } + } + + for (int i = 1; i < argc; i++) { + if (strncmp(argv[i], "--", 2) == 0) { + const char* p = strchr(argv[i] + 2, '='); + if (p) { + std::string name(argv[i] + 2, p); + std::string val(p + 1); + set_option(name, val); + } else { + std::string name(argv[i] + 2); + if (options.count(name) == 0) { + errors.push_back("undefined option: --" + name); + continue; + } + if (options[name]->has_value()) { + if (i + 1 >= argc) { + errors.push_back("option needs value: --" + name); + continue; + } else { + i++; + set_option(name, argv[i]); + } + } else { + set_option(name); + } + } + } else if (strncmp(argv[i], "-", 1) == 0) { + if (!argv[i][1]) + continue; + char last = argv[i][1]; + for (int j = 2; argv[i][j]; j++) { + last = argv[i][j]; + if (lookup.count(argv[i][j - 1]) == 0) { + errors.push_back(std::string("undefined short option: -") + + argv[i][j - 1]); + continue; + } + if (lookup[argv[i][j - 1]] == "") { + errors.push_back(std::string("ambiguous short option: -") + + argv[i][j - 1]); + continue; + } + set_option(lookup[argv[i][j - 1]]); + } + + if (lookup.count(last) == 0) { + errors.push_back(std::string("undefined short option: -") + last); + continue; + } + if (lookup[last] == "") { + errors.push_back(std::string("ambiguous short option: -") + last); + continue; + } + + if (i + 1 < argc && options[lookup[last]]->has_value()) { + set_option(lookup[last], argv[i + 1]); + i++; + } else { + set_option(lookup[last]); + } + } else { + others.push_back(argv[i]); + } + } + + for (std::map::iterator p = options.begin(); + p != options.end(); p++) + if (!p->second->valid()) + errors.push_back("need option: --" + std::string(p->first)); + + return errors.size() == 0; + } + + void parse_check(const std::string& arg) { + if (!options.count("help")) + add("help", '?', "print this message"); + check(0, parse(arg)); + } + + void parse_check(const std::vector& args) { + if (!options.count("help")) + add("help", '?', "print this message"); + check(args.size(), parse(args)); + } + + void parse_check(int argc, char* argv[]) { + if (!options.count("help")) + add("help", '?', "print this message"); + check(argc, parse(argc, argv)); + } + + std::string error() const { + return errors.size() > 0 ? errors[0] : ""; + } + + std::string error_full() const { + std::ostringstream oss; + for (size_t i = 0; i < errors.size(); i++) + oss << errors[i] << std::endl; + return oss.str(); + } + + std::string usage() const { + std::ostringstream oss; + oss << "usage: " << prog_name << " "; + for (size_t i = 0; i < ordered.size(); i++) { + if (ordered[i]->must()) + oss << ordered[i]->short_description() << " "; + } + + oss << "[options] ... " << ftr << std::endl; + oss << "options:" << std::endl; + + size_t max_width = 0; + for (size_t i = 0; i < ordered.size(); i++) { + max_width = std::max(max_width, ordered[i]->name().length()); + } + for (size_t i = 0; i < ordered.size(); i++) { + if (ordered[i]->short_name()) { + oss << " -" << ordered[i]->short_name() << ", "; + } else { + oss << " "; + } + + oss << "--" << ordered[i]->name(); + for (size_t j = ordered[i]->name().length(); j < max_width + 4; j++) + oss << ' '; + oss << ordered[i]->description() << std::endl; + } + return oss.str(); + } + +private: + void check(int argc, bool ok) { + if ((argc == 1 && !ok) || exist("help")) { + std::cerr << usage(); + exit(0); + } + + if (!ok) { + std::cerr << error() << std::endl + << usage(); + exit(1); + } + } + + void set_option(const std::string& name) { + if (options.count(name) == 0) { + errors.push_back("undefined option: --" + name); + return; + } + if (!options[name]->set()) { + errors.push_back("option needs value: --" + name); + return; + } + } + + void set_option(const std::string& name, const std::string& value) { + if (options.count(name) == 0) { + errors.push_back("undefined option: --" + name); + return; + } + if (!options[name]->set(value)) { + errors.push_back("option value is invalid: --" + name + "=" + value); + return; + } + } + + class option_base { + public: + virtual ~option_base() { + } + + virtual bool has_value() const = 0; + virtual bool set() = 0; + virtual bool set(const std::string& value) = 0; + virtual bool has_set() const = 0; + virtual bool valid() const = 0; + virtual bool must() const = 0; + + virtual const std::string& name() const = 0; + virtual char short_name() const = 0; + virtual const std::string& description() const = 0; + virtual std::string short_description() const = 0; + }; + + class option_without_value : public option_base { + public: + option_without_value(const std::string& name, char short_name, + const std::string& desc) + : nam(name) + , snam(short_name) + , desc(desc) + , has(false) { + } + ~option_without_value() { + } + + bool has_value() const { + return false; + } + + bool set() { + has = true; + return true; + } + + bool set(const std::string&) { + return false; + } + + bool has_set() const { + return has; + } + + bool valid() const { + return true; + } + + bool must() const { + return false; + } + + const std::string& name() const { + return nam; + } + + char short_name() const { + return snam; + } + + const std::string& description() const { + return desc; + } + + std::string short_description() const { + return "--" + nam; + } + + private: + std::string nam; + char snam; + std::string desc; + bool has; + }; + + template + class option_with_value : public option_base { + public: + option_with_value(const std::string& name, char short_name, bool need, + const T& def, const std::string& desc) + : nam(name) + , snam(short_name) + , need(need) + , has(false) + , def(def) + , actual(def) { + this->desc = full_description(desc); + } + ~option_with_value() { + } + + const T& get() const { + return actual; + } + + bool has_value() const { + return true; + } + + bool set() { + return false; + } + + bool set(const std::string& value) { + try { + actual = read(value); + has = true; + } catch (const std::exception& e) { + return false; + } + return true; + } + + bool has_set() const { + return has; + } + + bool valid() const { + if (need && !has) + return false; + return true; + } + + bool must() const { + return need; + } + + const std::string& name() const { + return nam; + } + + char short_name() const { + return snam; + } + + const std::string& description() const { + return desc; + } + + std::string short_description() const { + return "--" + nam + "=" + detail::readable_typename(); + } + + protected: + std::string full_description(const std::string& desc) { + return desc + " (" + detail::readable_typename() + + (need ? "" : " [=" + detail::default_value(def) + "]") + ")"; + } + + virtual T read(const std::string& s) = 0; + + std::string nam; + char snam; + bool need; + std::string desc; + + bool has; + T def; + T actual; + }; + + template + class option_with_value_with_reader : public option_with_value { + public: + option_with_value_with_reader(const std::string& name, char short_name, + bool need, const T def, + const std::string& desc, F reader) + : option_with_value(name, short_name, need, def, desc) + , reader(reader) { + } + + private: + T read(const std::string& s) { + return reader(s); + } + + F reader; + }; + + std::map options; + std::vector ordered; + std::string ftr; + + std::string prog_name; + std::vector others; + + std::vector errors; +}; + +} // namespace cmdline diff --git a/include/utils.h b/include/utils.h new file mode 100644 index 0000000..90a1d02 --- /dev/null +++ b/include/utils.h @@ -0,0 +1,168 @@ +#ifndef _UTILS_H_ +#define _UTILS_H_ + +// #include +// #include +// #include +// #include +// #include +// #include + +// #include +// #include + +#include +#include + +// namespace easy_brpc { + +// inline std::string ltrim(const std::string str) { +// auto s = str; +// s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun(std::isspace)))); +// return s; +// } + +// inline std::string rtrim(const std::string str) { +// auto s = str; +// s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun(std::isspace))).base(), s.end()); +// return s; +// } + +// inline std::string trim(const std::string str) { +// return ltrim(rtrim(str)); +// } + +// inline std::string toUpper(std::string str) { +// std::transform(str.begin(), str.end(), str.begin(), ::toupper); +// return str; +// } + +// inline std::string toLower(std::string str) { +// std::transform(str.begin(), str.end(), str.begin(), ::tolower); +// return str; +// } + +// inline auto toSnakeCase(const std::string& s) { +// std::regex words_regex("[A-Z][a-z]+"); +// auto words_begin = std::sregex_iterator(s.begin(), s.end(), words_regex); +// auto words_end = std::sregex_iterator(); +// std::string name{}; +// for (std::sregex_iterator i = words_begin; i != words_end; ++i) { +// std::smatch match = *i; +// std::string match_str = match.str(); +// std::transform(match_str.begin(), match_str.end(), match_str.begin(), ::tolower); +// auto z = i; +// if (++z == words_end) +// name += match_str; +// else +// name += (match_str + "_"); +// } +// return name; +// } + +// inline std::vector split(const std::string& str, const char delimiter, bool include_delimiter = false) { +// size_t start = 0; +// size_t end = str.find_first_of(delimiter); +// std::vector output; +// while (end <= std::string::npos) { +// if (start == str.size()) +// break; +// if (include_delimiter) +// output.emplace_back(str.substr(start, end - start + 1)); +// else +// output.emplace_back(str.substr(start, end - start)); +// if (end == std::string::npos) +// break; +// start = end + 1; +// end = str.find_first_of(delimiter, start); +// } +// return output; +// } + +// inline std::string GetDirectory(const std::string& path) { +// std::experimental::filesystem::path p(path); +// return (p.parent_path().string()); +// } + +// inline std::string remove(std::string str, char c = '/') { +// if (str.back() == c) +// str.erase(str.size() - 1); +// return str; +// } + +// inline std::vector process(std::string data) { +// std::replace(data.begin(), data.end(), ';', ' '); +// std::replace(data.begin(), data.end(), '<', ' '); +// std::replace(data.begin(), data.end(), '>', ' '); +// return split(data, ' '); +// } + +namespace detail { + +inline nlohmann::json parse_scalar(const YAML::Node& node) { + int i; + double d; + bool b; + std::string s; + + if (YAML::convert::decode(node, i)) + return i; + if (YAML::convert::decode(node, d)) + return d; + if (YAML::convert::decode(node, b)) + return b; + if (YAML::convert::decode(node, s)) + return s; + + return nullptr; +} + +} // namespace detail + +[[nodiscard]] inline nlohmann::json yaml2json(const YAML::Node& root) { + nlohmann::json j{}; + using namespace detail; + switch (root.Type()) { + case YAML::NodeType::Null: + break; + case YAML::NodeType::Scalar: + return parse_scalar(root); + case YAML::NodeType::Sequence: + for (auto&& node : root) + j.emplace_back(yaml2json(node)); + break; + case YAML::NodeType::Map: + for (auto&& it : root) + j[it.first.as()] = yaml2json(it.second); + break; + default: + break; + } + return j; +} + +[[nodiscard]] inline nlohmann::json yaml2json(const std::string& str) { + YAML::Node root = YAML::Load(str); + return yaml2json(root); +} + +namespace detail { + +template +struct to_helper {}; + +template + requires std::convertible_to, typename Container::value_type> +Container operator|(R&& r, to_helper) { + return Container{r.begin(), r.end()}; +} + +} // namespace detail + +template + requires(!std::ranges::view) +inline auto to() { + return detail::to_helper{}; +} + +#endif // _UTILS_H_ diff --git a/out/brpc.hpp b/out/brpc.hpp new file mode 100644 index 0000000..f1b01fd --- /dev/null +++ b/out/brpc.hpp @@ -0,0 +1,992 @@ +#ifndef _BRPC_H_ +#define _BRPC_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef __cpp_impl_coroutine +#include +#endif +#include + +#include +#include +#include +#include + +#ifdef __cpp_impl_coroutine +#include + +namespace brpc { + +class ContextPool final { +public: + ContextPool(std::size_t pool_size) + : m_next_io_context(0) { + if (pool_size == 0) + throw std::runtime_error("ContextPool size is 0"); + for (std::size_t i = 0; i < pool_size; ++i) { + auto io_context_ptr = std::make_shared(); + m_io_contexts.emplace_back(io_context_ptr); + m_work.emplace_back( + asio::require(io_context_ptr->get_executor(), asio::execution::outstanding_work.tracked)); + } + } + + void start() { + for (auto& context : m_io_contexts) + m_threads.emplace_back(std::jthread([&] { + context->run(); + })); + } + + void stop() { + for (auto& context_ptr : m_io_contexts) + context_ptr->stop(); + } + + asio::io_context& getIoContext() { + size_t index = m_next_io_context.fetch_add(1, std::memory_order_relaxed); + asio::io_context& io_context = *m_io_contexts[index % m_io_contexts.size()]; + ++m_next_io_context; + if (m_next_io_context == m_io_contexts.size()) + m_next_io_context = 0; + return io_context; + } + +private: + std::vector> m_io_contexts; + std::list m_work; + std::atomic_uint64_t m_next_io_context; + std::vector m_threads; +}; + +} // namespace brpc +#endif + +namespace brpc { + +template +inline msgpack::sbuffer pack(const TObject& object) { + msgpack::sbuffer buffer; + msgpack::pack(buffer, object); + return buffer; +} + +template +inline TObject unpack(const void* data, size_t size) { + auto handle = msgpack::unpack(reinterpret_cast(data), size); + auto value = handle.get().as(); + return value; +} + +template +T fromString(const std::string&); + +template +inline std::enable_if_t::value && !std::is_same::value, std::string> toString(T value) { + return std::to_string(value); +} + +inline std::string toString(bool value) { + return value ? "true" : "false"; +} + +inline std::string toString(const std::string& value) { + return value; +} + +template +inline std::string toString(const std::vector& vector) { + std::string str = "["; + auto it = vector.begin(); + if (it != vector.end()) { + str += toString(*it); + ++it; + } + for (; it != vector.end(); ++it) { + str += ","; + str += toString(*it); + } + str += "]"; + return str; +} + +template +inline std::string toString(const std::unordered_map& map) { + std::string str = "{"; + auto it = map.begin(); + if (it != map.end()) { + str += toString(it->first); + str += "->"; + str += toString(it->second); + ++it; + } + for (; it != map.end(); ++it) { + str += ","; + str += toString(it->first); + str += "->"; + str += toString(it->second); + } + str += "}"; + return str; +} + +inline std::string uuid() { + uuid_t uuid; + char uuid_str[37]; + uuid_generate_random(uuid); + uuid_unparse(uuid, uuid_str); + return std::string(uuid_str); +} + +struct ChannelConfig { + std::size_t io_threads{1}; + zmq::socket_type socktype{zmq::socket_type::dealer}; + int32_t sendhwm{0}; + int32_t recvhwm{0}; + int32_t sendbuf{0}; + int32_t recvbuf{0}; + int32_t linger{2000}; + std::string addr{"tcp://127.0.0.1:5833"}; + bool bind{false}; + bool mandatory{false}; + bool tcp_keepalive{true}; + int tcp_keepalive_idle{60}; + int tcp_keepalive_cnt{3}; + int tcp_keepalive_intvl{5}; + bool probe{false}; +}; + +inline std::string uniqueAddr() { + uuid_t uuid; + char s[37]; + uuid_generate_random(uuid); + uuid_unparse(uuid, s); + return "inproc://" + std::string(s); +} + +} // namespace brpc + +namespace brpc { + +class BiChannel final { +public: + BiChannel(const ChannelConfig& config, + std::function error, + std::function)> cb) + : m_context(config.io_threads) + , m_socket(m_context, config.socktype) + , m_send(m_context, zmq::socket_type::push) + , m_recv(m_context, zmq::socket_type::pull) + , m_error(std::move(error)) + , m_cb(std::move(cb)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + if (config.probe) { + m_socket.set(zmq::sockopt::probe_router, 1); + } + if (config.bind) { + if (config.socktype == zmq::socket_type::router) { + if (config.mandatory) + m_socket.set(zmq::sockopt::router_mandatory, 0); + else + m_socket.set(zmq::sockopt::router_mandatory, 1); + } + m_socket.bind(config.addr); + } else { + m_socket.connect(config.addr); + } + auto addr = uniqueAddr(); + m_recv.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_recv.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_recv.bind(addr); + + m_send.set(zmq::sockopt::sndhwm, config.sendhwm); + m_send.set(zmq::sockopt::sndbuf, config.sendbuf); + m_send.set(zmq::sockopt::linger, config.linger); + + m_send.connect(addr); + } + ~BiChannel() { + m_running = false; + if (m_thread.joinable()) + m_thread.join(); + } + + void send(std::vector&& snd_msgs) { + std::lock_guard lk(m_mutex); + if (!zmq::send_multipart(m_send, std::forward(snd_msgs))) { + m_error("send error!!!"); + } + } + + void send(std::vector&& snd_msgs, + std::chrono::milliseconds timeout, + std::function cb) { + std::lock_guard lk(m_mutex); + if (!zmq::send_multipart(m_send, std::forward(snd_msgs))) { + m_error("send error!!!"); + } + auto timeout_point = std::chrono::system_clock::now() + timeout; + m_timeout_task.emplace(timeout_point, std::move(cb)); + } + + void start() { + m_running = true; + m_thread = std::thread([this] { + std::vector items{ + {static_cast(m_socket), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + {static_cast(m_recv), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + }; + std::chrono::milliseconds interval(200); + std::multimap> timeout_task; + while (m_running.load()) { + zmq::poll(items, interval); + if (items[0].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_socket, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("zmq::recv_multipart error!!!"); + break; + } + m_cb(std::move(recv_msgs)); + } + if (items[1].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_recv, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("recv zmq::recv_multipart error!!!"); + break; + } + if (!zmq::send_multipart(m_socket, recv_msgs)) { + m_error("zmq::send_multipart error!!!"); + break; + } + } + { + std::lock_guard lk(m_mutex); + if (!m_timeout_task.empty()) { + timeout_task.merge(m_timeout_task); + m_timeout_task.clear(); + } + } + auto now = std::chrono::system_clock::now(); + while (!timeout_task.empty() && timeout_task.begin()->first <= now) { + (timeout_task.begin()->second)(); + timeout_task.erase(timeout_task.begin()); + } + } + }); + } + +private: + zmq::context_t m_context; + zmq::socket_t m_socket; + zmq::socket_t m_send; + zmq::socket_t m_recv; + std::function m_error; + std::function)> m_cb; + std::thread m_thread; + std::atomic_bool m_running{false}; + std::mutex m_mutex; + std::multimap> m_timeout_task; +}; + +} // namespace brpc +namespace brpc { + +struct UniChannel final { +public: + UniChannel(const brpc::ChannelConfig& config, + std::function)> cb, + std::function error) + : m_context(std::make_shared(config.io_threads)) + , m_socket(*m_context, config.socktype) + , m_cb(std::move(cb)) + , m_error(std::move(error)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.connect(config.addr); + } + + UniChannel(std::shared_ptr context, + const brpc::ChannelConfig& config, + std::function)> cb, + std::function error) + : m_context(std::move(context)) + , m_socket(*m_context, config.socktype) + , m_cb(std::move(cb)) + , m_error(std::move(error)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.connect(config.addr); + } + + ~UniChannel() { + m_running = false; + if (m_thread.joinable()) + m_thread.join(); + } + + void subscribe(const std::string& topic = "") { + m_socket.set(zmq::sockopt::subscribe, topic); + } + + void start() { + m_running = true; + m_thread = std::thread([this] { + std::vector items{ + {static_cast(m_socket), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + }; + std::chrono::milliseconds interval(200); + while (m_running.load()) { + zmq::poll(items, interval); + if (items[0].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_socket, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("zmq::recv_multipart error!!!"); + break; + } + m_cb(std::move(recv_msgs)); + } + } + }); + } + + auto& context() { + return m_context; + } + + auto& socket() { + return m_socket; + } + +private: + std::shared_ptr m_context; + zmq::socket_t m_socket; + std::function)> m_cb; + std::function m_error; + std::atomic_bool m_running; + std::thread m_thread; +}; + +} // namespace brpc + +namespace brpc { + +enum class TestType : int32_t { + EnumOne = 0, // zero + EnumTwo = 1, // one +}; + +inline std::string_view toString(const TestType value) { + switch (value) { + case TestType::EnumOne: + return "0"; + case TestType::EnumTwo: + return "1"; + default: + return "???"; + } +} + +template <> +inline TestType fromString(const std::string& value) { + if (value == "EnumOne") + return TestType::EnumOne; + if (value == "EnumTwo") + return TestType::EnumTwo; + throw std::bad_cast(); +} + +} // namespace brpc + +MSGPACK_ADD_ENUM(brpc::TestType) + +namespace brpc { + +struct Info { + std::string name; // test Name + + MSGPACK_DEFINE(name) +}; + +inline std::string toString(const Info& value) { + std::string str = "Info{"; + str += toString(value.name); + str += ","; + str += "}"; + return str; +} + +} // namespace brpc + +namespace brpc { + +struct BankInfo { + std::string name; // test Name + TestType type; // test Type + int32_t test_one; // test 1 + uint32_t test_two; // test 2 + std::unordered_map test_map_one; // test map + std::unordered_map test_map; // test map + std::vector test_vector; // test map + Info info; // Info + + MSGPACK_DEFINE(name, type, test_one, test_two, test_map_one, test_map, test_vector, info) +}; + +inline std::string toString(const BankInfo& value) { + std::string str = "BankInfo{"; + str += toString(value.name); + str += ","; + str += toString(value.type); + str += ","; + str += toString(value.test_one); + str += ","; + str += toString(value.test_two); + str += ","; + str += toString(value.test_map_one); + str += ","; + str += toString(value.test_map); + str += ","; + str += toString(value.test_vector); + str += ","; + str += toString(value.info); + str += ","; + str += "}"; + return str; +} + +} // namespace brpc + +namespace brpc { + +enum class HelloWorldClientHelloWorldServer { + hello_world, + +}; + +} // namespace brpc + +MSGPACK_ADD_ENUM(brpc::HelloWorldClientHelloWorldServer) + +namespace brpc { + +class HelloWorldClient final { +public: + HelloWorldClient(const ChannelConfig& config, + std::function error) + : m_channel(std::make_unique(config, error, [this](auto&& recv_msgs) mutable { + dispatch(std::forward(recv_msgs)); + })) + , m_error(error) { + } + + void start() { + m_channel->start(); + } + + void hello_world(BankInfo bank_info, std::string bank_name, uint64_t blance, std::function cb) { + auto type = pack(HelloWorldClientHelloWorldServer::hello_world); + auto req_id = m_req_id.fetch_add(1); + auto req_id_str = pack(req_id); + auto packet = pack>(std::make_tuple(std::move(bank_info), std::move(bank_name), blance)); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(type.data(), type.size())); + snd_bufs.emplace_back(zmq::message_t(req_id_str.data(), req_id_str.size())); + snd_bufs.emplace_back(zmq::message_t(packet.data(), packet.size())); + { + std::lock_guard lk(m_mtx); + m_cb.emplace(req_id, std::move(cb)); + } + m_channel->send(std::move(snd_bufs)); + } + + void hello_world(BankInfo bank_info, std::string bank_name, uint64_t blance, + std::function cb, + std::chrono::milliseconds timeout, + std::function timeout_cb) { + auto type = pack(HelloWorldClientHelloWorldServer::hello_world); + auto req_id = m_req_id.fetch_add(1); + auto req_id_str = pack(req_id); + auto packet = pack>(std::make_tuple(std::move(bank_info), std::move(bank_name), blance)); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(type.data(), type.size())); + snd_bufs.emplace_back(zmq::message_t(req_id_str.data(), req_id_str.size())); + snd_bufs.emplace_back(zmq::message_t(packet.data(), packet.size())); + { + std::lock_guard lk(m_mtx); + m_cb.emplace(req_id, std::move(cb)); + m_timeout_cb.emplace(req_id, std::move(timeout_cb)); + } + m_channel->send(std::move(snd_bufs), + std::move(timeout), + [this, req_id]() mutable { + std::unique_lock lk(m_mtx); +#ifdef __cpp_impl_coroutine + if (!m_cb.contains(req_id) || !m_timeout_cb.contains(req_id)) + return; +#else + if (m_cb.find(req_id) == m_cb.end() || m_timeout_cb.find(req_id) == m_timeout_cb.end()) + return; +#endif + auto cb = std::move(m_timeout_cb[req_id]); + m_timeout_cb.erase(req_id); + m_cb.erase(req_id); + lk.unlock(); + cb(); + }); + } +#ifdef __cpp_impl_coroutine + template CompletionToken> + auto hello_world_coro(BankInfo bank_info, std::string bank_name, uint64_t blance, CompletionToken&& token) { + return asio::async_initiate( + [this](Handler&& handler, BankInfo bank_info, std::string bank_name, uint64_t blance) mutable { + auto handler_ptr = std::make_shared(std::move(handler)); + hello_world( + std::move(bank_info), std::move(bank_name), blance, + [handler_ptr = std::move(handler_ptr)](std::string reply, Info info, uint64_t count) mutable { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [reply = std::move(reply), info = std::move(info), count, handler_ptr = std::move(handler_ptr)]() mutable -> void { + (*handler_ptr)(std::move(reply), std::move(info), count); + }); + }); + }, + token, + std::move(bank_info), std::move(bank_name), blance); + } + + template >)> CompletionToken> + auto hello_world_coro(BankInfo bank_info, std::string bank_name, uint64_t blance, std::chrono::milliseconds timeout, CompletionToken&& token) { + return asio::async_initiate>)>( + [this](Handler&& handler, BankInfo bank_info, std::string bank_name, uint64_t blance, auto timeout) mutable { + auto handler_ptr = std::make_shared(std::move(handler)); + hello_world( + std::move(bank_info), std::move(bank_name), blance, + [handler_ptr](std::string reply, Info info, uint64_t count) mutable { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [reply = std::move(reply), info = std::move(info), count, handler_ptr = std::move(handler_ptr)]() mutable -> void { + (*handler_ptr)(std::make_tuple(std::move(reply), std::move(info), count)); + }); + }, + std::move(timeout), + [handler_ptr] { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [=, handler_ptr = std::move(handler_ptr)]() mutable -> void { + (*handler_ptr)(std::nullopt); + }); + }); + }, + token, + std::move(bank_info), std::move(bank_name), blance, std::move(timeout)); + } +#endif + + static auto create(ChannelConfig& config, std::function error) { + config.socktype = zmq::socket_type::dealer; + return std::make_unique(config, std::move(error)); + } + +private: + void dispatch(std::vector recv_msgs) { + if (recv_msgs.size() != 3) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack(recv_msgs[0].data(), recv_msgs[0].size()); + auto req_id = unpack(recv_msgs[1].data(), recv_msgs[1].size()); + switch (type) { + case HelloWorldClientHelloWorldServer::hello_world: { + auto tp = unpack>(recv_msgs[2].data(), recv_msgs[2].size()); + std::unique_lock lk(m_mtx); +#ifdef __cpp_impl_coroutine + if (!m_cb.contains(req_id)) + break; +#else + if (m_cb.find(req_id) == m_cb.end()) + break; +#endif + auto cb = std::move(m_cb[req_id]); + m_cb.erase(req_id); + lk.unlock(); + auto func = std::any_cast>(cb); + std::apply(func, tp); + break; + } + default: + m_error("error type"); + } + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + } + + std::unique_ptr m_channel; + std::function m_error; + std::mutex m_mtx; + std::unordered_map m_cb; + std::unordered_map> m_timeout_cb; + std::atomic_uint64_t m_req_id{0}; +}; + +struct HelloWorldServerHandler { + virtual void hello_world(BankInfo bank_info, std::string bank_name, uint64_t blance, std::function cb) = 0; +}; + +struct CoroHelloWorldServerHandler { +#ifdef __cpp_impl_coroutine + virtual asio::awaitable hello_world(BankInfo bank_info, std::string bank_name, uint64_t blance, std::function cb) = 0; +#else + virtual void hello_world(BankInfo bank_info, std::string bank_name, uint64_t blance, std::function cb) = 0; +#endif +}; + +class HelloWorldServer final { +public: + HelloWorldServer(const ChannelConfig& config, + std::variant, std::shared_ptr> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { +#ifdef __cpp_impl_coroutine + m_pool.start(); +#endif + m_channel = std::make_unique(config, error, [this](std::vector recv_bufs) mutable { + if (recv_bufs.size() != 4) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack(recv_bufs[1].data(), recv_bufs[1].size()); + switch (type) { + case HelloWorldClientHelloWorldServer::hello_world: { + auto [bank_info, bank_name, blance] = unpack>(recv_bufs[3].data(), recv_bufs[3].size()); + auto out = [ptr = std::make_shared>(std::move(recv_bufs)), this](std::string reply, Info info, uint64_t count) mutable { + auto& snd_bufs = *ptr; + auto packet = pack>(std::make_tuple(std::move(reply), std::move(info), count)); + snd_bufs[3] = std::move(zmq::message_t(packet.data(), packet.size())); + m_channel->send(std::move(snd_bufs)); + }; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg->hello_world(std::move(bank_info), std::move(bank_name), blance, out); + } else { +#ifdef __cpp_impl_coroutine + asio::co_spawn( + m_pool.getIoContext(), + [](auto& arg, BankInfo bank_info, std::string bank_name, uint64_t blance, auto out) -> asio::awaitable { + co_await arg->hello_world(std::move(bank_info), std::move(bank_name), blance, std::move(out)); + co_return; + }(arg, std::move(bank_info), std::move(bank_name), blance, std::move(out)), + asio::detached); +#endif + } + }, + m_handler); + break; + } + default: + m_error("error type"); + } + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + }); + } + ~HelloWorldServer() { +#ifdef __cpp_impl_coroutine + m_pool.stop(); +#endif + } + + void start() { + m_channel->start(); + } + + static auto create(ChannelConfig& config, + std::variant, std::shared_ptr> handler, + std::function error) { + config.socktype = zmq::socket_type::router; + config.bind = true; + return std::make_unique(config, std::move(handler), std::move(error)); + } + +private: + std::variant, std::shared_ptr> m_handler; + std::function m_error; + std::unique_ptr m_channel; + std::mutex m_mtx; +#ifdef __cpp_impl_coroutine + brpc::ContextPool m_pool{1}; +#endif +}; + +} // namespace brpc +namespace brpc { + +enum class HelloWorldSenderHelloWorldReceiver { + hello_world, + notice, + +}; + +inline std::string_view toString(const HelloWorldSenderHelloWorldReceiver value) { + switch (value) { + case HelloWorldSenderHelloWorldReceiver::hello_world: + return "hello_world"; + case HelloWorldSenderHelloWorldReceiver::notice: + return "notice"; + default: + return "???"; + } +} + +template <> +inline HelloWorldSenderHelloWorldReceiver fromString(const std::string& value) { + if (value == "hello_world") + return HelloWorldSenderHelloWorldReceiver::hello_world; + if (value == "notice") + return HelloWorldSenderHelloWorldReceiver::notice; + throw std::bad_cast(); +} + +} // namespace brpc + +MSGPACK_ADD_ENUM(brpc::HelloWorldSenderHelloWorldReceiver) + +namespace brpc { + +struct HelloWorldReceiverHandler { + virtual void hello_world(std::string in) = 0; + virtual void notice(int32_t in, std::string info) = 0; +}; + +struct CoroHelloWorldReceiverHandler { +#ifdef __cpp_impl_coroutine + virtual asio::awaitable hello_world(std::string in) = 0; +#else + virtual void hello_world(std::string in) = 0; +#endif +#ifdef __cpp_impl_coroutine + virtual asio::awaitable notice(int32_t in, std::string info) = 0; +#else + virtual void notice(int32_t in, std::string info) = 0; +#endif +}; + +class HelloWorldReceiver final { +public: + HelloWorldReceiver(const ChannelConfig& config, + std::variant, std::shared_ptr> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { + m_channel = std::make_unique(config, [this](auto&& recv) mutable { + dispatch(std::forward(recv)); + }, + error); + } + HelloWorldReceiver(const ChannelConfig& config, + const std::shared_ptr& context, + std::variant, std::shared_ptr> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { + m_channel = std::make_unique(context, config, [this](auto&& recv_msgs) mutable { + dispatch(std::forward(recv_msgs)); + }, + error); + } + ~HelloWorldReceiver() { +#ifdef __cpp_impl_coroutine + m_pool.stop(); +#endif + } + + void start() { +#ifdef __cpp_impl_coroutine + m_pool.start(); +#endif + m_channel->subscribe(""); + m_channel->start(); + } + + static auto create(ChannelConfig& config, + std::variant, std::shared_ptr> handler, + std::function error) { + config.socktype = zmq::socket_type::sub; + return std::make_unique(config, std::move(handler), std::move(error)); + } + +private: + void dispatch(std::vector recv_bufs) { + if (recv_bufs.size() != 2) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack(recv_bufs[0].data(), recv_bufs[0].size()); + if (type == HelloWorldSenderHelloWorldReceiver::hello_world) { + auto [in] = unpack>(recv_bufs[1].data(), recv_bufs[1].size()); + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg->hello_world(std::move(in)); + } else { +#ifdef __cpp_impl_coroutine + asio::co_spawn( + m_pool.getIoContext(), + [](auto& arg, std::string in) -> asio::awaitable { + co_await arg->hello_world(std::move(in)); + co_return; + }(arg, std::move(in)), + asio::detached); +#endif + } + }, + m_handler); + } + if (type == HelloWorldSenderHelloWorldReceiver::notice) { + auto [in, info] = unpack>(recv_bufs[1].data(), recv_bufs[1].size()); + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg->notice(in, std::move(info)); + } else { +#ifdef __cpp_impl_coroutine + asio::co_spawn( + m_pool.getIoContext(), + [](auto& arg, int32_t in, std::string info) -> asio::awaitable { + co_await arg->notice(in, std::move(info)); + co_return; + }(arg, in, std::move(info)), + asio::detached); +#endif + } + }, + m_handler); + } + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + } + + std::variant, std::shared_ptr> m_handler; + std::function m_error; + std::unique_ptr m_channel; + std::mutex m_mtx; +#ifdef __cpp_impl_coroutine + brpc::ContextPool m_pool{1}; +#endif +}; + +class HelloWorldSender final { +public: + HelloWorldSender(const ChannelConfig& config) + : m_context(std::make_shared(config.io_threads)) + , m_socket(*m_context, config.socktype) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.bind(config.addr); + } + + HelloWorldSender(const ChannelConfig& config, + const std::shared_ptr& context) + : m_context(context) + , m_socket(*m_context, config.socktype) { + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.bind(config.addr); + } + + ~HelloWorldSender() { + } + + static auto create(ChannelConfig& config) { + config.socktype = zmq::socket_type::pub; + return std::make_unique(config); + } + + auto hello_world(std::string in) { + static auto pub_topic = brpc::pack(HelloWorldSenderHelloWorldReceiver::hello_world); + auto str = brpc::pack>(std::make_tuple(std::move(in))); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(pub_topic.data(), pub_topic.size())); + snd_bufs.emplace_back(zmq::message_t(str.data(), str.size())); + std::lock_guard lk(m_mtx); + return zmq::send_multipart(m_socket, std::move(snd_bufs)); + } + auto notice(int32_t in, std::string info) { + static auto pub_topic = brpc::pack(HelloWorldSenderHelloWorldReceiver::notice); + auto str = brpc::pack>(std::make_tuple(in, std::move(info))); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(pub_topic.data(), pub_topic.size())); + snd_bufs.emplace_back(zmq::message_t(str.data(), str.size())); + std::lock_guard lk(m_mtx); + return zmq::send_multipart(m_socket, std::move(snd_bufs)); + } + +private: + std::shared_ptr m_context; + zmq::socket_t m_socket; + std::mutex m_mtx; +}; + +} // namespace brpc +#endif //_BRPC_H_ diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..bf1d5fe --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,319 @@ +#include +#include +#include + +#include +#include +#include + +#include "cmdline.h" +#include "utils.h" + +auto extract(const std::string& input) { + std::regex pattern("<(.*?)>"); + std::smatch matches; + if (!std::regex_search(input, matches, pattern)) { + spdlog::error("extracted [{}] error.", input); + exit(1); + } + std::string extracted_string = matches[1].str(); + return extracted_string; +} + +std::string toCppType(std::string type) { + auto convert = [](const auto& type) { + static std::unordered_map cpp_type_table{ + {"bool", "bool"}, + {"i8", "int8_t"}, + {"u8", "uint8_t"}, + {"i16", "int16_t"}, + {"u16", "uint16_t"}, + {"i32", "int32_t"}, + {"u32", "uint32_t"}, + {"i64", "int64_t"}, + {"u64", "uint64_t"}, + {"f32", "float"}, + {"f64", "double"}, + {"string", "std::string"}, + }; + if (cpp_type_table.contains(type)) + return cpp_type_table[type]; + return type; + }; + if (type.starts_with("map")) { + auto extract_str = extract(type); + auto types = extract(type) | + std::views::split(';') | + std::views::transform([](auto&& rng) { + return std::string(&*rng.begin(), std::ranges::distance(rng.begin(), rng.end())); + }) | + to>(); + if (types.size() != 2) { + spdlog::error("invalid: [{}]", type); + exit(1); + } + type = std::format("std::unordered_map<{}, {}>", convert(types.at(0)), convert(types.at(1))); + } + if (type.starts_with("vector")) + type = std::format("std::vector<{}>", convert(extract(type))); + return convert(type); +} + +auto toSnakeCase(const std::string& s) { + std::regex words_regex("[A-Z][a-z]+"); + auto words_begin = std::sregex_iterator(s.begin(), s.end(), words_regex); + auto words_end = std::sregex_iterator(); + std::string name{}; + for (std::sregex_iterator i = words_begin; i != words_end; ++i) { + std::smatch match = *i; + std::string match_str = match.str(); + std::transform(match_str.begin(), match_str.end(), match_str.begin(), ::tolower); + auto z = i; + if (++z == words_end) + name += match_str; + else + name += (match_str + "_"); + } + return name; +} + +auto _snake(inja::Arguments& args) { + return toSnakeCase(args.at(0)->get()); +} + +auto _join(inja::Arguments& args) { + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << toSnakeCase(json["name"]) << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +auto _format_args(inja::Arguments& args) { + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << json["type"].get() << " " << json["name"].get() << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +static std::unordered_map CPP_TYPE_TABLE{ + {"bool", "bool"}, + {"int8_t", "int8_t"}, + {"uint8_t", "uint8_t"}, + {"int16_t", "int16_t"}, + {"uint16_t", "uint16_t"}, + {"int32_t", "int32_t"}, + {"uint32_t", "uint32_t"}, + {"int64_t", "int64_t"}, + {"uint64_t", "uint64_t"}, + {"float", "float"}, + {"double", "double"}, +}; + +auto _format_args_to_const_ref(inja::Arguments& args) { + auto convert = [](const auto& type, const auto& name) { + if (CPP_TYPE_TABLE.contains(type)) + return std::format("{} {}", type, name); + return std::format("const {}& {}", type, name); + }; + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << convert(json["type"].get(), json["name"].get()) << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +auto _format_args_type(inja::Arguments& args) { + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << json["type"].get() << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +auto _format_args_name_and_move(inja::Arguments& args) { + auto convert = [](const auto& type, const auto& name) { + if (CPP_TYPE_TABLE.contains(type)) + return name; + return std::format("std::move({})", name); + }; + + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << convert(json["type"].get(), json["name"].get()) << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +auto _format_catch_move(inja::Arguments& args) { + auto convert = [](const auto& type, const auto& name) { + if (CPP_TYPE_TABLE.contains(type)) + return name; + return std::format("{} = std::move({})", name, name); + }; + + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << convert(json["type"].get(), json["name"].get()) << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +auto _format_args_name(inja::Arguments& args) { + std::stringstream ss; + for (auto& json : *args.at(0)) + ss << json["name"].get() << ","; + auto str = ss.str(); + str.pop_back(); + return str; +} + +void formatCode(const std::string& file, const std::string& content) { + { + std::ofstream write(file); + if (!write.is_open()) { + spdlog::error("write {} error!!!", file); + return; + } + write << content; + } + char buffer[2048] = {}; + std::string result; + auto command = fmt::format("{} {}", "clang-format", file); + spdlog::info("start format {}", command); + FILE* pipe = popen(command.c_str(), "r"); + if (!pipe) { + spdlog::error("Couldn't start command {}", command); + return; + } + while (fgets(buffer, sizeof(buffer), pipe) != nullptr) { + result += buffer; + memset(buffer, 0x00, sizeof(buffer)); + } + pclose(pipe); + std::ofstream write(file); + if (!write.is_open()) { + spdlog::error("open {} fail", file); + return; + } + write << result; +} + +auto parseYaml(const std::string& file) { + YAML::Node config = YAML::LoadFile(file); + nlohmann::json ast; + std::string filename, namespace_str; + for (const auto& kv : config) { + nlohmann::json data; + auto node_name = kv.first.as(); + if ("property" == node_name) { + namespace_str = kv.second["namespace"].as(); + filename = kv.second["filename"].as(); + ast["node"][node_name]["filename"] = filename; + ast["node"][node_name]["namespace"] = namespace_str; + continue; + } + data["property"]["filename"] = filename; + data["property"]["namespace"] = namespace_str; + auto type = config[node_name]["type"].as(); + data["type"] = type; + if (type == "struct") { + data["struct_name"] = node_name; + nlohmann::json definitions_json; + for (auto struct_val : config[node_name]["definitions"]) { + nlohmann::json j; + j["name"] = struct_val.first.as(); + j["type"] = toCppType(struct_val.second["type"].as()); + j["comment"] = struct_val.second["comment"].as(); + definitions_json.emplace_back(std::move(j)); + } + data["definitions"] = std::move(definitions_json); + } + if (type == "enum") { + data["enum_name"] = node_name; + data["value_type"] = toCppType(config[node_name]["value_type"].as()); + nlohmann::json definitions_json; + for (auto struct_val : config[node_name]["definitions"]) { + nlohmann::json j; + j["name"] = struct_val.first.as(); + j["default"] = struct_val.second["default"].as(); + j["comment"] = struct_val.second["comment"].as(); + definitions_json.emplace_back(std::move(j)); + } + data["definitions"] = std::move(definitions_json); + } + if (type == "interface") { + data["interface_name"] = node_name; + data["pattern"] = config[node_name]["pattern"].as(); + data["caller"] = config[node_name]["caller"].as(); + data["callee"] = config[node_name]["callee"].as(); + for (auto struct_val : config[node_name]["definitions"]) { + nlohmann::json j; + nlohmann::json in_out; + j["func_name"] = struct_val.first.as(); + for (auto val : struct_val.second) { + for (auto val1 : val.second) { + in_out["name"] = val1.first.as(); + in_out["type"] = toCppType(val1.second["type"].as()); + j[val.first.as()].emplace_back(std::move(in_out)); + } + } + data["definitions"].emplace_back(std::move(j)); + } + } + ast["node"]["value"].emplace_back(std::move(data)); + } + return ast; +} + +int main(int argc, char** argv) { + cmdline::parser a; + a.add("filename", 'f', "input yaml yaml file", true, ""); + a.add("template", 't', "template directory", true, ""); + a.add("output", 'o', "output directory", true, ""); + a.add("lang", 'l', "Language", false, "cpp"); + a.parse_check(argc, argv); + + auto filename = a.get("filename"); + auto injia_template = a.get("template"); + auto output = a.get("output"); + auto lang = a.get("lang"); + + spdlog::info("filename: {}", filename); + if (injia_template.back() == '/') + injia_template.pop_back(); + injia_template = std::format("{}/ast.cpp.inja", injia_template); + spdlog::info("template: {}", injia_template); + spdlog::info("output: {}", output); + spdlog::info("lang: {}", lang); + + nlohmann::json data = parseYaml(filename); + // spdlog::info("{}", data.dump(4)); + + inja::Environment env; + env.set_trim_blocks(true); + env.set_lstrip_blocks(true); + env.add_callback("_snake", 1, _snake); + env.add_callback("_join", 1, _join); + env.add_callback("_format_args", 1, _format_args); + env.add_callback("_format_args_to_const_ref", 1, _format_args_to_const_ref); + env.add_callback("_format_args_name", 1, _format_args_name); + env.add_callback("_format_args_type", 1, _format_args_type); + env.add_callback("_format_args_name_and_move", 1, _format_args_name_and_move); + env.add_callback("_format_catch_move", 1, _format_catch_move); + + auto temp = env.parse_template(injia_template); + std::string result = env.render(temp, data); + + auto f = std::format("{}/{}", output, data["node"].at("property").at("filename").get()); + formatCode(f, result); + + return 0; +} diff --git a/template/cpp/asio_context_pool.inja b/template/cpp/asio_context_pool.inja new file mode 100644 index 0000000..581946a --- /dev/null +++ b/template/cpp/asio_context_pool.inja @@ -0,0 +1,43 @@ +namespace {{node.property.namespace}} { + +class ContextPool final { +public: + ContextPool(std::size_t pool_size) + : m_next_io_context(0) { + if (pool_size == 0) + throw std::runtime_error("ContextPool size is 0"); + for (std::size_t i = 0; i < pool_size; ++i) { + auto io_context_ptr = std::make_shared(); + m_io_contexts.emplace_back(io_context_ptr); + m_work.emplace_back( + asio::require(io_context_ptr->get_executor(), asio::execution::outstanding_work.tracked)); + } + } + + void start() { + for (auto& context : m_io_contexts) + m_threads.emplace_back(std::jthread([&] { context->run(); })); + } + + void stop() { + for (auto& context_ptr : m_io_contexts) + context_ptr->stop(); + } + + asio::io_context& getIoContext() { + size_t index = m_next_io_context.fetch_add(1, std::memory_order_relaxed); + asio::io_context& io_context = *m_io_contexts[index % m_io_contexts.size()]; + ++m_next_io_context; + if (m_next_io_context == m_io_contexts.size()) + m_next_io_context = 0; + return io_context; + } + +private: + std::vector> m_io_contexts; + std::list m_work; + std::atomic_uint64_t m_next_io_context; + std::vector m_threads; +}; + +} // {{node.property.namespace}} diff --git a/template/cpp/ast.cpp.inja b/template/cpp/ast.cpp.inja new file mode 100644 index 0000000..d0c6f7e --- /dev/null +++ b/template/cpp/ast.cpp.inja @@ -0,0 +1,65 @@ +#ifndef _{{upper(node.property.namespace)}}_H_ +#define _{{upper(node.property.namespace)}}_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef __cpp_impl_coroutine +#include +#endif +#include + +#include +#include +#include +#include + +#ifdef __cpp_impl_coroutine +#include + +{% include "asio_context_pool.inja" %} +#endif + +{% include "utils.inja" %} + +{% include "bi_channel.inja" %} + +{% include "uni_channel.inja" %} + +{% for value in node.value %} + {% if value.type == "enum" %} + {% include "enum.inja" %} + {% endif %} +{% endfor %} + +{% for value in node.value %} + {% if value.type == "struct" %} + {% include "struct.inja" %} + {% endif %} +{% endfor %} + +{% for value in node.value %} + {% if value.type == "interface" %} + {% if value.pattern == "bi" %} + {% include "bi.inja" %} + {% endif %} + {% endif %} +{% endfor %} + +{% for value in node.value %} + {% if value.type == "interface" %} + {% if value.pattern == "uni" %} + {% include "uni.inja" %} + {% endif %} + {% endif %} +{% endfor %} + +#endif //_{{upper(node.property.namespace)}}_H_ diff --git a/template/cpp/bi.inja b/template/cpp/bi.inja new file mode 100644 index 0000000..98c065d --- /dev/null +++ b/template/cpp/bi.inja @@ -0,0 +1,280 @@ +namespace {{node.property.namespace}} { + +enum class {{value.caller}}{{value.callee}} { + {% for func in value.definitions %} + {{func.func_name}}, + {% endfor %} + +}; + +} // {{node.property.namespace}} + +MSGPACK_ADD_ENUM({{node.property.namespace}}::{{value.caller}}{{value.callee}}) + +namespace {{node.property.namespace}} { + +class {{value.caller}} final { +public: + {{value.caller}}(const ChannelConfig& config, + std::function error) + : m_channel(std::make_unique(config, error, [this](auto&& recv_msgs) mutable { dispatch(std::forward(recv_msgs)); })) + , m_error(error) { + } + + void start() { + m_channel->start(); + } + + {% for func in value.definitions %} + + void {{func.func_name}}({{_format_args(func.inputs)}}, std::function cb) { + auto type = pack<{{value.caller}}{{value.callee}}>({{value.caller}}{{value.callee}}::{{func.func_name}}); + auto req_id = m_req_id.fetch_add(1); + auto req_id_str = pack(req_id); + auto packet = pack>(std::make_tuple({{_format_args_name_and_move(func.inputs)}})); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(type.data(), type.size())); + snd_bufs.emplace_back(zmq::message_t(req_id_str.data(), req_id_str.size())); + snd_bufs.emplace_back(zmq::message_t(packet.data(), packet.size())); + { + std::lock_guard lk(m_mtx); + m_cb.emplace(req_id, std::move(cb)); + } + m_channel->send(std::move(snd_bufs)); + } + + void {{func.func_name}}({{_format_args(func.inputs)}}, + std::function cb, + std::chrono::milliseconds timeout, + std::function timeout_cb) { + auto type = pack<{{value.caller}}{{value.callee}}>({{value.caller}}{{value.callee}}::{{func.func_name}}); + auto req_id = m_req_id.fetch_add(1); + auto req_id_str = pack(req_id); + auto packet = pack>(std::make_tuple({{_format_args_name_and_move(func.inputs)}})); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(type.data(), type.size())); + snd_bufs.emplace_back(zmq::message_t(req_id_str.data(), req_id_str.size())); + snd_bufs.emplace_back(zmq::message_t(packet.data(), packet.size())); + { + std::lock_guard lk(m_mtx); + m_cb.emplace(req_id, std::move(cb)); + m_timeout_cb.emplace(req_id, std::move(timeout_cb)); + } + m_channel->send(std::move(snd_bufs), + std::move(timeout), + [this, req_id]() mutable { + std::unique_lock lk(m_mtx); +#ifdef __cpp_impl_coroutine + if (!m_cb.contains(req_id) || !m_timeout_cb.contains(req_id)) + return; +#else + if (m_cb.find(req_id) == m_cb.end() || m_timeout_cb.find(req_id) == m_timeout_cb.end()) + return; +#endif + auto cb = std::move(m_timeout_cb[req_id]); + m_timeout_cb.erase(req_id); + m_cb.erase(req_id); + lk.unlock(); + cb(); + }); + } +#ifdef __cpp_impl_coroutine + template CompletionToken> + auto {{func.func_name}}_coro({{_format_args(func.inputs)}}, CompletionToken&& token) { + return asio::async_initiate( + [this](Handler&& handler, {{_format_args(func.inputs)}}) mutable { + auto handler_ptr = std::make_shared(std::move(handler)); + {{func.func_name}}( + {{_format_args_name_and_move(func.inputs)}}, + [handler_ptr = std::move(handler_ptr)]({{_format_args(func.outputs)}}) mutable { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [{{_format_catch_move(func.outputs)}}, handler_ptr = std::move(handler_ptr)] () mutable -> void { + (*handler_ptr)({{_format_args_name_and_move(func.outputs)}}); + }); + }); + }, + token, + {{_format_args_name_and_move(func.inputs)}}); + } + + template >)> CompletionToken> + auto {{func.func_name}}_coro({{_format_args(func.inputs)}}, std::chrono::milliseconds timeout, CompletionToken&& token) { + return asio::async_initiate>)>( + [this](Handler&& handler, {{_format_args(func.inputs)}}, auto timeout) mutable { + auto handler_ptr = std::make_shared(std::move(handler)); + {{func.func_name}}( + {{_format_args_name_and_move(func.inputs)}}, + [handler_ptr]({{_format_args(func.outputs)}}) mutable { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [{{_format_catch_move(func.outputs)}}, handler_ptr = std::move(handler_ptr)] () mutable -> void { + (*handler_ptr)(std::make_tuple({{_format_args_name_and_move(func.outputs)}})); + }); + }, + std::move(timeout), + [handler_ptr] { + auto ex = asio::get_associated_executor(*handler_ptr); + asio::post(ex, [=, handler_ptr = std::move(handler_ptr)] () mutable -> void { + (*handler_ptr)(std::nullopt); + }); + }); + }, + token, + {{_format_args_name_and_move(func.inputs)}}, std::move(timeout)); + } +#endif + + {% endfor %} + + + static auto create(ChannelConfig& config, std::function error) { + config.socktype = zmq::socket_type::dealer; + return std::make_unique<{{value.caller}}>(config, std::move(error)); + } + +private: + void dispatch(std::vector recv_msgs) { + if (recv_msgs.size() != 3) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack<{{value.caller}}{{value.callee}}>(recv_msgs[0].data(), recv_msgs[0].size()); + auto req_id = unpack(recv_msgs[1].data(), recv_msgs[1].size()); + switch(type) { + {% for func in value.definitions %} + case {{value.caller}}{{value.callee}}::{{func.func_name}}: { + auto tp = unpack>(recv_msgs[2].data(), recv_msgs[2].size()); + std::unique_lock lk(m_mtx); +#ifdef __cpp_impl_coroutine + if (!m_cb.contains(req_id)) + break; +#else + if (m_cb.find(req_id) == m_cb.end()) + break; +#endif + auto cb = std::move(m_cb[req_id]); + m_cb.erase(req_id); + lk.unlock(); + auto func = std::any_cast>(cb); + std::apply(func, tp); + break; + } + {% endfor %} + default: + m_error("error type"); + } + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + } + + std::unique_ptr m_channel; + std::function m_error; + std::mutex m_mtx; + std::unordered_map m_cb; + std::unordered_map> m_timeout_cb; + std::atomic_uint64_t m_req_id{0}; +}; + +struct {{value.callee}}Handler { + {% for func in value.definitions %} + virtual void {{func.func_name}}({{_format_args(func.inputs)}}, std::function cb) = 0; + {% endfor %} +}; + +struct Coro{{value.callee}}Handler { + {% for func in value.definitions %} +#ifdef __cpp_impl_coroutine + virtual asio::awaitable {{func.func_name}}({{_format_args(func.inputs)}}, std::function cb) = 0; +#else + virtual void {{func.func_name}}({{_format_args(func.inputs)}}, std::function cb) = 0; +#endif + {% endfor %} +}; + + +class {{value.callee}} final { +public: + {{value.callee}}(const ChannelConfig& config, + std::variant, std::shared_ptr<{{value.callee}}Handler>> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { +#ifdef __cpp_impl_coroutine + m_pool.start(); +#endif + m_channel = std::make_unique(config, error, [this] (std::vector recv_bufs) mutable { + if (recv_bufs.size() != 4) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack<{{value.caller}}{{value.callee}}>(recv_bufs[1].data(), recv_bufs[1].size()); + switch(type) { + {% for func in value.definitions %} + case {{value.caller}}{{value.callee}}::{{func.func_name}}: { + auto [{{_format_args_name(func.inputs)}}] = unpack>(recv_bufs[3].data(), recv_bufs[3].size()); + auto out = [ptr = std::make_shared>(std::move(recv_bufs)),this] ({{_format_args(func.outputs)}}) mutable { + auto& snd_bufs = *ptr; + auto packet = pack>(std::make_tuple({{_format_args_name_and_move(func.outputs)}})); + snd_bufs[3] = std::move(zmq::message_t(packet.data(), packet.size())); + m_channel->send(std::move(snd_bufs)); + }; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg->{{func.func_name}}({{_format_args_name_and_move(func.inputs)}}, out); + } else { +#ifdef __cpp_impl_coroutine + asio::co_spawn( + m_pool.getIoContext(), + [] (auto& arg, {{_format_args(func.inputs)}}, auto out)-> asio::awaitable { + co_await arg->{{func.func_name}}({{_format_args_name_and_move(func.inputs)}}, std::move(out)); + co_return; + }(arg, {{_format_args_name_and_move(func.inputs)}}, std::move(out)), + asio::detached); +#endif + } + }, + m_handler); + break; + } + {% endfor %} + default: + m_error("error type"); + } + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + }); + } + ~{{value.callee}}() { +#ifdef __cpp_impl_coroutine + m_pool.stop(); +#endif + } + + void start() { + m_channel->start(); + } + + static auto create(ChannelConfig& config, + std::variant, std::shared_ptr<{{value.callee}}Handler>> handler, + std::function error) { + config.socktype = zmq::socket_type::router; + config.bind = true; + return std::make_unique<{{value.callee}}>(config, std::move(handler), std::move(error)); + } + +private: + std::variant, std::shared_ptr<{{value.callee}}Handler>> m_handler; + std::function m_error; + std::unique_ptr m_channel; + std::mutex m_mtx; +#ifdef __cpp_impl_coroutine + brpc::ContextPool m_pool{1}; +#endif +}; + +} // {{node.property.namespace}} \ No newline at end of file diff --git a/template/cpp/bi_channel.inja b/template/cpp/bi_channel.inja new file mode 100644 index 0000000..ef425ae --- /dev/null +++ b/template/cpp/bi_channel.inja @@ -0,0 +1,135 @@ +namespace {{node.property.namespace}} { + +class BiChannel final { +public: + BiChannel(const ChannelConfig& config, + std::function error, + std::function)> cb) + : m_context(config.io_threads) + , m_socket(m_context, config.socktype) + , m_send(m_context, zmq::socket_type::push) + , m_recv(m_context, zmq::socket_type::pull) + , m_error(std::move(error)) + , m_cb(std::move(cb)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + if (config.probe) { + m_socket.set(zmq::sockopt::probe_router, 1); + } + if (config.bind) { + if (config.socktype == zmq::socket_type::router) { + if (config.mandatory) + m_socket.set(zmq::sockopt::router_mandatory, 0); + else + m_socket.set(zmq::sockopt::router_mandatory, 1); + } + m_socket.bind(config.addr); + } else { + m_socket.connect(config.addr); + } + auto addr = uniqueAddr(); + m_recv.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_recv.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_recv.bind(addr); + + m_send.set(zmq::sockopt::sndhwm, config.sendhwm); + m_send.set(zmq::sockopt::sndbuf, config.sendbuf); + m_send.set(zmq::sockopt::linger, config.linger); + + m_send.connect(addr); + } + ~BiChannel() { + m_running = false; + if (m_thread.joinable()) + m_thread.join(); + } + + void send(std::vector&& snd_msgs) { + std::lock_guard lk(m_mutex); + if (!zmq::send_multipart(m_send, std::forward(snd_msgs))) { + m_error("send error!!!"); + } + } + + void send(std::vector&& snd_msgs, + std::chrono::milliseconds timeout, + std::function cb) { + std::lock_guard lk(m_mutex); + if (!zmq::send_multipart(m_send, std::forward(snd_msgs))) { + m_error("send error!!!"); + } + auto timeout_point = std::chrono::system_clock::now() + timeout; + m_timeout_task.emplace(timeout_point, std::move(cb)); + } + + void start() { + m_running = true; + m_thread = std::thread([this] { + std::vector items{ + {static_cast(m_socket), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + {static_cast(m_recv), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + }; + std::chrono::milliseconds interval(200); + std::multimap> timeout_task; + while (m_running.load()) { + zmq::poll(items, interval); + if (items[0].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_socket, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("zmq::recv_multipart error!!!"); + break; + } + m_cb(std::move(recv_msgs)); + } + if (items[1].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_recv, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("recv zmq::recv_multipart error!!!"); + break; + } + if (!zmq::send_multipart(m_socket, recv_msgs)) { + m_error("zmq::send_multipart error!!!"); + break; + } + } + { + std::lock_guard lk(m_mutex); + if (!m_timeout_task.empty()) { + timeout_task.merge(m_timeout_task); + m_timeout_task.clear(); + } + } + auto now = std::chrono::system_clock::now(); + while (!timeout_task.empty() && timeout_task.begin()->first <= now) { + (timeout_task.begin()->second)(); + timeout_task.erase(timeout_task.begin()); + } + } + }); + } + +private: + zmq::context_t m_context; + zmq::socket_t m_socket; + zmq::socket_t m_send; + zmq::socket_t m_recv; + std::function m_error; + std::function)> m_cb; + std::thread m_thread; + std::atomic_bool m_running{false}; + std::mutex m_mutex; + std::multimap> m_timeout_task; +}; + +} // {{node.property.namespace}} \ No newline at end of file diff --git a/template/cpp/enum.inja b/template/cpp/enum.inja new file mode 100644 index 0000000..9d526bc --- /dev/null +++ b/template/cpp/enum.inja @@ -0,0 +1,31 @@ +namespace {{node.property.namespace}} { + +enum class {{value.enum_name}} : {{value.value_type}} { + {% for field in value.definitions %} + {{field.name}} = {{field.default}}, // {{field.comment}} + {% endfor %} +}; + +inline std::string_view toString(const {{value.enum_name}} value) { + switch (value) { + {% for field in value.definitions %} + case {{value.enum_name}}::{{field.name}}: + return "{{field.default}}"; + {% endfor %} + default: + return "???"; + } +} + +template <> +inline {{value.enum_name}} fromString<{{value.enum_name}}>(const std::string& value) { + {% for field in value.definitions %} + if (value == "{{field.name}}") + return {{value.enum_name}}::{{field.name}}; + {% endfor %} + throw std::bad_cast(); +} + +} // {{node.property.namespace}} + +MSGPACK_ADD_ENUM({{node.property.namespace}}::{{value.enum_name}}) diff --git a/template/cpp/struct.inja b/template/cpp/struct.inja new file mode 100644 index 0000000..c598936 --- /dev/null +++ b/template/cpp/struct.inja @@ -0,0 +1,22 @@ + +namespace {{node.property.namespace}} { + +struct {{value.struct_name}} { + {% for field in value.definitions %} + {{field.type}} {{_snake(field.name)}}; // {{field.comment}} + {% endfor %} + + MSGPACK_DEFINE({{_join(value.definitions)}}) +}; + +inline std::string toString(const {{value.struct_name}}& value) { + std::string str = "{{value.struct_name}}{"; + {% for field in value.definitions %} + str += toString(value.{{_snake(field.name)}}); + str += ","; + {% endfor %} + str += "}"; + return str; +} + +} // {{node.property.namespace}} diff --git a/template/cpp/uni.inja b/template/cpp/uni.inja new file mode 100644 index 0000000..4c397c6 --- /dev/null +++ b/template/cpp/uni.inja @@ -0,0 +1,192 @@ +namespace {{node.property.namespace}} { + +enum class {{value.caller}}{{value.callee}} { + {% for func in value.definitions %} + {{func.func_name}}, + {% endfor %} + +}; + +inline std::string_view toString(const {{value.caller}}{{value.callee}} value) { + switch (value) { + {% for func in value.definitions %} + case {{value.caller}}{{value.callee}}::{{func.func_name}}: + return "{{func.func_name}}"; + {% endfor %} + default: + return "???"; + } +} + +template <> +inline {{value.caller}}{{value.callee}} fromString<{{value.caller}}{{value.callee}}>(const std::string& value) { + {% for func in value.definitions %} + if (value == "{{func.func_name}}") + return {{value.caller}}{{value.callee}}::{{func.func_name}}; + {% endfor %} + throw std::bad_cast(); +} + +} // {{node.property.namespace}} + +MSGPACK_ADD_ENUM({{node.property.namespace}}::{{value.caller}}{{value.callee}}) + +namespace {{node.property.namespace}} { + +struct {{value.callee}}Handler { + {% for func in value.definitions %} + virtual void {{func.func_name}}({{_format_args(func.inputs)}}) = 0; + {% endfor %} +}; + +struct Coro{{value.callee}}Handler { + {% for func in value.definitions %} +#ifdef __cpp_impl_coroutine + virtual asio::awaitable {{func.func_name}}({{_format_args(func.inputs)}}) = 0; +#else + virtual void {{func.func_name}}({{_format_args(func.inputs)}}) = 0; +#endif + {% endfor %} +}; + +class {{value.callee}} final { +public: + {{value.callee}}(const ChannelConfig& config, + std::variant, std::shared_ptr<{{value.callee}}Handler>> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { + m_channel = std::make_unique(config, [this](auto&& recv) mutable { dispatch(std::forward(recv)); }, error); + } + {{value.callee}}(const ChannelConfig& config, + const std::shared_ptr& context, + std::variant, std::shared_ptr<{{value.callee}}Handler>> handler, + std::function error) + : m_handler(std::move(handler)) + , m_error(error) { + m_channel = std::make_unique(context, config, [this](auto&& recv_msgs) mutable { dispatch(std::forward(recv_msgs)); }, error); + } + ~{{value.callee}}() { +#ifdef __cpp_impl_coroutine + m_pool.stop(); +#endif + } + + void start() { +#ifdef __cpp_impl_coroutine + m_pool.start(); +#endif + m_channel->subscribe(""); + m_channel->start(); + } + + static auto create(ChannelConfig& config, + std::variant, std::shared_ptr> handler, + std::function error) { + config.socktype = zmq::socket_type::sub; + return std::make_unique<{{value.callee}}>(config, std::move(handler), std::move(error)); + } + +private: + void dispatch(std::vector recv_bufs) { + if (recv_bufs.size() != 2) { + m_error("Illegal response packet"); + return; + } + try { + auto type = unpack<{{value.caller}}{{value.callee}}>(recv_bufs[0].data(), recv_bufs[0].size()); + {% for func in value.definitions %} + if (type == {{value.caller}}{{value.callee}}::{{func.func_name}}) { + auto [{{_format_args_name(func.inputs)}}] = unpack>(recv_bufs[1].data(), recv_bufs[1].size()); + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg->{{func.func_name}}({{_format_args_name_and_move(func.inputs)}}); + } else { +#ifdef __cpp_impl_coroutine + asio::co_spawn( + m_pool.getIoContext(), + [] (auto& arg, {{_format_args(func.inputs)}})-> asio::awaitable { + co_await arg->{{func.func_name}}({{_format_args_name_and_move(func.inputs)}}); + co_return; + }(arg, {{_format_args_name_and_move(func.inputs)}}), + asio::detached); +#endif + } + }, + m_handler); + } + {% endfor %} + } catch (const msgpack::type_error& err) { + m_error(err.what()); + } + } + + std::variant, std::shared_ptr<{{value.callee}}Handler>> m_handler; + std::function m_error; + std::unique_ptr m_channel; + std::mutex m_mtx; +#ifdef __cpp_impl_coroutine + brpc::ContextPool m_pool{1}; +#endif +}; + +class {{value.caller}} final { +public: + {{value.caller}}(const ChannelConfig& config) + : m_context(std::make_shared(config.io_threads)) + , m_socket(*m_context, config.socktype){ + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.bind(config.addr); + } + + {{value.caller}}(const ChannelConfig& config, + const std::shared_ptr& context) + : m_context(context) + , m_socket(*m_context, config.socktype){ + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.bind(config.addr); + } + + ~{{value.caller}}() {} + + static auto create(ChannelConfig& config) { + config.socktype = zmq::socket_type::pub; + return std::make_unique<{{value.caller}}>(config); + } + + {% for func in value.definitions %} + auto {{func.func_name}} ({{_format_args(func.inputs)}}) { + static auto pub_topic = brpc::pack<{{value.caller}}{{value.callee}}>({{value.caller}}{{value.callee}}::{{func.func_name}}); + auto str = brpc::pack>(std::make_tuple({{_format_args_name_and_move(func.inputs)}})); + std::vector snd_bufs; + snd_bufs.emplace_back(zmq::message_t(pub_topic.data(), pub_topic.size())); + snd_bufs.emplace_back(zmq::message_t(str.data(), str.size())); + std::lock_guard lk(m_mtx); + return zmq::send_multipart(m_socket, std::move(snd_bufs)); + } + {% endfor %} + +private: + std::shared_ptr m_context; + zmq::socket_t m_socket; + std::mutex m_mtx; +}; + +} // {{node.property.namespace}} \ No newline at end of file diff --git a/template/cpp/uni_channel.inja b/template/cpp/uni_channel.inja new file mode 100644 index 0000000..d4a554f --- /dev/null +++ b/template/cpp/uni_channel.inja @@ -0,0 +1,97 @@ +namespace {{node.property.namespace}} { + +struct UniChannel final { +public: + UniChannel(const brpc::ChannelConfig& config, + std::function)> cb, + std::function error) + : m_context(std::make_shared(config.io_threads)) + , m_socket(*m_context, config.socktype) + , m_cb(std::move(cb)) + , m_error(std::move(error)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.connect(config.addr); + } + + UniChannel(std::shared_ptr context, + const brpc::ChannelConfig& config, + std::function)> cb, + std::function error) + : m_context(std::move(context)) + , m_socket(*m_context, config.socktype) + , m_cb(std::move(cb)) + , m_error(std::move(error)) { + m_socket.set(zmq::sockopt::sndhwm, config.sendhwm); + m_socket.set(zmq::sockopt::rcvhwm, config.recvhwm); + m_socket.set(zmq::sockopt::sndbuf, config.sendbuf); + m_socket.set(zmq::sockopt::rcvbuf, config.recvbuf); + m_socket.set(zmq::sockopt::linger, config.linger); + if (config.tcp_keepalive) { + m_socket.set(zmq::sockopt::tcp_keepalive, 1); + m_socket.set(zmq::sockopt::tcp_keepalive_idle, config.tcp_keepalive_idle); + m_socket.set(zmq::sockopt::tcp_keepalive_cnt, config.tcp_keepalive_cnt); + m_socket.set(zmq::sockopt::tcp_keepalive_intvl, config.tcp_keepalive_intvl); + } + m_socket.connect(config.addr); + } + + ~UniChannel() { + m_running = false; + if (m_thread.joinable()) + m_thread.join(); + } + + void subscribe(const std::string& topic = "") { + m_socket.set(zmq::sockopt::subscribe, topic); + } + + void start() { + m_running = true; + m_thread = std::thread([this] { + std::vector items{ + {static_cast(m_socket), 0, ZMQ_POLLIN | ZMQ_POLLERR, 0}, + }; + std::chrono::milliseconds interval(200); + while (m_running.load()) { + zmq::poll(items, interval); + if (items[0].revents & ZMQ_POLLIN) { + std::vector recv_msgs; + auto ret = zmq::recv_multipart(m_socket, std::back_inserter(recv_msgs)); + if (!ret) { + m_error("zmq::recv_multipart error!!!"); + break; + } + m_cb(std::move(recv_msgs)); + } + } + }); + } + + auto& context() { + return m_context; + } + + auto& socket() { + return m_socket; + } + +private: + std::shared_ptr m_context; + zmq::socket_t m_socket; + std::function)> m_cb; + std::function m_error; + std::atomic_bool m_running; + std::thread m_thread; +}; + +} // {{node.property.namespace}} diff --git a/template/cpp/utils.inja b/template/cpp/utils.inja new file mode 100644 index 0000000..1390762 --- /dev/null +++ b/template/cpp/utils.inja @@ -0,0 +1,103 @@ +namespace {{node.property.namespace}} { + +template +inline msgpack::sbuffer pack(const TObject& object) { + msgpack::sbuffer buffer; + msgpack::pack(buffer, object); + return buffer; +} + +template +inline TObject unpack(const void* data, size_t size) { + auto handle = msgpack::unpack(reinterpret_cast(data), size); + auto value = handle.get().as(); + return value; +} + +template +T fromString(const std::string&); + +template +inline std::enable_if_t::value && !std::is_same::value, std::string> toString(T value) { + return std::to_string(value); +} + +inline std::string toString(bool value) { + return value ? "true" : "false"; +} + +inline std::string toString(const std::string& value) { + return value; +} + +template +inline std::string toString(const std::vector& vector) { + std::string str = "["; + auto it = vector.begin(); + if (it != vector.end()) { + str += toString(*it); + ++it; + } + for (; it != vector.end(); ++it) { + str += ","; + str += toString(*it); + } + str += "]"; + return str; +} + +template +inline std::string toString(const std::unordered_map& map) { + std::string str = "{"; + auto it = map.begin(); + if (it != map.end()) { + str += toString(it->first); + str += "->"; + str += toString(it->second); + ++it; + } + for (; it != map.end(); ++it) { + str += ","; + str += toString(it->first); + str += "->"; + str += toString(it->second); + } + str += "}"; + return str; +} + +inline std::string uuid() { + uuid_t uuid; + char uuid_str[37]; + uuid_generate_random(uuid); + uuid_unparse(uuid, uuid_str); + return std::string(uuid_str); +} + +struct ChannelConfig { + std::size_t io_threads{1}; + zmq::socket_type socktype{zmq::socket_type::dealer}; + int32_t sendhwm{0}; + int32_t recvhwm{0}; + int32_t sendbuf{0}; + int32_t recvbuf{0}; + int32_t linger{2000}; + std::string addr{"tcp://127.0.0.1:5833"}; + bool bind{false}; + bool mandatory{false}; + bool tcp_keepalive{true}; + int tcp_keepalive_idle{60}; + int tcp_keepalive_cnt{3}; + int tcp_keepalive_intvl{5}; + bool probe{false}; +}; + +inline std::string uniqueAddr() { + uuid_t uuid; + char s[37]; + uuid_generate_random(uuid); + uuid_unparse(uuid, s); + return "inproc://" + std::string(s); +} + +} // {{node.property.namespace}} diff --git a/test/cpp/main.cpp b/test/cpp/main.cpp new file mode 100644 index 0000000..da5aa44 --- /dev/null +++ b/test/cpp/main.cpp @@ -0,0 +1,207 @@ +#include +#include +#include +#include + +#include "brpc.hpp" + +void start(std::function func) { + std::thread(std::move(func)).detach(); +} + +auto create_bank_info() { + brpc::BankInfo bank_info; + bank_info.name = "xiaoli"; + bank_info.type = brpc::TestType::EnumOne; + bank_info.test_one = 100; + bank_info.test_two = 101; + bank_info.test_map_one.emplace("brpc", brpc::TestType::EnumOne); + bank_info.test_map.emplace(false, 555); + bank_info.test_vector.emplace_back("vector"); + bank_info.info.name = "rpc"; + std::string bank_name = "zhongxin"; + return bank_info; +} + +struct Handler : public brpc::HelloWorldServerHandler { + virtual void hello_world(brpc::BankInfo bank_info, std::string bank_name, uint64_t blance, + std::function cb) override { + spdlog::info("brpc::HelloWorldServer server recv: {}, bank_name: {}, blance: {}", + brpc::toString(bank_info), bank_name, blance); + brpc::Info info; + info.name = "test"; + cb("hello world", std::move(info), 789); + } +}; + +#ifdef __cpp_impl_coroutine + +struct CoroHandler : public brpc::CoroHelloWorldServerHandler { + virtual asio::awaitable hello_world(brpc::BankInfo bank_info, std::string bank_name, uint64_t blance, + std::function cb) override { + spdlog::info("coro brpc::HelloWorldServer server recv: {}, bank_name: {}, blance: {}", + brpc::toString(bank_info), bank_name, blance); + brpc::Info info; + info.name = "coro test"; + cb("coro hello world", std::move(info), 556); + co_return; + } +}; + +struct CoroHelloWorldReceiver : public brpc::CoroHelloWorldReceiverHandler { + virtual asio::awaitable hello_world(std::string in) override { + spdlog::info("CoroHelloWorldReceiver::hello_world: {}", in); + co_return; + } + virtual asio::awaitable notice(int32_t in, std::string info) override { + spdlog::info("CoroHelloWorldReceiver::notice: {}: {}", in, info); + co_return; + } +}; + +#endif + +struct HelloWorldReceiverHandler : public brpc::HelloWorldReceiverHandler { + virtual void hello_world(std::string in) override { + spdlog::info("HelloWorldReceiverHandler::hello_world: {}", in); + return; + } + virtual void notice(int32_t in, std::string info) override { + spdlog::info("HelloWorldReceiverHandler::notice: {}: {}", in, info); + return; + } +}; + +auto test_pub_sub() { + start([] { + brpc::ChannelConfig pub_config{}; + pub_config.addr = "tcp://127.0.0.1:5877"; + auto sender = brpc::HelloWorldSender::create(pub_config); + int i = 10; + while (i--) { + sender->hello_world(std::to_string(i) + "_brpc"); + sender->notice(i, "hello world"); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + brpc::ChannelConfig sub_config{}; + sub_config.addr = "tcp://127.0.0.1:5877"; + auto receiver = brpc::HelloWorldReceiver::create( + sub_config, +#ifdef __cpp_impl_coroutine + std::make_shared(), +#else + std::make_shared(), +#endif + [](auto error) { + spdlog::error("{}", error); + }); + receiver->start(); + return receiver; +} + +void test_bi() { + start([] { + brpc::ChannelConfig bi_config{}; + bi_config.addr = "tcp://127.0.0.1:5878"; + auto server = brpc::HelloWorldServer::create( + bi_config, + std::make_shared(), + [](std::string error) { + spdlog::error("brpc::HelloWorldServer error: {}", error); + }); + server->start(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + std::this_thread::sleep_for(std::chrono::seconds(5)); + }); + start([] { + brpc::ChannelConfig bi_config{}; + bi_config.addr = "tcp://127.0.0.1:5878"; + auto client = brpc::HelloWorldClient::create(bi_config, [](std::string error) { + spdlog::error("brpc::HelloWorldClient error: {}", error); + }); + client->start(); + + std::string bank_name = "zhongxin"; + client->hello_world( + create_bank_info(), + bank_name, + 999, + [](std::string reply, brpc::Info info, uint64_t count) { + spdlog::info("brpc::HelloWorldClient::hello_world recv: {},{},{}", reply, brpc::toString(info), count); + }); + + client->hello_world( + create_bank_info(), + bank_name, + 999, + [](std::string reply, brpc::Info info, uint64_t count) { + spdlog::info("brpc::HelloWorldClient::hello_world(timeout) recv: {},{},{}", reply, brpc::toString(info), count); + }, + std::chrono::milliseconds(200), + [] { + spdlog::info("brpc::HelloWorldClient::timeout timeout!!!"); + }); + + std::this_thread::sleep_for(std::chrono::seconds(5)); + }); +} + +#ifdef __cpp_impl_coroutine +void test_coro_bi(auto& pool) { + start([] { + brpc::ChannelConfig bi_config{}; + bi_config.addr = "tcp://127.0.0.1:5879"; + auto server = brpc::HelloWorldServer::create( + bi_config, + std::make_shared(), + [](std::string error) { + spdlog::error("brpc::HelloWorldServer error: {}", error); + }); + server->start(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + }); + + asio::co_spawn( + pool.getIoContext(), + [&]() -> asio::awaitable { + std::cout << std::this_thread::get_id() << std::endl; + brpc::ChannelConfig bi_config{}; + bi_config.addr = "tcp://127.0.0.1:5879"; + auto client = brpc::HelloWorldClient::create(bi_config, [](std::string error) { + spdlog::error("coro brpc::HelloWorldClient error: {}", error); + }); + client->start(); + spdlog::info("start coro client."); + + auto [reply, info, count] = co_await client->hello_world_coro(create_bank_info(), "HF", 777, asio::as_tuple(asio::use_awaitable)); + spdlog::info("coro brpc::HelloWorldClient::hello_world recv: {},{},{}", reply, brpc::toString(info), count); + + auto ret = co_await client->hello_world_coro(create_bank_info(), "HF", 666, std::chrono::milliseconds{500}, asio::use_awaitable); + if (ret.has_value()) { + auto [reply, info, count] = ret.value(); + spdlog::info("coro brpc::HelloWorldClient::hello_world recv: {},{},{}", reply, brpc::toString(info), count); + co_return; + } + spdlog::error("coro brpc::HelloWorldClient::hello_world timeout"); + co_return; + }, + asio::detached); +} +#endif + +int main() { + auto receiver = test_pub_sub(); + test_bi(); + std::this_thread::sleep_for(std::chrono::seconds(10)); + spdlog::info("----------------test coro--------------------------"); +#ifdef __cpp_impl_coroutine + brpc::ContextPool pool{4}; + pool.start(); + test_coro_bi(pool); + std::this_thread::sleep_for(std::chrono::seconds(6)); + pool.stop(); +#endif + return 0; +} diff --git a/test_xmake.lua b/test_xmake.lua new file mode 100644 index 0000000..3c33723 --- /dev/null +++ b/test_xmake.lua @@ -0,0 +1,36 @@ +set_project("brpc") +set_version("0.0.1", {build = "%Y%m%d%H%M"}) +set_xmakever("2.8.5") + +add_repositories("my_private_repo https://github.com/fantasy-peak/xmake-repo.git") + +add_requires("spdlog", "asio") +add_requires("cppzmq", {system = false}) + +set_policy("check.auto_ignore_flags", false) +add_cxflags("-O2 -Wall -Wextra -pedantic-errors -Wno-missing-field-initializers -Wno-ignored-qualifiers") +add_includedirs("./out") + +target("cpp17") + set_languages("c++17") + set_kind("binary") + add_files("test/cpp/main.cpp") + add_packages("cppzmq", "spdlog", "asio") + add_syslinks("pthread", "uuid") +target_end() + +target("cpp20") + set_languages("c++20") + set_kind("binary") + add_files("test/cpp/main.cpp") + add_packages("cppzmq", "spdlog", "asio") + add_syslinks("pthread", "uuid") +target_end() + +target("cpp23") + set_languages("c++23") + set_kind("binary") + add_files("test/cpp/main.cpp") + add_packages("cppzmq", "spdlog", "asio") + add_syslinks("pthread", "uuid") +target_end() diff --git a/xmake.lua b/xmake.lua new file mode 100644 index 0000000..876e3c0 --- /dev/null +++ b/xmake.lua @@ -0,0 +1,19 @@ +set_project("brpc") +set_version("0.0.1", {build = "%Y%m%d%H%M"}) +set_xmakever("2.8.5") + +add_repositories("my_private_repo https://github.com/fantasy-peak/xmake-repo.git") + +add_requires("yaml-cpp", "nlohmann_json", "spdlog", "inja") +add_requires("cppzmq", {system = false}) + +set_languages("c++23") +set_policy("check.auto_ignore_flags", false) +add_cxflags("-O2 -Wall -Wextra -pedantic-errors -Wno-missing-field-initializers -Wno-ignored-qualifiers") +add_includedirs("include", "out") + +target("brpc") + set_kind("binary") + add_files("src/*.cpp") + add_packages("yaml-cpp", "nlohmann_json", "spdlog", "inja") +target_end()