Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reference implementation for parallel_phase feature #1570

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5e5778e
Add initial impl of Parallel Block
pavelkumbrasev Nov 13, 2024
55fd835
Add test for workers_leave
pavelkumbrasev Nov 18, 2024
637a74d
Fix thread_leave_manager
pavelkumbrasev Nov 20, 2024
eb172a7
Add parallel_block API, improve tests
isaevil Nov 20, 2024
7f08af0
Improve tests for parallel_block
isaevil Nov 25, 2024
f6a139a
Add workers_leave::automatic, let hybric systems take advantage of pa…
isaevil Nov 26, 2024
0a9faed
Fix Windows compilation
isaevil Nov 26, 2024
c1802a1
Add win32 exports
isaevil Nov 27, 2024
ecd5a9e
Add mac64 symbols
isaevil Nov 27, 2024
0d32f80
Correct mask, add a clarifying comment to it
isaevil Nov 28, 2024
f61fc1d
Apply suggested typo fix
isaevil Nov 28, 2024
17f1dac
Rename block to phase
isaevil Nov 29, 2024
aa7cc2a
Align with RFC, utilize my_version_and_traits, improve readability
isaevil Dec 2, 2024
104606c
Fix non-preview compilation
isaevil Dec 3, 2024
597136f
Change scoped_parallel_phase default parameter, fix entry points for Win
isaevil Dec 4, 2024
1bc6e3a
Apply suggestions from code review
isaevil Dec 9, 2024
73742f3
Apply comments, change tests
isaevil Dec 6, 2024
ba5b922
Correct mac64 exports
isaevil Dec 10, 2024
7311cec
Improve test reporting, move thread demand into internals
isaevil Dec 13, 2024
1f22bc6
Extend testing suite with scoped phases and test with this_task_arena
isaevil Dec 13, 2024
fd4a24f
Apply suggestions from code review
isaevil Dec 17, 2024
77985c0
Simplify state transition logic
isaevil Dec 17, 2024
1a6b73c
Align leave_policy variables names across code
isaevil Dec 17, 2024
398d16b
Move parallel phase tests into a separate file
isaevil Dec 17, 2024
4dfaf5e
Address comments
isaevil Dec 17, 2024
6a951bd
Decrease the size of dummy work, don't execute tests for workerless a…
isaevil Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,8 @@
#define __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1
#endif

#if TBB_PREVIEW_PARALLEL_PHASE || __TBB_BUILD
#define __TBB_PREVIEW_PARALLEL_PHASE 1
#endif

#endif // __TBB_detail__config_H
174 changes: 153 additions & 21 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,11 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, s
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);

#if __TBB_PREVIEW_PARALLEL_PHASE
TBB_EXPORT void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base*, std::uintptr_t);
TBB_EXPORT void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t);
#endif
} // namespace r1

namespace d2 {
Expand Down Expand Up @@ -122,6 +127,14 @@ class task_arena_base {
normal = 2 * priority_stride,
high = 3 * priority_stride
};

#if __TBB_PREVIEW_PARALLEL_PHASE
enum class leave_policy : int {
automatic = 0,
fast = 1
};
#endif

#if __TBB_ARENA_BINDING
using constraints = tbb::detail::d1::constraints;
#endif /*__TBB_ARENA_BINDING*/
Expand Down Expand Up @@ -162,13 +175,37 @@ class task_arena_base {
return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
}

#if __TBB_PREVIEW_PARALLEL_PHASE
leave_policy get_leave_policy() const {
bool fast_policy_set = (my_version_and_traits & fast_leave_policy_flag) == fast_leave_policy_flag;
return fast_policy_set ? leave_policy::fast : leave_policy::automatic;
Comment on lines +180 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool fast_policy_set = (my_version_and_traits & fast_leave_policy_flag) == fast_leave_policy_flag;
return fast_policy_set ? leave_policy::fast : leave_policy::automatic;
return (my_version_and_traits & fast_leave_policy_flag) ? leave_policy::fast : leave_policy::automatic;

It's really the same but shorter :)

}

int leave_policy_to_traits(leave_policy lp) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to leave_policy_trait?

return lp == leave_policy::fast ? fast_leave_policy_flag : 0;
}

void set_leave_policy(leave_policy lp) {
my_version_and_traits |= leave_policy_to_traits(lp);
}
#endif

enum {
default_flags = 0
, core_type_support_flag = 1
default_flags = 0,
core_type_support_flag = 1,
fast_leave_policy_flag = 1 << 1
};

