Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Isaev, Ilya <ilya.isaev@intel.com>
  • Loading branch information
isaevil committed Dec 18, 2024
1 parent 398d16b commit 4dfaf5e
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 48 deletions.
20 changes: 8 additions & 12 deletions include/oneapi/tbb/task_arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,6 @@ namespace d1 {
static constexpr unsigned num_priority_levels = 3;
static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1);

#if __TBB_PREVIEW_PARALLEL_PHASE
static constexpr int leave_policy_trait_offset = 1;
static constexpr int leave_policy_trait_mask = ~((1 << leave_policy_trait_offset) - 1);
#endif

class task_arena_base {
friend struct r1::task_arena_impl;
friend void r1::observe(d1::task_scheduler_observer&, bool);
Expand Down Expand Up @@ -181,23 +176,24 @@ class task_arena_base {
}

#if __TBB_PREVIEW_PARALLEL_PHASE
int leave_policy_to_traits(leave_policy lp) const {
return static_cast<int>(lp) << leave_policy_trait_offset;
leave_policy get_leave_policy() const {
bool fast_policy_set = (my_version_and_traits & fast_leave_policy_flag) == fast_leave_policy_flag;
return fast_policy_set ? leave_policy::fast : leave_policy::automatic;
}

leave_policy get_leave_policy() const {
int underlying_policy_v = (my_version_and_traits & leave_policy_trait_mask) >> leave_policy_trait_offset;
return static_cast<leave_policy>(underlying_policy_v);
int leave_policy_to_traits(leave_policy lp) const {
return lp == leave_policy::fast ? fast_leave_policy_flag : 0;
}

void set_leave_policy(leave_policy lp) {
my_version_and_traits = (my_version_and_traits & ~leave_policy_trait_mask) | leave_policy_to_traits(lp);
my_version_and_traits |= leave_policy_to_traits(lp);
}
#endif

enum {
default_flags = 0,
core_type_support_flag = 1
core_type_support_flag = 1,
fast_leave_policy_flag = 1 << 1
};

task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority
Expand Down
16 changes: 8 additions & 8 deletions src/tbb/arena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ struct task_arena_impl {
static int max_concurrency(const d1::task_arena_base*);
static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
static d1::slot_id execution_slot(const d1::task_arena_base&);
static void register_parallel_phase(d1::task_arena_base*);
static void unregister_parallel_phase(d1::task_arena_base*, bool);
static void register_parallel_phase(d1::task_arena_base*, std::uintptr_t);
static void unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t);
};

void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
Expand Down Expand Up @@ -566,12 +566,12 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena)
return task_arena_impl::execution_slot(arena);
}

void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) {
task_arena_impl::register_parallel_phase(ta);
void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::register_parallel_phase(ta, flags);
}

void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
task_arena_impl::unregister_parallel_phase(ta, static_cast<bool>(flags));
task_arena_impl::unregister_parallel_phase(ta, flags);
}

void task_arena_impl::initialize(d1::task_arena_base& ta) {
Expand Down Expand Up @@ -921,17 +921,17 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
}

#if __TBB_PREVIEW_PARALLEL_PHASE
void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta) {
void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) {
arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena;
__TBB_ASSERT(a, nullptr);
a->my_thread_leave.register_parallel_phase();
a->advertise_new_work<arena::work_enqueued>();
}

void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, bool with_fast_leave) {
void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) {
arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena;
__TBB_ASSERT(a, nullptr);
a->my_thread_leave.unregister_parallel_phase(with_fast_leave);
a->my_thread_leave.unregister_parallel_phase(/*with_fast_leave=*/static_cast<bool>(flags));
}
#endif

Expand Down
21 changes: 12 additions & 9 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ class thread_leave_manager {

// Indicate start of parallel phase in the state machine
void register_parallel_phase() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uint64_t prev = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(prev != 0, "The initial state was not set");

std::uint64_t desired{};
do {
// Need to add a reference for this start of a parallel phase, preserving the leave
Expand All @@ -224,17 +224,20 @@ class thread_leave_manager {
// of new parallel phase, it should be transitioned to "Delayed leave"
desired = DELAYED_LEAVE;
}
__TBB_ASSERT(desired + PARALLEL_PHASE > desired, "Overflow detected");
desired += PARALLEL_PHASE; // Take into account this start of a parallel phase
} while (!my_state.compare_exchange_strong(prev, desired));
}

