From 4dfaf5e7c3a5d56bcc37a1753203aa9102747d53 Mon Sep 17 00:00:00 2001 From: "Isaev, Ilya" Date: Tue, 17 Dec 2024 17:52:58 +0100 Subject: [PATCH] Address comments Signed-off-by: Isaev, Ilya --- include/oneapi/tbb/task_arena.h | 20 +++++------- src/tbb/arena.cpp | 16 +++++----- src/tbb/arena.h | 21 +++++++------ src/tbb/waiters.h | 6 ++-- test/tbb/test_parallel_phase.cpp | 53 ++++++++++++++++++++++---------- 5 files changed, 68 insertions(+), 48 deletions(-) diff --git a/include/oneapi/tbb/task_arena.h b/include/oneapi/tbb/task_arena.h index 3f0bc5a611..3eb007dcc8 100644 --- a/include/oneapi/tbb/task_arena.h +++ b/include/oneapi/tbb/task_arena.h @@ -118,11 +118,6 @@ namespace d1 { static constexpr unsigned num_priority_levels = 3; static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1); -#if __TBB_PREVIEW_PARALLEL_PHASE -static constexpr int leave_policy_trait_offset = 1; -static constexpr int leave_policy_trait_mask = ~((1 << leave_policy_trait_offset) - 1); -#endif - class task_arena_base { friend struct r1::task_arena_impl; friend void r1::observe(d1::task_scheduler_observer&, bool); @@ -181,23 +176,24 @@ class task_arena_base { } #if __TBB_PREVIEW_PARALLEL_PHASE - int leave_policy_to_traits(leave_policy lp) const { - return static_cast(lp) << leave_policy_trait_offset; + leave_policy get_leave_policy() const { + bool fast_policy_set = (my_version_and_traits & fast_leave_policy_flag) == fast_leave_policy_flag; + return fast_policy_set ? leave_policy::fast : leave_policy::automatic; } - leave_policy get_leave_policy() const { - int underlying_policy_v = (my_version_and_traits & leave_policy_trait_mask) >> leave_policy_trait_offset; - return static_cast(underlying_policy_v); + int leave_policy_to_traits(leave_policy lp) const { + return lp == leave_policy::fast ? fast_leave_policy_flag : 0; } void set_leave_policy(leave_policy lp) { - my_version_and_traits = (my_version_and_traits & ~leave_policy_trait_mask) | leave_policy_to_traits(lp); + my_version_and_traits |= leave_policy_to_traits(lp); } #endif enum { default_flags = 0, - core_type_support_flag = 1 + core_type_support_flag = 1, + fast_leave_policy_flag = 1 << 1 }; task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority diff --git a/src/tbb/arena.cpp b/src/tbb/arena.cpp index 1f37ba9bf5..0791869bf5 100644 --- a/src/tbb/arena.cpp +++ b/src/tbb/arena.cpp @@ -530,8 +530,8 @@ struct task_arena_impl { static int max_concurrency(const d1::task_arena_base*); static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); static d1::slot_id execution_slot(const d1::task_arena_base&); - static void register_parallel_phase(d1::task_arena_base*); - static void unregister_parallel_phase(d1::task_arena_base*, bool); + static void register_parallel_phase(d1::task_arena_base*, std::uintptr_t); + static void unregister_parallel_phase(d1::task_arena_base*, std::uintptr_t); }; void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { @@ -566,12 +566,12 @@ d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::task_arena_base& arena) return task_arena_impl::execution_slot(arena); } -void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) { - task_arena_impl::register_parallel_phase(ta); +void __TBB_EXPORTED_FUNC register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { + task_arena_impl::register_parallel_phase(ta, flags); } void __TBB_EXPORTED_FUNC unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { - task_arena_impl::unregister_parallel_phase(ta, static_cast(flags)); + task_arena_impl::unregister_parallel_phase(ta, flags); } void task_arena_impl::initialize(d1::task_arena_base& ta) { @@ -921,17 +921,17 @@ int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { } #if __TBB_PREVIEW_PARALLEL_PHASE -void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta) { +void task_arena_impl::register_parallel_phase(d1::task_arena_base* ta, std::uintptr_t /*reserved*/) { arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena; __TBB_ASSERT(a, nullptr); a->my_thread_leave.register_parallel_phase(); a->advertise_new_work(); } -void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, bool with_fast_leave) { +void task_arena_impl::unregister_parallel_phase(d1::task_arena_base* ta, std::uintptr_t flags) { arena* a = ta ? ta->my_arena.load(std::memory_order_relaxed) : governor::get_thread_data()->my_arena; __TBB_ASSERT(a, nullptr); - a->my_thread_leave.unregister_parallel_phase(with_fast_leave); + a->my_thread_leave.unregister_parallel_phase(/*with_fast_leave=*/static_cast(flags)); } #endif diff --git a/src/tbb/arena.h b/src/tbb/arena.h index fcc6795109..72cfc7feda 100644 --- a/src/tbb/arena.h +++ b/src/tbb/arena.h @@ -210,9 +210,9 @@ class thread_leave_manager { // Indicate start of parallel phase in the state machine void register_parallel_phase() { - __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uint64_t prev = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(prev != 0, "The initial state was not set"); + std::uint64_t desired{}; do { // Need to add a reference for this start of a parallel phase, preserving the leave @@ -224,17 +224,20 @@ class thread_leave_manager { // of new parallel phase, it should be transitioned to "Delayed leave" desired = DELAYED_LEAVE; } + __TBB_ASSERT(desired + PARALLEL_PHASE > desired, "Overflow detected"); desired += PARALLEL_PHASE; // Take into account this start of a parallel phase } while (!my_state.compare_exchange_strong(prev, desired)); } // Indicate the end of parallel phase in the state machine void unregister_parallel_phase(bool enable_fast_leave) { - __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uint64_t prev = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(prev != 0, "The initial state was not set"); + std::uint64_t desired{}; do { + __TBB_ASSERT(prev - PARALLEL_PHASE < prev, + "A call to unregister without its register complement"); desired = prev - PARALLEL_PHASE; // Mark the end of this phase in reference counter if (enable_fast_leave && /*it was the last parallel phase*/desired == DELAYED_LEAVE) { desired = ONE_TIME_FAST_LEAVE; @@ -243,9 +246,9 @@ class thread_leave_manager { } bool is_retention_allowed() { - __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != 0, "The initial state was not set"); - std::uint64_t curr = my_state.load(std::memory_order_relaxed); + __TBB_ASSERT(curr != 0, "The initial state was not set"); + return curr != FAST_LEAVE && curr != ONE_TIME_FAST_LEAVE; } }; @@ -360,7 +363,7 @@ class arena: public padded //! Constructor arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level #if __TBB_PREVIEW_PARALLEL_PHASE - , tbb::task_arena::leave_policy wl + , tbb::task_arena::leave_policy lp #endif ); @@ -368,7 +371,7 @@ class arena: public padded static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level #if __TBB_PREVIEW_PARALLEL_PHASE - , tbb::task_arena::leave_policy wl + , tbb::task_arena::leave_policy lp #endif ); @@ -376,7 +379,7 @@ class arena: public padded unsigned arena_priority_level, d1::constraints constraints = d1::constraints{} #if __TBB_PREVIEW_PARALLEL_PHASE - , tbb::task_arena::leave_policy wl = tbb::task_arena::leave_policy::automatic + , tbb::task_arena::leave_policy lp = tbb::task_arena::leave_policy::automatic #endif ); diff --git a/src/tbb/waiters.h b/src/tbb/waiters.h index a217933b7d..e3248bb77f 100644 --- a/src/tbb/waiters.h +++ b/src/tbb/waiters.h @@ -61,7 +61,7 @@ class outermost_worker_waiter : public waiter_base { if (is_delayed_leave_enabled()) { static constexpr std::chrono::microseconds worker_wait_leave_duration(1000); static_assert(worker_wait_leave_duration > std::chrono::steady_clock::duration(1), - "Clock resolution is not enough for measured interval."); + "Clock resolution is not enough for measured interval."); for (auto t1 = std::chrono::steady_clock::now(), t2 = t1; std::chrono::duration_cast(t2 - t1) < worker_wait_leave_duration; @@ -71,7 +71,9 @@ class outermost_worker_waiter : public waiter_base { return true; } - if (!my_arena.my_thread_leave.is_retention_allowed() || my_arena.my_threading_control->is_any_other_client_active()) { + if (!my_arena.my_thread_leave.is_retention_allowed() || + my_arena.my_threading_control->is_any_other_client_active()) + { break; } d0::yield(); diff --git a/test/tbb/test_parallel_phase.cpp b/test/tbb/test_parallel_phase.cpp index df6119bfed..8b5f9404fa 100644 --- a/test/tbb/test_parallel_phase.cpp +++ b/test/tbb/test_parallel_phase.cpp @@ -12,7 +12,7 @@ struct dummy_func { } }; -template +template std::size_t measure_median_start_time(tbb::task_arena* ta, const F1& start = F1{}, const F2& end = F2{}) { std::size_t num_threads = ta ? ta->max_concurrency() : tbb::this_task_arena::max_concurrency(); std::size_t num_runs = 1000; @@ -120,42 +120,62 @@ class start_time_collection_sequenced_phases : public start_time_collection_base { using base = start_time_collection_base; - using base::base; friend base; + bool with_fast_leave; + std::size_t measure_impl() { std::size_t median_start_time; if (arena) { median_start_time = measure_median_start_time(arena, [this] { arena->start_parallel_phase(); }, - [this] { arena->end_parallel_phase(/*with_fast_leave=*/true); }); + [this] { arena->end_parallel_phase(with_fast_leave); }); } else { median_start_time = measure_median_start_time(arena, [] { tbb::this_task_arena::start_parallel_phase(); }, - [] { tbb::this_task_arena::end_parallel_phase(/*with_fast_leave=*/true); }); + [this] { tbb::this_task_arena::end_parallel_phase(with_fast_leave); }); } return median_start_time; }; + +public: + start_time_collection_sequenced_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) : + base(ta, ntrials), with_fast_leave(fast_leave) + {} + + explicit start_time_collection_sequenced_phases(std::size_t ntrials, bool fast_leave = false) : + base(ntrials), with_fast_leave(fast_leave) + {} }; class start_time_collection_sequenced_scoped_phases : public start_time_collection_base { using base = start_time_collection_base; - using base::base; friend base; + bool with_fast_leave; + std::size_t measure_impl() { tbb::task_arena::scoped_parallel_phase* phase = nullptr; auto median_start_time = measure_median_start_time(arena, [this, &phase] { - phase = new tbb::task_arena::scoped_parallel_phase{*arena, /*with_fast_leave=*/true}; + phase = new tbb::task_arena::scoped_parallel_phase{*arena, with_fast_leave}; }, [&phase] { delete phase; }); return median_start_time; }; + +public: + start_time_collection_sequenced_scoped_phases(tbb::task_arena& ta, std::size_t ntrials, bool fast_leave = false) : + base(ta, ntrials), with_fast_leave(fast_leave) + {} + + explicit start_time_collection_sequenced_scoped_phases(std::size_t ntrials, bool fast_leave = false) : + base(ntrials), with_fast_leave(fast_leave) + {} }; //! \brief \ref interface \ref requirement @@ -170,8 +190,8 @@ TEST_CASE("Check that workers leave faster with leave_policy::fast") { tbb::task_arena::priority::normal, tbb::task_arena::leave_policy::fast }; - start_time_collection st_collector1{ta_automatic_leave, /*num_trials=*/20}; - start_time_collection st_collector2{ta_fast_leave, /*num_trials=*/20}; + start_time_collection st_collector1{ta_automatic_leave, /*num_trials=*/10}; + start_time_collection st_collector2{ta_fast_leave, /*num_trials=*/10}; auto times_automatic = st_collector1.measure(); auto times_fast = st_collector2.measure(); @@ -195,9 +215,9 @@ TEST_CASE("Parallel Phase retains workers in task_arena") { tbb::task_arena::priority::normal, tbb::task_arena::leave_policy::fast }; - start_time_collection_phase_wrapped st_collector1{ta_fast1, /*num_trials=*/20}; - start_time_collection_scoped_phase_wrapped st_collector_scoped{ta_fast1, /*num_trials=*/20}; - start_time_collection st_collector2{ta_fast2, /*num_trials=*/20}; + start_time_collection_phase_wrapped st_collector1{ta_fast1, /*num_trials=*/10}; + start_time_collection_scoped_phase_wrapped st_collector_scoped{ta_fast1, /*num_trials=*/10}; + start_time_collection st_collector2{ta_fast2, /*num_trials=*/10}; auto times1 = st_collector1.measure(); auto times2 = st_collector2.measure(); @@ -218,9 +238,9 @@ TEST_CASE("Parallel Phase retains workers in task_arena") { TEST_CASE("Test one-time fast leave") { tbb::task_arena ta1{}; tbb::task_arena ta2{}; - start_time_collection st_collector1{ta1, /*num_trials=*/10}; - start_time_collection_sequenced_phases st_collector2{ta2, /*num_trials=*/10}; - start_time_collection_sequenced_scoped_phases st_collector_scoped{ta2, /*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector1{ta1, /*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector2{ta2, /*num_trials=*/10, /*fast_leave*/true}; + start_time_collection_sequenced_scoped_phases st_collector_scoped{ta2, /*num_trials=*/10, /*fast_leave*/true}; auto times1 = st_collector1.measure(); auto times2 = st_collector2.measure(); @@ -239,8 +259,8 @@ TEST_CASE("Test one-time fast leave") { //! \brief \ref interface \ref requirement TEST_CASE("Test parallel phase with this_task_arena") { - start_time_collection st_collector1{/*num_trials=*/10}; - start_time_collection_sequenced_phases st_collector2{/*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector1{/*num_trials=*/10}; + start_time_collection_sequenced_phases st_collector2{/*num_trials=*/10, /*fast_leave*/true}; auto times1 = st_collector1.measure(); auto times2 = st_collector2.measure(); @@ -251,4 +271,3 @@ TEST_CASE("Test parallel phase with this_task_arena") { WARN_MESSAGE(median1 < median2, "Expected one-time fast leave setting to slow workers to start new work"); } -