diff --git a/bench/bm_case1.cpp b/bench/bm_case1.cpp index 33ce8f55..6b229f9f 100644 --- a/bench/bm_case1.cpp +++ b/bench/bm_case1.cpp @@ -19,7 +19,7 @@ inline constexpr std::size_t N_ITER = 10; inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000, 1024); template -struct math_op : public fg::node, fg::IN, fg::OUT> { +struct math_op : public fg::node, fg::PortInNamed, fg::PortOutNamed> { T factor = static_cast(1.0f); // public: @@ -58,7 +58,7 @@ static_assert(fg::traits::node::can_process_one_simd>); #endif template -class math_bulk_op : public fg::node, fg::IN, fg::OUT> { +class math_bulk_op : public fg::node, fg::PortInNamed>, fg::PortOutNamed>> { T _factor = static_cast(1.0f); public: @@ -126,8 +126,8 @@ class converting_multiply : public fg::node> { T _factor = static_cast(1.0f); public: - fg::IN in; - fg::OUT out; + fg::PortIn in; + fg::PortOut out; converting_multiply() = delete; @@ -160,8 +160,8 @@ static_assert(fg::traits::node::can_process_one_simd class add : public fg::node> { public: - fg::IN in; - fg::OUT out; + fg::PortIn in; + fg::PortOut out; template V> [[nodiscard]] constexpr V @@ -183,7 +183,7 @@ static_assert(fg::traits::node::can_process_one_simd>); // It doesn't need to be enabled for reflection. // template -class gen_operation_SIMD : public fg::node, fg::IN, fg::OUT> { +class gen_operation_SIMD : public fg::node, fg::PortInNamed>, fg::PortOutNamed>> { T _value = static_cast(1.0f); public: @@ -270,11 +270,11 @@ using multiply_SIMD = gen_operation_SIMD; template using add_SIMD = gen_operation_SIMD; -template +template class copy : public fg::node> { public: - fg::IN in; - fg::OUT out; + fg::PortIn> in; + fg::PortOut> out; template V> [[nodiscard]] constexpr V @@ -336,8 +336,8 @@ simd_size() noexcept { namespace stdx = vir::stdx; -template -class convert : public fg::node, fg::IN, fg::OUT> { +template +class convert : public fg::node, fg::PortInNamed>, fg::PortOutNamed>> { static_assert(stdx::is_simd_v != stdx::is_simd_v, "either input xor output must be SIMD capable"); constexpr static std::size_t from_simd_size = detail::simd_size(); constexpr static std::size_t to_simd_size = detail::simd_size(); @@ -451,7 +451,7 @@ inline const boost::ut::suite _constexpr_bm = [] { } { - auto merged_node = merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(test::source(N_SAMPLES), copy()), copy()), + auto merged_node = merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(test::source(N_SAMPLES), copy()), copy()), copy()), test::sink()); "merged src(N=1024)->b1(Nā‰¤128)->b2(N=1024)->b3(N=32...128)->sink"_benchmark.repeat(N_SAMPLES) = [&merged_node]() { loop_over_process_one(merged_node); }; @@ -521,7 +521,7 @@ inline const boost::ut::suite _runtime_tests = [] { auto &src = flow_graph.make_node>(N_SAMPLES); auto &sink = flow_graph.make_node>(); - using copy = ::copy; + using copy = ::copy; std::vector cpy(10); for (std::size_t i = 0; i < cpy.size(); i++) { cpy[i] = std::addressof(flow_graph.make_node({ { "name", fmt::format("copy {} at {}", i, fair::graph::this_source_location()) } })); @@ -542,8 +542,8 @@ inline const boost::ut::suite _runtime_tests = [] { { fg::graph flow_graph; - auto &src = flow_graph.make_node>(N_SAMPLES); - auto &b1 = flow_graph.make_node>(); + auto &src = flow_graph.make_node>(N_SAMPLES); + auto &b1 = flow_graph.make_node>(); auto &b2 = flow_graph.make_node>(); auto &b3 = flow_graph.make_node>(); auto &sink = flow_graph.make_node>(); diff --git a/bench/bm_scheduler.cpp b/bench/bm_scheduler.cpp index 400966d3..610b6ef2 100644 --- a/bench/bm_scheduler.cpp +++ b/bench/bm_scheduler.cpp @@ -14,7 +14,7 @@ inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000'000, 1024); inline constexpr std::size_t N_NODES = 5; template -class math_op : public fg::node, fg::IN, fg::OUT> { +class math_op : public fg::node, fg::PortInNamed, fg::PortOutNamed> { T _factor = static_cast(1.0f); public: diff --git a/bench/bm_test_helper.hpp b/bench/bm_test_helper.hpp index c725228f..a1592f0d 100644 --- a/bench/bm_test_helper.hpp +++ b/bench/bm_test_helper.hpp @@ -18,9 +18,9 @@ inline static std::size_t n_samples_produced = 0_UZ; template class source : public fg::node> { public: - uint64_t _n_samples_max; - std::size_t _n_tag_offset; - fg::OUT out; + uint64_t _n_samples_max; + std::size_t _n_tag_offset; + fg::PortOut out; source() = delete; @@ -87,11 +87,11 @@ class source : public fg::node> { inline static std::size_t n_samples_consumed = 0_UZ; -template +template struct sink : public fg::node> { - fg::IN in; - std::size_t should_receive_n_samples = 0; - int64_t _last_tag_position = -1; + fg::PortIn> in; + std::size_t should_receive_n_samples = 0; + int64_t _last_tag_position = -1; template V> [[nodiscard]] constexpr auto diff --git a/include/README.md b/include/README.md index 62090efc..1a1438a7 100644 --- a/include/README.md +++ b/include/README.md @@ -19,10 +19,10 @@ continuously improve this document. through a general `work()` function. Blocks are the building blocks of a flow-graph and can be thought of as vertices in a graph, and *ports* are their input/output connections to neighboring blocks for data streaming, streaming tags, and asynchronous messages. For the specific implementation, see [node.hpp](node.hpp). -* [port](#Ports) is an interface through which data flows into or out of a block. Each block may have zero, one or - more input ports, and zero, one or more output ports. Data is passed between blocks by connecting the output port of - one block to the input port of another. For the specific implementation, see [port.hpp](port.hpp). -* [buffer](#Buffer) is an area of memory where data is temporarily stored in the runtime-connected graph. Each port +* [Port](#Ports) is an interface through which data flows into or out of a block. Each block may have zero, one or + more input ports, and zero, one or more output ports. Data is passed between blocks by connecting the output Port of + one block to the input Port of another. For the specific implementation, see [port.hpp](port.hpp). +* [buffer](#Buffer) is an area of memory where data is temporarily stored in the runtime-connected graph. Each Port has its own buffer to store the data, tags, or other messages it needs to perform its computations. Buffer implementations are typically domain-specific (e.g. for blocks/ports implemented on the CPU, GPU, etc.) and are often, but not necessarily, implemented as circular buffers. For the specific interface see [Buffer.hpp](Buffer.hpp) and one @@ -54,31 +54,36 @@ switch and adopt the provided low-level graph algorithms. ### Ports -Ports in this framework are designed to interconnect blocks in a graph, similar to RF connectors. The port class +Ports in this framework are designed to interconnect blocks in a graph, similar to RF connectors. The Port class template has several parameters that define its behaviour, including the type of data it handles (`T`), its -name (`PortName`), type (`PortType`), direction (`PortDirection` <-> input/output), and the minimum and maximum number -of samples (`MIN_SAMPLES` and `MAX_SAMPLES`) the user requires for a given block before the `work()` is invoked by the -scheduler. The buffer type used by the port can also be specified using the `BufferType` parameter, -with `gr::circular_buffer` being the default: - +name (`PortName`), type (`PortType`), direction (`PortDirection` <-> input/output), and optional list of `Arguments` +that may constrain the port behaviour on the `Block` or `Scheduler` level:: ```cpp -template> -class port { /* ... */ }; +class template +struct Port { /* ... */ }; ``` +Some of the possible optional port annotation attributes are: + +* `RequiredSamples` to describe the min/max number of samples required from this port before invoking the blocks work + function, +* `Optional` informing the graph/scheduler that a given port does not require to be connected, +* `PortDomain` described whether the port can be handled within the same scheduling domain (e.g. `CPU` + or `GPU`), +* `StreamBufferType` and `TagBufferType` to inject specific user-provided buffer implementations to the port, or +* `Async` for making a port asynchronous in a signal flow-graph block. + When connecting ports, either a single-step or a two-step connection method can be used: 1. single-step connection: which allocates a buffer and passes the corresponding `BufferWriter` and `BufferReader` - instances to the source and destination port. The buffer size is determined only once based on + instances to the source and destination Port. The buffer size is determined only once based on the `[MIN, MAX]_SAMPLES` constraints and is inherited/fixed for further connected input ports. 2. two-step connection (usually done by the graph): * register all ports that shall be connected to each other - * determine the minimum buffer size required by the set of connected port and then perform the actual connections as + * determine the minimum buffer size required by the set of connected Port and then perform the actual connections as outlined in the single-step connection method. -Each port belongs to a single computing domain, which is specified using the port_domain_t enumeration: +Each Port belongs to a single computing domain, which is specified using the port_domain_t enumeration: ```cpp enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU /*, ...*/ }; @@ -97,8 +102,8 @@ favour low-latency execution (e.g. few bytes) to keep the data and L1/L2/L3 cach including, for example, GPUs this choice would cause significant overhead when copying data from the CPU to GPU that favour DMA-type block-transfer to exchange data for best efficiency. -Additionally, the usage of one buffer type and port per computation domain, along with explicit data conversion, enables -users to easily extend the framework. This approach provides the flexibility for users to define custom buffer and port +Additionally, the usage of one buffer type and Port per computation domain, along with explicit data conversion, enables +users to easily extend the framework. This approach provides the flexibility for users to define custom buffer and Port implementations that cater to the specific requirements of their applications, thus offering optimal performance and scalability. The ability to create specialized implementations for specific use cases, coupled with the framework's openness to user-defined extensions, makes it a versatile and customizable solution. @@ -113,8 +118,8 @@ For example: ```cpp struct user_defined_block : node { - IN in; - OUT out; + PortIn in; + PortOut out; // implement either: [[nodiscard]] constexpr work_return_t work() noexcept {...} // or one of the convenience functions outlined below @@ -127,7 +132,7 @@ types through templating the input 'T' and return type 'R': ```cpp template -struct user_defined_block : node, OUT> { +struct user_defined_block : node, PortOut> { // implement either: [[nodiscard]] constexpr work_return_t work() noexcept {...} // or one of the convenience functions outlined below @@ -146,7 +151,7 @@ The following defaults are defined for one of the two 'user_defined_block' block * **case 1a** - non-decimating N-in->N-out mechanic and automatic handling of streaming tags and settings changes: ```cpp template - struct user_defined_block : node, OUT> { + struct user_defined_block : node, PortOut> { T _factor = T{1}; // constuctor setting _factor etc. @@ -155,12 +160,12 @@ The following defaults are defined for one of the two 'user_defined_block' block } }; ``` - The number, type, and ordering of input and arguments of `process_one(..)` are defined by the port definitions. + The number, type, and ordering of input and arguments of `process_one(..)` are defined by the Port definitions. * **case 1b** - non-decimating N-in->N-out mechanic providing bulk access to the input/output data and automatic handling of streaming tags and settings changes: ```cpp template - struct user_defined_block : node, OUT> { + struct user_defined_block : node, PortOut> { T _factor = T{1}; // constuctor setting _factor etc. diff --git a/include/data_sink.hpp b/include/data_sink.hpp index 34512dde..58d255c8 100644 --- a/include/data_sink.hpp +++ b/include/data_sink.hpp @@ -144,7 +144,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_trigger_poller(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { + get_trigger_poller(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_trigger_poller(std::forward(matcher), pre_samples, post_samples, block) : nullptr; @@ -152,7 +152,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_multiplexed_poller(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { + get_multiplexed_poller(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_multiplexed_poller(std::forward(matcher), maximum_window_size, block) : nullptr; @@ -160,7 +160,7 @@ class data_sink_registry { template std::shared_ptr::dataset_poller> - get_snapshot_poller(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { + get_snapshot_poller(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, blocking_mode block = blocking_mode::Blocking) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); return sink ? sink->get_snapshot_poller(std::forward(matcher), delay, block) : nullptr; @@ -168,7 +168,7 @@ class data_sink_registry { template Callback> bool - register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback&& callback) { + register_streaming_callback(const data_sink_query &query, std::size_t max_chunk_size, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -181,7 +181,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_trigger_callback(const data_sink_query &query, M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) { + register_trigger_callback(const data_sink_query &query, M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -194,7 +194,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_multiplexed_callback(const data_sink_query &query, M&& matcher, std::size_t maximum_window_size, Callback&& callback) { + register_multiplexed_callback(const data_sink_query &query, M &&matcher, std::size_t maximum_window_size, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -207,7 +207,7 @@ class data_sink_registry { template Callback, TriggerMatcher M> bool - register_snapshot_callback(const data_sink_query &query, M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) { + register_snapshot_callback(const data_sink_query &query, M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg{ _mutex }; auto sink = find_sink(query); if (!sink) { @@ -319,7 +319,7 @@ class data_sink : public node> { Annotated> signal_min = std::numeric_limits::lowest(); Annotated> signal_max = std::numeric_limits::max(); - IN in; + PortIn> in; struct poller { // TODO consider whether reusing port here makes sense @@ -409,7 +409,7 @@ class data_sink : public node> { template std::shared_ptr - get_trigger_poller(M&& matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { + get_trigger_poller(M &&matcher, std::size_t pre_samples, std::size_t post_samples, blocking_mode block_mode = blocking_mode::Blocking) { const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); @@ -420,7 +420,7 @@ class data_sink : public node> { template std::shared_ptr - get_multiplexed_poller(M&& matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { + get_multiplexed_poller(M &&matcher, std::size_t maximum_window_size, blocking_mode block_mode = blocking_mode::Blocking) { std::lock_guard lg(_listener_mutex); const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); @@ -430,7 +430,7 @@ class data_sink : public node> { template std::shared_ptr - get_snapshot_poller(M&& matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { + get_snapshot_poller(M &&matcher, std::chrono::nanoseconds delay, blocking_mode block_mode = blocking_mode::Blocking) { const auto block = block_mode == blocking_mode::Blocking; auto handler = std::make_shared(); std::lock_guard lg(_listener_mutex); @@ -440,27 +440,27 @@ class data_sink : public node> { template Callback> void - register_streaming_callback(std::size_t max_chunk_size, Callback&& callback) { + register_streaming_callback(std::size_t max_chunk_size, Callback &&callback) { add_listener(std::make_unique>(max_chunk_size, std::forward(callback), *this), false); } template Callback> void - register_trigger_callback(M&& matcher, std::size_t pre_samples, std::size_t post_samples, Callback&& callback) { + register_trigger_callback(M &&matcher, std::size_t pre_samples, std::size_t post_samples, Callback &&callback) { add_listener(std::make_unique>(std::forward(matcher), pre_samples, post_samples, std::forward(callback)), false); ensure_history_size(pre_samples); } template Callback> void - register_multiplexed_callback(M&& matcher, std::size_t maximum_window_size, Callback&& callback) { + register_multiplexed_callback(M &&matcher, std::size_t maximum_window_size, Callback &&callback) { std::lock_guard lg(_listener_mutex); add_listener(std::make_unique>(std::forward(matcher), maximum_window_size, std::forward(callback)), false); } template Callback> void - register_snapshot_callback(M&& matcher, std::chrono::nanoseconds delay, Callback&& callback) { + register_snapshot_callback(M &&matcher, std::chrono::nanoseconds delay, Callback &&callback) { std::lock_guard lg(_listener_mutex); add_listener(std::make_unique>(std::forward(matcher), delay, std::forward(callback)), false); } @@ -634,7 +634,7 @@ class data_sink : public node> { Callback callback; template - explicit continuous_listener(std::size_t max_chunk_size, CallbackFW&& c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} + explicit continuous_listener(std::size_t max_chunk_size, CallbackFW &&c, const data_sink &parent) : parent_sink(parent), buffer(max_chunk_size), callback{ std::forward(c) } {} explicit continuous_listener(std::shared_ptr poller, bool do_block, const data_sink &parent) : parent_sink(parent), block(do_block), polling_handler{ std::move(poller) } {} @@ -760,11 +760,12 @@ class data_sink : public node> { Callback callback; template - explicit trigger_listener(Matcher&& matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) + explicit trigger_listener(Matcher &&matcher, std::shared_ptr handler, std::size_t pre, std::size_t post, bool do_block) : block(do_block), pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(handler) } {} template - explicit trigger_listener(Matcher&& matcher, std::size_t pre, std::size_t post, CallbackFW&& cb) : pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} + explicit trigger_listener(Matcher &&matcher, std::size_t pre, std::size_t post, CallbackFW &&cb) + : pre_samples(pre), post_samples(post), trigger_matcher(std::forward(matcher)), callback{ std::forward(cb) } {} void set_dataset_template(DataSet dst) override { @@ -850,11 +851,11 @@ class data_sink : public node> { Callback callback; template - explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, CallbackFW&& cb) + explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, CallbackFW &&cb) : matcher(std::forward(matcher_)), maximum_window_size(max_window_size), callback(std::forward(cb)) {} template - explicit multiplexed_listener(Matcher&& matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) + explicit multiplexed_listener(Matcher &&matcher_, std::size_t max_window_size, std::shared_ptr handler, bool do_block) : block(do_block), matcher(std::forward(matcher_)), maximum_window_size(max_window_size), polling_handler{ std::move(handler) } {} void @@ -949,11 +950,12 @@ class data_sink : public node> { Callback callback; template - explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) + explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, std::shared_ptr poller, bool do_block) : block(do_block), time_delay(delay), trigger_matcher(std::forward(matcher)), polling_handler{ std::move(poller) } {} template - explicit snapshot_listener(Matcher&& matcher, std::chrono::nanoseconds delay, CallbackFW&& cb) : time_delay(delay), trigger_matcher(std::forward(matcher)), callback(std::forward(cb)) {} + explicit snapshot_listener(Matcher &&matcher, std::chrono::nanoseconds delay, CallbackFW &&cb) + : time_delay(delay), trigger_matcher(std::forward(matcher)), callback(std::forward(cb)) {} void set_dataset_template(DataSet dst) override { diff --git a/include/graph.hpp b/include/graph.hpp index b7f8b259..88a9e810 100644 --- a/include/graph.hpp +++ b/include/graph.hpp @@ -738,9 +738,10 @@ operator<<(std::ostream &os, const port_direction_t &value) { return os << static_cast(value); } +template inline std::ostream & -operator<<(std::ostream &os, const port_domain_t &value) { - return os << static_cast(value); +operator<<(std::ostream &os, const T &value) { + return os << value.Name; } #if HAVE_SOURCE_LOCATION diff --git a/include/node.hpp b/include/node.hpp index 1bbb1eca..57b36d88 100644 --- a/include/node.hpp +++ b/include/node.hpp @@ -378,7 +378,7 @@ struct node : protected std::tuple { update_ports_status() { ports_status = ports_status_t(); meta::tuple_for_each( - [&ps = ports_status](Port auto &port) { + [&ps = ports_status](PortType auto &port) { ps.in_min_samples = std::max(ps.in_min_samples, port.min_buffer_size()); ps.in_max_samples = std::min(ps.in_max_samples, port.max_buffer_size()); ps.in_available = std::min(ps.in_available, port.streamReader().available()); @@ -389,7 +389,7 @@ struct node : protected std::tuple { input_ports(&self())); meta::tuple_for_each( - [&ps = ports_status](Port auto &port) { + [&ps = ports_status](PortType auto &port) { ps.out_min_samples = std::max(ps.out_min_samples, port.min_buffer_size()); ps.out_max_samples = std::min(ps.out_max_samples, port.max_buffer_size()); ps.out_available = std::min(ps.out_available, port.streamWriter().available()); @@ -1320,8 +1320,12 @@ merge(A &&a, B &&b) { } #if !DISABLE_SIMD -namespace test { -struct copy : public node::max(), "in">, OUT::max(), "out">> { +namespace test { // TODO: move to dedicated tests + +struct copy : public node { + PortIn in; + PortOut out; + public: template V> [[nodiscard]] constexpr V @@ -1329,7 +1333,18 @@ struct copy : public node::m return a; } }; +} // namespace test +#endif +} // namespace fair::graph +#if !DISABLE_SIMD +ENABLE_REFLECTION(fair::graph::test::copy, in, out); +#endif + +namespace fair::graph { + +#if !DISABLE_SIMD +namespace test { static_assert(traits::node::input_port_types::size() == 1); static_assert(std::same_as, float>); static_assert(traits::node::can_process_one_scalar); diff --git a/include/port.hpp b/include/port.hpp index a5d48bee..327c89e9 100644 --- a/include/port.hpp +++ b/include/port.hpp @@ -1,44 +1,79 @@ #ifndef GNURADIO_PORT_HPP #define GNURADIO_PORT_HPP -#include "circular_buffer.hpp" -#include "tag.hpp" -#include "utils.hpp" #include #include #include #include #include +#include +#include +#include +#include + +#include + namespace fair::graph { using fair::meta::fixed_string; using namespace fair::literals; -// #### default supported types -- TODO: to be replaced by pmt::pmtv declaration +#ifndef PMT_SUPPORTED_TYPE // // #### default supported types -- TODO: to be replaced by pmt::pmtv declaration +#define PMT_SUPPORTED_TYPE // Only DataSet and DataSet are added => consider to support more Dataset -using supported_type = std::variant, std::complex, DataSet, DataSet /*, ...*/>; +using supported_type = std::variant, std::complex, DataSet, DataSet /*, ...*/>; +#endif enum class port_direction_t { INPUT, OUTPUT, ANY }; // 'ANY' only for query and not to be used for port declarations + enum class connection_result_t { SUCCESS, FAILED }; + enum class port_type_t { STREAM, /*!< used for single-producer-only ond usually synchronous one-to-one or one-to-many communications */ MESSAGE /*!< used for multiple-producer one-to-one, one-to-many, many-to-one, or many-to-many communications */ }; -enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU }; + +/** + * @brief optional port annotation argument to described whether the port can be handled within the same scheduling domain. + * + * @tparam PortDomainName the unique name of the domain, name shouldn't clash with other existing definitions (e.g. 'CPU' and 'GPU') + */ +template +struct PortDomain { + inline static constexpr fixed_string Name = PortDomainName; +}; + +template +concept PortDomainType = requires { T::Name; } && std::is_base_of_v, T>; + +template +using is_port_domain = std::bool_constant>; + +struct CPU : public PortDomain<"CPU"> {}; + +struct GPU : public PortDomain<"GPU"> {}; + +static_assert(is_port_domain::value); +static_assert(is_port_domain::value); +static_assert(!is_port_domain::value); template -concept Port = requires(T t, const std::size_t n_items) { // dynamic definitions +concept PortType = requires(T t, const std::size_t n_items, const supported_type &newDefault) { // dynamic definitions typename T::value_type; - { t.pmt_type() } -> std::same_as; + { t.defaultValue() } -> std::same_as; + { t.setDefaultValue(newDefault) } -> std::same_as; { t.name } -> std::convertible_to; { t.priority } -> std::convertible_to; { t.min_samples } -> std::convertible_to; { t.max_samples } -> std::convertible_to; { t.type() } -> std::same_as; { t.direction() } -> std::same_as; + { t.domain() } -> std::same_as; { t.resize_buffer(n_items) } -> std::same_as; { t.disconnect() } -> std::same_as; + { t.isSynchronous() } -> std::same_as; + { t.isOptional() } -> std::same_as; }; /** @@ -51,6 +86,95 @@ struct internal_port_buffers { void *tagHandler; }; +/** + * @brief optional port annotation argument to describe the min/max number of samples required from this port before invoking the blocks work function. + * + * @tparam MIN_SAMPLES (>0) specifies the minimum number of samples the port/block requires for processing in one scheduler iteration + * @tparam MAX_SAMPLES specifies the maximum number of samples the port/block can process in one scheduler iteration + */ +template +struct RequiredSamples { + static_assert(MIN_SAMPLES > 0, "Port, ..> must be >= 0"); + static constexpr std::size_t MinSamples = MIN_SAMPLES; + static constexpr std::size_t MaxSamples = MAX_SAMPLES; +}; + +template +concept IsRequiredSamples = requires { + T::MinSamples; + T::MaxSamples; +} && std::is_base_of_v, T>; + +template +using is_required_samples = std::bool_constant>; + +static_assert(is_required_samples>::value); +static_assert(!is_required_samples::value); + +/** + * @brief optional port annotation argument informing the graph/scheduler that a given port does not require to be connected + */ +struct Optional {}; + +/** + * @brief optional port annotation argument to define the buffer implementation to be used for streaming data + * + * @tparam BufferType user-extendable buffer implementation for the streaming data + */ +template +struct StreamBufferType { + using type = BufferType; +}; + +/** + * @brief optional port annotation argument to define the buffer implementation to be used for tag data + * + * @tparam BufferType user-extendable buffer implementation for the tag data + */ +template +struct TagBufferType { + using type = BufferType; +}; + +template +concept IsStreamBufferAttribute = requires { typename T::type; } && gr::Buffer && std::is_base_of_v, T>; +; + +template +concept IsTagBufferAttribute = requires { typename T::type; } && gr::Buffer && std::is_base_of_v, T>; + +template +using is_stream_buffer_attribute = std::bool_constant>; + +template +using is_tag_buffer_attribute = std::bool_constant>; + +template +struct DefaultStreamBuffer : StreamBufferType> {}; + +struct DefaultTagBuffer : TagBufferType> {}; + +static_assert(is_stream_buffer_attribute>::value); +static_assert(!is_stream_buffer_attribute::value); +static_assert(!is_tag_buffer_attribute>::value); +static_assert(is_tag_buffer_attribute::value); + +/** + * @brief Annotation for making a port asynchronous in a signal flow-graph block. + * + * In a standard block, the processing function is invoked based on the least common number of samples + * available across all input and output ports. When a port is annotated with `Async`, it is excluded from this + * least common number calculation. + * + * Applying `Async` as an optional template argument of the Port class essentially marks the port as "optional" for the + * synchronization mechanism. The block's processing function will be invoked regardless of the number of samples + * available at this specific port, relying solely on the state of other ports that are not marked as asynchronous. + * + * Use this annotation to create ports that do not constrain the block's ability to process data, making it + * asynchronous relative to the other ports in the block. + */ +struct Async {}; + /** * @brief 'ports' are interfaces that allows data to flow between blocks in a graph, similar to RF connectors. * Each block can have zero or more input/output ports. When connecting ports, either a single-step or a two-step @@ -73,38 +197,41 @@ struct internal_port_buffers { * @tparam PortName a string to identify the port, notably to be used in an UI- and hand-written explicit code context. * @tparam PortType STREAM or MESSAGE * @tparam PortDirection either input or output - * @tparam MIN_SAMPLES specifies the minimum number of samples the port/block requires for processing in one scheduler iteration - * @tparam MAX_SAMPLES specifies the maximum number of samples the port/block can process in one scheduler iteration - * @tparam BufferType user-extendable buffer implementation for the streaming data - * @tparam TagBufferType user-extendable buffer implementation for the tag data + * @tparam Arguments optional: default to 'DefaultStreamBuffer' and DefaultTagBuffer' based on 'gr::circular_buffer', and CPU domain */ -template, - gr::Buffer TagBufferType = gr::circular_buffer> -class port { -public: - static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); - - using value_type = T; - - static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; - static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; - +template +struct Port { template - using with_name = port; + using with_name = Port; - using ReaderType = decltype(std::declval().new_reader()); - using WriterType = decltype(std::declval().new_writer()); - using IoType = std::conditional_t; - using TagReaderType = decltype(std::declval().new_reader()); - using TagWriterType = decltype(std::declval().new_writer()); - using TagIoType = std::conditional_t; + static_assert(PortDirection != port_direction_t::ANY, "ANY reserved for queries and not port direction declarations"); + + using value_type = T; + using ArgumentsTypeList = typename fair::meta::typelist; + using Domain = ArgumentsTypeList::template find_or_default; + using Required = ArgumentsTypeList::template find_or_default>; + using BufferType = ArgumentsTypeList::template find_or_default>::type; + using TagBufferType = ArgumentsTypeList::template find_or_default::type; + static constexpr port_direction_t Direction = PortDirection; + static constexpr bool IS_INPUT = PortDirection == port_direction_t::INPUT; + static constexpr bool IS_OUTPUT = PortDirection == port_direction_t::OUTPUT; + static constexpr fixed_string Name = PortName; + + using ReaderType = decltype(std::declval().new_reader()); + using WriterType = decltype(std::declval().new_writer()); + using IoType = std::conditional_t; + using TagReaderType = decltype(std::declval().new_reader()); + using TagWriterType = decltype(std::declval().new_writer()); + using TagIoType = std::conditional_t; // public properties - const std::string name = static_cast(PortName); - std::int16_t priority = 0; // ā†’ dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) - std::size_t min_samples = (MIN_SAMPLES == std::dynamic_extent ? 1 : MIN_SAMPLES); - std::size_t max_samples = MAX_SAMPLES; + constexpr static bool synchronous = not std::disjunction_v...>; + constexpr static bool optional = std::disjunction_v...>; + std::string name = static_cast(PortName); + std::int16_t priority = 0; // ā†’ dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) + std::size_t min_samples = (Required::MinSamples == std::dynamic_extent ? 1 : Required::MinSamples); + std::size_t max_samples = Required::MaxSamples; + T default_value = T{}; private: bool _connected = false; @@ -112,21 +239,30 @@ class port { TagIoType _tagIoHandler = new_tag_io_handler(); public: + bool + initBuffer(std::size_t nSamples = 0) noexcept { + if constexpr (IS_OUTPUT) { + // write one default value into output -- needed for cyclic graph initialisation + return _ioHandler.try_publish([val = default_value](std::span &out) { out[0] = val; }, 1_UZ); + } + return true; + } + [[nodiscard]] constexpr auto - new_io_handler() const noexcept { + new_io_handler(std::size_t buffer_size = 65536) const noexcept { if constexpr (IS_INPUT) { - return BufferType(65536).new_reader(); + return BufferType(buffer_size).new_reader(); } else { - return BufferType(65536).new_writer(); + return BufferType(buffer_size).new_writer(); } } [[nodiscard]] constexpr auto - new_tag_io_handler() const noexcept { + new_tag_io_handler(std::size_t buffer_size = 65536) const noexcept { if constexpr (IS_INPUT) { - return TagBufferType(65536).new_reader(); + return TagBufferType(buffer_size).new_reader(); } else { - return TagBufferType(65536).new_writer(); + return TagBufferType(buffer_size).new_writer(); } } @@ -158,22 +294,22 @@ class port { } public: - port() = default; - port(const port &) = delete; + constexpr Port() = default; + Port(const Port &) = delete; auto - operator=(const port &) + operator=(const Port &) = delete; - port(std::string port_name, std::int16_t priority_ = 0, std::size_t min_samples_ = 0_UZ, std::size_t max_samples_ = SIZE_MAX) noexcept + Port(std::string port_name, std::int16_t priority_ = 0, std::size_t min_samples_ = 0_UZ, std::size_t max_samples_ = SIZE_MAX) noexcept : name(std::move(port_name)), priority{ priority_ }, min_samples(min_samples_), max_samples(max_samples_) { static_assert(PortName.empty(), "port name must be exclusively declared via NTTP or constructor parameter"); } - constexpr port(port &&other) noexcept : name(std::move(other.name)), priority{ other.priority }, min_samples(other.min_samples), max_samples(other.max_samples) {} + constexpr Port(Port &&other) noexcept : name(std::move(other.name)), priority{ other.priority }, min_samples(other.min_samples), max_samples(other.max_samples) {} - constexpr port & - operator=(port &&other) { - port tmp(std::move(other)); + constexpr Port & + operator=(Port &&other) { + Port tmp(std::move(other)); std::swap(name, tmp._name); std::swap(min_samples, tmp._min_samples); std::swap(max_samples, tmp._max_samples); @@ -195,6 +331,21 @@ class port { return PortDirection; } + [[nodiscard]] constexpr static std::string_view + domain() noexcept { + return std::string_view(Domain::Name); + } + + [[nodiscard]] constexpr static bool + isSynchronous() noexcept { + return synchronous; + } + + [[nodiscard]] constexpr static bool + isOptional() noexcept { + return optional; + } + [[nodiscard]] constexpr static decltype(PortName) static_name() noexcept requires(!PortName.empty()) @@ -208,8 +359,17 @@ class port { #else [[nodiscard]] constexpr supported_type #endif - pmt_type() const noexcept { - return T(); + defaultValue() const noexcept { + return default_value; + } + + bool + setDefaultValue(const supported_type &newDefault) noexcept { + if (std::holds_alternative(newDefault)) { + default_value = std::get(newDefault); + return true; + } + return false; } [[nodiscard]] constexpr static std::size_t @@ -219,19 +379,19 @@ class port { [[nodiscard]] constexpr std::size_t min_buffer_size() const noexcept { - if constexpr (MIN_SAMPLES == std::dynamic_extent) { + if constexpr (Required::MinSamples == std::dynamic_extent) { return min_samples; } else { - return MIN_SAMPLES; + return Required::MinSamples; } } [[nodiscard]] constexpr std::size_t max_buffer_size() const noexcept { - if constexpr (MAX_SAMPLES == std::dynamic_extent) { + if constexpr (Required::MaxSamples == std::dynamic_extent) { return max_samples; } else { - return MAX_SAMPLES; + return Required::MaxSamples; } } @@ -346,39 +506,52 @@ namespace detail { template using just_t = T; -template -consteval fair::meta::typelist...> +template +consteval fair::meta::typelist(), PortType, PortDirection, Arguments...>, Is>...> repeated_ports_impl(std::index_sequence) { return {}; } } // namespace detail -// TODO: Add port index to BaseName -template -using repeated_ports = decltype(detail::repeated_ports_impl>(std::make_index_sequence())); - -template -using IN = port; -template -using OUT = port; -template -using IN_MSG = port; -template -using OUT_MSG = port; - -static_assert(Port>); -static_assert(Port())>); -static_assert(Port>); -static_assert(Port>); -static_assert(Port>); - -static_assert(IN::static_name() == fixed_string("in")); -static_assert(requires { IN("in").name; }); - -static_assert(OUT_MSG::static_name() == fixed_string("out_msg")); -static_assert(!(OUT_MSG::with_name<"out_message">::static_name() == fixed_string("out_msg"))); -static_assert(OUT_MSG::with_name<"out_message">::static_name() == fixed_string("out_message")); +template +using repeated_ports = decltype(detail::repeated_ports_impl(std::make_index_sequence())); + +static_assert(repeated_ports<3, float, "out", port_type_t::STREAM, port_direction_t::OUTPUT, Optional>::at<0>::Name == fixed_string("out0")); +static_assert(repeated_ports<3, float, "out", port_type_t::STREAM, port_direction_t::OUTPUT, Optional>::at<1>::Name == fixed_string("out1")); +static_assert(repeated_ports<3, float, "out", port_type_t::STREAM, port_direction_t::OUTPUT, Optional>::at<2>::Name == fixed_string("out2")); + +template +using PortIn = Port; +template +using PortOut = Port; +template +using MsgPortIn = Port; +template +using MsgPortOut = Port; + +template +using PortInNamed = Port; +template +using PortOutNamed = Port; +template +using MsgPortInNamed = Port; +template +using MsgPortOutNamed = Port; + +static_assert(PortType>); +static_assert(PortType())>); +static_assert(PortType>); +static_assert(PortType>); +static_assert(PortType>); + +static_assert(PortIn>::Required::MinSamples == 1); +static_assert(PortIn>::Required::MaxSamples == 2); +static_assert(std::same_as>::Domain, CPU>); +static_assert(std::same_as, GPU>::Domain, GPU>); + +static_assert(MsgPortOutNamed<"out_msg">::static_name() == fixed_string("out_msg")); +static_assert(!(MsgPortOutNamed<"out_msg">::with_name<"out_message">::static_name() == fixed_string("out_msg"))); +static_assert(MsgPortOutNamed<"out_msg">::with_name<"out_message">::static_name() == fixed_string("out_message")); /** * Runtime capable wrapper to be used within a block. It's primary purpose is to allow the runtime @@ -404,7 +577,11 @@ class dynamic_port { virtual ~model() = default; [[nodiscard]] virtual supported_type - pmt_type() const noexcept + defaultValue() const noexcept + = 0; + + [[nodiscard]] virtual bool + setDefaultValue(const supported_type &val) noexcept = 0; [[nodiscard]] virtual port_type_t @@ -415,6 +592,18 @@ class dynamic_port { direction() const noexcept = 0; + [[nodiscard]] virtual std::string_view + domain() const noexcept + = 0; + + [[nodiscard]] virtual bool + isSynchronous() noexcept + = 0; + + [[nodiscard]] virtual bool + isOptional() noexcept + = 0; + [[nodiscard]] virtual connection_result_t resize_buffer(std::size_t min_size) noexcept = 0; @@ -435,7 +624,7 @@ class dynamic_port { std::unique_ptr _accessor; - template + template class wrapper final : public model { using PortType = std::decay_t; std::conditional_t _value; @@ -489,15 +678,20 @@ class dynamic_port { } ~wrapper() override = default; - + // TODO revisit: constexpr was removed because emscripten does not support constexpr function for non literal type, like DataSet #if defined(__EMSCRIPTEN__) [[nodiscard]] supported_type #else [[nodiscard]] constexpr supported_type #endif - pmt_type() const noexcept override { - return _value.pmt_type(); + defaultValue() const noexcept override { + return _value.defaultValue(); + } + + [[nodiscard]] bool + setDefaultValue(const supported_type &val) noexcept override { + return _value.setDefaultValue(val); } [[nodiscard]] constexpr port_type_t @@ -510,6 +704,21 @@ class dynamic_port { return _value.direction(); } + [[nodiscard]] constexpr std::string_view + domain() const noexcept override { + return _value.domain(); + } + + [[nodiscard]] bool + isSynchronous() noexcept override { + return _value.isSynchronous(); + } + + [[nodiscard]] bool + isOptional() noexcept override { + return _value.isOptional(); + } + [[nodiscard]] connection_result_t resize_buffer(std::size_t min_size) noexcept override { return _value.resize_buffer(min_size); @@ -553,17 +762,22 @@ class dynamic_port { = delete; // TODO: Make owning versus non-owning API more explicit - template + template explicit constexpr dynamic_port(T &arg) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{ std::make_unique>(arg) } {} - template + template explicit constexpr dynamic_port(T &&arg) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{ std::make_unique>(std::forward(arg)) } {} [[nodiscard]] supported_type - pmt_type() const noexcept { - return _accessor->pmt_type(); + defaultValue() const noexcept { + return _accessor->defaultValue(); + } + + [[nodiscard]] bool + setDefaultValue(const supported_type &val) noexcept { + return _accessor->setDefaultValue(val); } [[nodiscard]] port_type_t @@ -576,6 +790,21 @@ class dynamic_port { return _accessor->direction(); } + [[nodiscard]] std::string_view + domain() const noexcept { + return _accessor->domain(); + } + + [[nodiscard]] bool + isSynchronous() noexcept { + return _accessor->isSynchronous(); + } + + [[nodiscard]] bool + isOptional() noexcept { + return _accessor->isOptional(); + } + [[nodiscard]] connection_result_t resize_buffer(std::size_t min_size) { if (direction() == port_direction_t::OUTPUT) { @@ -595,10 +824,10 @@ class dynamic_port { } }; -static_assert(Port); +static_assert(PortType); constexpr void -publish_tag(Port auto &port, property_map &&tag_data, std::size_t tag_offset = 0) noexcept { +publish_tag(PortType auto &port, property_map &&tag_data, std::size_t tag_offset = 0) noexcept { port.tagWriter().publish( [&port, data = std::move(tag_data), &tag_offset](std::span tag_output) { tag_output[0].index = port.streamWriter().position() + std::make_signed_t(tag_offset); @@ -608,7 +837,7 @@ publish_tag(Port auto &port, property_map &&tag_data, std::size_t tag_offset = 0 } constexpr void -publish_tag(Port auto &port, const property_map &tag_data, std::size_t tag_offset = 0) noexcept { +publish_tag(PortType auto &port, const property_map &tag_data, std::size_t tag_offset = 0) noexcept { port.tagWriter().publish( [&port, &tag_data, &tag_offset](std::span tag_output) { tag_output[0].index = port.streamWriter().position() + tag_offset; @@ -618,7 +847,7 @@ publish_tag(Port auto &port, const property_map &tag_data, std::size_t tag_offse } constexpr std::size_t -samples_to_next_tag(const Port auto &port) { +samples_to_next_tag(const PortType auto &port) { if (port.tagReader().available() == 0) [[likely]] { return std::numeric_limits::max(); // default: no tags in sight } diff --git a/include/port_traits.hpp b/include/port_traits.hpp index 9ae32d5c..11b9b7d1 100644 --- a/include/port_traits.hpp +++ b/include/port_traits.hpp @@ -8,11 +8,11 @@ namespace fair::graph::traits::port { template concept has_fixed_info_v = requires { - typename T::value_type; - { T::static_name() }; - { T::direction() } -> std::same_as; - { T::type() } -> std::same_as; - }; + typename T::value_type; + { T::static_name() }; + { T::direction() } -> std::same_as; + { T::type() } -> std::same_as; +}; template using has_fixed_info = std::integral_constant>; @@ -43,25 +43,15 @@ using is_output = std::integral_constant concept is_output_v = is_output::value; -template +template concept is_port_v = is_output_v || is_input_v; template -struct min_samples : std::integral_constant::value... })> {}; - -template -struct min_samples> - : std::integral_constant {}; +struct min_samples : std::integral_constant {}; template -struct max_samples : std::integral_constant::value... })> {}; - -template -struct max_samples> - : std::integral_constant {}; +struct max_samples : std::integral_constant {}; -} // namespace port +} // namespace fair::graph::traits::port #endif // include guard diff --git a/include/typelist.hpp b/include/typelist.hpp index 749fb18b..bf789677 100644 --- a/include/typelist.hpp +++ b/include/typelist.hpp @@ -428,6 +428,9 @@ struct typelist { }())>; }; +template +constexpr bool is_any_of_v = std::disjunction_v...>; + namespace detail { template typename OtherTypelist, typename... Args> meta::typelist diff --git a/include/utils.hpp b/include/utils.hpp index 778cdffc..32316d5c 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -104,6 +104,44 @@ operator+(const fixed_string &lhs, const fixed_string &rhs return result; } +namespace detail { +constexpr int +log10(int n) noexcept { + if (n < 10) return 0; + return 1 + log10(n / 10); +} + +constexpr int +pow10(int n) noexcept { + if (n == 0) return 1; + return 10 * pow10(n - 1); +} + +template +constexpr fixed_string +make_fixed_string_impl(std::index_sequence) { + constexpr auto numDigits = sizeof...(Idx); + return { { ('0' + (N / pow10(numDigits - Idx - 1) % 10))..., 0 } }; +} +} // namespace detail + +template +constexpr auto +make_fixed_string() noexcept { + if constexpr (N == 0) { + return fixed_string{ "0" }; + } else { + constexpr std::size_t digits = 1U + static_cast(detail::log10(N)); + return detail::make_fixed_string_impl(std::make_index_sequence()); + } +} + +static_assert(fixed_string("0") == make_fixed_string<0>()); +static_assert(fixed_string("1") == make_fixed_string<1>()); +static_assert(fixed_string("2") == make_fixed_string<2>()); +static_assert(fixed_string("123") == make_fixed_string<123>()); +static_assert((fixed_string("out") + make_fixed_string<123>()) == fixed_string("out123")); + template [[nodiscard]] std::string type_name() noexcept { diff --git a/src/main.cpp b/src/main.cpp index 5b1dbe31..c2d086f0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -8,26 +8,34 @@ namespace fg = fair::graph; template -class count_source : public fg::node, fg::OUT::max(), "random">> { -public: +struct count_source : public fg::node> { + fg::PortOut random; + constexpr T process_one() { return 42; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (count_source), random); + template -class expect_sink : public fg::node, fg::IN::max(), "sink">> { -public: +struct expect_sink : public fg::node> { + fg::PortIn sink; + void process_one(T value) { std::cout << value << std::endl; } }; +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (expect_sink), sink); + template() * std::declval())> -class scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { -public: +struct scale : public fg::node> { + fg::PortIn original; + fg::PortOut scaled; + template V> [[nodiscard]] constexpr auto process_one(V a) const noexcept { @@ -35,9 +43,14 @@ class scale : public fg::node, fg::IN), original, scaled); + template() + std::declval())> -class adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, fg::OUT::max(), "sum">> { -public: +struct adder : public fg::node> { + fg::PortIn addend0; + fg::PortIn addend1; + fg::PortOut sum; + template V> [[nodiscard]] constexpr auto process_one(V a, V b) const noexcept { @@ -45,31 +58,35 @@ class adder : public fg::node, fg::IN), addend0, addend1, sum); + +using fg::port_type_t::STREAM, fg::port_direction_t::INPUT, fg::port_direction_t::OUTPUT; + template -class duplicate : public fg::node, fair::meta::typelist::max(), "in">>, fg::repeated_ports> { - using base = fg::node, fair::meta::typelist::max(), "in">>, fg::repeated_ports>; +class duplicate : public fg::node, fair::meta::typelist>, fg::repeated_ports> { + using base = fg::node, fair::meta::typelist>, fg::repeated_ports>; public: using return_type = typename fg::traits::node::return_type; [[nodiscard]] constexpr return_type process_one(T a) const noexcept { - return [&a](std::index_sequence) { return std::make_tuple(((void) Is, a)...); } - (std::make_index_sequence()); + return [&a](std::index_sequence) { return std::make_tuple(((void) Is, a)...); }(std::make_index_sequence()); } }; template requires(Depth > 0) -class delay : public fg::node, fg::IN::max(), "in">, fg::OUT::max(), "out">> { +struct delay : public fg::node> { + fg::PortIn in; + fg::PortOut out; std::array buffer = {}; int pos = 0; -public: [[nodiscard]] constexpr T - process_one(T in) noexcept { + process_one(T val) noexcept { T ret = buffer[pos]; - buffer[pos] = in; + buffer[pos] = val; if (pos == Depth - 1) { pos = 0; } else { @@ -79,6 +96,8 @@ class delay : public fg::node, fg::IN), in, out); + int main() { using fg::merge; diff --git a/test/blocklib/core/fft/fft.hpp b/test/blocklib/core/fft/fft.hpp index eb946622..a9920cfd 100644 --- a/test/blocklib/core/fft/fft.hpp +++ b/test/blocklib/core/fft/fft.hpp @@ -114,8 +114,8 @@ struct fft : node> { using OutUniquePtr = typename fftw::OutUniquePtr; using PlanUniquePtr = typename fftw::PlanUniquePtr; - IN in; - OUT> out; + PortIn in; + PortOut> out; std::size_t fftSize{ 1024 }; int window{ static_cast(WindowFunction::None) }; diff --git a/test/blocklib/core/filter/time_domain_filter.hpp b/test/blocklib/core/filter/time_domain_filter.hpp index 04d9426e..47007343 100644 --- a/test/blocklib/core/filter/time_domain_filter.hpp +++ b/test/blocklib/core/filter/time_domain_filter.hpp @@ -17,8 +17,8 @@ struct fir_filter : node, Doc> { - IN in; - OUT out; + PortIn in; + PortOut out; std::vector b{}; // feedforward coefficients history_buffer inputHistory{ 32 }; @@ -51,8 +51,8 @@ struct iir_filter : node, Doc> { - IN in; - OUT out; + PortIn in; + PortOut out; std::vector b{ 1 }; // feed-forward coefficients std::vector a{ 1 }; // feedback coefficients history_buffer inputHistory{ 32 }; diff --git a/test/blocklib/core/sources/clock_source.hpp b/test/blocklib/core/sources/clock_source.hpp index b87e0c23..cf534e7e 100644 --- a/test/blocklib/core/sources/clock_source.hpp +++ b/test/blocklib/core/sources/clock_source.hpp @@ -30,7 +30,7 @@ ClockSource Documentation -- add here )"">> { std::chrono::time_point nextTimePoint = ClockSourceType::now(); // - OUT out; + PortOut out; std::vector tags{}; std::size_t next_tag{ 0 }; // diff --git a/test/blocklib/core/unit-test/common_nodes.hpp b/test/blocklib/core/unit-test/common_nodes.hpp index 2fc2f979..1b95a9b8 100644 --- a/test/blocklib/core/unit-test/common_nodes.hpp +++ b/test/blocklib/core/unit-test/common_nodes.hpp @@ -17,8 +17,8 @@ class builtin_multiply : public fair::graph::node> { T _factor = static_cast(1.0f); public: - fair::graph::IN in; - fair::graph::OUT out; + fair::graph::PortIn in; + fair::graph::PortOut out; builtin_multiply() = delete; @@ -40,10 +40,10 @@ ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out); template class builtin_counter : public fair::graph::node> { public: - static std::size_t s_event_count; + static std::size_t s_event_count; - fair::graph::IN in; - fair::graph::OUT out; + fair::graph::PortIn in; + fair::graph::PortOut out; [[nodiscard]] constexpr auto process_one(T a) const noexcept { @@ -69,11 +69,11 @@ class multi_adder : public fair::graph::node_model { const std::string unique_name_ = fmt::format("multi_adder#{}", unique_id); // TODO: resolve symbol duplication protected: - using in_port_t = fair::graph::IN; + using in_port_t = fair::graph::PortIn; // std::list because ports don't like to change in-memory address // after connection is established, and vector might reallocate - std::list _input_ports; - fair::graph::OUT _output_port; + std::list _input_ports; + fair::graph::PortOut _output_port; protected: using setting_map = std::map>; diff --git a/test/blocklib/core/unit-test/tag_monitors.hpp b/test/blocklib/core/unit-test/tag_monitors.hpp index 0abef7c4..8bd958f5 100644 --- a/test/blocklib/core/unit-test/tag_monitors.hpp +++ b/test/blocklib/core/unit-test/tag_monitors.hpp @@ -81,7 +81,7 @@ equal_tag_lists(const std::vector &tags1, const std::vector &tags2 template struct TagSource : public node> { - OUT out; + PortOut out; std::vector tags{}; std::size_t next_tag{ 0 }; std::int64_t n_samples_max = 1024; @@ -139,8 +139,8 @@ struct TagSource : public node> { template struct TagMonitor : public node> { - IN in; - OUT out; + PortIn in; + PortOut out; std::vector tags{}; std::int64_t n_samples_produced{ 0 }; @@ -179,7 +179,7 @@ struct TagMonitor : public node> { template struct TagSink : public node> { using ClockSourceType = std::chrono::system_clock; - IN in; + PortIn in; std::vector tags{}; std::int64_t n_samples_produced{ 0 }; std::chrono::time_point timeFirstSample = ClockSourceType::now(); diff --git a/test/plugins/good_base_plugin.cpp b/test/plugins/good_base_plugin.cpp index f0931776..bb54b165 100644 --- a/test/plugins/good_base_plugin.cpp +++ b/test/plugins/good_base_plugin.cpp @@ -27,7 +27,7 @@ read_total_count(const fair::graph::property_map ¶ms) { } template -class cout_sink : public fg::node, fg::IN> { +class cout_sink : public fg::node, fg::PortInNamed> { public: std::size_t total_count = -1_UZ; @@ -45,7 +45,7 @@ class cout_sink : public fg::node, fg::IN> { }; template -class fixed_source : public fg::node, fg::OUT> { +class fixed_source : public fg::node, fg::PortOutNamed> { public: std::size_t event_count = -1_UZ; // infinite count by default diff --git a/test/plugins/good_conversion_plugin.cpp b/test/plugins/good_conversion_plugin.cpp index ac99acdb..1d779d9a 100644 --- a/test/plugins/good_conversion_plugin.cpp +++ b/test/plugins/good_conversion_plugin.cpp @@ -11,8 +11,8 @@ namespace fg = fair::graph; template class convert : public fg::node> { public: - fg::IN in; - fg::OUT out; + fg::PortIn in; + fg::PortOut out; [[nodiscard]] constexpr auto process_one(From value) const noexcept { diff --git a/test/plugins/good_math_plugin.cpp b/test/plugins/good_math_plugin.cpp index ad96b0db..49bd0812 100644 --- a/test/plugins/good_math_plugin.cpp +++ b/test/plugins/good_math_plugin.cpp @@ -28,8 +28,8 @@ class math_base { T _factor = static_cast(1.0f); public: - fg::IN in; - fg::OUT out; + fg::PortIn> in; + fg::PortOut> out; math_base() = delete; diff --git a/test/qa_data_sink.cpp b/test/qa_data_sink.cpp index 78b14e47..a6b8154c 100644 --- a/test/qa_data_sink.cpp +++ b/test/qa_data_sink.cpp @@ -37,7 +37,7 @@ namespace fair::graph::data_sink_test { template struct Source : public node> { - OUT out; + PortOut out; std::int32_t n_samples_produced = 0; std::int32_t n_samples_max = 1024; std::size_t n_tag_offset = 0; @@ -322,7 +322,7 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(connection_result_t::SUCCESS, flow_graph.connect<"out">(src).to<"in">(sink))); - auto poller_data_only = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); + auto poller_data_only = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); expect(neq(poller_data_only, nullptr)); auto poller_with_tags = data_sink_registry::instance().get_streaming_poller(data_sink_query::sink_name("test_sink"), blocking_mode::Blocking); diff --git a/test/qa_dynamic_node.cpp b/test/qa_dynamic_node.cpp index 3a96bcbe..47fb475b 100644 --- a/test/qa_dynamic_node.cpp +++ b/test/qa_dynamic_node.cpp @@ -10,7 +10,7 @@ template std::atomic_size_t multi_adder::_unique_id_counter = 0; template -struct fixed_source : public fg::node, fg::OUT> { +struct fixed_source : public fg::node, fg::PortOutNamed> { T value = 1; fg::work_return_t @@ -30,7 +30,7 @@ struct fixed_source : public fg::node, fg::OUT>); template -struct cout_sink : public fg::node, fg::IN> { +struct cout_sink : public fg::node, fg::PortInNamed> { std::size_t remaining = 0; void diff --git a/test/qa_dynamic_port.cpp b/test/qa_dynamic_port.cpp index 108ba81c..07ef5c12 100644 --- a/test/qa_dynamic_port.cpp +++ b/test/qa_dynamic_port.cpp @@ -29,8 +29,8 @@ class dynamic_node : public fg::node { template() * std::declval())> class scale : public fg::node> { public: - fg::IN original; - fg::OUT scaled; + fg::PortIn original; + fg::PortOut scaled; template V> [[nodiscard]] constexpr auto @@ -44,9 +44,9 @@ ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, T Scale, typename R), (scale() + std::declval())> class adder : public fg::node> { public: - fg::IN addend0; - fg::IN addend1; - fg::OUT sum; + fg::PortIn addend0; + fg::PortIn addend1; + fg::PortOut sum; template V> [[nodiscard]] constexpr auto @@ -60,7 +60,7 @@ ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, typename R), (adder), add template class cout_sink : public fg::node> { public: - fg::IN sink; + fg::PortIn sink; void process_one(T value) { @@ -74,8 +74,8 @@ ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), sink); template class repeater_source : public fg::node> { public: - fg::OUT value; - std::size_t _counter = 0; + fg::PortOut value; + std::size_t _counter = 0; fair::graph::work_return_t work(std::size_t requested_work) { @@ -103,24 +103,33 @@ const boost::ut::suite PortApiTests = [] { using namespace fair::graph; "PortApi"_test = [] { - static_assert(Port>); - static_assert(Port("in"))>); - static_assert(Port>); - static_assert(Port>); - static_assert(Port>); - - static_assert(IN::static_name() == fixed_string("in")); - static_assert(requires { IN("in").name; }); + static_assert(PortType>); + static_assert(PortType())>); + static_assert(PortType>); + static_assert(PortType>); + static_assert(PortType>); + + static_assert(PortType>); + static_assert(PortType("in"))>); + static_assert(PortType>); + static_assert(PortType>); + static_assert(PortType>); + + static_assert(PortIn>::Required::MinSamples == 1); + static_assert(PortIn>::Required::MaxSamples == 2); + static_assert(PortIn::direction() == port_direction_t::INPUT); + static_assert(PortOut::direction() == port_direction_t::OUTPUT); }; "PortBufferApi"_test = [] { - OUT::max(), "out0"> output_port; - BufferWriter auto &writer = output_port.streamWriter(); + PortOut output_port; + BufferWriter auto &writer = output_port.streamWriter(); // BufferWriter auto &tagWriter = output_port.tagWriter(); expect(ge(writer.available(), 32_UZ)); - IN::max(), "int0"> input_port; - const BufferReader auto &reader = input_port.streamReader(); + using ExplicitUnlimitedSize = RequiredSamples<1, std::numeric_limits::max()>; + PortIn input_port; + const BufferReader auto &reader = input_port.streamReader(); expect(eq(reader.available(), 0_UZ)); auto buffers = output_port.buffer(); input_port.setBuffer(buffers.streamBuffer, buffers.tagBufferType); @@ -139,9 +148,10 @@ const boost::ut::suite PortApiTests = [] { "RuntimePortApi"_test = [] { // declare in block - OUT::max(), "out"> out; - IN::max(), "in"> in; - std::vector port_list; + using ExplicitUnlimitedSize = RequiredSamples<1, std::numeric_limits::max()>; + PortOut out; + PortIn in; + std::vector port_list; port_list.emplace_back(out); port_list.emplace_back(in); diff --git a/test/qa_fft.cpp b/test/qa_fft.cpp index 62df8365..87b978da 100644 --- a/test/qa_fft.cpp +++ b/test/qa_fft.cpp @@ -17,9 +17,9 @@ namespace fg = fair::graph; template struct CountSource : public fg::node> { - fg::OUT out{}; - int count{ 0 }; - int nSamples{ 1024 }; + fg::PortOut out{}; + int count{ 0 }; + int nSamples{ 1024 }; constexpr std::make_signed_t available_samples(const CountSource & /*d*/) noexcept { diff --git a/test/qa_hier_node.cpp b/test/qa_hier_node.cpp index efe942a9..a8e8c174 100644 --- a/test/qa_hier_node.cpp +++ b/test/qa_hier_node.cpp @@ -6,7 +6,7 @@ namespace fg = fair::graph; template() * std::declval())> -struct scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { +struct scale : public fg::node, fg::PortInNamed, fg::PortOutNamed> { template V> [[nodiscard]] constexpr auto process_one(V a) const noexcept { @@ -15,8 +15,7 @@ struct scale : public fg::node, fg::IN() + std::declval())> -struct adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, - fg::OUT::max(), "sum">> { +struct adder : public fg::node, fg::PortInNamed, fg::PortInNamed, fg::PortOutNamed> { template V> [[nodiscard]] constexpr auto process_one(V a, V b) const noexcept { @@ -42,7 +41,7 @@ class hier_node : public fg::node_model { std::vector _tags_at_output; std::unique_ptr _settings = std::make_unique>>(*this); - using in_port_t = fg::IN; + using in_port_t = fg::PortIn; fg::scheduler::simple<> _scheduler; @@ -136,17 +135,17 @@ template std::atomic_size_t hier_node::_unique_id_counter = 0; template -struct fixed_source : public fg::node, fg::OUT> { - std::size_t remaining_events_count; +struct fixed_source : public fg::node> { + fg::PortOut> out; + std::size_t remaining_events_count; - T value = 1; + T value = 1; fg::work_return_t work(std::size_t requested_work) { if (remaining_events_count != 0) { using namespace fair::literals; - auto &port = fg::output_port<0>(this); - auto &writer = port.streamWriter(); + auto &writer = out.streamWriter(); auto data = writer.reserve_output_range(1_UZ); data[0] = value; data.publish(1_UZ); @@ -165,11 +164,12 @@ struct fixed_source : public fg::node, fg::OUT), remaining_events_count); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (fixed_source), out, remaining_events_count); template -struct cout_sink : public fg::node, fg::IN> { - std::size_t remaining = 0; +struct cout_sink : public fg::node> { + fg::PortIn> in; + std::size_t remaining = 0; void process_one(T value) { @@ -180,7 +180,7 @@ struct cout_sink : public fg::node, fg::IN> { } }; -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), remaining); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (cout_sink), in, remaining); fg::graph make_graph(std::size_t events_count) { diff --git a/test/qa_node.cpp b/test/qa_node.cpp index ff7b8025..11be29bc 100644 --- a/test/qa_node.cpp +++ b/test/qa_node.cpp @@ -63,9 +63,9 @@ struct StrideTestData { template struct CountSource : public fg::node> { - fg::OUT out{}; - int count{ 0 }; - int n_samples{ 1024 }; + fg::PortOut out{}; + int count{ 0 }; + int n_samples{ 1024 }; constexpr std::make_signed_t available_samples(const CountSource & /*d*/) noexcept { @@ -81,11 +81,11 @@ struct CountSource : public fg::node> { template struct IntDecBlock : public fg::node, fg::PerformDecimationInterpolation, fg::PerformStride> { - fg::IN in{}; - fg::OUT out{}; + fg::PortIn in{}; + fg::PortOut out{}; - ProcessStatus status{}; - bool write_to_vector{ false }; + ProcessStatus status{}; + bool write_to_vector{ false }; fg::work_return_status_t process_bulk(std::span input, std::span output) noexcept { diff --git a/test/qa_plugins_test.cpp b/test/qa_plugins_test.cpp index ab7124f4..3fe53880 100644 --- a/test/qa_plugins_test.cpp +++ b/test/qa_plugins_test.cpp @@ -40,8 +40,8 @@ class builtin_multiply : public fg::node> { T _factor = static_cast(1.0f); public: - fg::IN in; - fg::OUT out; + fg::PortIn in; + fg::PortOut out; builtin_multiply() = delete; diff --git a/test/qa_scheduler.cpp b/test/qa_scheduler.cpp index 65a64a28..0aba6930 100644 --- a/test/qa_scheduler.cpp +++ b/test/qa_scheduler.cpp @@ -34,7 +34,7 @@ class tracer { // define some example graph nodes template -class count_source : public fg::node, fg::OUT::max(), "out">> { +class count_source : public fg::node, fg::PortOutNamed> { tracer &_tracer; std::size_t _count = 0; @@ -57,7 +57,7 @@ class count_source : public fg::node, fg::OUT>); template -class expect_sink : public fg::node, fg::IN::max(), "in">> { +class expect_sink : public fg::node, fg::PortInNamed> { tracer &_tracer; std::int64_t _count = 0; std::function _checker; @@ -84,7 +84,7 @@ class expect_sink : public fg::node, fg::IN() * std::declval())> -class scale : public fg::node, fg::IN::max(), "original">, fg::OUT::max(), "scaled">> { +class scale : public fg::node, fg::PortInNamed, fg::PortOutNamed> { tracer &_tracer; public: @@ -99,8 +99,7 @@ class scale : public fg::node, fg::IN() + std::declval())> -class adder : public fg::node, fg::IN::max(), "addend0">, fg::IN::max(), "addend1">, - fg::OUT::max(), "sum">> { +class adder : public fg::node, fg::PortInNamed, fg::PortInNamed, fg::PortOutNamed> { tracer &_tracer; public: diff --git a/test/qa_settings.cpp b/test/qa_settings.cpp index 314f9c59..61be8773 100644 --- a/test/qa_settings.cpp +++ b/test/qa_settings.cpp @@ -85,7 +85,7 @@ printChanges(const property_map &oldMap, const property_map &newMap) noexcept { template struct Source : public node> { - OUT out; + PortOut out; std::int32_t n_samples_produced = 0; std::int32_t n_samples_max = 1024; std::int32_t n_tag_offset = 0; @@ -121,8 +121,8 @@ some test doc documentation template struct TestBlock : public node, BlockingIO, TestBlockDoc, SupportedTypes> { - IN in{}; - OUT out{}; + PortIn in{}; + PortOut out{}; // parameters A, Unit<"As">> scaling_factor = static_cast(1); // N.B. unit 'As' = 'Coulomb' A context{}; @@ -165,8 +165,8 @@ template struct Decimate : public node, SupportedTypes, PerformDecimationInterpolation, Doc> { - IN in{}; - OUT out{}; + PortIn in{}; + PortOut out{}; A sample_rate = 1.f; void @@ -205,7 +205,7 @@ static_assert(NodeType>); template struct Sink : public node> { - IN in; + PortIn in; std::int32_t n_samples_consumed = 0; std::int32_t n_samples_max = -1; int64_t last_tag_position = -1;