// Indicate the end of parallel phase in the state machine
void unregister_parallel_phase(bool enable_fast_leave) {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uint64_t prev = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(prev != 0, "The initial state was not set");

std::uint64_t desired{};
do {
__TBB_ASSERT(prev - PARALLEL_PHASE < prev,
"A call to unregister without its register complement");
desired = prev - PARALLEL_PHASE; // Mark the end of this phase in reference counter
if (enable_fast_leave && /*it was the last parallel phase*/desired == DELAYED_LEAVE) {
desired = ONE_TIME_FAST_LEAVE;
Expand All @@ -243,9 +246,9 @@ class thread_leave_manager {
}

bool is_retention_allowed() {
__TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set");

std::uint64_t curr = my_state.load(std::memory_order_relaxed);
__TBB_ASSERT(curr != 0, "The initial state was not set");

return curr != FAST_LEAVE && curr != ONE_TIME_FAST_LEAVE;
}
};
Expand Down Expand Up @@ -360,23 +363,23 @@ class arena: public padded<arena_base>
//! Constructor
arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level
#if __TBB_PREVIEW_PARALLEL_PHASE
, tbb::task_arena::leave_policy wl
, tbb::task_arena::leave_policy lp
#endif
);

//! Allocate an instance of arena.
static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
unsigned priority_level
#if __TBB_PREVIEW_PARALLEL_PHASE
, tbb::task_arena::leave_policy wl
, tbb::task_arena::leave_policy lp
#endif
);

static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
unsigned arena_priority_level,
d1::constraints constraints = d1::constraints{}
#if __TBB_PREVIEW_PARALLEL_PHASE
, tbb::task_arena::leave_policy wl = tbb::task_arena::leave_policy::automatic
, tbb::task_arena::leave_policy lp = tbb::task_arena::leave_policy::automatic
#endif
);

Expand Down
6 changes: 4 additions & 2 deletions src/tbb/waiters.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class outermost_worker_waiter : public waiter_base {
if (is_delayed_leave_enabled()) {
static constexpr std::chrono::microseconds worker_wait_leave_duration(1000);
static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1),
"Clock resolution is not enough for measured interval.");
"Clock resolution is not enough for measured interval.");

for (auto t1 = std::chrono::steady_clock::now(), t2 = t1;
std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1) < worker_wait_leave_duration;
Expand All @@ -71,7 +71,9 @@ class outermost_worker_waiter : public waiter_base {
return true;
}

if (!my_arena.my_thread_leave.is_retention_allowed() || my_arena.my_threading_control->is_any_other_client_active()) {
if (!my_arena.my_thread_leave.is_retention_allowed() ||
my_arena.my_threading_control->is_any_other_client_active())
{
break;
}
d0::yield();
Expand Down
53 changes: 36 additions & 17 deletions test/tbb/test_parallel_phase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct dummy_func {
}
};

template <typename F1 = dummy_func, typename F2 = dummy_func>
template <typename F1 = dummy_func, typename F2 = dummy_func>
std::size_t measure_median_start_time(tbb::task_arena* ta, const F1& start = F1{}, const F2& end = F2{}) {
std::size_t num_threads = ta ? ta->max_concurrency() : tbb::this_task_arena::max_concurrency();
std::size_t num_runs = 1000;
Expand Down Expand Up @@ -120,42 +120,62 @@ class start_time_collection_sequenced_phases
: public start_time_collection_base<start_time_collection_sequenced_phases>
{
using base = start_time_collection_base<start_time_collection_sequenced_phases>;
using base::base;
friend base;

bool with_fast_leave;

std::size_t measure_impl() {
std::size_t median_start_time;
if (arena) {
median_start_time = measure_median_start_time(arena,
[this] { arena->start_parallel_phase(); },
[this] { arena->end_parallel_phase(/*with_fast_leave=*/true); });
[this] { arena->end_parallel_phase(with_fast_leave); });
} else {
median_start_time = measure_median_start_time(arena,
[] { tbb::this_task_arena::start_parallel_phase(); },
[] { tbb::this_task_arena::end_parallel_phase(/*with_fast_leave=*/true); });
[this] { tbb::this_task_arena::end_parallel_phase(with_fast_leave); });
}
return median_start_time;
};

public:
start_time_collection_sequenced_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) :
base(ta, ntrials), with_fast_leave(fast_leave)
{}

explicit start_time_collection_sequenced_phases(std::size_t ntrials, bool fast_leave = false) :
base(ntrials), with_fast_leave(fast_leave)
{}
};

