Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Oct 30, 2023
1 parent d930382 commit f7ffab2
Showing 1 changed file with 161 additions and 144 deletions.
305 changes: 161 additions & 144 deletions tests/utils/async_utils_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,43 +69,139 @@ class notifying_counter {
} // namespace tests

using namespace tests;

TEST_F(async_utils_tests, test_busywait_mutex_mt) {
typedef irs::async_utils::busywait_mutex mutex_t;
template<bool UsePriority>
void run_thread_pool_bound_mt() {
// test max threads
{
mutex_t mutex;
std::lock_guard<mutex_t> lock(mutex);
std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); });
thread.join();
irs::async_utils::ThreadPool<> pool(2);
std::atomic<size_t> count(0);
std::mutex mutex;
auto task1 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
auto task2 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
auto task3 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
std::unique_lock<std::mutex> lock(mutex);

ASSERT_EQ(2, pool.threads());
pool.run(std::move(task1));
pool.run(std::move(task2));
pool.run(std::move(task3));
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (1 != pool.tasks_pending() || 2 != pool.tasks_active() ||
count != 2) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(2, count); // 2 tasks started
ASSERT_EQ(2, pool.threads());
ASSERT_EQ(2, pool.tasks_active());
ASSERT_EQ(1, pool.tasks_pending());
lock.unlock();
pool.stop(true);
}

// test max threads delta grow
{
mutex_t mutex;
std::lock_guard<mutex_t> lock(mutex);
ASSERT_FALSE(mutex.try_lock());
irs::async_utils::ThreadPool<> pool(1);
std::atomic<size_t> count(0);
std::mutex mutex;
auto task = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
std::unique_lock<std::mutex> lock(mutex);

ASSERT_EQ(1, pool.threads());
pool.run(std::move(task));
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (0 != pool.tasks_pending() || 1 != pool.tasks_active() ||
count != 1) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(1, count); // 1 task started
ASSERT_EQ(1, pool.threads());
ASSERT_EQ(1, pool.tasks_active());
ASSERT_EQ(0, pool.tasks_pending());
lock.unlock();
pool.stop(true);
}

// test max idle
{
std::condition_variable cond;
std::mutex ctrl_mtx;
std::unique_lock<std::mutex> lock(ctrl_mtx);
mutex_t mutex;
std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void {
std::unique_lock<std::mutex> lock(ctrl_mtx);
mutex.lock();
cond.notify_all();
cond.wait_for(lock, 1000ms);
mutex.unlock();
});
irs::async_utils::ThreadPool<> pool(3);
std::atomic<size_t> count(0);
std::mutex mutex1;
std::mutex mutex2;
std::condition_variable start_cond;
notifying_counter start_count(start_cond, 3);
std::mutex start_mutex;
auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex1);
++count;
};
auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex1);
++count;
};
auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex2);
++count;
};
std::unique_lock<std::mutex> lock1(mutex1);
std::unique_lock<std::mutex> lock2(mutex2);
std::unique_lock<std::mutex> start_lock(start_mutex);

ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms));
lock.unlock();
cond.notify_all();
thread.join();
ASSERT_EQ(3, pool.threads());
pool.run(std::move(task1));
pool.run(std::move(task2));
pool.run(std::move(task3));
ASSERT_TRUE(start_count ||
std::cv_status::no_timeout ==
start_cond.wait_for(start_lock, 10000ms) ||
start_count); // wait for all 3 tasks to start
ASSERT_EQ(0, count); // 0 tasks complete
ASSERT_EQ(3, pool.threads());
ASSERT_EQ(3, pool.tasks_active());
ASSERT_EQ(0, pool.tasks_pending());
ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats());
lock1.unlock();
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (count != 2) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(2, count); // 2 tasks complete
lock2.unlock();
pool.stop(true);
}
}

TEST_F(async_utils_tests, test_thread_pool_run_mt) {
template<bool UsePriority>
void run_test_thread_pool_run_mt() {
// test schedule 1 task
{
irs::async_utils::ThreadPool<> pool(1);
Expand Down Expand Up @@ -217,134 +313,47 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) {
}
}

TEST_F(async_utils_tests, test_thread_pool_bound_mt) {
// test max threads
TEST_F(async_utils_tests, test_busywait_mutex_mt) {
typedef irs::async_utils::busywait_mutex mutex_t;
{
irs::async_utils::ThreadPool<> pool(2);
std::atomic<size_t> count(0);
std::mutex mutex;
auto task1 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
auto task2 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
auto task3 = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
std::unique_lock<std::mutex> lock(mutex);
mutex_t mutex;
std::lock_guard<mutex_t> lock(mutex);
std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); });
thread.join();
}

