Skip to content

Commit

Permalink
[try_put_and_wait] Part 2: Add implementation of try_put_and_wait fea…
Browse files Browse the repository at this point in the history
…ture for buffering nodes (#1412)

Co-authored-by: Aleksei Fedotov <aleksei.fedotov@intel.com>
  • Loading branch information
kboyarinov and aleksei-fedotov authored Aug 15, 2024
1 parent 012f2e0 commit 5cd159a
Show file tree
Hide file tree
Showing 13 changed files with 936 additions and 55 deletions.
62 changes: 45 additions & 17 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;
bool successful_get = false;

do {
predecessor_type *src;
Expand All @@ -113,19 +116,36 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr) {
successful_get = src->try_get( v, *metainfo_ptr );
} else
#endif
{
successful_get = src->try_get( v );
}

if (msg == false) {
if (successful_get == false) {
// Relinquish ownership of the edge
register_successor(*src, *my_owner);
} else {
// Retain ownership of the edge
this->add(*src);
}
} while ( msg == false );
return msg;
} while ( successful_get == false );
return successful_get;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
for(;;) {
Expand Down Expand Up @@ -158,7 +178,7 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

bool try_reserve( output_type &v ) {
bool msg = false;
bool successful_reserve = false;

do {
predecessor_type* pred = nullptr;
Expand All @@ -172,9 +192,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
successful_reserve = pred->try_reserve( v );

if (msg == false) {
if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *pred, *this->my_owner );
Expand All @@ -183,9 +203,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Retain ownership of the edge
this->add( *pred);
}
} while ( msg == false );
} while ( successful_reserve == false );

return msg;
return successful_reserve;
}

bool try_release() {
Expand Down Expand Up @@ -342,7 +362,7 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
Expand Down Expand Up @@ -425,11 +445,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -444,10 +468,14 @@ class round_robin_cache : public successor_cache<T, M> {
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for round robin cache
graph_task* try_put_task( const T& t, const message_metainfo& ) override {
return try_put_task(t);
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};
Expand Down
26 changes: 24 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,44 @@ class trackable_messages_graph_task : public graph_task {
}
}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: graph_task(g, allocator, node_priority)
, my_msg_wait_context_vertices(std::move(msg_waiters))
{
}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
: trackable_messages_graph_task(g, allocator, no_priority, msg_waiters) {}

trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: trackable_messages_graph_task(g, allocator, no_priority, std::move(msg_waiters)) {}

const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
return my_msg_wait_context_vertices;
}

protected:
template <typename DerivedType>
void finalize(const d1::execution_data& ed) {
auto wait_context_vertices = std::move(my_msg_wait_context_vertices);
auto msg_reference_vertices = std::move(my_msg_reference_vertices);
graph_task::finalize<DerivedType>(ed);

for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
// If there is no thread reference vertices associated with the task
// then this task was created by transferring the ownership from other metainfo
// instance (e.g. while taking from the buffer)
if (msg_reference_vertices.empty()) {
for (auto& msg_waiter : wait_context_vertices) {
msg_waiter->release(1);
}
} else {
for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
}
}
}
private:
Expand Down
52 changes: 50 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class item_buffer {
return element(i).item;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo& get_my_metainfo(size_t i) {
__TBB_ASSERT(my_item_valid(i), "attempt to get invalid item");
return element(i).metainfo;
}
#endif

// may be called with an empty slot or a slot that has already been constructed into.
void set_my_item(size_t i, const item_type &o
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
Expand All @@ -102,22 +109,46 @@ class item_buffer {
#endif
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
void set_my_item(size_t i, const item_type& o, message_metainfo&& metainfo) {
if(element(i).state != no_item) {
destroy_item(i);
}

new(&(element(i).item)) item_type(o);
new(&element(i).metainfo) message_metainfo(std::move(metainfo));
// Skipping the reservation on metainfo.waiters since the ownership
// is moving from metainfo to the cache
element(i).state = has_item;
}
#endif

// destructively-fetch an object from the buffer
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
void fetch_item(size_t i, item_type& o, message_metainfo& metainfo) {
__TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
o = get_my_item(i); // could have std::move assign semantics
metainfo = std::move(get_my_metainfo(i));
destroy_item(i);
}
#else
void fetch_item(size_t i, item_type &o) {
__TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
o = get_my_item(i); // could have std::move assign semantics
destroy_item(i);
}
#endif


// move an existing item from one slot to another. The moved-to slot must be unoccupied,
// the moved-from slot must exist and not be reserved. The after, from will be empty,
// to will be occupied but not reserved
void move_item(size_t to, size_t from) {
__TBB_ASSERT(!my_item_valid(to), "Trying to move to a non-empty slot");
__TBB_ASSERT(my_item_valid(from), "Trying to move from an empty slot");
set_my_item(to, get_my_item(from)); // could have std::move semantics
// could have std::move semantics
set_my_item(to, get_my_item(from) __TBB_FLOW_GRAPH_METAINFO_ARG(get_my_metainfo(from)));
destroy_item(from);

}

// put an item in an empty slot. Return true if successful, else false
Expand All @@ -129,12 +160,29 @@ class item_buffer {
return true;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
bool place_item(size_t here, const item_type &me, Metainfo&& metainfo) {
#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
if(my_item_valid(here)) return false;
#endif
set_my_item(here, me, std::forward<Metainfo>(metainfo));
return true;
}
#endif

// could be implemented with std::move semantics
void swap_items(size_t i, size_t j) {
__TBB_ASSERT(my_item_valid(i) && my_item_valid(j), "attempt to swap invalid item(s)");
item_type temp = get_my_item(i);
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo temp_metainfo = get_my_metainfo(i);
set_my_item(i, get_my_item(j), get_my_metainfo(j));
set_my_item(j, temp, temp_metainfo);
#else
set_my_item(i, get_my_item(j));
set_my_item(j, temp);
#endif
}

void destroy_item(size_type i) {
Expand Down
7 changes: 7 additions & 0 deletions include/oneapi/tbb/detail/_flow_graph_join_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,13 @@
return op_data.status == SUCCEEDED;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: implement try_get with metainfo for join_node
bool try_get( output_type &v, message_metainfo& ) override {
return try_get(v);
}
#endif

protected:
void reset_node(reset_flags f) override {
input_ports_type::reset(f);
Expand Down
17 changes: 12 additions & 5 deletions include/oneapi/tbb/detail/_flow_graph_node_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,12 @@ class function_input_base : public receiver<Input>, no_assign {
}
else {
input_type i;
if(my_predecessors.get_item(i)) {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo metainfo;
#endif
if(my_predecessors.get_item(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) {
++my_concurrency;
new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo)));
}
}
return new_task;
Expand Down Expand Up @@ -351,8 +354,12 @@ class function_input_base : public receiver<Input>, no_assign {
}

//! allocates a task to apply a body
graph_task* create_body_task( const input_type &input
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
graph_task* create_body_task( const input_type &input, Metainfo&& metainfo )
#else
graph_task* create_body_task( const input_type &input )
#endif
{
if (!is_graph_active(my_graph_ref)) {
return nullptr;
Expand All @@ -363,7 +370,7 @@ class function_input_base : public receiver<Input>, no_assign {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (!metainfo.empty()) {
using task_type = apply_body_task_bypass<class_type, input_type, trackable_messages_graph_task>;
t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, metainfo);
t = allocator.new_object<task_type>(my_graph_ref, allocator, *this, input, my_priority, std::forward<Metainfo>(metainfo));
} else
#endif
{
Expand Down
Loading

0 comments on commit 5cd159a

Please sign in to comment.