task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_to_traits(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(max_concurrency)
Expand All @@ -180,8 +217,16 @@ class task_arena_base {
{}

#if __TBB_ARENA_BINDING
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
: my_version_and_traits(default_flags | core_type_support_flag)
task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp
#endif
)
: my_version_and_traits(default_flags | core_type_support_flag
#if __TBB_PREVIEW_PARALLEL_PHASE
| leave_policy_to_traits(lp)
#endif
)
, my_initialization_state(do_once_state::uninitialized)
, my_arena(nullptr)
, my_max_concurrency(constraints_.max_concurrency)
Expand Down Expand Up @@ -259,31 +304,58 @@ class task_arena : public task_arena_base {
* Value of 1 is default and reflects behavior of implicit arenas.
**/
task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
Comment on lines 306 to +310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation seems a bit misaligned.

)
: task_arena_base(max_concurrency_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

#if __TBB_ARENA_BINDING
//! Creates task arena pinned to certain NUMA node
task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
: task_arena_base(constraints_, reserved_for_masters, a_priority)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
: task_arena_base(constraints_, reserved_for_masters, a_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, lp
#endif
)
{}

//! Copies settings from another task_arena
task_arena(const task_arena &s) // copy settings but not the reference or instance
task_arena(const task_arena &a) // copy settings but not the reference or instance
: task_arena_base(
constraints{}
.set_numa_id(s.my_numa_id)
.set_max_concurrency(s.my_max_concurrency)
.set_core_type(s.my_core_type)
.set_max_threads_per_core(s.my_max_threads_per_core)
, s.my_num_reserved_slots, s.my_priority)
.set_numa_id(a.my_numa_id)
.set_max_concurrency(a.my_max_concurrency)
.set_core_type(a.my_core_type)
.set_max_threads_per_core(a.my_max_threads_per_core)
, a.my_num_reserved_slots, a.my_priority
#if __TBB_PREVIEW_PARALLEL_PHASE
, a.get_leave_policy()
#endif
)

{}
#else
//! Copies settings from another task_arena
task_arena(const task_arena& a) // copy settings but not the reference or instance
: task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
: task_arena_base(a.my_max_concurrency,
a.my_num_reserved_slots,
a.my_priority,
#if __TBB_PREVIEW_PARALLEL_PHASE
a.get_leave_policy()
#endif
)
{}
#endif /*__TBB_ARENA_BINDING*/

Expand All @@ -292,7 +364,11 @@ class task_arena : public task_arena_base {

//! Creates an instance of task_arena attached to the current arena of the thread
explicit task_arena( attach )
: task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
: task_arena_base(automatic, 1, priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy::automatic
#endif
) // use default settings if attach fails
{
if (r1::attach(*this)) {
mark_initialized();
Expand All @@ -311,21 +387,32 @@ class task_arena : public task_arena_base {

//! Overrides concurrency level and forces initialization of internal representation
void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
my_max_concurrency = max_concurrency_;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
}

#if __TBB_ARENA_BINDING
void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
priority a_priority = priority::normal)
priority a_priority = priority::normal
#if __TBB_PREVIEW_PARALLEL_PHASE
, leave_policy lp = leave_policy::automatic
#endif
)
{
__TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
if( !is_active() ) {
Expand All @@ -335,6 +422,9 @@ class task_arena : public task_arena_base {
my_max_threads_per_core = constraints_.max_threads_per_core;
my_num_reserved_slots = reserved_for_masters;
my_priority = a_priority;
#if __TBB_PREVIEW_PARALLEL_PHASE
set_leave_policy(lp);
#endif
r1::initialize(*this);
mark_initialized();
}
Expand Down Expand Up @@ -404,6 +494,32 @@ class task_arena : public task_arena_base {
return execute_impl<decltype(f())>(f);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
void start_parallel_phase() {
initialize();
r1::register_parallel_phase(this, /*reserved*/0);
}
void end_parallel_phase(bool with_fast_leave = false) {
__TBB_ASSERT(my_initialization_state.load(std::memory_order_relaxed) == do_once_state::initialized, nullptr);
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::unregister_parallel_phase(this, static_cast<std::uintptr_t>(with_fast_leave));
}

class scoped_parallel_phase {
task_arena& arena;
bool one_time_fast_leave;
public:
scoped_parallel_phase(task_arena& ta, bool with_fast_leave = false)
: arena(ta), one_time_fast_leave(with_fast_leave)
{
arena.start_parallel_phase();
}
~scoped_parallel_phase() {
arena.end_parallel_phase(one_time_fast_leave);
}
};
#endif

#if __TBB_EXTRA_DEBUG
//! Returns my_num_reserved_slots
int debug_reserved_slots() const {
Expand Down Expand Up @@ -472,6 +588,17 @@ inline void enqueue(F&& f) {
enqueue_impl(std::forward<F>(f), nullptr);
}

#if __TBB_PREVIEW_PARALLEL_PHASE
inline void start_parallel_phase() {
r1::register_parallel_phase(nullptr, /*reserved*/0);
}

inline void end_parallel_phase(bool with_fast_leave) {
// It is guaranteed by the standard that conversion of boolean to integral type will result in either 0 or 1
r1::unregister_parallel_phase(nullptr, static_cast<std::uintptr_t>(with_fast_leave));
}
#endif

using r1::submit;

} // namespace d1
Expand All @@ -491,6 +618,11 @@ using detail::d1::max_concurrency;
using detail::d1::isolate;

using detail::d1::enqueue;

#if __TBB_PREVIEW_PARALLEL_PHASE
using detail::d1::start_parallel_phase;
using detail::d1::end_parallel_phase;
#endif
} // namespace this_task_arena

} // inline namespace v1
Expand Down
Loading
Loading