diff --git a/tests/utils/async_utils_tests.cpp b/tests/utils/async_utils_tests.cpp index 5c4c56fda..150263b7e 100644 --- a/tests/utils/async_utils_tests.cpp +++ b/tests/utils/async_utils_tests.cpp @@ -69,43 +69,139 @@ class notifying_counter { } // namespace tests using namespace tests; - -TEST_F(async_utils_tests, test_busywait_mutex_mt) { - typedef irs::async_utils::busywait_mutex mutex_t; +template +void run_thread_pool_bound_mt() { + // test max threads { - mutex_t mutex; - std::lock_guard lock(mutex); - std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); - thread.join(); + irs::async_utils::ThreadPool<> pool(2); + std::atomic count(0); + std::mutex mutex; + auto task1 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task2 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task3 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + + ASSERT_EQ(2, pool.threads()); + pool.run(std::move(task1)); + pool.run(std::move(task2)); + pool.run(std::move(task3)); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (1 != pool.tasks_pending() || 2 != pool.tasks_active() || + count != 2) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(2, count); // 2 tasks started + ASSERT_EQ(2, pool.threads()); + ASSERT_EQ(2, pool.tasks_active()); + ASSERT_EQ(1, pool.tasks_pending()); + lock.unlock(); + pool.stop(true); } + // test max threads delta grow { - mutex_t mutex; - std::lock_guard lock(mutex); - ASSERT_FALSE(mutex.try_lock()); + irs::async_utils::ThreadPool<> pool(1); + std::atomic count(0); + std::mutex mutex; + auto task = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + + ASSERT_EQ(1, pool.threads()); + pool.run(std::move(task)); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (0 != pool.tasks_pending() || 1 != pool.tasks_active() || + count != 1) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(1, count); // 1 task started + ASSERT_EQ(1, pool.threads()); + ASSERT_EQ(1, pool.tasks_active()); + ASSERT_EQ(0, pool.tasks_pending()); + lock.unlock(); + pool.stop(true); } + // test max idle { - std::condition_variable cond; - std::mutex ctrl_mtx; - std::unique_lock lock(ctrl_mtx); - mutex_t mutex; - std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { - std::unique_lock lock(ctrl_mtx); - mutex.lock(); - cond.notify_all(); - cond.wait_for(lock, 1000ms); - mutex.unlock(); - }); + irs::async_utils::ThreadPool<> pool(3); + std::atomic count(0); + std::mutex mutex1; + std::mutex mutex2; + std::condition_variable start_cond; + notifying_counter start_count(start_cond, 3); + std::mutex start_mutex; + auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex1); + ++count; + }; + auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex1); + ++count; + }; + auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex2); + ++count; + }; + std::unique_lock lock1(mutex1); + std::unique_lock lock2(mutex2); + std::unique_lock start_lock(start_mutex); - ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); - lock.unlock(); - cond.notify_all(); - thread.join(); + ASSERT_EQ(3, pool.threads()); + pool.run(std::move(task1)); + pool.run(std::move(task2)); + pool.run(std::move(task3)); + ASSERT_TRUE(start_count || + std::cv_status::no_timeout == + start_cond.wait_for(start_lock, 10000ms) || + start_count); // wait for all 3 tasks to start + ASSERT_EQ(0, count); // 0 tasks complete + ASSERT_EQ(3, pool.threads()); + ASSERT_EQ(3, pool.tasks_active()); + ASSERT_EQ(0, pool.tasks_pending()); + ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats()); + lock1.unlock(); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (count != 2) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(2, count); // 2 tasks complete + lock2.unlock(); + pool.stop(true); } } -TEST_F(async_utils_tests, test_thread_pool_run_mt) { +template +void run_test_thread_pool_run_mt() { // test schedule 1 task { irs::async_utils::ThreadPool<> pool(1); @@ -217,134 +313,47 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) { } } -TEST_F(async_utils_tests, test_thread_pool_bound_mt) { - // test max threads +TEST_F(async_utils_tests, test_busywait_mutex_mt) { + typedef irs::async_utils::busywait_mutex mutex_t; { - irs::async_utils::ThreadPool<> pool(2); - std::atomic count(0); - std::mutex mutex; - auto task1 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task2 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task3 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); + mutex_t mutex; + std::lock_guard lock(mutex); + std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); + thread.join(); + } - ASSERT_EQ(2, pool.threads()); - pool.run(std::move(task1)); - pool.run(std::move(task2)); - pool.run(std::move(task3)); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (1 != pool.tasks_pending() || 2 != pool.tasks_active() || - count != 2) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(2, count); // 2 tasks started - ASSERT_EQ(2, pool.threads()); - ASSERT_EQ(2, pool.tasks_active()); - ASSERT_EQ(1, pool.tasks_pending()); - lock.unlock(); - pool.stop(true); + { + mutex_t mutex; + std::lock_guard lock(mutex); + ASSERT_FALSE(mutex.try_lock()); } - // test max threads delta grow { - irs::async_utils::ThreadPool<> pool(1); - std::atomic count(0); - std::mutex mutex; - auto task = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); + std::condition_variable cond; + std::mutex ctrl_mtx; + std::unique_lock lock(ctrl_mtx); + mutex_t mutex; + std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { + std::unique_lock lock(ctrl_mtx); + mutex.lock(); + cond.notify_all(); + cond.wait_for(lock, 1000ms); + mutex.unlock(); + }); - ASSERT_EQ(1, pool.threads()); - pool.run(std::move(task)); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (0 != pool.tasks_pending() || 1 != pool.tasks_active() || - count != 1) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(1, count); // 1 task started - ASSERT_EQ(1, pool.threads()); - ASSERT_EQ(1, pool.tasks_active()); - ASSERT_EQ(0, pool.tasks_pending()); + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); lock.unlock(); - pool.stop(true); + cond.notify_all(); + thread.join(); } +} - // test max idle - { - irs::async_utils::ThreadPool<> pool(3); - std::atomic count(0); - std::mutex mutex1; - std::mutex mutex2; - std::condition_variable start_cond; - notifying_counter start_count(start_cond, 3); - std::mutex start_mutex; - auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex1); - ++count; - }; - auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex1); - ++count; - }; - auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex2); - ++count; - }; - std::unique_lock lock1(mutex1); - std::unique_lock lock2(mutex2); - std::unique_lock start_lock(start_mutex); +TEST_F(async_utils_tests, test_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} - ASSERT_EQ(3, pool.threads()); - pool.run(std::move(task1)); - pool.run(std::move(task2)); - pool.run(std::move(task3)); - ASSERT_TRUE(start_count || - std::cv_status::no_timeout == - start_cond.wait_for(start_lock, 10000ms) || - start_count); // wait for all 3 tasks to start - ASSERT_EQ(0, count); // 0 tasks complete - ASSERT_EQ(3, pool.threads()); - ASSERT_EQ(3, pool.tasks_active()); - ASSERT_EQ(0, pool.tasks_pending()); - ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats()); - lock1.unlock(); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (count != 2) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(2, count); // 2 tasks complete - lock2.unlock(); - pool.stop(true); - } +TEST_F(async_utils_tests, test_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); } TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) { @@ -626,6 +635,14 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { } } +TEST_F(async_utils_tests, test_queue_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} + +TEST_F(async_utils_tests, test_queue_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); +} + TEST_F(async_utils_tests, test_queue_thread_pool_delay_mt) { { uint64_t counter{0};