ASSERT_EQ(2, pool.threads());
pool.run(std::move(task1));
pool.run(std::move(task2));
pool.run(std::move(task3));
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (1 != pool.tasks_pending() || 2 != pool.tasks_active() ||
count != 2) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(2, count); // 2 tasks started
ASSERT_EQ(2, pool.threads());
ASSERT_EQ(2, pool.tasks_active());
ASSERT_EQ(1, pool.tasks_pending());
lock.unlock();
pool.stop(true);
{
mutex_t mutex;
std::lock_guard<mutex_t> lock(mutex);
ASSERT_FALSE(mutex.try_lock());
}

// test max threads delta grow
{
irs::async_utils::ThreadPool<> pool(1);
std::atomic<size_t> count(0);
std::mutex mutex;
auto task = [&mutex, &count]() -> void {
++count;
std::lock_guard<std::mutex> lock(mutex);
};
std::unique_lock<std::mutex> lock(mutex);
std::condition_variable cond;
std::mutex ctrl_mtx;
std::unique_lock<std::mutex> lock(ctrl_mtx);
mutex_t mutex;
std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void {
std::unique_lock<std::mutex> lock(ctrl_mtx);
mutex.lock();
cond.notify_all();
cond.wait_for(lock, 1000ms);
mutex.unlock();
});

ASSERT_EQ(1, pool.threads());
pool.run(std::move(task));
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (0 != pool.tasks_pending() || 1 != pool.tasks_active() ||
count != 1) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(1, count); // 1 task started
ASSERT_EQ(1, pool.threads());
ASSERT_EQ(1, pool.tasks_active());
ASSERT_EQ(0, pool.tasks_pending());
ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms));
lock.unlock();
pool.stop(true);
cond.notify_all();
thread.join();
}
}

// test max idle
{
irs::async_utils::ThreadPool<> pool(3);
std::atomic<size_t> count(0);
std::mutex mutex1;
std::mutex mutex2;
std::condition_variable start_cond;
notifying_counter start_count(start_cond, 3);
std::mutex start_mutex;
auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex1);
++count;
};
auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex1);
++count;
};
auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void {
{ std::lock_guard<std::mutex> lock(start_mutex); }
++start_count;
std::lock_guard<std::mutex> lock(mutex2);
++count;
};
std::unique_lock<std::mutex> lock1(mutex1);
std::unique_lock<std::mutex> lock2(mutex2);
std::unique_lock<std::mutex> start_lock(start_mutex);
TEST_F(async_utils_tests, test_thread_pool_run_mt) {
run_test_thread_pool_run_mt<true>();
}

ASSERT_EQ(3, pool.threads());
pool.run(std::move(task1));
pool.run(std::move(task2));
pool.run(std::move(task3));
ASSERT_TRUE(start_count ||
std::cv_status::no_timeout ==
start_cond.wait_for(start_lock, 10000ms) ||
start_count); // wait for all 3 tasks to start
ASSERT_EQ(0, count); // 0 tasks complete
ASSERT_EQ(3, pool.threads());
ASSERT_EQ(3, pool.tasks_active());
ASSERT_EQ(0, pool.tasks_pending());
ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats());
lock1.unlock();
{
const auto end = std::chrono::steady_clock::now() +
10s; // assume 10s is more than enough
while (count != 2) {
std::this_thread::sleep_for(10ms);
ASSERT_LE(std::chrono::steady_clock::now(), end);
}
}
ASSERT_EQ(2, count); // 2 tasks complete
lock2.unlock();
pool.stop(true);
}
TEST_F(async_utils_tests, test_thread_pool_bound_mt) {
run_thread_pool_bound_mt<true>();
}

TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) {
Expand Down Expand Up @@ -626,6 +635,14 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) {
}
}

TEST_F(async_utils_tests, test_queue_thread_pool_run_mt) {
run_test_thread_pool_run_mt<false>();
}

TEST_F(async_utils_tests, test_queue_thread_pool_bound_mt) {
run_thread_pool_bound_mt<false>();
}

TEST_F(async_utils_tests, test_queue_thread_pool_delay_mt) {
{
uint64_t counter{0};
Expand Down

0 comments on commit f7ffab2

Please sign in to comment.