diff --git a/CMakeLists.txt b/CMakeLists.txt index c25f389ef..7e7e4d7c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -136,6 +136,9 @@ if (MSVC) set(CMAKE_C_FLAGS "/Zc:static_assert- ${CMAKE_C_FLAGS}") set(CMAKE_CXX_FLAGS "/Zc:static_assert- ${CMAKE_CXX_FLAGS}") endif () +else () + # We want to force clang/gcc to use it, but only for C++ files + add_compile_options($<$:-fsized-deallocation>) endif () # fix strange issue with utf_8_to_32_iterator failing diff --git a/core/index/index_writer.cpp b/core/index/index_writer.cpp index d139bed6e..a661d988b 100644 --- a/core/index/index_writer.cpp +++ b/core/index/index_writer.cpp @@ -1197,9 +1197,7 @@ void IndexWriter::Clear(uint64_t tick) { to_commit.ctx = SwitchFlushContext(); // Ensure there are no active struct update operations - std::unique_lock ctx_lock{to_commit.ctx->pending_.Mutex()}; - to_commit.ctx->pending_.Wait(ctx_lock); - ctx_lock.unlock(); + to_commit.ctx->pending_.Wait(); Abort(); // Abort any already opened transaction ApplyFlush(std::move(to_commit)); @@ -1805,11 +1803,8 @@ IndexWriter::PendingContext IndexWriter::PrepareFlush(const CommitInfo& info) { // noexcept block: I'm not sure is it really necessary or not auto ctx = SwitchFlushContext(); - // TODO(MBkkt) It looks like lock mutex_ completely unnecessary // ensure there are no active struct update operations - std::unique_lock lock{ctx->pending_.Mutex()}; - ctx->pending_.Wait(lock); - lock.unlock(); + ctx->pending_.Wait(); // Stage 0 // wait for any outstanding segments to settle to ensure that any rollbacks // are properly tracked in 'modification_queries_' diff --git a/core/index/index_writer.hpp b/core/index/index_writer.hpp index c3bcb251a..9645d8db5 100644 --- a/core/index/index_writer.hpp +++ b/core/index/index_writer.hpp @@ -45,6 +45,7 @@ #include "utils/object_pool.hpp" #include "utils/string.hpp" #include "utils/thread_utils.hpp" +#include "utils/wait_group.hpp" #include @@ -835,36 +836,7 @@ class IndexWriter : private util::noncopyable { std::deque pending_segments_; // entries from 'pending_segments_' that are available for reuse Freelist pending_freelist_; - // TODO(MBkkt) Considered to replace with YACLib in ArangoDB 3.11+ or ... - struct WaitGroup { - std::mutex& Mutex() noexcept { return m_; } - - void Add() noexcept { counter_.fetch_add(1, std::memory_order_relaxed); } - - void Done() noexcept { - if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { - std::lock_guard lock{m_}; - cv_.notify_one(); - } - } - - void Wait(std::unique_lock& lock) noexcept { - IRS_ASSERT(lock.mutex() == &m_); - IRS_ASSERT(lock.owns_lock()); - if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) { - do { - cv_.wait(lock); - // relaxed probably enough - } while (counter_.load(std::memory_order_acquire) != 0); - } - counter_.store(1, std::memory_order_relaxed); - } - - private: - std::atomic_size_t counter_{1}; - std::mutex m_; - std::condition_variable cv_; - } pending_; + WaitGroup pending_; // set of segments to be removed from the index upon commit ConsolidatingSegments segment_mask_; diff --git a/core/utils/async_utils.cpp b/core/utils/async_utils.cpp index f33cd50c2..705856e3c 100644 --- a/core/utils/async_utils.cpp +++ b/core/utils/async_utils.cpp @@ -34,8 +34,7 @@ using namespace std::chrono_literals; -namespace irs { -namespace async_utils { +namespace irs::async_utils { void busywait_mutex::lock() noexcept { while (!try_lock()) { @@ -55,305 +54,103 @@ void busywait_mutex::unlock() noexcept { locked_.store(false, std::memory_order_release); } -thread_pool::thread_pool( - size_t max_threads /*= 0*/, size_t max_idle /*= 0*/, - std::basic_string_view worker_name /*= ""*/) - : shared_state_(std::make_shared()), - max_idle_(max_idle), - max_threads_(max_threads), - worker_name_(worker_name) {} - -thread_pool::~thread_pool() { - try { - stop(true); - } catch (...) { - } -} - -size_t thread_pool::max_idle() const { - std::lock_guard lock{shared_state_->lock}; - - return max_idle_; -} - -void thread_pool::max_idle(size_t value) { - auto& state = *shared_state_; - - { - std::lock_guard lock{state.lock}; - - max_idle_ = value; - } - - state.cond.notify_all(); // wake any idle threads if they need termination -} - -void thread_pool::max_idle_delta(int delta) { - auto& state = *shared_state_; - - { - std::lock_guard lock{state.lock}; - auto max_idle = max_idle_ + delta; - - if (delta > 0 && max_idle < max_idle_) { - max_idle_ = std::numeric_limits::max(); - } else if (delta < 0 && max_idle > max_idle_) { - max_idle_ = std::numeric_limits::min(); - } else { - max_idle_ = max_idle; - } - } - - state.cond.notify_all(); // wake any idle threads if they need termination +template +ThreadPool::ThreadPool(size_t threads, + std::basic_string_view name) { + start(threads, name); } -size_t thread_pool::max_threads() const { - std::lock_guard lock{shared_state_->lock}; - - return max_threads_; -} - -void thread_pool::max_threads(size_t value) { - auto& state = *shared_state_; - - { - std::lock_guard lock{shared_state_->lock}; - - max_threads_ = value; - - if (State::ABORT != state.state.load()) { - maybe_spawn_worker(); - } - } - - state.cond.notify_all(); // wake any idle threads if they need termination -} - -void thread_pool::max_threads_delta(int delta) { - auto& state = *shared_state_; - - { - std::lock_guard lock{state.lock}; - auto max_threads = max_threads_ + delta; - - if (delta > 0 && max_threads < max_threads_) { - max_threads_ = std::numeric_limits::max(); - } else if (delta < 0 && max_threads > max_threads_) { - max_threads_ = std::numeric_limits::min(); - } else { - max_threads_ = max_threads; - } - - if (State::ABORT != state.state.load()) { - maybe_spawn_worker(); - } - } - - state.cond.notify_all(); // wake any idle threads if they need termination -} - -bool thread_pool::run(std::function&& fn, - clock_t::duration delay /*=0*/) { - if (!fn) { - return false; - } - - auto& state = *shared_state_; - - { - std::lock_guard lock{state.lock}; - - if (State::RUN != state.state.load()) { - return false; // pool not active - } - - queue_.emplace(std::move(fn), clock_t::now() + delay); - - try { - maybe_spawn_worker(); - } catch (...) { - if (0 == threads_.load()) { - // failed to spawn a thread to execute a task - queue_.pop(); - throw; +template +void ThreadPool::start(size_t threads, + std::basic_string_view name) { + std::lock_guard lock{m_}; + IRS_ASSERT(threads_.empty()); + IRS_ASSERT(threads); + threads_.reserve(threads); + for (size_t i = 0; i != threads; ++i) { + threads_.emplace_back([this, name] { + if (!name.empty()) { + set_thread_name(name.data()); } - } + Work(); + }); } - - state.cond.notify_one(); - - return true; } -void thread_pool::stop(bool skip_pending /*= false*/) { - shared_state_->state.store(skip_pending ? State::ABORT : State::FINISH); - - decltype(queue_) empty; - { - std::unique_lock lock{shared_state_->lock}; - - // wait for all threads to terminate - while (threads_.load()) { - shared_state_->cond.notify_all(); // wake all threads - shared_state_->cond.wait_for(lock, 50ms); +template +bool ThreadPool::run(Func&& fn, Clock::duration delay) { + IRS_ASSERT(fn); + if constexpr (UseDelay) { + auto at = Clock::now() + delay; + std::unique_lock lock{m_}; + if (WasStop()) { + return false; } - - queue_.swap(empty); - } -} - -void thread_pool::limits(size_t max_threads, size_t max_idle) { - auto& state = *shared_state_; - - { - std::lock_guard lock{state.lock}; - - max_threads_ = max_threads; - max_idle_ = max_idle; - - if (State::ABORT != state.state.load()) { - maybe_spawn_worker(); + tasks_.emplace(std::move(fn), at); + // TODO We can skip notify when new element is more delayed than min + } else { + std::unique_lock lock{m_}; + if (WasStop()) { + return false; } + tasks_.push(std::move(fn)); } - - state.cond.notify_all(); // wake any idle threads if they need termination -} - -bool thread_pool::maybe_spawn_worker() { - IRS_ASSERT(!shared_state_->lock.try_lock()); // lock must be held - - // create extra thread if all threads are busy and can grow pool - const size_t pool_size = threads_.load(); - - if (!queue_.empty() && active_ == pool_size && pool_size < max_threads_) { - std::thread worker(&thread_pool::worker, this, shared_state_); - worker.detach(); - - threads_.fetch_add(1); - - return true; - } - - return false; -} - -std::pair thread_pool::limits() const { - std::lock_guard lock{shared_state_->lock}; - - return {max_threads_, max_idle_}; -} - -std::tuple thread_pool::stats() const { - std::lock_guard lock{shared_state_->lock}; - - return {active_, queue_.size(), threads_.load()}; -} - -size_t thread_pool::tasks_active() const { - std::lock_guard lock{shared_state_->lock}; - - return active_; -} - -size_t thread_pool::tasks_pending() const { - std::lock_guard lock{shared_state_->lock}; - - return queue_.size(); -} - -size_t thread_pool::threads() const { - std::lock_guard lock{shared_state_->lock}; - - return threads_.load(); + cv_.notify_one(); + return true; } -void thread_pool::worker(std::shared_ptr shared_state) noexcept { - // hold a reference to 'shared_state_' ensure state is still alive - if (!worker_name_.empty()) { - set_thread_name(worker_name_.c_str()); +template +void ThreadPool::stop(bool skip_pending) noexcept { + std::unique_lock lock{m_}; + if (skip_pending) { + tasks_ = decltype(tasks_){}; } - - { - std::unique_lock lock{shared_state->lock, std::defer_lock}; - - try { - worker_impl(lock, shared_state); - } catch (...) { - // NOOP - } - - threads_.fetch_sub(1); + if (WasStop()) { + return; } - - if (State::RUN != shared_state->state.load()) { - shared_state->cond.notify_all(); // wake up thread_pool::stop(...) + state_ |= 1; + lock.unlock(); + cv_.notify_all(); + for (auto& t : threads_) { + t.join(); } -} - -void thread_pool::worker_impl(std::unique_lock& lock, - std::shared_ptr shared_state) { - auto& state = shared_state->state; - - lock.lock(); - - while (State::ABORT != state.load() && threads_.load() <= max_threads_) { - IRS_ASSERT(lock.owns_lock()); - if (!queue_.empty()) { - if (const auto& top = queue_.top(); top.at <= clock_t::now()) { - func_t fn; - fn.swap(const_cast(top.fn)); - queue_.pop(); - - ++active_; - Finally decrement = [this]() noexcept { --active_; }; - // if have more tasks but no idle thread and can grow pool - try { - maybe_spawn_worker(); - } catch (const std::bad_alloc&) { - fprintf(stderr, "Failed to allocate memory while spawning a worker"); - } catch (const std::exception& e) { - IRS_LOG_ERROR( - absl::StrCat("Failed to grow pool, error '", e.what(), "'")); - } catch (...) { - IRS_LOG_ERROR("Failed to grow pool"); - } - - lock.unlock(); - try { - fn(); - } catch (const std::bad_alloc&) { - fprintf(stderr, "Failed to allocate memory while executing task"); - } catch (const std::exception& e) { - IRS_LOG_ERROR( - absl::StrCat("Failed to execute task, error '", e.what(), "'")); - } catch (...) { - IRS_LOG_ERROR("Failed to execute task"); + threads_ = decltype(threads_){}; +} + +template +void ThreadPool::Work() { + std::unique_lock lock{m_}; + while (true) { + while (!tasks_.empty()) { + Func fn; + if constexpr (UseDelay) { + auto& top = tasks_.top(); + if (top.at > Clock::now()) { + cv_.wait_until(lock, top.at); + continue; } - fn = nullptr; - lock.lock(); - continue; - } - } - IRS_ASSERT(active_ <= threads_.load()); - - if (const auto idle = threads_.load() - active_; - (idle <= max_idle_ || (!queue_.empty() && threads_.load() == 1))) { - if (const auto run_state = state.load(); - !queue_.empty() && State::ABORT != run_state) { - const auto at = queue_.top().at; // queue_ might be modified - shared_state->cond.wait_until(lock, at); - } else if (State::RUN == run_state) { - IRS_ASSERT(queue_.empty()); - shared_state->cond.wait(lock); + fn = std::move(const_cast(top.fn)); } else { - IRS_ASSERT(State::RUN != run_state); - return; // termination requested + fn = std::move(tasks_.front()); } - } else { - return; // too many idle threads + tasks_.pop(); + state_ += 2; + lock.unlock(); + try { + fn(); + } catch (...) { + } + lock.lock(); + state_ -= 2; + } + if (WasStop()) { + return; } + cv_.wait(lock); } } -} // namespace async_utils -} // namespace irs +template class ThreadPool; +template class ThreadPool; + +} // namespace irs::async_utils diff --git a/core/utils/async_utils.hpp b/core/utils/async_utils.hpp index a1f24904b..8d57a6374 100644 --- a/core/utils/async_utils.hpp +++ b/core/utils/async_utils.hpp @@ -23,9 +23,9 @@ #pragma once -#include #include -#include +#include +#include #include #include @@ -34,8 +34,7 @@ #include "string.hpp" #include "thread_utils.hpp" -namespace irs { -namespace async_utils { +namespace irs::async_utils { ////////////////////////////////////////////////////////////////////////////// /// @brief spinlock implementation for Win32 since std::mutex cannot be used @@ -51,67 +50,60 @@ class busywait_mutex final { std::atomic locked_{false}; }; -class thread_pool { +template +class ThreadPool { public: - using native_char_t = std::remove_pointer_t; - using clock_t = std::chrono::steady_clock; - using func_t = std::function; - - explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0, - std::basic_string_view worker_name = - kEmptyStringView); - ~thread_pool(); - size_t max_idle() const; - void max_idle(size_t value); - void max_idle_delta(int delta); // change value by delta - size_t max_threads() const; - void max_threads(size_t value); - void max_threads_delta(int delta); // change value by delta - - // 1st - max_threads(), 2nd - max_idle() - std::pair limits() const; - void limits(size_t max_threads, size_t max_idle); - - bool run(std::function&& fn, clock_t::duration delay = {}); - void stop(bool skip_pending = false); // always a blocking call - size_t tasks_active() const; - size_t tasks_pending() const; - size_t threads() const; + using Char = std::remove_pointer_t; + using Clock = std::chrono::steady_clock; + using Func = fu2::unique_function; + + ThreadPool() = default; + explicit ThreadPool(size_t threads, std::basic_string_view name = {}); + ~ThreadPool() { stop(true); } + + void start(size_t threads, std::basic_string_view name = {}); + bool run(Func&& fn, Clock::duration delay = {}); + void stop(bool skip_pending = false) noexcept; // always a blocking call + size_t tasks_active() const { + std::lock_guard lock{m_}; + return state_ / 2; + } + size_t tasks_pending() const { + std::lock_guard lock{m_}; + return tasks_.size(); + } + size_t threads() const { + std::lock_guard lock{m_}; + return threads_.size(); + } // 1st - tasks active(), 2nd - tasks pending(), 3rd - threads() - std::tuple stats() const; + std::tuple stats() const { + std::lock_guard lock{m_}; + return {state_ / 2, tasks_.size(), threads_.size()}; + } private: - enum class State { ABORT, FINISH, RUN }; + struct Task { + explicit Task(Func&& fn, Clock::time_point at) + : at{at}, fn{std::move(fn)} {} - struct shared_state { - std::mutex lock; - std::condition_variable cond; - std::atomic state{State::RUN}; - }; + Clock::time_point at; + Func fn; - struct task { - explicit task(std::function&& fn, clock_t::time_point at) - : at(at), fn(std::move(fn)) {} + bool operator<(const Task& rhs) const noexcept { return rhs.at < at; } + }; - clock_t::time_point at; - func_t fn; + void Work(); - bool operator<(const task& rhs) const noexcept { return rhs.at < at; } - }; + bool WasStop() const { return state_ % 2 != 0; } - void worker(std::shared_ptr shared_state) noexcept; - void worker_impl(std::unique_lock& lock, - std::shared_ptr shared_state); - bool maybe_spawn_worker(); - - std::shared_ptr shared_state_; - size_t active_{0}; - std::atomic threads_{0}; - size_t max_idle_; - size_t max_threads_; - std::priority_queue queue_; - std::basic_string worker_name_; + std::vector threads_; + mutable std::mutex m_; + std::condition_variable cv_; + std::conditional_t, std::queue> + tasks_; + // stop flag and active tasks counter + uint64_t state_ = 0; }; -} // namespace async_utils -} // namespace irs +} // namespace irs::async_utils diff --git a/core/utils/empty.hpp b/core/utils/empty.hpp new file mode 100644 index 000000000..bc3dae1d3 --- /dev/null +++ b/core/utils/empty.hpp @@ -0,0 +1,39 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2023 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Valery Mironov +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include + +#include "shared.hpp" + +namespace irs::utils { + +struct Empty final { + template + Empty(Args&&... /*args*/) {} +}; + +template +using Need = std::conditional_t; + +} // namespace irs::utils diff --git a/core/utils/wait_group.hpp b/core/utils/wait_group.hpp new file mode 100644 index 000000000..b439da655 --- /dev/null +++ b/core/utils/wait_group.hpp @@ -0,0 +1,76 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016 by EMC Corporation, All Rights Reserved +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is EMC Corporation +/// +/// @author Valery Mironov +//////////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include + +namespace irs { + +// TODO(MBkkt) Considered to replace with YACLib +struct WaitGroup { + explicit WaitGroup(size_t counter = 0) noexcept : counter_{2 * counter + 1} {} + + void Add(size_t counter = 1) noexcept { + counter_.fetch_add(2 * counter, std::memory_order_relaxed); + } + + void Done(size_t counter = 1) noexcept { + if (counter_.fetch_sub(2 * counter, std::memory_order_acq_rel) == + 2 * counter) { + std::lock_guard lock{m_}; + cv_.notify_one(); + } + } + + // Multiple parallel Wait not supported, if needed check YACLib + void Wait(size_t counter = 0) noexcept { + if (counter_.fetch_sub(1, std::memory_order_acq_rel) != 1) { + std::unique_lock lock{m_}; + while (counter_.load(std::memory_order_acquire) != 0) { + cv_.wait(lock); + } + } + // We can put acquire here and remove above, but is it worth? + Reset(counter); + } + + // It shouldn't used for synchronization + size_t Count() const noexcept { + return counter_.load(std::memory_order_relaxed) / 2; + } + + void Reset(size_t counter) noexcept { + counter_.store(2 * counter + 1, std::memory_order_relaxed); + } + + std::mutex& Mutex() noexcept { return m_; } + + private: + std::atomic_size_t counter_; + std::condition_variable cv_; + std::mutex m_; +}; + +} // namespace irs diff --git a/tests/index/index_profile_tests.cpp b/tests/index/index_profile_tests.cpp index 094a250da..82a9ef72b 100644 --- a/tests/index/index_profile_tests.cpp +++ b/tests/index/index_profile_tests.cpp @@ -126,7 +126,7 @@ class index_profile_test_case : public tests::index_test_base { std::atomic writer_import_count(0); auto thread_count = (std::max)((size_t)1, num_insert_threads); auto total_threads = thread_count + num_import_threads + num_update_threads; - irs::async_utils::thread_pool thread_pool(total_threads, total_threads); + irs::async_utils::ThreadPool<> thread_pool(total_threads); std::mutex mutex; if (!writer) { @@ -473,7 +473,7 @@ class index_profile_test_case : public tests::index_test_base { size_t cleanup_interval) { auto* directory = &dir(); std::atomic working(true); - irs::async_utils::thread_pool thread_pool(1, 1); + irs::async_utils::ThreadPool<> thread_pool(1); thread_pool.run([cleanup_interval, directory, &working]() -> void { while (working.load()) { @@ -502,7 +502,7 @@ class index_profile_test_case : public tests::index_test_base { options.segment_count_max = 8; // match original implementation or may run // out of file handles (e.g. MacOS/Travis) - irs::async_utils::thread_pool thread_pool(commit_threads, commit_threads); + irs::async_utils::ThreadPool<> thread_pool(commit_threads); auto writer = open_writer(irs::OM_CREATE, options); for (size_t i = 0; i < commit_threads; ++i) { @@ -536,7 +536,7 @@ class index_profile_test_case : public tests::index_test_base { irs::index_utils::MakePolicy(irs::index_utils::ConsolidateCount()); irs::IndexWriterOptions options; std::atomic working(true); - irs::async_utils::thread_pool thread_pool(2, 2); + irs::async_utils::ThreadPool<> thread_pool(2); options.segment_count_max = 8; // match original implementation or may run // out of file handles (e.g. MacOS/Travis) diff --git a/tests/index/index_tests.cpp b/tests/index/index_tests.cpp index 539f3875e..b55391312 100644 --- a/tests/index/index_tests.cpp +++ b/tests/index/index_tests.cpp @@ -537,7 +537,7 @@ class index_test_case : public tests::index_test_base { // validate terms async { - irs::async_utils::thread_pool pool(thread_count, thread_count); + irs::async_utils::ThreadPool<> pool(thread_count); { std::lock_guard lock(mutex); @@ -580,7 +580,7 @@ class index_test_case : public tests::index_test_base { ASSERT_EQ(expected_term_itrs[i]->value(), actual_term_itr->value()); } - irs::async_utils::thread_pool pool(thread_count, thread_count); + irs::async_utils::ThreadPool<> pool(thread_count); { std::lock_guard lock(mutex); diff --git a/tests/store/directory_test_case.cpp b/tests/store/directory_test_case.cpp index 192218aad..6c9a78017 100644 --- a/tests/store/directory_test_case.cpp +++ b/tests/store/directory_test_case.cpp @@ -374,7 +374,7 @@ TEST_P(directory_test_case, read_multiple_streams) { auto in = dir_->open("test_async", irs::IOAdvice::NORMAL); std::mutex in_mtx; std::mutex mutex; - irs::async_utils::thread_pool pool(16, 16); + irs::async_utils::ThreadPool<> pool(16); ASSERT_FALSE(!in); in = in->reopen(); @@ -383,7 +383,7 @@ TEST_P(directory_test_case, read_multiple_streams) { { std::lock_guard lock(mutex); - for (auto i = pool.max_threads(); i; --i) { + for (auto i = pool.threads(); i; --i) { pool.run([&in, &in_mtx, &mutex]() -> void { index_input::ptr input; diff --git a/tests/utils/async_utils_tests.cpp b/tests/utils/async_utils_tests.cpp index 0f45b2c1b..4be4f5423 100644 --- a/tests/utils/async_utils_tests.cpp +++ b/tests/utils/async_utils_tests.cpp @@ -29,6 +29,8 @@ #include "utils/async_utils.hpp" #include "utils/thread_utils.hpp" +using namespace std::chrono_literals; + namespace tests { class async_utils_tests : public ::testing::Test { @@ -64,50 +66,142 @@ class notifying_counter { size_t notify_after_; }; -} // namespace tests - -using namespace tests; -using namespace std::chrono_literals; - -TEST_F(async_utils_tests, test_busywait_mutex_mt) { - typedef irs::async_utils::busywait_mutex mutex_t; +template +void run_thread_pool_bound_mt() { + // test max threads { - mutex_t mutex; - std::lock_guard lock(mutex); - std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); - thread.join(); + irs::async_utils::ThreadPool pool(2); + std::atomic count(0); + std::mutex mutex; + auto task1 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task2 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + auto task3 = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + + ASSERT_EQ(2, pool.threads()); + pool.run(std::move(task1)); + pool.run(std::move(task2)); + pool.run(std::move(task3)); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (1 != pool.tasks_pending() || 2 != pool.tasks_active() || + count != 2) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(2, count); // 2 tasks started + ASSERT_EQ(2, pool.threads()); + ASSERT_EQ(2, pool.tasks_active()); + ASSERT_EQ(1, pool.tasks_pending()); + lock.unlock(); + pool.stop(true); } + // test max threads delta grow { - mutex_t mutex; - std::lock_guard lock(mutex); - ASSERT_FALSE(mutex.try_lock()); + irs::async_utils::ThreadPool pool(1); + std::atomic count(0); + std::mutex mutex; + auto task = [&mutex, &count]() -> void { + ++count; + std::lock_guard lock(mutex); + }; + std::unique_lock lock(mutex); + + ASSERT_EQ(1, pool.threads()); + pool.run(std::move(task)); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (0 != pool.tasks_pending() || 1 != pool.tasks_active() || + count != 1) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(1, count); // 1 task started + ASSERT_EQ(1, pool.threads()); + ASSERT_EQ(1, pool.tasks_active()); + ASSERT_EQ(0, pool.tasks_pending()); + lock.unlock(); + pool.stop(true); } + // test max idle { - std::condition_variable cond; - std::mutex ctrl_mtx; - std::unique_lock lock(ctrl_mtx); - mutex_t mutex; - std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { - std::unique_lock lock(ctrl_mtx); - mutex.lock(); - cond.notify_all(); - cond.wait_for(lock, 1000ms); - mutex.unlock(); - }); + irs::async_utils::ThreadPool pool(3); + std::atomic count(0); + std::mutex mutex1; + std::mutex mutex2; + std::condition_variable start_cond; + notifying_counter start_count(start_cond, 3); + std::mutex start_mutex; + auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex1); + ++count; + }; + auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex1); + ++count; + }; + auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void { + { std::lock_guard lock(start_mutex); } + ++start_count; + std::lock_guard lock(mutex2); + ++count; + }; + std::unique_lock lock1(mutex1); + std::unique_lock lock2(mutex2); + std::unique_lock start_lock(start_mutex); - ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); - lock.unlock(); - cond.notify_all(); - thread.join(); + ASSERT_EQ(3, pool.threads()); + pool.run(std::move(task1)); + pool.run(std::move(task2)); + pool.run(std::move(task3)); + ASSERT_TRUE(start_count || + std::cv_status::no_timeout == + start_cond.wait_for(start_lock, 10000ms) || + start_count); // wait for all 3 tasks to start + ASSERT_EQ(0, count); // 0 tasks complete + ASSERT_EQ(3, pool.threads()); + ASSERT_EQ(3, pool.tasks_active()); + ASSERT_EQ(0, pool.tasks_pending()); + ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats()); + lock1.unlock(); + { + const auto end = std::chrono::steady_clock::now() + + 10s; // assume 10s is more than enough + while (count != 2) { + std::this_thread::sleep_for(10ms); + ASSERT_LE(std::chrono::steady_clock::now(), end); + } + } + ASSERT_EQ(2, count); // 2 tasks complete + lock2.unlock(); + pool.stop(true); } } -TEST_F(async_utils_tests, test_thread_pool_run_mt) { +template +void run_test_thread_pool_run_mt() { // test schedule 1 task { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool pool(1); std::condition_variable cond; std::mutex mutex; std::unique_lock lock(mutex); @@ -122,7 +216,7 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) { // test schedule 3 task sequential { - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool pool(1); std::condition_variable cond; notifying_counter count(cond, 3); std::mutex mutex; @@ -157,7 +251,7 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) { // test schedule 3 task parallel { - irs::async_utils::thread_pool pool(3, 0); + irs::async_utils::ThreadPool pool(3); std::condition_variable cond; notifying_counter count(cond, 3); std::mutex mutex; @@ -187,7 +281,7 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) { // test schedule 1 task exception + 1 task { - irs::async_utils::thread_pool pool(1, 0, IR_NATIVE_STRING("foo")); + irs::async_utils::ThreadPool pool(1, IR_NATIVE_STRING("foo")); std::condition_variable cond; notifying_counter count(cond, 2); std::mutex mutex; @@ -216,179 +310,53 @@ TEST_F(async_utils_tests, test_thread_pool_run_mt) { } } -TEST_F(async_utils_tests, test_thread_pool_bound_mt) { - // test max threads - { - irs::async_utils::thread_pool pool(0, 0); - std::atomic count(0); - std::mutex mutex; - auto task1 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task2 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - auto task3 = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); - - ASSERT_EQ(0, pool.threads()); - pool.run(std::move(task1)); - pool.run(std::move(task2)); - pool.run(std::move(task3)); - pool.max_threads(2); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (1 != pool.tasks_pending() || 2 != pool.tasks_active() || - count != 2) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(2, count); // 2 tasks started - ASSERT_EQ(2, pool.threads()); - ASSERT_EQ(2, pool.tasks_active()); - ASSERT_EQ(1, pool.tasks_pending()); - lock.unlock(); - pool.stop(true); - } - - // test max threads delta grow +TEST_F(async_utils_tests, test_busywait_mutex_mt) { + typedef irs::async_utils::busywait_mutex mutex_t; { - irs::async_utils::thread_pool pool(0, 0); - std::atomic count(0); - std::mutex mutex; - auto task = [&mutex, &count]() -> void { - ++count; - std::lock_guard lock(mutex); - }; - std::unique_lock lock(mutex); - - ASSERT_EQ(0, pool.threads()); - pool.run(std::move(task)); - pool.max_threads_delta(1); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (0 != pool.tasks_pending() || 1 != pool.tasks_active() || - count != 1) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(1, count); // 1 task started - ASSERT_EQ(1, pool.threads()); - ASSERT_EQ(1, pool.tasks_active()); - ASSERT_EQ(0, pool.tasks_pending()); - lock.unlock(); - pool.stop(true); + mutex_t mutex; + std::lock_guard lock(mutex); + std::thread thread([&mutex]() -> void { ASSERT_FALSE(mutex.try_lock()); }); + thread.join(); } - // test max threads delta { - irs::async_utils::thread_pool pool(1, 10); - - ASSERT_EQ(1, pool.max_threads()); - pool.max_threads_delta(1); - ASSERT_EQ(2, pool.max_threads()); - pool.max_threads_delta(-2); - ASSERT_EQ(0, pool.max_threads()); - pool.max_threads(std::numeric_limits::max()); - pool.max_threads_delta(1); - ASSERT_EQ(std::numeric_limits::max(), pool.max_threads()); - pool.max_threads(1); - pool.max_threads_delta(-2); - ASSERT_EQ(std::numeric_limits::min(), pool.max_threads()); + mutex_t mutex; + std::lock_guard lock(mutex); + ASSERT_FALSE(mutex.try_lock()); } - // test max idle { - irs::async_utils::thread_pool pool(0, 0); - std::atomic count(0); - std::mutex mutex1; - std::mutex mutex2; - std::condition_variable start_cond; - notifying_counter start_count(start_cond, 3); - std::mutex start_mutex; - auto task1 = [&start_mutex, &start_count, &mutex1, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex1); - ++count; - }; - auto task2 = [&start_mutex, &start_count, &mutex1, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex1); - ++count; - }; - auto task3 = [&start_mutex, &start_count, &mutex2, &count]() -> void { - { std::lock_guard lock(start_mutex); } - ++start_count; - std::lock_guard lock(mutex2); - ++count; - }; - std::unique_lock lock1(mutex1); - std::unique_lock lock2(mutex2); - std::unique_lock start_lock(start_mutex); + std::condition_variable cond; + std::mutex ctrl_mtx; + std::unique_lock lock(ctrl_mtx); + mutex_t mutex; + std::thread thread([&cond, &ctrl_mtx, &mutex]() -> void { + std::unique_lock lock(ctrl_mtx); + mutex.lock(); + cond.notify_all(); + cond.wait_for(lock, 1000ms); + mutex.unlock(); + }); - ASSERT_EQ(0, pool.threads()); - pool.run(std::move(task1)); - pool.run(std::move(task2)); - pool.run(std::move(task3)); - pool.limits(3, 1); - ASSERT_EQ(std::make_pair(size_t(3), size_t(1)), pool.limits()); - ASSERT_TRUE(start_count || - std::cv_status::no_timeout == - start_cond.wait_for(start_lock, 10000ms) || - start_count); // wait for all 3 tasks to start - ASSERT_EQ(0, count); // 0 tasks complete - ASSERT_EQ(3, pool.threads()); - ASSERT_EQ(3, pool.tasks_active()); - ASSERT_EQ(0, pool.tasks_pending()); - ASSERT_EQ(std::make_tuple(size_t(3), size_t(0), size_t(3)), pool.stats()); - lock1.unlock(); - { - const auto end = std::chrono::steady_clock::now() + - 10s; // assume 10s is more than enough - while (2 != pool.threads() || count != 2) { - std::this_thread::sleep_for(10ms); - ASSERT_LE(std::chrono::steady_clock::now(), end); - } - } - ASSERT_EQ(2, count); // 2 tasks complete - ASSERT_EQ(2, pool.threads()); - lock2.unlock(); - pool.stop(true); + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 1000ms)); + lock.unlock(); + cond.notify_all(); + thread.join(); } +} - // test max idle delta - { - irs::async_utils::thread_pool pool(10, 1); - - ASSERT_EQ(1, pool.max_idle()); - pool.max_idle_delta(1); - ASSERT_EQ(2, pool.max_idle()); - pool.max_idle_delta(-2); - ASSERT_EQ(0, pool.max_idle()); - pool.max_idle(std::numeric_limits::max()); - pool.max_idle_delta(1); - ASSERT_EQ(std::numeric_limits::max(), pool.max_idle()); - pool.max_idle(1); - pool.max_idle_delta(-2); - ASSERT_EQ(std::numeric_limits::min(), pool.max_idle()); - } +TEST_F(async_utils_tests, test_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} + +TEST_F(async_utils_tests, test_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); } TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) { // test stop run pending { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -428,15 +396,15 @@ TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) { { const auto end = std::chrono::steady_clock::now() + 10s; // assume 10s is more than enough - while (pool.tasks_active() || pool.threads()) { + while (pool.tasks_active()) { std::this_thread::sleep_for(100ms); ASSERT_LE(std::chrono::steady_clock::now(), end); } } ASSERT_EQ(0, pool.tasks_active()); ASSERT_EQ(0, pool.tasks_pending()); - ASSERT_EQ(0, pool.threads()); - ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(0)), pool.stats()); + ASSERT_EQ(1, pool.threads()); + ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(1)), pool.stats()); pool.stop(); // blocking call (thread runtime duration simulated via sleep) ASSERT_EQ(2, count); // all tasks ran ASSERT_EQ(0, pool.tasks_active()); @@ -449,7 +417,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_delay_mt) { TEST_F(async_utils_tests, test_thread_pool_max_idle_mt) { // test stop run pending { - irs::async_utils::thread_pool pool(4, 2); + irs::async_utils::ThreadPool<> pool(4); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -504,8 +472,8 @@ TEST_F(async_utils_tests, test_thread_pool_max_idle_mt) { } ASSERT_EQ(0, pool.tasks_active()); ASSERT_EQ(0, pool.tasks_pending()); - ASSERT_EQ(2, pool.threads()); - ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(2)), pool.stats()); + ASSERT_EQ(4, pool.threads()); + ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(4)), pool.stats()); pool.stop(); // blocking call (thread runtime duration simulated via sleep) ASSERT_EQ(4, count); // all tasks ran ASSERT_EQ(0, pool.tasks_active()); @@ -518,7 +486,7 @@ TEST_F(async_utils_tests, test_thread_pool_max_idle_mt) { TEST_F(async_utils_tests, test_thread_pool_stop_mt) { // test stop run pending { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -542,7 +510,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { // test stop skip pending { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -575,7 +543,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { // test pool stop + run { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -588,9 +556,8 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { }; std::unique_lock lock(mutex); - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(1, pool.threads()); pool.run(std::move(task1)); - pool.max_threads(1); { // assume 10s is more than enough to start first thread const auto end = std::chrono::steady_clock::now() + 10s; @@ -608,7 +575,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { // test multiple calls to stop will all block { - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::condition_variable cond; std::mutex mutex; std::unique_lock lock(mutex); @@ -617,7 +584,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { cond.notify_all(); }; - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(1, pool.threads()); pool.run(std::move(task)); ASSERT_EQ(1, pool.threads()); @@ -656,7 +623,7 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { // test stop with a single thread will stop threads { - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); pool.run([]() -> void {}); // start a single thread ASSERT_EQ(1, pool.threads()); @@ -665,6 +632,53 @@ TEST_F(async_utils_tests, test_thread_pool_stop_mt) { } } +TEST_F(async_utils_tests, test_queue_thread_pool_run_mt) { + run_test_thread_pool_run_mt(); +} + +TEST_F(async_utils_tests, test_queue_thread_pool_bound_mt) { + run_thread_pool_bound_mt(); +} + +TEST_F(async_utils_tests, test_queue_thread_pool_delay_mt) { + { + uint64_t counter{0}; + uint64_t counter_start{0}; + irs::async_utils::ThreadPool pool(1); + std::condition_variable cond; + std::mutex mutex; + std::unique_lock lock(mutex); + auto task = [&]() -> void { + std::lock_guard lock(mutex); + ++counter; + ++counter_start; + if (counter_start == 2) { + cond.notify_all(); + } + }; + auto task2 = [&]() -> void { + std::lock_guard lock(mutex); + if (counter > 0) { + --counter; + } else { + ++counter; + } + ++counter_start; + if (counter_start == 2) { + cond.notify_all(); + } + }; + + ASSERT_EQ(1, pool.threads()); + // delay is ignored for non priority qeue + // tasks are executed as is + pool.run(std::move(task), 10000s); + pool.run(std::move(task2), 1s); + ASSERT_EQ(std::cv_status::no_timeout, cond.wait_for(lock, 100s)); + ASSERT_EQ(0, counter); + } +} + TEST(thread_utils_test, get_set_name) { const thread_name_t expected_name = IR_NATIVE_STRING("foo"); #if (defined(__linux__) || defined(__APPLE__) || \ @@ -684,3 +698,5 @@ TEST(thread_utils_test, get_set_name) { }); #endif } + +} // namespace tests diff --git a/tests/utils/thread_pool_test.cpp b/tests/utils/thread_pool_test.cpp index 6538caeb0..8412d9fe4 100644 --- a/tests/utils/thread_pool_test.cpp +++ b/tests/utils/thread_pool_test.cpp @@ -58,7 +58,7 @@ using namespace std::chrono_literals; TEST(thread_pool_test, test_run_1task_mt) { // test schedule 1 task - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::condition_variable cond; std::mutex mutex; std::unique_lock lock(mutex); @@ -73,7 +73,7 @@ TEST(thread_pool_test, test_run_1task_mt) { TEST(thread_pool_test, test_run_3tasks_seq_mt) { // test schedule 3 task sequential - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); std::condition_variable cond; notifying_counter count(cond, 3); std::mutex mutex; @@ -108,7 +108,7 @@ TEST(thread_pool_test, test_run_3tasks_seq_mt) { TEST(thread_pool_test, test_run_3tasks_parallel_mt) { // test schedule 3 task parallel - irs::async_utils::thread_pool pool(3, 0); + irs::async_utils::ThreadPool<> pool(3); std::condition_variable cond; notifying_counter count(cond, 3); std::mutex mutex; @@ -138,7 +138,7 @@ TEST(thread_pool_test, test_run_3tasks_parallel_mt) { TEST(thread_pool_test, test_run_1task_excpetion_1task_mt) { // test schedule 1 task exception + 1 task - irs::async_utils::thread_pool pool(1, 0, IR_NATIVE_STRING("foo")); + irs::async_utils::ThreadPool<> pool(1, IR_NATIVE_STRING("foo")); std::condition_variable cond; notifying_counter count(cond, 2); std::mutex mutex; @@ -167,7 +167,7 @@ TEST(thread_pool_test, test_run_1task_excpetion_1task_mt) { TEST(thread_pool_test, test_max_threads_mt) { // test max threads - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::ThreadPool<> pool(2); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -184,12 +184,11 @@ TEST(thread_pool_test, test_max_threads_mt) { }; std::unique_lock lock(mutex); - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(2, pool.threads()); pool.run(std::move(task1)); pool.run(std::move(task2)); pool.run(std::move(task3)); - ASSERT_EQ(3, pool.tasks_pending()); - pool.max_threads(2); + ASSERT_EQ(3, pool.tasks_pending() + pool.tasks_active()); int tryCount{100}; while (tryCount-- && count.load(std::memory_order_relaxed) < 2) { std::this_thread::sleep_for(100ms); // assume threads start within 100msec @@ -204,7 +203,7 @@ TEST(thread_pool_test, test_max_threads_mt) { TEST(thread_pool_test, test_max_threads_delta_grow_mt) { // test max threads delta grow - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task = [&mutex, &count]() -> void { @@ -213,9 +212,8 @@ TEST(thread_pool_test, test_max_threads_delta_grow_mt) { }; std::unique_lock lock(mutex); - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(1, pool.threads()); pool.run(std::move(task)); - pool.max_threads_delta(1); int tryCount{100}; while (tryCount-- && count.load(std::memory_order_relaxed) < 1) { std::this_thread::sleep_for(100ms); // assume threads start within 100msec @@ -228,26 +226,9 @@ TEST(thread_pool_test, test_max_threads_delta_grow_mt) { pool.stop(true); } -TEST(thread_pool_test, test_max_threads_delta_mt) { - // test max threads delta - irs::async_utils::thread_pool pool(1, 10); - - ASSERT_EQ(1, pool.max_threads()); - pool.max_threads_delta(1); - ASSERT_EQ(2, pool.max_threads()); - pool.max_threads_delta(-2); - ASSERT_EQ(0, pool.max_threads()); - pool.max_threads(std::numeric_limits::max()); - pool.max_threads_delta(1); - ASSERT_EQ(std::numeric_limits::max(), pool.max_threads()); - pool.max_threads(1); - pool.max_threads_delta(-2); - ASSERT_EQ(std::numeric_limits::min(), pool.max_threads()); -} - TEST(thread_pool_test, test_max_idle_mt) { // test max idle - irs::async_utils::thread_pool pool(0, 0); + irs::async_utils::ThreadPool<> pool(3); std::atomic count(0); std::mutex mutex1; std::mutex mutex2; @@ -276,12 +257,10 @@ TEST(thread_pool_test, test_max_idle_mt) { std::unique_lock lock2(mutex2); std::unique_lock start_lock(start_mutex); - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(3, pool.threads()); ASSERT_TRUE(pool.run(std::move(task1))); ASSERT_TRUE(pool.run(std::move(task2))); ASSERT_TRUE(pool.run(std::move(task3))); - pool.limits(3, 1); - ASSERT_EQ(std::make_pair(size_t(3), size_t(1)), pool.limits()); ASSERT_TRUE(start_count || std::cv_status::no_timeout == start_cond.wait_for(start_lock, 1000ms) || @@ -297,31 +276,14 @@ TEST(thread_pool_test, test_max_idle_mt) { std::this_thread::sleep_for(100ms); // assume threads start within 100msec } ASSERT_EQ(2, count); // 2 tasks complete - ASSERT_EQ(2, pool.threads()); + ASSERT_EQ(3, pool.threads()); lock2.unlock(); pool.stop(true); } -TEST(thread_pool_test, test_max_idle_delta_mt) { - // test max idle delta - irs::async_utils::thread_pool pool(10, 1); - - ASSERT_EQ(1, pool.max_idle()); - pool.max_idle_delta(1); - ASSERT_EQ(2, pool.max_idle()); - pool.max_idle_delta(-2); - ASSERT_EQ(0, pool.max_idle()); - pool.max_idle(std::numeric_limits::max()); - pool.max_idle_delta(1); - ASSERT_EQ(std::numeric_limits::max(), pool.max_idle()); - pool.max_idle(1); - pool.max_idle_delta(-2); - ASSERT_EQ(std::numeric_limits::min(), pool.max_idle()); -} - TEST(thread_pool_test, test_stop_run_pending_delay_mt) { // test stop run pending - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -377,7 +339,7 @@ TEST(thread_pool_test, test_stop_run_pending_delay_mt) { TEST(thread_pool_test, test_schedule_seq_mt) { // test stop run pending - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); size_t id(0); std::mutex mutex; std::condition_variable cond; @@ -457,7 +419,7 @@ TEST(thread_pool_test, test_stop_long_running_run_after_stop_mt) { ++count; }; - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); { auto lock = std::lock_guard(mtx); ASSERT_TRUE(pool.run(std::move(long_running_task))); @@ -474,7 +436,6 @@ TEST(thread_pool_test, test_stop_long_running_run_after_stop_mt) { ASSERT_EQ(1, pool.threads()); ASSERT_EQ(std::make_tuple(size_t(1), size_t(0), size_t(1)), pool.stats()); } - ASSERT_FALSE(pool.run(std::function())); pool.stop(); // blocking call (thread runtime duration simulated via sleep) ASSERT_EQ(1, count); ASSERT_FALSE(pool.run(std::move(func))); @@ -495,7 +456,7 @@ TEST(thread_pool_test, test_stop_long_ruuning_run_skip_pending_mt) { ++count; }; - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); { auto lock = std::unique_lock(mtx); ASSERT_TRUE(pool.run(std::move(long_running_task))); @@ -543,7 +504,7 @@ TEST(thread_pool_test, test_stop_long_ruuning_run_skip_pending_mt) { TEST(thread_pool_test, test_stop_run_pending_mt) { // test stop run pending - irs::async_utils::thread_pool pool(4, 2); + irs::async_utils::ThreadPool<> pool(4); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -595,8 +556,8 @@ TEST(thread_pool_test, test_stop_run_pending_mt) { } ASSERT_EQ(0, pool.tasks_active()); ASSERT_EQ(0, pool.tasks_pending()); - ASSERT_EQ(2, pool.threads()); - ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(2)), pool.stats()); + ASSERT_EQ(4, pool.threads()); + ASSERT_EQ(std::make_tuple(size_t(0), size_t(0), size_t(4)), pool.stats()); pool.stop(); // blocking call (thread runtime duration simulated via sleep) ASSERT_EQ(4, count); // all tasks ran ASSERT_EQ(0, pool.tasks_active()); @@ -607,7 +568,7 @@ TEST(thread_pool_test, test_stop_run_pending_mt) { TEST(thread_pool_test, test_stop_with_pending_task_mt) { // test stop run pending - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -679,7 +640,7 @@ TEST(thread_pool_test, test_stop_with_pending_task_mt) { TEST(thread_pool_test, test_abort_with_pending_task_mt) { // test stop run pending - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -752,7 +713,7 @@ TEST(thread_pool_test, test_abort_with_pending_task_mt) { TEST(thread_pool_test, test_stop_mt) { // test stop run pending - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -778,7 +739,7 @@ TEST(thread_pool_test, test_stop_mt) { TEST(thread_pool_test, test_stop_skip_pending_mt) { // test stop skip pending - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -807,7 +768,7 @@ TEST(thread_pool_test, test_stop_skip_pending_mt) { TEST(thread_pool_test, test_stop_run_mt) { // test pool stop + run - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::atomic count(0); std::mutex mutex; auto task1 = [&mutex, &count]() -> void { @@ -820,9 +781,8 @@ TEST(thread_pool_test, test_stop_run_mt) { }; std::unique_lock lock(mutex); - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(1, pool.threads()); ASSERT_TRUE(pool.run(std::move(task1))); - pool.max_threads(1); int tryCount{100}; while (tryCount-- && count.load(std::memory_order_relaxed) < 1) { std::this_thread::sleep_for(100ms); // assume threads start within 100msec @@ -836,7 +796,7 @@ TEST(thread_pool_test, test_stop_run_mt) { TEST(thread_pool_test, test_multiple_calls_stop_mt) { // test multiple calls to stop will all block - irs::async_utils::thread_pool pool(1, 0); + irs::async_utils::ThreadPool<> pool(1); std::condition_variable cond; std::mutex mutex; std::unique_lock lock(mutex); @@ -845,7 +805,7 @@ TEST(thread_pool_test, test_multiple_calls_stop_mt) { cond.notify_all(); }; - ASSERT_EQ(0, pool.threads()); + ASSERT_EQ(1, pool.threads()); ASSERT_TRUE(pool.run(std::move(task))); ASSERT_EQ(1, pool.threads()); @@ -883,7 +843,7 @@ TEST(thread_pool_test, test_multiple_calls_stop_mt) { TEST(thread_pool_test, test_stop_signle_threads_mt) { // test stop with a single thread will stop threads - irs::async_utils::thread_pool pool(1, 1); + irs::async_utils::ThreadPool<> pool(1); ASSERT_TRUE(pool.run([]() -> void {})); // start a single thread ASSERT_EQ(1, pool.threads()); @@ -897,7 +857,7 @@ TEST(thread_pool_test, test_check_name_mt) { // test stop with a single thread will stop threads const thread_name_t expected_name = IR_NATIVE_STRING("foo"); std::basic_string> actual_name; - irs::async_utils::thread_pool pool(1, 1, IR_NATIVE_STRING("foo")); + irs::async_utils::ThreadPool<> pool(1, IR_NATIVE_STRING("foo")); ASSERT_TRUE(pool.run([expected_name, &actual_name]() -> void { EXPECT_TRUE(irs::set_thread_name(expected_name)); diff --git a/utils/index-put.cpp b/utils/index-put.cpp index f18d0fb04..a83a0d36f 100644 --- a/utils/index-put.cpp +++ b/utils/index-put.cpp @@ -379,7 +379,7 @@ int put(const std::string& path, const std::string& dir_type, auto writer = irs::IndexWriter::Make(*dir, codec, irs::OM_CREATE, opts); - irs::async_utils::thread_pool thread_pool( + irs::async_utils::ThreadPool<> thread_pool( indexer_threads + consolidation_threads + 1); // +1 for commiter thread SCOPED_TIMER("Total Time"); diff --git a/utils/index-search.cpp b/utils/index-search.cpp index d7cd951f3..7161d1d47 100644 --- a/utils/index-search.cpp +++ b/utils/index-search.cpp @@ -548,7 +548,11 @@ int search(std::string_view path, std::string_view dir_type, irs::DirectoryReader reader; irs::Scorers order; - irs::async_utils::thread_pool thread_pool(search_threads); + irs::async_utils::ThreadPool<> thread_pool(search_threads); + irs::IndexReaderOptions options; + if (wand.Enabled()) { + options.scorers = {&score, 1}; + } { SCOPED_TIMER("Index read time");