Skip to content

Commit

Permalink
add worker-thread-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wuxianrong committed Nov 8, 2024
1 parent 4820646 commit 9721c9c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 40 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ set(ZSTD_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
ExternalProject_Add(fmt
DEPENDS
URL
https://github.com/fmtlib/fmt/archive/refs/tags/7.1.0.tar.gz
https://github.com/fmtlib/fmt/archive/refs/tags/10.2.1.tar.gz
URL_HASH
MD5=32af902636d373641f4ef9032fc65b3a
MD5=dc09168c94f90ea890257995f2c497a5
DOWNLOAD_NO_PROGRESS
1
UPDATE_COMMAND
Expand Down
43 changes: 25 additions & 18 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,58 @@ class ThreadPool : public pstd::noncopyable {
public:
class Worker {
public:
explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){};
explicit Worker(ThreadPool* tp, size_t size) : start_(false), thread_pool_(tp), should_stop_(false), max_queue_size_(size) {};
static void* WorkerMain(void* arg);

int start();
int stop();
void Schedule(TaskFunc func, void* arg);

private:
void runInThread();
size_t max_queue_size();
void cur_queue_size(size_t* qsize);
void cur_time_queue_size(size_t* qsize);
void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg);
bool should_stop();
void set_should_stop();

pthread_t thread_id_;
std::atomic<bool> start_;
ThreadPool* const thread_pool_;
std::string worker_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;
std::atomic<bool> should_stop_;
size_t max_queue_size_;
};

explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
virtual ~ThreadPool();

void Schedule(TaskFunc func, void* arg);
int start_thread_pool();
int stop_thread_pool();
bool should_stop();
void set_should_stop();

void Schedule(TaskFunc func, void* arg);
void DelaySchedule(uint64_t timeout, TaskFunc func, void* arg);
size_t max_queue_size();
size_t worker_size();
std::string thread_pool_name();
void cur_queue_size(size_t* qsize);
void cur_time_queue_size(size_t* qsize);
std::string thread_pool_name();

private:
void runInThread();

/*
* Here we used auto poll to find the next work thread,
* last_thread_ is the last work thread
*/
int last_thread_;
size_t worker_num_;
size_t max_queue_size_;

std::string thread_pool_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
std::vector<Worker*> workers_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;

pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;

};

} // namespace net
Expand Down
57 changes: 37 additions & 20 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace net {

void* ThreadPool::Worker::WorkerMain(void* arg) {
auto tp = static_cast<ThreadPool*>(arg);
auto tp = static_cast<Worker*>(arg);
tp->runInThread();
return nullptr;
}
Expand All @@ -33,6 +33,9 @@ int ThreadPool::Worker::start() {
}

int ThreadPool::Worker::stop() {
should_stop_.store(true);
rsignal_.notify_all();
wsignal_.notify_all();
if (start_.load()) {
if (pthread_join(thread_id_, nullptr) != 0) {
return -1;
Expand All @@ -43,20 +46,19 @@ int ThreadPool::Worker::stop() {
return 0;
}

ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: worker_num_(worker_num),
max_queue_size_(max_queue_size),
thread_pool_name_(std::move(thread_pool_name)),
running_(false),
should_stop_(false) {}
last_thread_(0) {}

ThreadPool::~ThreadPool() { stop_thread_pool(); }

int ThreadPool::start_thread_pool() {
if (!running_.load()) {
should_stop_.store(false);
for (size_t i = 0; i < worker_num_; ++i) {
workers_.push_back(new Worker(this));
workers_.push_back(new Worker(this, max_queue_size_));
int res = workers_[i]->start();
if (res != 0) {
return kCreateThreadError;
Expand All @@ -70,9 +72,6 @@ int ThreadPool::start_thread_pool() {
int ThreadPool::stop_thread_pool() {
int res = 0;
if (running_.load()) {
should_stop_.store(true);
rsignal_.notify_all();
wsignal_.notify_all();
for (const auto worker : workers_) {
res = worker->stop();
if (res != 0) {
Expand All @@ -87,24 +86,33 @@ int ThreadPool::stop_thread_pool() {
return res;
}

bool ThreadPool::should_stop() { return should_stop_.load(); }

void ThreadPool::set_should_stop() { should_stop_.store(true); }
bool ThreadPool::Worker::should_stop() { return should_stop_.load(); }

void ThreadPool::Schedule(TaskFunc func, void* arg) {
std::unique_lock lock(mu_);
wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); });
void ThreadPool::Worker::set_should_stop() { should_stop_.store(true); }

if (!should_stop()) {
void ThreadPool::Worker::Schedule(TaskFunc func, void* arg) {
std::lock_guard lock(mu_);
//wsignal_.wait(lock, [worker]() { return queue_.size() < thread_pool_->max_queue_size_ || thread_pool_->should_stop(); });
if (queue_.size() < max_queue_size_ && !should_stop()) {
queue_.emplace(func, arg);
rsignal_.notify_one();
}
}

void ThreadPool::Schedule(TaskFunc func, void* arg) {
int next_thread = last_thread_;
bool find = false;
for (int cnt = 0; cnt < worker_num_; cnt++) {
Worker* worker = workers_[next_thread];
worker->Schedule(func, arg);
next_thread = (next_thread + 1) % worker_num_;
}
}

/*
* timeout is in millisecond
*/
void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
void ThreadPool::Worker::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
auto now = std::chrono::system_clock::now();
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
uint64_t exec_time = unow + timeout * 1000;
Expand All @@ -116,21 +124,30 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
}
}

size_t ThreadPool::max_queue_size() { return max_queue_size_; }
size_t ThreadPool::Worker::max_queue_size() { return max_queue_size_; }

void ThreadPool::cur_queue_size(size_t* qsize) {
void ThreadPool::Worker::cur_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
*qsize = queue_.size();
}

void ThreadPool::cur_time_queue_size(size_t* qsize) {
void ThreadPool::Worker::cur_time_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
*qsize = time_queue_.size();
}

size_t ThreadPool::max_queue_size() { return max_queue_size_; }
void ThreadPool::cur_queue_size(size_t* qsize) {
*qsize = 10;
}

void ThreadPool::cur_time_queue_size(size_t* qsize) {
*qsize = 10;
}

std::string ThreadPool::thread_pool_name() { return thread_pool_name_; }

void ThreadPool::runInThread() {
void ThreadPool::Worker::runInThread() {
while (!should_stop()) {
std::unique_lock lock(mu_);
rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); });
Expand Down

0 comments on commit 9721c9c

Please sign in to comment.