Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
MBkkt committed Oct 30, 2023
1 parent f69a39d commit d930382
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 696 deletions.
389 changes: 71 additions & 318 deletions core/utils/async_utils.cpp

Large diffs are not rendered by default.

121 changes: 46 additions & 75 deletions core/utils/async_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <function2/function2.hpp>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>

Expand All @@ -35,8 +34,7 @@
#include "string.hpp"
#include "thread_utils.hpp"

namespace irs {
namespace async_utils {
namespace irs::async_utils {

//////////////////////////////////////////////////////////////////////////////
/// @brief spinlock implementation for Win32 since std::mutex cannot be used
Expand All @@ -52,86 +50,59 @@ class busywait_mutex final {
std::atomic<bool> locked_{false};
};

template<bool UsePriority = true>
class thread_pool {
template<bool UseDelay = true>
class ThreadPool {
public:
using native_char_t = std::remove_pointer_t<thread_name_t>;
using clock_t = std::chrono::steady_clock;
using func_t = fu2::unique_function<void()>;

explicit thread_pool(size_t max_threads = 0, size_t max_idle = 0,
basic_string_view<native_char_t> worker_name =
kEmptyStringView<native_char_t>);
~thread_pool();
size_t max_idle() const;
void max_idle(size_t value);
void max_idle_delta(int delta); // change value by delta
size_t max_threads() const;
void max_threads(size_t value);
void max_threads_delta(int delta); // change value by delta

// 1st - max_threads(), 2nd - max_idle()
std::pair<size_t, size_t> limits() const;
void limits(size_t max_threads, size_t max_idle);

bool run(func_t&& fn, [[maybe_unused]] clock_t::duration delay = {});
void stop(bool skip_pending = false); // always a blocking call
size_t tasks_active() const;
size_t tasks_pending() const;
size_t threads() const;
using Char = std::remove_pointer_t<thread_name_t>;
using Clock = std::chrono::steady_clock;
using Func = fu2::unique_function<void()>;

explicit ThreadPool(size_t threads, basic_string_view<Char> name = {});
~ThreadPool() { stop(true); }

bool run(Func&& fn, Clock::duration delay = {});
void stop(bool skip_pending = false) noexcept; // always a blocking call
size_t tasks_active() const {
std::lock_guard lock{m_};
return state_ / 2;
}
size_t tasks_pending() const {
std::lock_guard lock{m_};
return tasks_.size();
}
size_t threads() const {
std::lock_guard lock{m_};
return threads_.size();
}
// 1st - tasks active(), 2nd - tasks pending(), 3rd - threads()
std::tuple<size_t, size_t, size_t> stats() const;
std::tuple<size_t, size_t, size_t> stats() const {
std::lock_guard lock{m_};
return {state_ / 2, tasks_.size(), threads_.size()};
}

private:
enum class State { ABORT, FINISH, RUN };

auto& next() {
if constexpr (UsePriority) {
return queue_.top();
} else {
return queue_.front();
}
}
struct Task {
explicit Task(Func&& fn, Clock::time_point at)
: at{at}, fn{std::move(fn)} {}

template<typename T>
static func_t& func(T& t) {
if constexpr (UsePriority) {
return const_cast<func_t&>(t.fn);
} else {
return const_cast<func_t&>(t);
}
}
Clock::time_point at;
Func fn;

struct shared_state {
std::mutex lock;
std::condition_variable cond;
std::atomic<State> state{State::RUN};
bool operator<(const Task& rhs) const noexcept { return rhs.at < at; }
};

struct task {
explicit task(func_t&& fn, clock_t::time_point at)
: at(at), fn(std::move(fn)) {}
void Work();

clock_t::time_point at;
func_t fn;

bool operator<(const task& rhs) const noexcept { return rhs.at < at; }
};
bool WasStop() const { return state_ % 2 != 0; }

void worker(std::shared_ptr<shared_state> shared_state) noexcept;
void worker_impl(std::unique_lock<std::mutex>& lock,
std::shared_ptr<shared_state> shared_state);
bool maybe_spawn_worker();

std::shared_ptr<shared_state> shared_state_;
size_t active_{0};
std::atomic<size_t> threads_{0};
size_t max_idle_;
size_t max_threads_;
std::conditional_t<UsePriority, std::priority_queue<task>, std::queue<func_t>>
queue_;
basic_string<native_char_t> worker_name_;
basic_string<Char> name_;
std::vector<std::thread> threads_;
mutable std::mutex m_;
std::condition_variable cv_;
std::conditional_t<UseDelay, std::priority_queue<Task>, std::queue<Func>>
tasks_;
// stop flag and active tasks counter
uint64_t state_ = 0;
};

} // namespace async_utils
} // namespace irs
} // namespace irs::async_utils
8 changes: 4 additions & 4 deletions tests/index/index_profile_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class index_profile_test_case : public tests::index_test_base {
std::atomic<size_t> writer_import_count(0);
auto thread_count = (std::max)((size_t)1, num_insert_threads);
auto total_threads = thread_count + num_import_threads + num_update_threads;
irs::async_utils::thread_pool thread_pool(total_threads, total_threads);
irs::async_utils::ThreadPool<> thread_pool(total_threads);
std::mutex mutex;

if (!writer) {
Expand Down Expand Up @@ -473,7 +473,7 @@ class index_profile_test_case : public tests::index_test_base {
size_t cleanup_interval) {
auto* directory = &dir();
std::atomic<bool> working(true);
irs::async_utils::thread_pool thread_pool(1, 1);
irs::async_utils::ThreadPool<> thread_pool(1);

thread_pool.run([cleanup_interval, directory, &working]() -> void {
while (working.load()) {
Expand Down Expand Up @@ -502,7 +502,7 @@ class index_profile_test_case : public tests::index_test_base {
options.segment_count_max = 8; // match original implementation or may run
// out of file handles (e.g. MacOS/Travis)

irs::async_utils::thread_pool thread_pool(commit_threads, commit_threads);
irs::async_utils::ThreadPool<> thread_pool(commit_threads);
auto writer = open_writer(irs::OM_CREATE, options);

for (size_t i = 0; i < commit_threads; ++i) {
Expand Down Expand Up @@ -536,7 +536,7 @@ class index_profile_test_case : public tests::index_test_base {
irs::index_utils::MakePolicy(irs::index_utils::ConsolidateCount());
irs::IndexWriterOptions options;
std::atomic<bool> working(true);
irs::async_utils::thread_pool thread_pool(2, 2);
irs::async_utils::ThreadPool<> thread_pool(2);

options.segment_count_max = 8; // match original implementation or may run
// out of file handles (e.g. MacOS/Travis)
Expand Down
4 changes: 2 additions & 2 deletions tests/index/index_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ class index_test_case : public tests::index_test_base {

// validate terms async
{
irs::async_utils::thread_pool pool(thread_count, thread_count);
irs::async_utils::ThreadPool<> pool(thread_count);

{
std::lock_guard<std::mutex> lock(mutex);
Expand Down Expand Up @@ -583,7 +583,7 @@ class index_test_case : public tests::index_test_base {
ASSERT_EQ(expected_term_itrs[i]->value(), actual_term_itr->value());
}

irs::async_utils::thread_pool pool(thread_count, thread_count);
irs::async_utils::ThreadPool<> pool(thread_count);

{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
4 changes: 2 additions & 2 deletions tests/store/directory_test_case.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ TEST_P(directory_test_case, read_multiple_streams) {
auto in = dir_->open("test_async", irs::IOAdvice::NORMAL);
std::mutex in_mtx;
std::mutex mutex;
irs::async_utils::thread_pool pool(16, 16);
irs::async_utils::ThreadPool<> pool(16);

ASSERT_FALSE(!in);
in = in->reopen();
Expand All @@ -385,7 +385,7 @@ TEST_P(directory_test_case, read_multiple_streams) {
{
std::lock_guard<std::mutex> lock(mutex);

for (auto i = pool.max_threads(); i; --i) {
for (auto i = pool.threads(); i; --i) {
pool.run([&in, &in_mtx, &mutex]() -> void {
index_input::ptr input;

Expand Down
Loading

0 comments on commit d930382

Please sign in to comment.