class start_time_collection_sequenced_scoped_phases
: public start_time_collection_base<start_time_collection_sequenced_scoped_phases>
{
using base = start_time_collection_base<start_time_collection_sequenced_scoped_phases>;
using base::base;
friend base;

bool with_fast_leave;

std::size_t measure_impl() {
tbb::task_arena::scoped_parallel_phase* phase = nullptr;
auto median_start_time = measure_median_start_time(arena,
[this, &phase] {
phase = new tbb::task_arena::scoped_parallel_phase{*arena, /*with_fast_leave=*/true};
phase = new tbb::task_arena::scoped_parallel_phase{*arena, with_fast_leave};
},
[&phase] {
delete phase;
});
return median_start_time;
};

public:
start_time_collection_sequenced_scoped_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) :
base(ta, ntrials), with_fast_leave(fast_leave)
{}

explicit start_time_collection_sequenced_scoped_phases(std::size_t ntrials, bool fast_leave = false) :
base(ntrials), with_fast_leave(fast_leave)
{}
};

//! \brief \ref interface \ref requirement
Expand All @@ -170,8 +190,8 @@ TEST_CASE("Check that workers leave faster with leave_policy::fast") {
tbb::task_arena::priority::normal,
tbb::task_arena::leave_policy::fast
};
start_time_collection st_collector1{ta_automatic_leave, /*num_trials=*/20};
start_time_collection st_collector2{ta_fast_leave, /*num_trials=*/20};
start_time_collection st_collector1{ta_automatic_leave, /*num_trials=*/10};
start_time_collection st_collector2{ta_fast_leave, /*num_trials=*/10};

auto times_automatic = st_collector1.measure();
auto times_fast = st_collector2.measure();
Expand All @@ -195,9 +215,9 @@ TEST_CASE("Parallel Phase retains workers in task_arena") {
tbb::task_arena::priority::normal,
tbb::task_arena::leave_policy::fast
};
start_time_collection_phase_wrapped st_collector1{ta_fast1, /*num_trials=*/20};
start_time_collection_scoped_phase_wrapped st_collector_scoped{ta_fast1, /*num_trials=*/20};
start_time_collection st_collector2{ta_fast2, /*num_trials=*/20};
start_time_collection_phase_wrapped st_collector1{ta_fast1, /*num_trials=*/10};
start_time_collection_scoped_phase_wrapped st_collector_scoped{ta_fast1, /*num_trials=*/10};
start_time_collection st_collector2{ta_fast2, /*num_trials=*/10};

auto times1 = st_collector1.measure();
auto times2 = st_collector2.measure();
Expand All @@ -218,9 +238,9 @@ TEST_CASE("Parallel Phase retains workers in task_arena") {
TEST_CASE("Test one-time fast leave") {
tbb::task_arena ta1{};
tbb::task_arena ta2{};
start_time_collection st_collector1{ta1, /*num_trials=*/10};
start_time_collection_sequenced_phases st_collector2{ta2, /*num_trials=*/10};
start_time_collection_sequenced_scoped_phases st_collector_scoped{ta2, /*num_trials=*/10};
start_time_collection_sequenced_phases st_collector1{ta1, /*num_trials=*/10};
start_time_collection_sequenced_phases st_collector2{ta2, /*num_trials=*/10, /*fast_leave*/true};
start_time_collection_sequenced_scoped_phases st_collector_scoped{ta2, /*num_trials=*/10, /*fast_leave*/true};

auto times1 = st_collector1.measure();
auto times2 = st_collector2.measure();
Expand All @@ -239,8 +259,8 @@ TEST_CASE("Test one-time fast leave") {

//! \brief \ref interface \ref requirement
TEST_CASE("Test parallel phase with this_task_arena") {
start_time_collection st_collector1{/*num_trials=*/10};
start_time_collection_sequenced_phases st_collector2{/*num_trials=*/10};
start_time_collection_sequenced_phases st_collector1{/*num_trials=*/10};
start_time_collection_sequenced_phases st_collector2{/*num_trials=*/10, /*fast_leave*/true};

auto times1 = st_collector1.measure();
auto times2 = st_collector2.measure();
Expand All @@ -251,4 +271,3 @@ TEST_CASE("Test parallel phase with this_task_arena") {
WARN_MESSAGE(median1 < median2,
"Expected one-time fast leave setting to slow workers to start new work");
}

0 comments on commit 4dfaf5e

Please sign in to comment.