Skip to content

Commit

Permalink
Merge branch 'dev/kboyarinov/try_put_and_wait_production' of https://…
Browse files Browse the repository at this point in the history
…github.com/oneapi-src/oneTBB into dev/kboyarinov/try_put_and_wait_production
  • Loading branch information
kboyarinov committed Aug 23, 2024
2 parents 0538258 + 172c44c commit 02bba15
Show file tree
Hide file tree
Showing 6 changed files with 610 additions and 29 deletions.
7 changes: 7 additions & 0 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,14 @@ class threshold_regulator<T, continue_msg, void> : public continue_receiver, no_

T *my_node;

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* execute(const message_metainfo&) override {
#else
graph_task* execute() override {
#endif
return my_node->decrement_counter( 1 );
}

Expand Down
26 changes: 21 additions & 5 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,18 +753,25 @@ class continue_input : public continue_receiver {
virtual broadcast_cache<output_type > &successors() = 0;

friend class apply_body_task_bypass< class_type, continue_msg >;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
friend class apply_body_task_bypass< class_type, continue_msg, trackable_messages_graph_task >;
#endif

//! Applies the body to the provided input
graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) ) {
graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
// There is an extra copied needed to capture the
// body execution without the try_put
fgt_begin_body( my_body );
output_type v = (*my_body)( continue_msg() );
fgt_end_body( my_body );
return successors().try_put_task( v );
return successors().try_put_task( v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* execute(const message_metainfo& metainfo) override {
#else
graph_task* execute() override {
#endif
if(!is_graph_active(my_graph_ref)) {
return nullptr;
}
Expand All @@ -776,12 +783,21 @@ class continue_input : public continue_receiver {
#if _MSC_VER && !__INTEL_COMPILER
#pragma warning (pop)
#endif
return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}) );
return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) );
}
else {
d1::small_object_allocator allocator{};
typedef apply_body_task_bypass<class_type, continue_msg> task_type;
graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
graph_task* t = nullptr;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (!metainfo.empty()) {
using task_type = apply_body_task_bypass<class_type, continue_msg, trackable_messages_graph_task>;
t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority, metainfo );
} else
#endif
{
using task_type = apply_body_task_bypass<class_type, continue_msg>;
t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
}
return t;
}
}
Expand Down
122 changes: 100 additions & 22 deletions include/oneapi/tbb/flow_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,30 +401,61 @@ class continue_receiver : public receiver< continue_msg > {
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class broadcast_cache;
template<typename X, typename Y> friend class round_robin_cache;

private:
// execute body is supposed to be too small to create a task for.
graph_task* try_put_task( const input_type & ) override {
graph_task* try_put_task_impl( const input_type& __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo predecessor_metainfo;
#endif
{
spin_mutex::scoped_lock l(my_mutex);
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Prolong the wait and store the metainfo until receiving signals from all the predecessors
for (auto waiter : metainfo.waiters()) {
waiter->reserve(1);
}
my_current_metainfo.merge(metainfo);
#endif
if ( ++my_current_count < my_predecessor_count )
return SUCCESSFULLY_ENQUEUED;
else
else {
my_current_count = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
predecessor_metainfo = my_current_metainfo;
my_current_metainfo = message_metainfo{};
#endif
}
}
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* res = execute(predecessor_metainfo);
for (auto waiter : predecessor_metainfo.waiters()) {
waiter->release(1);
}
#else
graph_task* res = execute();
#endif
return res? res : SUCCESSFULLY_ENQUEUED;
}

protected:
graph_task* try_put_task( const input_type& input ) override {
return try_put_task_impl(input __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add metainfo support for continue_receiver
graph_task* try_put_task( const input_type& input, const message_metainfo& ) override {
return try_put_task(input);
graph_task* try_put_task( const input_type& input, const message_metainfo& metainfo ) override {
return try_put_task_impl(input, metainfo);
}
#endif

spin_mutex my_mutex;
int my_predecessor_count;
int my_current_count;
int my_initial_predecessor_count;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo my_current_metainfo;
#endif
node_priority_t my_priority;
// the friend declaration in the base class did not eliminate the "protected class"
// error in gcc 4.1.2
Expand All @@ -440,7 +471,11 @@ class continue_receiver : public receiver< continue_msg > {
//! Does whatever should happen when the threshold is reached
/** This should be very fast or else spawn a task. This is
called while the sender is blocked in the try_put(). */
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* execute(const message_metainfo& metainfo) = 0;
#else
virtual graph_task* execute() = 0;
#endif
template<typename TT, typename M> friend class successor_cache;
bool is_continue_receiver() override { return true; }

Expand Down Expand Up @@ -3329,20 +3364,41 @@ class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
return false;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_get( input_type &v, message_metainfo& metainfo ) override {
spin_mutex::scoped_lock l( my_mutex );
if (my_buffer_is_valid) {
v = my_buffer;
metainfo = my_buffered_metainfo;

// Since the successor of the node will use move semantics while wrapping the metainfo
// that is designed to transfer the ownership of the value from single-push buffer to the task
// It is required to reserve one more reference here because the value keeps in the buffer
// and the ownership is not transferred
for (auto msg_waiter : metainfo.waiters()) {
msg_waiter->reserve(1);
}
return true;
}
return false;
}
#endif

//! Reserves an item
bool try_reserve( T &v ) override {
return try_get(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
private:
// TODO: add real implementation
bool try_reserve(T& v, message_metainfo&) override {
return try_reserve(v);
}

bool try_get( input_type& v, message_metainfo& ) override {
return try_get(v);
bool try_reserve(T& v, message_metainfo& metainfo) override {
spin_mutex::scoped_lock l( my_mutex );
if (my_buffer_is_valid) {
v = my_buffer;
metainfo = my_buffered_metainfo;
return true;
}
return false;
}
public:
#endif
Expand All @@ -3361,6 +3417,12 @@ class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
void clear() {
spin_mutex::scoped_lock l( my_mutex );
my_buffer_is_valid = false;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
for (auto msg_waiter : my_buffered_metainfo.waiters()) {
msg_waiter->release(1);
}
my_buffered_metainfo = message_metainfo{};
#endif
}

protected:
Expand All @@ -3370,20 +3432,33 @@ class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
template<typename X, typename Y> friend class round_robin_cache;
graph_task* try_put_task( const input_type &v ) override {
spin_mutex::scoped_lock l( my_mutex );
return try_put_task_impl(v);
return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for overwrite_node
graph_task* try_put_task(const input_type& v, const message_metainfo&) override {
return try_put_task(v);
graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
spin_mutex::scoped_lock l( my_mutex );
return try_put_task_impl(v, metainfo);
}
#endif

graph_task * try_put_task_impl(const input_type &v) {
graph_task * try_put_task_impl(const input_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
my_buffer = v;
my_buffer_is_valid = true;
graph_task* rtask = my_successors.try_put_task(v);
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Since the new item is pushed to the buffer - reserving the waiters
for (auto msg_waiter : metainfo.waiters()) {
msg_waiter->reserve(1);
}

// Since the item is taken out from the buffer - releasing the stored waiters
for (auto msg_waiter : my_buffered_metainfo.waiters()) {
msg_waiter->release(1);
}

my_buffered_metainfo = metainfo;
#endif
graph_task* rtask = my_successors.try_put_task(v __TBB_FLOW_GRAPH_METAINFO_ARG(my_buffered_metainfo) );
if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
return rtask;
}
Expand Down Expand Up @@ -3421,6 +3496,9 @@ class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
spin_mutex my_mutex;
broadcast_cache< input_type, null_rw_mutex > my_successors;
input_type my_buffer;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo my_buffered_metainfo;
#endif
bool my_buffer_is_valid;

void reset_node( reset_flags f) override {
Expand Down Expand Up @@ -3467,13 +3545,13 @@ class write_once_node : public overwrite_node<T> {
template<typename X, typename Y> friend class round_robin_cache;
graph_task *try_put_task( const T &v ) override {
spin_mutex::scoped_lock l( this->my_mutex );
return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for write_once_node
graph_task* try_put_task(const T& v, const message_metainfo&) override {
return try_put_task(v);
graph_task* try_put_task(const T& v, const message_metainfo& metainfo) override {
spin_mutex::scoped_lock l( this->my_mutex );
return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v, metainfo);
}
#endif
}; // write_once_node
Expand Down
Loading

0 comments on commit 02bba15

Please sign in to comment.