Skip to content

Commit

Permalink
[try_put_and_wait] Add try_put_and_wait feature to oneTBB Flow Graph (#…
Browse files Browse the repository at this point in the history
…1463)

Signed-off-by: kboyarinov <konstantin.boyarinov@intel.com>
Co-authored-by: pavelkumbrasev <pavel.kumbrasev@intel.com>
Co-authored-by: Dmitri Mokhov <dmitri.n.mokhov@intel.com>
Co-authored-by: Aleksei Fedotov <aleksei.fedotov@intel.com>
Co-authored-by: Mike Voss <michaelj.voss@intel.com>
  • Loading branch information
5 people authored Aug 26, 2024
1 parent 5555985 commit aae7fa6
Show file tree
Hide file tree
Showing 27 changed files with 3,836 additions and 272 deletions.
5 changes: 5 additions & 0 deletions include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \
|| TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT)
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
62 changes: 53 additions & 9 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,29 +272,57 @@ class forward_task_bypass : public graph_task {

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

using check_metainfo = std::is_same<BaseTaskType, graph_task>;
using without_metainfo = std::true_type;
using with_metainfo = std::false_type;

graph_task* call_apply_body_bypass_impl(without_metainfo) {
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(with_metainfo) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(check_metainfo{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
: BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +371,15 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand All @@ -361,7 +398,14 @@ class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_

T *my_node;

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* execute(const message_metainfo&) override {
#else
graph_task* execute() override {
#endif
return my_node->decrement_counter( 1 );
}

Expand Down
113 changes: 90 additions & 23 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;
bool successful_get = false;

do {
predecessor_type *src;
Expand All @@ -113,18 +116,35 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr) {
successful_get = src->try_get( v, *metainfo_ptr );
} else
#endif
{
successful_get = src->try_get( v );
}

if (msg == false) {
if (successful_get == false) {
// Relinquish ownership of the edge
register_successor(*src, *my_owner);
} else {
// Retain ownership of the edge
this->add(*src);
}
} while ( msg == false );
return msg;
} while ( successful_get == false );
return successful_get;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -157,8 +177,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
bool msg = false;
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
predecessor_type* pred = nullptr;
Expand All @@ -172,9 +193,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve( v );
}

if (msg == false) {
if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *pred, *this->my_owner );
Expand All @@ -183,11 +211,21 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Retain ownership of the edge
this->add( *pred);
}
} while ( msg == false );
} while ( successful_reserve == false );

return msg;
return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
reserved_src.store(nullptr, std::memory_order_relaxed);
Expand Down Expand Up @@ -268,6 +306,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +368,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +380,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,6 +402,21 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
Expand Down Expand Up @@ -411,11 +463,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -429,6 +485,17 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
Loading

0 comments on commit aae7fa6

Please sign in to comment.