Skip to content

Commit

Permalink
Reworked wait_contex approach + scalability improvement in task_group (
Browse files Browse the repository at this point in the history
…#1345)

Signed-off-by: pavelkumbrasev <pavel.kumbrasev@intel.com>
Co-authored-by: Konstantin Boyarinov <konstantin.boyarinov@intel.com>
Co-authored-by: Aleksei Fedotov <aleksei.fedotov@intel.com>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent bad4c42 commit 1f52f50
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 110 deletions.
25 changes: 23 additions & 2 deletions include/oneapi/tbb/collaborative_call_once.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 Intel Corporation
Copyright (c) 2021-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,27 @@ namespace d1 {
#pragma warning (disable: 4324)
#endif

template <typename F>
class collaborative_call_stack_task : public task {
const F& m_func;
wait_context& m_wait_ctx;

void finalize() {
m_wait_ctx.release();
}
task* execute(d1::execution_data&) override {
task* res = d2::task_ptr_or_nullptr(m_func);
finalize();
return res;
}
task* cancel(d1::execution_data&) override {
finalize();
return nullptr;
}
public:
collaborative_call_stack_task(const F& f, wait_context& wctx) : m_func(f), m_wait_ctx(wctx) {}
};

constexpr std::uintptr_t collaborative_once_max_references = max_nfs_size;
constexpr std::uintptr_t collaborative_once_references_mask = collaborative_once_max_references-1;

Expand Down Expand Up @@ -103,7 +124,7 @@ class alignas(max_nfs_size) collaborative_once_runner : no_copy {
task_group_context context{ task_group_context::bound,
task_group_context::default_traits | task_group_context::concurrent_wait };

function_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };
collaborative_call_stack_task<F> t{ std::forward<F>(f), m_storage.m_wait_context };

// Set the ready flag after entering the execute body to prevent
// moonlighting threads from occupying all slots inside the arena.
Expand Down
82 changes: 79 additions & 3 deletions include/oneapi/tbb/detail/_task.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2023 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,12 @@ class task;
class wait_context;
class task_group_context;
struct execution_data;
class wait_tree_vertex_interface;
}

namespace d2 {
class task_group;
class task_group_base;
}

namespace r1 {
Expand All @@ -53,6 +59,7 @@ TBB_EXPORT void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group
TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::wait_context&, d1::task_group_context& ctx);
TBB_EXPORT d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data*);
TBB_EXPORT d1::task_group_context* __TBB_EXPORTED_FUNC current_context();
TBB_EXPORT d1::wait_tree_vertex_interface* get_thread_reference_vertex(d1::wait_tree_vertex_interface* wc);

// Do not place under __TBB_RESUMABLE_TASKS. It is a stub for unsupported platforms.
struct suspend_point_type;
Expand Down Expand Up @@ -124,8 +131,7 @@ class wait_context {
friend class r1::thread_data;
friend class r1::task_dispatcher;
friend class r1::external_waiter;
friend class task_group;
friend class task_group_base;
friend class wait_context_vertex;
friend struct r1::task_arena_impl;
friend struct r1::suspend_point_type;
public:
Expand All @@ -147,6 +153,76 @@ class wait_context {
}
};

class wait_tree_vertex_interface {
public:
virtual void reserve(std::uint32_t delta = 1) = 0;
virtual void release(std::uint32_t delta = 1) = 0;

protected:
virtual ~wait_tree_vertex_interface() = default;
};

class wait_context_vertex : public wait_tree_vertex_interface {
public:
wait_context_vertex(std::uint32_t ref = 0) : m_wait(ref) {}

void reserve(std::uint32_t delta = 1) override {
m_wait.reserve(delta);
}

void release(std::uint32_t delta = 1) override {
m_wait.release(delta);
}

wait_context& get_context() {
return m_wait;
}
private:
friend class d2::task_group;
friend class d2::task_group_base;

bool continue_execution() const {
return m_wait.continue_execution();
}

wait_context m_wait;
};

class reference_vertex : public wait_tree_vertex_interface {
public:
reference_vertex(wait_tree_vertex_interface* parent, std::uint32_t ref_count) : my_parent{parent}, m_ref_count{ref_count}
{}

void reserve(std::uint32_t delta = 1) override {
if (m_ref_count.fetch_add(static_cast<std::uint64_t>(delta)) == 0) {
my_parent->reserve();
}
}

void release(std::uint32_t delta = 1) override {
std::uint64_t ref = m_ref_count.fetch_sub(static_cast<std::uint64_t>(delta)) - static_cast<std::uint64_t>(delta);
if (ref == 0) {
auto parent = my_parent;
execute_continuation();
destroy();
parent->release();
}
}

std::uint32_t get_num_child() {
return static_cast<std::uint32_t>(m_ref_count.load(std::memory_order_acquire));
}

protected:
virtual void execute_continuation() {}
virtual void destroy() {}
virtual void destroy(const d1::execution_data&) {}

private:
wait_tree_vertex_interface* my_parent;
std::atomic<std::uint64_t> m_ref_count;
};

struct execution_data {
task_group_context* context{};
slot_id original_slot{};
Expand Down
11 changes: 6 additions & 5 deletions include/oneapi/tbb/detail/_task_handle.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2020-2021 Intel Corporation
Copyright (c) 2020-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,7 @@ class task_handle;

class task_handle_task : public d1::task {
std::uint64_t m_version_and_traits{};
d1::wait_context& m_wait_ctx;
d1::wait_tree_vertex_interface* m_wait_tree_vertex;
d1::task_group_context& m_ctx;
d1::small_object_allocator m_allocator;
public:
Expand All @@ -46,15 +46,16 @@ class task_handle_task : public d1::task {
}
}

task_handle_task(d1::wait_context& wo, d1::task_group_context& ctx, d1::small_object_allocator& alloc)
: m_wait_ctx(wo)
task_handle_task(d1::wait_tree_vertex_interface* vertex, d1::task_group_context& ctx, d1::small_object_allocator& alloc)
: m_wait_tree_vertex(vertex)
, m_ctx(ctx)
, m_allocator(alloc) {
suppress_unused_warning(m_version_and_traits);
m_wait_tree_vertex->reserve();
}

~task_handle_task() override {
m_wait_ctx.release();
m_wait_tree_vertex->release();
}

d1::task_group_context& ctx() const { return m_ctx; }
Expand Down
Loading

0 comments on commit 1f52f50

Please sign in to comment.