Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Add try_put_and_wait feature to oneTBB Flow Graph #1463

Merged
merged 46 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
cef3bb8
Add wait_vertex implementation
pavelkumbrasev Apr 15, 2024
f25d898
Improve task_group scalability
pavelkumbrasev Apr 15, 2024
cda2377
Increase wait map size
pavelkumbrasev Apr 15, 2024
63ea3b6
Rename wait in task_group
pavelkumbrasev Apr 15, 2024
b3093e2
Fix compilation and copyright
pavelkumbrasev Apr 15, 2024
0313450
Drop brackets
pavelkumbrasev Apr 15, 2024
204926c
Add missed dtor and deallocation
pavelkumbrasev Apr 15, 2024
5e27eb0
Fix copyright
pavelkumbrasev Apr 15, 2024
2bce523
Fix collaborative_call_once compilation
pavelkumbrasev Apr 15, 2024
c3ea187
Add comment for release in wait_vertex
pavelkumbrasev Apr 15, 2024
69aa6ec
Make names consistent
pavelkumbrasev Apr 15, 2024
4059154
Add missed namespace qualifier
pavelkumbrasev Apr 15, 2024
f6771e4
Fix vertex names
pavelkumbrasev Apr 16, 2024
c984265
Update namespace qualifiers
pavelkumbrasev Apr 16, 2024
d352aa3
Refactor node structure. Replace nodes in parallel_for and parallel_r…
pavelkumbrasev Feb 27, 2024
1d70767
Fix constructor and copyright
pavelkumbrasev Feb 27, 2024
fc7de60
Initial impl
pavelkumbrasev Feb 27, 2024
f6b9a46
Add entry point
pavelkumbrasev Feb 27, 2024
67d8457
Improve scalability of Flow Graph
kboyarinov Feb 27, 2024
6055dc7
Cleanup and increase FG interface version
kboyarinov Feb 28, 2024
94baad0
Fix issues caused by interface version changes
kboyarinov Feb 29, 2024
a8ccbba
Align changes
kboyarinov Apr 17, 2024
e449166
Exclude changes in parallel for
kboyarinov Apr 17, 2024
dc3224c
Exclude changes in task.cpp
kboyarinov Apr 17, 2024
899bd5c
Increase interface version for multiple interfaces
kboyarinov Apr 17, 2024
b4767c6
Temporarly disable map cleanup on dispatcher dtor
kboyarinov Apr 17, 2024
aee5fb2
Fix deduction guides
kboyarinov May 1, 2024
edae460
Merge remote-tracking branch 'origin/master' into dev/kboyarinov/try_…
kboyarinov Aug 1, 2024
db69fca
Fix incorrect alignment with master
kboyarinov Aug 1, 2024
35358fd
[try_put_and_wait] Part 1: Add implementation of try_put_and_wait fea…
kboyarinov Aug 1, 2024
b24bbd4
[try_put_and_wait] Part 3: Add implementation of try_put_and_wait fea…
dnmokhov Aug 1, 2024
33b373d
[try_put_and_wait] Part 4: Add implementation of try_put_and_wait API…
kboyarinov Aug 1, 2024
012f2e0
[try_put_and_wait] Part 5: Add try_put_and_wait API for indexer_node …
kboyarinov Aug 1, 2024
5cd159a
[try_put_and_wait] Part 2: Add implementation of try_put_and_wait fea…
kboyarinov Aug 15, 2024
d67ee73
[try_put_and_wait] Part 6: Add implementation for limiter_node + rese…
kboyarinov Aug 16, 2024
b533e9d
Merge remote-tracking branch 'origin/master' into dev/kboyarinov/try_…
kboyarinov Aug 16, 2024
e387116
[try_put_and_wait] Part 7: implementation of try_put_and_wait for que…
kboyarinov Aug 19, 2024
1ce85d9
[try_put_and_wait] Part 8: Add implementation for key_matching join_n…
kboyarinov Aug 20, 2024
99d7f3a
[try_put_and_wait] Part 11: Add implementation of try_put_and_wait fe…
dnmokhov Aug 23, 2024
172c44c
[try_put_and_wait] Part 10: implementation for broadcast-push buffers…
kboyarinov Aug 23, 2024
0538258
Remove CI trigger
kboyarinov Aug 23, 2024
02bba15
Merge branch 'dev/kboyarinov/try_put_and_wait_production' of https://…
kboyarinov Aug 23, 2024
30bca6e
Merge remote-tracking branch 'origin/master' into dev/kboyarinov/try_…
kboyarinov Aug 23, 2024
2345106
Add missed brief
kboyarinov Aug 23, 2024
37dfe6a
Add d1 to task_arena_base friend declaration
kboyarinov Aug 23, 2024
d21be43
Supress warning on Windows
kboyarinov Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \
|| TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT)
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
62 changes: 53 additions & 9 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,29 +272,57 @@ class forward_task_bypass : public graph_task {

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

using check_metainfo = std::is_same<BaseTaskType, graph_task>;
using without_metainfo = std::true_type;
using with_metainfo = std::false_type;

graph_task* call_apply_body_bypass_impl(without_metainfo) {
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(with_metainfo) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(check_metainfo{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
: BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +371,15 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand All @@ -361,7 +398,14 @@ class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_

T *my_node;

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* execute(const message_metainfo&) override {
#else
graph_task* execute() override {
#endif
return my_node->decrement_counter( 1 );
}

Expand Down
113 changes: 90 additions & 23 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;
bool successful_get = false;

do {
predecessor_type *src;
Expand All @@ -113,18 +116,35 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr) {
successful_get = src->try_get( v, *metainfo_ptr );
} else
#endif
{
successful_get = src->try_get( v );
}

if (msg == false) {
if (successful_get == false) {
// Relinquish ownership of the edge
register_successor(*src, *my_owner);
} else {
// Retain ownership of the edge
this->add(*src);
}
} while ( msg == false );
return msg;
} while ( successful_get == false );
return successful_get;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -157,8 +177,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
bool msg = false;
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
predecessor_type* pred = nullptr;
Expand All @@ -172,9 +193,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve( v );
}

if (msg == false) {
if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *pred, *this->my_owner );
Expand All @@ -183,11 +211,21 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Retain ownership of the edge
this->add( *pred);
}
} while ( msg == false );
} while ( successful_reserve == false );

return msg;
return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
reserved_src.store(nullptr, std::memory_order_relaxed);
Expand Down Expand Up @@ -268,6 +306,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +368,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +380,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,6 +402,21 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
Expand Down Expand Up @@ -411,11 +463,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -429,6 +485,17 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
Loading
Loading