diff --git a/core/utils/async_utils.cpp b/core/utils/async_utils.cpp index d054c403d..01cc1e137 100644 --- a/core/utils/async_utils.cpp +++ b/core/utils/async_utils.cpp @@ -55,27 +55,32 @@ void busywait_mutex::unlock() noexcept { locked_.store(false, std::memory_order_release); } -thread_pool::thread_pool(size_t max_threads /*= 0*/, size_t max_idle /*= 0*/, - basic_string_view worker_name /*= ""*/) +template +thread_pool::thread_pool( + size_t max_threads /*= 0*/, size_t max_idle /*= 0*/, + basic_string_view worker_name /*= ""*/) : shared_state_(std::make_shared()), max_idle_(max_idle), max_threads_(max_threads), worker_name_(worker_name) {} -thread_pool::~thread_pool() { +template +thread_pool::~thread_pool() { try { stop(true); } catch (...) { } } -size_t thread_pool::max_idle() const { +template +size_t thread_pool::max_idle() const { std::lock_guard lock{shared_state_->lock}; return max_idle_; } -void thread_pool::max_idle(size_t value) { +template +void thread_pool::max_idle(size_t value) { auto& state = *shared_state_; { @@ -87,7 +92,8 @@ void thread_pool::max_idle(size_t value) { state.cond.notify_all(); // wake any idle threads if they need termination } -void thread_pool::max_idle_delta(int delta) { +template +void thread_pool::max_idle_delta(int delta) { auto& state = *shared_state_; { @@ -106,13 +112,15 @@ void thread_pool::max_idle_delta(int delta) { state.cond.notify_all(); // wake any idle threads if they need termination } -size_t thread_pool::max_threads() const { +template +size_t thread_pool::max_threads() const { std::lock_guard lock{shared_state_->lock}; return max_threads_; } -void thread_pool::max_threads(size_t value) { +template +void thread_pool::max_threads(size_t value) { auto& state = *shared_state_; { @@ -128,7 +136,8 @@ void thread_pool::max_threads(size_t value) { state.cond.notify_all(); // wake any idle threads if they need termination } -void thread_pool::max_threads_delta(int delta) { +template +void thread_pool::max_threads_delta(int delta) { auto& state = *shared_state_; { @@ -151,8 +160,9 @@ void thread_pool::max_threads_delta(int delta) { state.cond.notify_all(); // wake any idle threads if they need termination } -bool thread_pool::run(std::function&& fn, - clock_t::duration delay /*=0*/) { +template +bool thread_pool::run(thread_pool::func_t&& fn, + clock_t::duration delay /*=0*/) { if (!fn) { return false; } @@ -165,8 +175,11 @@ bool thread_pool::run(std::function&& fn, if (State::RUN != state.state.load()) { return false; // pool not active } - - queue_.emplace(std::move(fn), clock_t::now() + delay); + if constexpr (UsePriority) { + queue_.emplace(std::move(fn), clock_t::now() + delay); + } else { + queue_.emplace(std::move(fn)); + } try { maybe_spawn_worker(); @@ -184,7 +197,8 @@ bool thread_pool::run(std::function&& fn, return true; } -void thread_pool::stop(bool skip_pending /*= false*/) { +template +void thread_pool::stop(bool skip_pending /*= false*/) { shared_state_->state.store(skip_pending ? State::ABORT : State::FINISH); decltype(queue_) empty; @@ -201,7 +215,8 @@ void thread_pool::stop(bool skip_pending /*= false*/) { } } -void thread_pool::limits(size_t max_threads, size_t max_idle) { +template +void thread_pool::limits(size_t max_threads, size_t max_idle) { auto& state = *shared_state_; { @@ -218,7 +233,8 @@ void thread_pool::limits(size_t max_threads, size_t max_idle) { state.cond.notify_all(); // wake any idle threads if they need termination } -bool thread_pool::maybe_spawn_worker() { +template +bool thread_pool::maybe_spawn_worker() { IRS_ASSERT(!shared_state_->lock.try_lock()); // lock must be held // create extra thread if all threads are busy and can grow pool @@ -236,37 +252,44 @@ bool thread_pool::maybe_spawn_worker() { return false; } -std::pair thread_pool::limits() const { +template +std::pair thread_pool::limits() const { std::lock_guard lock{shared_state_->lock}; return {max_threads_, max_idle_}; } -std::tuple thread_pool::stats() const { +template +std::tuple thread_pool::stats() const { std::lock_guard lock{shared_state_->lock}; return {active_, queue_.size(), threads_.load()}; } -size_t thread_pool::tasks_active() const { +template +size_t thread_pool::tasks_active() const { std::lock_guard lock{shared_state_->lock}; return active_; } -size_t thread_pool::tasks_pending() const { +template +size_t thread_pool::tasks_pending() const { std::lock_guard lock{shared_state_->lock}; return queue_.size(); } -size_t thread_pool::threads() const { +template +size_t thread_pool::threads() const { std::lock_guard lock{shared_state_->lock}; return threads_.load(); } -void thread_pool::worker(std::shared_ptr shared_state) noexcept { +template +void thread_pool::worker( + std::shared_ptr shared_state) noexcept { // hold a reference to 'shared_state_' ensure state is still alive if (!worker_name_.empty()) { set_thread_name(worker_name_.c_str()); @@ -289,8 +312,10 @@ void thread_pool::worker(std::shared_ptr shared_state) noexcept { } } -void thread_pool::worker_impl(std::unique_lock& lock, - std::shared_ptr shared_state) { +template +void thread_pool::worker_impl( + std::unique_lock& lock, + std::shared_ptr shared_state) { auto& state = shared_state->state; lock.lock(); @@ -298,11 +323,16 @@ void thread_pool::worker_impl(std::unique_lock& lock, while (State::ABORT != state.load() && threads_.load() <= max_threads_) { IRS_ASSERT(lock.owns_lock()); if (!queue_.empty()) { - if (const auto& top = queue_.top(); top.at <= clock_t::now()) { - func_t fn; - fn.swap(const_cast(top.fn)); + auto& top = next(); + bool proceed = true; + if constexpr (UsePriority) { + if (top.at > clock_t::now()) { + proceed = false; + } + } + if (proceed) { + func_t fn = std::move(func(top)); queue_.pop(); - ++active_; Finally decrement = [this]() noexcept { --active_; }; // if have more tasks but no idle thread and can grow pool @@ -339,8 +369,11 @@ void thread_pool::worker_impl(std::unique_lock& lock, (idle <= max_idle_ || (!queue_.empty() && threads_.load() == 1))) { if (const auto run_state = state.load(); !queue_.empty() && State::ABORT != run_state) { - const auto at = queue_.top().at; // queue_ might be modified - shared_state->cond.wait_until(lock, at); + IRS_ASSERT(UsePriority); + if constexpr (UsePriority) { + const auto at = queue_.top().at; // queue_ might be modified + shared_state->cond.wait_until(lock, at); + } } else if (State::RUN == run_state) { IRS_ASSERT(queue_.empty()); shared_state->cond.wait(lock); @@ -354,5 +387,8 @@ void thread_pool::worker_impl(std::unique_lock& lock, } } +template class thread_pool; +template class thread_pool; + } // namespace async_utils } // namespace irs diff --git a/core/utils/async_utils.hpp b/core/utils/async_utils.hpp index 8d2badfb1..f5bf2c039 100644 --- a/core/utils/async_utils.hpp +++ b/core/utils/async_utils.hpp @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -51,11 +52,12 @@ class busywait_mutex final { std::atomic locked_{false}; }; +template class thread_pool { public: using native_char_t = std::remove_pointer_t; using clock_t = std::chrono::steady_clock; - using func_t = std::function; + using func_t = fu2::unique_function; explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0, basic_string_view worker_name = @@ -72,7 +74,7 @@ class thread_pool { std::pair limits() const; void limits(size_t max_threads, size_t max_idle); - bool run(std::function&& fn, clock_t::duration delay = {}); + bool run(func_t&& fn, [[maybe_unused]] clock_t::duration delay = {}); void stop(bool skip_pending = false); // always a blocking call size_t tasks_active() const; size_t tasks_pending() const; @@ -83,6 +85,23 @@ class thread_pool { private: enum class State { ABORT, FINISH, RUN }; + auto& next() { + if constexpr (UsePriority) { + return queue_.top(); + } else { + return queue_.front(); + } + } + + template + static func_t& func(T& t) { + if constexpr (UsePriority) { + return const_cast(t.fn); + } else { + return const_cast(t); + } + } + struct shared_state { std::mutex lock; std::condition_variable cond; @@ -90,7 +109,7 @@ class thread_pool { }; struct task { - explicit task(std::function&& fn, clock_t::time_point at) + explicit task(func_t&& fn, clock_t::time_point at) : at(at), fn(std::move(fn)) {} clock_t::time_point at; @@ -109,7 +128,8 @@ class thread_pool { std::atomic threads_{0}; size_t max_idle_; size_t max_threads_; - std::priority_queue queue_; + std::conditional_t, std::queue> + queue_; basic_string worker_name_; }; diff --git a/tests/utils/async_utils_tests.cpp b/tests/utils/async_utils_tests.cpp index fdca9e632..04d6a342c 100644 --- a/tests/utils/async_utils_tests.cpp +++ b/tests/utils/async_utils_tests.cpp @@ -29,6 +29,8 @@ #include "utils/async_utils.hpp" #include "utils/thread_utils.hpp" +using namespace std::chrono_literals; + namespace tests { class async_utils_tests : public ::testing::Test { @@ -64,162 +66,11 @@ class notifying_counter { size_t notify_after_; }; -} // namespace tests - -using namespace tests; -using namespace std::chrono_literals; - -TEST_F(async_utils_tests, test_busywait_mutex_mt) { - typedef irs::async_utils::busywait_mutex mutex_t; - { - mutex_t mutex; - std::lock_guard lock(mutex); - std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); - thread.join(); - } - - { - mutex_t mutex; - std::lock_guard lock(mutex); - ASSERT_FALSE(mutex.try_lock()); - } - - { - std::condition_variable cond; - std::mutex ctrl_mtx; - std::unique_lock lock(ctrl_mtx); - mutex_t mutex; - std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { - std::unique_lock lock(ctrl_mtx); - mutex.lock(); - cond.notify_all(); - cond.wait_for(lock, 1000ms); - mutex.unlock(); - }); - - ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); - lock.unlock(); - cond.notify_all(); - thread.join(); - } -} - -TEST_F(async_utils_tests, test_thread_pool_run_mt) { - // test schedule 1 task - { - irs::async_utils::thread_pool pool(1, 0); - std::condition_variable cond; - std::mutex mutex; - std::unique_lock lock(mutex); - auto task = [&mutex, &cond]() -> void { - std::lock_guard lock(mutex); - cond.notify_all(); - }; - - pool.run(std::move(task)); - ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); - } - - // test schedule 3 task sequential - { - irs::async_utils::thread_pool pool(1, 1); - std::condition_variable cond; - notifying_counter count(cond, 3); - std::mutex mutex; - std::mutex sync_mutex; - auto task1 = [&mutex, &sync_mutex, &count]() -> void { - { std::lock_guard lock(mutex); } - std::unique_lock lock(sync_mutex, std::try_to_lock); - if (lock.owns_lock()) ++count; - std::this_thread::sleep_for(300ms); - }; - auto task2 = [&mutex, &sync_mutex, &count]() -> void { - { std::lock_guard lock(mutex); } - std::unique_lock lock(sync_mutex, std::try_to_lock); - if (lock.owns_lock()) ++count; - std::this_thread::sleep_for(300ms); - }; - auto task3 = [&mutex, &sync_mutex, &count]() -> void { - { std::lock_guard lock(mutex); } - std::unique_lock lock(sync_mutex, std::try_to_lock); - if (lock.owns_lock()) ++count; - std::this_thread::sleep_for(300ms); - }; - std::unique_lock lock(mutex); - - pool.run(std::move(task1)); - pool.run(std::move(task2)); - pool.run(std::move(task3)); - ASSERT_EQ(std::cv_status::no_timeout, - cond.wait_for(lock, 1000ms)); // wait for all 3 tasks - pool.stop(); - } - - // test schedule 3 task parallel - { - irs::async_utils::thread_pool pool(3, 0); - std::condition_variable cond; - notifying_counter count(cond, 3); - std::mutex mutex; - auto task1 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task2 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task3 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); - - ASSERT_TRUE(pool.run(std::move(task1))); - ASSERT_TRUE(pool.run(std::move(task2))); - ASSERT_TRUE(pool.run(std::move(task3))); - ASSERT_TRUE(count || - std::cv_status::no_timeout == cond.wait_for(lock, 1000ms) || - count); // wait for all 3 tasks - lock.unlock(); - pool.stop(); - } - - // test schedule 1 task exception + 1 task - { - irs::async_utils::thread_pool pool(1, 0, IR_NATIVE_STRING("foo")); - std::condition_variable cond; - notifying_counter count(cond, 2); - std::mutex mutex; - auto task1 = [&count]() -> void { - ++count; - throw "error"; - }; - auto task2 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); - std::mutex dummy_mutex; - std::unique_lock dummy_lock(dummy_mutex); - - pool.run(std::move(task1)); - pool.run(std::move(task2)); - ASSERT_TRUE(count || - std::cv_status::no_timeout == - cond.wait_for(dummy_lock, 10000ms) || - count); // wait for all 2 tasks (exception trace is slow on - // MSVC and even slower on *NIX with gdb) - ASSERT_EQ(1, pool.threads()); - lock.unlock(); - pool.stop(true); - } -} - -TEST_F(async_utils_tests, test_thread_pool_bound_mt) { +template +void run_thread_pool_bound_mt() { // test max threads { - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::thread_pool pool(0, 0); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -260,7 +111,7 @@ TEST_F(async_utils_tests, test_thread_pool_bound_mt) { // test max threads delta grow { - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::thread_pool pool(0, 0); std::atomic count(0); std::mutex mutex; auto task = [&mutex, &count]() -> void { @@ -291,7 +142,7 @@ TEST_F(async_utils_tests, test_thread_pool_bound_mt) { // test max threads delta { - irs::async_utils::thread_pool pool(1, 10); + irs::async_utils::thread_pool pool(1, 10); ASSERT_EQ(1, pool.max_threads()); pool.max_threads_delta(1); @@ -308,7 +159,7 @@ TEST_F(async_utils_tests, test_thread_pool_bound_mt) { // test max idle { - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::thread_pool pool(0, 0); std::atomic count(0); std::mutex mutex1; std::mutex mutex2; @@ -369,7 +220,7 @@ TEST_F(async_utils_tests, test_thread_pool_bound_mt) { // test max idle delta { - irs::async_utils::thread_pool pool(10, 1); + irs::async_utils::thread_pool pool(10, 1); ASSERT_EQ(1, pool.max_idle()); pool.max_idle_delta(1); @@ -385,6 +236,167 @@ TEST_F(async_utils_tests, test_thread_pool_bound_mt) { } } +template +void run_test_thread_pool_run_mt() { + // test schedule 1 task + { + irs::async_utils::thread_pool pool(1, 0); + std::condition_variable cond; + std::mutex mutex; + std::unique_lock lock(mutex); + auto task = [&mutex, &cond]() -> void { + std::lock_guard lock(mutex); + cond.notify_all(); + }; + + pool.run(std::move(task)); + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); + } + + // test schedule 3 task sequential + { + irs::async_utils::thread_pool pool(1, 1); + std::condition_variable cond; + notifying_counter count(cond, 3); + std::mutex mutex; + std::mutex sync_mutex; + auto task1 = [&mutex, &sync_mutex, &count]() -> void { + { std::lock_guard lock(mutex); } + std::unique_lock lock(sync_mutex, std::try_to_lock); + if (lock.owns_lock()) ++count; + std::this_thread::sleep_for(300ms); + }; + auto task2 = [&mutex, &sync_mutex, &count]() -> void { + { std::lock_guard lock(mutex); } + std::unique_lock lock(sync_mutex, std::try_to_lock); + if (lock.owns_lock()) ++count; + std::this_thread::sleep_for(300ms); + }; + auto task3 = [&mutex, &sync_mutex, &count]() -> void { + { std::lock_guard lock(mutex); } + std::unique_lock lock(sync_mutex, std::try_to_lock); + if (lock.owns_lock()) ++count; + std::this_thread::sleep_for(300ms); + }; + std::unique_lock lock(mutex); + + pool.run(std::move(task1)); + pool.run(std::move(task2)); + pool.run(std::move(task3)); + ASSERT_EQ(std::cv_status::no_timeout, + cond.wait_for(lock, 1000ms)); // wait for all 3 tasks + pool.stop(); + } + + // test schedule 3 task parallel + { + irs::async_utils::thread_pool pool(3, 0); + std::condition_variable cond; + notifying_counter count(cond, 3); + std::mutex mutex; + auto task1 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task2 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task3 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + + ASSERT_TRUE(pool.run(std::move(task1))); + ASSERT_TRUE(pool.run(std::move(task2))); + ASSERT_TRUE(pool.run(std::move(task3))); + ASSERT_TRUE(count || + std::cv_status::no_timeout == cond.wait_for(lock, 1000ms) || + count); // wait for all 3 tasks + lock.unlock(); + pool.stop(); + } + + // test schedule 1 task exception + 1 task + { + irs::async_utils::thread_pool pool(1, 0, + IR_NATIVE_STRING("foo")); + std::condition_variable cond; + notifying_counter count(cond, 2); + std::mutex mutex; + auto task1 = [&count]() -> void { + ++count; + throw "error"; + }; + auto task2 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + std::mutex dummy_mutex; + std::unique_lock dummy_lock(dummy_mutex); + + pool.run(std::move(task1)); + pool.run(std::move(task2)); + ASSERT_TRUE(count || + std::cv_status::no_timeout == + cond.wait_for(dummy_lock, 10000ms) || + count); // wait for all 2 tasks (exception trace is slow on + // MSVC and even slower on *NIX with gdb) + ASSERT_EQ(1, pool.threads()); + lock.unlock(); + pool.stop(true); + } +} + +} // namespace tests + +using namespace tests; + +TEST_F(async_utils_tests, test_busywait_mutex_mt) { + typedef irs::async_utils::busywait_mutex mutex_t; + { + mutex_t mutex; + std::lock_guard lock(mutex); + std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); + thread.join(); + } + + { + mutex_t mutex; + std::lock_guard lock(mutex); + ASSERT_FALSE(mutex.try_lock()); + } + + { + std::condition_variable cond; + std::mutex ctrl_mtx; + std::unique_lock lock(ctrl_mtx); + mutex_t mutex; + std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { + std::unique_lock lock(ctrl_mtx); + mutex.lock(); + cond.notify_all(); + cond.wait_for(lock, 1000ms); + mutex.unlock(); + }); + + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); + lock.unlock(); + cond.notify_all(); + thread.join(); + } +} + +TEST_F(async_utils_tests, test_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} + +TEST_F(async_utils_tests, test_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); +} + TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) { // test stop run pending { @@ -665,6 +677,53 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { } } +TEST_F(async_utils_tests, test_queue_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} + +TEST_F(async_utils_tests, test_queue_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); +} + +TEST_F(async_utils_tests, test_queue_thread_pool_delay_mt) { + { + uint64_t counter{0}; + uint64_t counter_start{0}; + irs::async_utils::thread_pool pool(1, 1); + std::condition_variable cond; + std::mutex mutex; + std::unique_lock lock(mutex); + auto task = [&]() -> void { + std::lock_guard lock(mutex); + ++counter; + ++counter_start; + if (counter_start == 2) { + cond.notify_all(); + } + }; + auto task2 = [&]() -> void { + std::lock_guard lock(mutex); + if (counter > 0) { + --counter; + } else { + ++counter; + } + ++counter_start; + if (counter_start == 2) { + cond.notify_all(); + } + }; + + ASSERT_EQ(0, pool.threads()); + // delay is ignored for non priority qeue + // tasks are executed as is + pool.run(std::move(task), 10000s); + pool.run(std::move(task2), 1s); + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 100s)); + ASSERT_EQ(0, counter); + } +} + TEST(thread_utils_test, get_set_name) { const thread_name_t expected_name = IR_NATIVE_STRING("foo"); #if (defined(__linux__) || defined(__APPLE__) || \