Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Feature/use nonpriority pool (#570)
Browse files Browse the repository at this point in the history
* new thread pool

* fix

* wip

* add static
  • Loading branch information
Dronplane committed Nov 17, 2023
1 parent 43f6ba1 commit 43bb8ff
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 193 deletions.
95 changes: 65 additions & 30 deletions core/utils/async_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,32 @@ void busywait_mutex::unlock() noexcept {
locked_.store(false, std::memory_order_release);
}

thread_pool::thread_pool(
template<bool UsePriority>
thread_pool<UsePriority>::thread_pool(
size_t max_threads /*= 0*/, size_t max_idle /*= 0*/,
std::basic_string_view<native_char_t> worker_name /*= ""*/)
basic_string_view<native_char_t> worker_name /*= ""*/)
: shared_state_(std::make_shared<shared_state>()),
max_idle_(max_idle),
max_threads_(max_threads),
worker_name_(worker_name) {}

thread_pool::~thread_pool() {
template<bool UsePriority>
thread_pool<UsePriority>::~thread_pool() {
try {
stop(true);
} catch (...) {
}
}

size_t thread_pool::max_idle() const {
template<bool UsePriority>
size_t thread_pool<UsePriority>::max_idle() const {
std::lock_guard lock{shared_state_->lock};

return max_idle_;
}

void thread_pool::max_idle(size_t value) {
template<bool UsePriority>
void thread_pool<UsePriority>::max_idle(size_t value) {
auto& state = *shared_state_;

{
Expand All @@ -88,7 +92,8 @@ void thread_pool::max_idle(size_t value) {
state.cond.notify_all(); // wake any idle threads if they need termination
}

void thread_pool::max_idle_delta(int delta) {
template<bool UsePriority>
void thread_pool<UsePriority>::max_idle_delta(int delta) {
auto& state = *shared_state_;

{
Expand All @@ -107,13 +112,15 @@ void thread_pool::max_idle_delta(int delta) {
state.cond.notify_all(); // wake any idle threads if they need termination
}

size_t thread_pool::max_threads() const {
template<bool UsePriority>
size_t thread_pool<UsePriority>::max_threads() const {
std::lock_guard lock{shared_state_->lock};

return max_threads_;
}

void thread_pool::max_threads(size_t value) {
template<bool UsePriority>
void thread_pool<UsePriority>::max_threads(size_t value) {
auto& state = *shared_state_;

{
Expand All @@ -129,7 +136,8 @@ void thread_pool::max_threads(size_t value) {
state.cond.notify_all(); // wake any idle threads if they need termination
}

void thread_pool::max_threads_delta(int delta) {
template<bool UsePriority>
void thread_pool<UsePriority>::max_threads_delta(int delta) {
auto& state = *shared_state_;

{
Expand All @@ -152,8 +160,9 @@ void thread_pool::max_threads_delta(int delta) {
state.cond.notify_all(); // wake any idle threads if they need termination
}

bool thread_pool::run(std::function<void()>&& fn,
clock_t::duration delay /*=0*/) {
template<bool UsePriority>
bool thread_pool<UsePriority>::run(thread_pool<UsePriority>::func_t&& fn,
clock_t::duration delay /*=0*/) {
if (!fn) {
return false;
}
Expand All @@ -166,8 +175,11 @@ bool thread_pool::run(std::function<void()>&& fn,
if (State::RUN != state.state.load()) {
return false; // pool not active
}

queue_.emplace(std::move(fn), clock_t::now() + delay);
if constexpr (UsePriority) {
queue_.emplace(std::move(fn), clock_t::now() + delay);
} else {
queue_.emplace(std::move(fn));
}

try {
maybe_spawn_worker();
Expand All @@ -185,7 +197,8 @@ bool thread_pool::run(std::function<void()>&& fn,
return true;
}

void thread_pool::stop(bool skip_pending /*= false*/) {
template<bool UsePriority>
void thread_pool<UsePriority>::stop(bool skip_pending /*= false*/) {
shared_state_->state.store(skip_pending ? State::ABORT : State::FINISH);

decltype(queue_) empty;
Expand All @@ -202,7 +215,8 @@ void thread_pool::stop(bool skip_pending /*= false*/) {
}
}

void thread_pool::limits(size_t max_threads, size_t max_idle) {
template<bool UsePriority>
void thread_pool<UsePriority>::limits(size_t max_threads, size_t max_idle) {
auto& state = *shared_state_;

{
Expand All @@ -219,7 +233,8 @@ void thread_pool::limits(size_t max_threads, size_t max_idle) {
state.cond.notify_all(); // wake any idle threads if they need termination
}

bool thread_pool::maybe_spawn_worker() {
template<bool UsePriority>
bool thread_pool<UsePriority>::maybe_spawn_worker() {
IRS_ASSERT(!shared_state_->lock.try_lock()); // lock must be held

// create extra thread if all threads are busy and can grow pool
Expand All @@ -237,37 +252,44 @@ bool thread_pool::maybe_spawn_worker() {
return false;
}

std::pair<size_t, size_t> thread_pool::limits() const {
template<bool UsePriority>
std::pair<size_t, size_t> thread_pool<UsePriority>::limits() const {
std::lock_guard lock{shared_state_->lock};

return {max_threads_, max_idle_};
}

std::tuple<size_t, size_t, size_t> thread_pool::stats() const {
template<bool UsePriority>
std::tuple<size_t, size_t, size_t> thread_pool<UsePriority>::stats() const {
std::lock_guard lock{shared_state_->lock};

return {active_, queue_.size(), threads_.load()};
}

size_t thread_pool::tasks_active() const {
template<bool UsePriority>
size_t thread_pool<UsePriority>::tasks_active() const {
std::lock_guard lock{shared_state_->lock};

return active_;
}

size_t thread_pool::tasks_pending() const {
template<bool UsePriority>
size_t thread_pool<UsePriority>::tasks_pending() const {
std::lock_guard lock{shared_state_->lock};

return queue_.size();
}

size_t thread_pool::threads() const {
template<bool UsePriority>
size_t thread_pool<UsePriority>::threads() const {
std::lock_guard lock{shared_state_->lock};

return threads_.load();
}

void thread_pool::worker(std::shared_ptr<shared_state> shared_state) noexcept {
template<bool UsePriority>
void thread_pool<UsePriority>::worker(
std::shared_ptr<shared_state> shared_state) noexcept {
// hold a reference to 'shared_state_' ensure state is still alive
if (!worker_name_.empty()) {
set_thread_name(worker_name_.c_str());
Expand All @@ -290,20 +312,27 @@ void thread_pool::worker(std::shared_ptr<shared_state> shared_state) noexcept {
}
}

void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
std::shared_ptr<shared_state> shared_state) {
template<bool UsePriority>
void thread_pool<UsePriority>::worker_impl(
std::unique_lock<std::mutex>& lock,
std::shared_ptr<shared_state> shared_state) {
auto& state = shared_state->state;

lock.lock();

while (State::ABORT != state.load() && threads_.load() <= max_threads_) {
IRS_ASSERT(lock.owns_lock());
if (!queue_.empty()) {
if (const auto& top = queue_.top(); top.at <= clock_t::now()) {
func_t fn;
fn.swap(const_cast<func_t&>(top.fn));
auto& top = next();
bool proceed = true;
if constexpr (UsePriority) {
if (top.at > clock_t::now()) {
proceed = false;
}
}
if (proceed) {
func_t fn = std::move(func(top));
queue_.pop();

++active_;
Finally decrement = [this]() noexcept { --active_; };
// if have more tasks but no idle thread and can grow pool
Expand Down Expand Up @@ -340,8 +369,11 @@ void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
(idle <= max_idle_ || (!queue_.empty() && threads_.load() == 1))) {
if (const auto run_state = state.load();
!queue_.empty() && State::ABORT != run_state) {
const auto at = queue_.top().at; // queue_ might be modified
shared_state->cond.wait_until(lock, at);
IRS_ASSERT(UsePriority);
if constexpr (UsePriority) {
const auto at = queue_.top().at; // queue_ might be modified
shared_state->cond.wait_until(lock, at);
}
} else if (State::RUN == run_state) {
IRS_ASSERT(queue_.empty());
shared_state->cond.wait(lock);
Expand All @@ -355,5 +387,8 @@ void thread_pool::worker_impl(std::unique_lock<std::mutex>& lock,
}
}

template class thread_pool<true>;
template class thread_pool<false>;

} // namespace async_utils
} // namespace irs
30 changes: 25 additions & 5 deletions core/utils/async_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <atomic>
#include <condition_variable>
#include <function2/function2.hpp>
#include <functional>
#include <queue>
#include <thread>
Expand All @@ -51,11 +52,12 @@ class busywait_mutex final {
std::atomic<bool> locked_{false};
};

template<bool UsePriority = true>
class thread_pool {
public:
using native_char_t = std::remove_pointer_t<thread_name_t>;
using clock_t = std::chrono::steady_clock;
using func_t = std::function<void()>;
using func_t = fu2::unique_function<void()>;

explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0,
std::basic_string_view<native_char_t> worker_name =
Expand All @@ -72,7 +74,7 @@ class thread_pool {
std::pair<size_t, size_t> limits() const;
void limits(size_t max_threads, size_t max_idle);

bool run(std::function<void()>&& fn, clock_t::duration delay = {});
bool run(func_t&& fn, [[maybe_unused]] clock_t::duration delay = {});
void stop(bool skip_pending = false); // always a blocking call
size_t tasks_active() const;
size_t tasks_pending() const;
Expand All @@ -83,14 +85,31 @@ class thread_pool {
private:
enum class State { ABORT, FINISH, RUN };

auto& next() {
if constexpr (UsePriority) {
return queue_.top();
} else {
return queue_.front();
}
}

template<typename T>
static func_t& func(T& t) {
if constexpr (UsePriority) {
return const_cast<func_t&>(t.fn);
} else {
return const_cast<func_t&>(t);
}
}

struct shared_state {
std::mutex lock;
std::condition_variable cond;
std::atomic<State> state{State::RUN};
};

struct task {
explicit task(std::function<void()>&& fn, clock_t::time_point at)
explicit task(func_t&& fn, clock_t::time_point at)
: at(at), fn(std::move(fn)) {}

clock_t::time_point at;
Expand All @@ -109,8 +128,9 @@ class thread_pool {
std::atomic<size_t> threads_{0};
size_t max_idle_;
size_t max_threads_;
std::priority_queue<task> queue_;
std::basic_string<native_char_t> worker_name_;
std::conditional_t<UsePriority, std::priority_queue<task>, std::queue<func_t>>
queue_;
basic_string<native_char_t> worker_name_;
};

} // namespace async_utils
Expand Down
Loading

0 comments on commit 43bb8ff

Please sign in to comment.