Skip to content

Commit

Permalink
[fix][common] Implement hybrid prior queue and simple queue for
Browse files Browse the repository at this point in the history
SimpleWorkerSet.

Signed-off-by: Ketor <d.ketor@gmail.com>
  • Loading branch information
ketor authored and rock-git committed Apr 2, 2024
1 parent 6b63e95 commit 1aedede
Show file tree
Hide file tree
Showing 22 changed files with 198 additions and 144 deletions.
1 change: 1 addition & 0 deletions conf/coordinator-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
-dingo_log_switch_coor_lease=true
-default_replica_num=3
-use_pthread_prior_worker_set=true
-use_prior_worker_set=false
1 change: 1 addition & 0 deletions conf/index-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
-enable_threads_service=false
-dingo_log_switch_txn_detail=true
-use_pthread_prior_worker_set=true
-use_prior_worker_set=false
1 change: 1 addition & 0 deletions conf/store-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@
-enable_threads_service=false
-dingo_log_switch_txn_detail=true
-use_pthread_prior_worker_set=true
-use_prior_worker_set=false
154 changes: 95 additions & 59 deletions src/common/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ std::vector<std::string> Worker::GetPendingTaskTrace() {
return traces;
}

WorkerSet::WorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count)
ExecqWorkerSet::ExecqWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count)
: name_(name),
worker_num_(worker_num),
max_pending_task_count_(max_pending_task_count),
active_worker_id_(0),
total_task_count_metrics_(fmt::format("dingo_worker_set_{}_total_task_count", name)),
pending_task_count_metrics_(fmt::format("dingo_worker_set_{}_pending_task_count", name)) {}

WorkerSet::~WorkerSet() = default;
ExecqWorkerSet::~ExecqWorkerSet() = default;

bool WorkerSet::Init() {
bool ExecqWorkerSet::Init() {
for (int i = 0; i < worker_num_; ++i) {
auto worker = Worker::New([this](WorkerEventType type) { WatchWorker(type); });
if (!worker->Init()) {
Expand All @@ -186,13 +186,13 @@ bool WorkerSet::Init() {
return true;
}

void WorkerSet::Destroy() {
void ExecqWorkerSet::Destroy() {
for (const auto& worker : workers_) {
worker->Destroy();
}
}

bool WorkerSet::ExecuteRR(TaskRunnablePtr task) {
bool ExecqWorkerSet::ExecuteRR(TaskRunnablePtr task) {
if (BAIDU_UNLIKELY(max_pending_task_count_ > 0 &&
pending_task_count_.load(std::memory_order_relaxed) > max_pending_task_count_)) {
DINGO_LOG(WARNING) << fmt::format("[execqueue] exceed max pending task limit, {}/{}",
Expand All @@ -209,7 +209,7 @@ bool WorkerSet::ExecuteRR(TaskRunnablePtr task) {
return ret;
}

bool WorkerSet::ExecuteLeastQueue(TaskRunnablePtr task) {
bool ExecqWorkerSet::ExecuteLeastQueue(TaskRunnablePtr task) {
if (BAIDU_UNLIKELY(max_pending_task_count_ > 0 &&
pending_task_count_.load(std::memory_order_relaxed) > max_pending_task_count_)) {
DINGO_LOG(WARNING) << fmt::format("[execqueue] exceed max pending task limit, {}/{}",
Expand All @@ -226,7 +226,7 @@ bool WorkerSet::ExecuteLeastQueue(TaskRunnablePtr task) {
return ret;
}

bool WorkerSet::ExecuteHashByRegionId(int64_t region_id, TaskRunnablePtr task) {
bool ExecqWorkerSet::ExecuteHashByRegionId(int64_t region_id, TaskRunnablePtr task) {
if (BAIDU_UNLIKELY(max_pending_task_count_ > 0 &&
pending_task_count_.load(std::memory_order_relaxed) > max_pending_task_count_)) {
DINGO_LOG(WARNING) << fmt::format("[execqueue] exceed max pending task limit, {}/{}",
Expand All @@ -243,13 +243,13 @@ bool WorkerSet::ExecuteHashByRegionId(int64_t region_id, TaskRunnablePtr task) {
return ret;
}

void WorkerSet::WatchWorker(WorkerEventType type) {
void ExecqWorkerSet::WatchWorker(WorkerEventType type) {
if (type == WorkerEventType::kFinishTask) {
DecPendingTaskCount();
}
}

uint32_t WorkerSet::LeastPendingTaskWorker() {
uint32_t ExecqWorkerSet::LeastPendingTaskWorker() {
uint32_t index = 0;
int32_t min_pending_count = INT32_MAX;
uint32_t worker_num = workers_.size();
Expand All @@ -265,23 +265,23 @@ uint32_t WorkerSet::LeastPendingTaskWorker() {
return index;
}

uint64_t WorkerSet::TotalTaskCount() { return total_task_count_metrics_.get_value(); }
uint64_t ExecqWorkerSet::TotalTaskCount() { return total_task_count_metrics_.get_value(); }

void WorkerSet::IncTotalTaskCount() { total_task_count_metrics_ << 1; }
void ExecqWorkerSet::IncTotalTaskCount() { total_task_count_metrics_ << 1; }

uint64_t WorkerSet::PendingTaskCount() { return pending_task_count_.load(std::memory_order_relaxed); }
uint64_t ExecqWorkerSet::PendingTaskCount() { return pending_task_count_.load(std::memory_order_relaxed); }

void WorkerSet::IncPendingTaskCount() {
void ExecqWorkerSet::IncPendingTaskCount() {
pending_task_count_metrics_ << 1;
pending_task_count_.fetch_add(1, std::memory_order_relaxed);
}

void WorkerSet::DecPendingTaskCount() {
void ExecqWorkerSet::DecPendingTaskCount() {
pending_task_count_metrics_ << -1;
pending_task_count_.fetch_sub(1, std::memory_order_relaxed);
}

std::vector<std::vector<std::string>> WorkerSet::GetPendingTaskTrace() {
std::vector<std::vector<std::string>> ExecqWorkerSet::GetPendingTaskTrace() {
std::vector<std::vector<std::string>> traces;

traces.reserve(workers_.size());
Expand All @@ -292,10 +292,12 @@ std::vector<std::vector<std::string>> WorkerSet::GetPendingTaskTrace() {
return traces;
}

PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread)
SimpleWorkerSet::SimpleWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count,
bool use_pthread, bool use_prior)
: name_(name),
worker_num_(worker_num),
use_pthread_(use_pthread),
use_prior_(use_prior),
max_pending_task_count_(max_pending_task_count),
total_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_total_task_count", name)),
pending_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_pending_task_count", name)),
Expand All @@ -305,43 +307,72 @@ PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t ma
bthread_cond_init(&cond_, nullptr);
}

PriorWorkerSet::~PriorWorkerSet() {
SimpleWorkerSet::~SimpleWorkerSet() {
bthread_cond_destroy(&cond_);
bthread_mutex_destroy(&mutex_);
}

bool PriorWorkerSet::Init() {
bool SimpleWorkerSet::Init() {
uint32_t i = 0;

auto worker_function = [this, &i]() {
if (use_pthread_) {
pthread_setname_np(pthread_self(), (name_ + ":" + std::to_string(i)).c_str());
}

while (true) {
bthread_mutex_lock(&mutex_);
while (pending_task_count_.load(std::memory_order_relaxed) == 0) {
bthread_cond_wait(&cond_, &mutex_);
}
if (!use_prior_) {
while (true) {
bthread_mutex_lock(&mutex_);
while (tasks_.empty()) {
bthread_cond_wait(&cond_, &mutex_);
}

// get task from task queue
TaskRunnablePtr task = nullptr;
int64_t now_time_us = 0;
if (!tasks_.empty()) {
task = tasks_.top();
tasks_.pop();
// get task from task queue
TaskRunnablePtr task = nullptr;
if (BAIDU_LIKELY(!tasks_.empty())) {
task = tasks_.front();
tasks_.pop();
}

now_time_us = Helper::TimestampUs();
queue_wait_metrics_ << now_time_us - task->CreateTimeUs();
}
bthread_mutex_unlock(&mutex_);

bthread_mutex_unlock(&mutex_);
if (BAIDU_UNLIKELY(task != nullptr)) {
int64_t now_time_us = Helper::TimestampUs();
queue_wait_metrics_ << now_time_us - task->CreateTimeUs();

if (BAIDU_UNLIKELY(task != nullptr)) {
task->Run();
queue_run_metrics_ << Helper::TimestampUs() - now_time_us;
DecPendingTaskCount();
Notify(WorkerEventType::kFinishTask);
task->Run();

queue_run_metrics_ << Helper::TimestampUs() - now_time_us;
DecPendingTaskCount();
Notify(WorkerEventType::kFinishTask);
}
}
} else {
while (true) {
bthread_mutex_lock(&mutex_);
while (pending_task_count_.load(std::memory_order_relaxed) == 0) {
bthread_cond_wait(&cond_, &mutex_);
}

// get task from task queue
TaskRunnablePtr task = nullptr;
if (BAIDU_LIKELY(!prior_tasks_.empty())) {
task = prior_tasks_.top();
prior_tasks_.pop();
}

bthread_mutex_unlock(&mutex_);

if (BAIDU_LIKELY(task != nullptr)) {
int64_t now_time_us = Helper::TimestampUs();
queue_wait_metrics_ << now_time_us - task->CreateTimeUs();

task->Run();

queue_run_metrics_ << Helper::TimestampUs() - now_time_us;
DecPendingTaskCount();
Notify(WorkerEventType::kFinishTask);
}
}
}
};
Expand All @@ -359,7 +390,7 @@ bool PriorWorkerSet::Init() {
return true;
}

void PriorWorkerSet::Destroy() {
void SimpleWorkerSet::Destroy() {
if (use_pthread_) {
for (auto& std_thread : pthread_workers_) {
std_thread.join();
Expand All @@ -371,61 +402,66 @@ void PriorWorkerSet::Destroy() {
}
}

bool PriorWorkerSet::Execute(TaskRunnablePtr task) {
auto pend_task_count = pending_task_count_.load(std::memory_order_relaxed);

if (BAIDU_UNLIKELY(max_pending_task_count_ > 0 && pend_task_count > max_pending_task_count_)) {
DINGO_LOG(WARNING) << fmt::format("[execqueue] exceed max pending task limit, {}/{}",
pending_task_count_.load(std::memory_order_relaxed), max_pending_task_count_);
return false;
bool SimpleWorkerSet::Execute(TaskRunnablePtr task) {
if (max_pending_task_count_ > 0) {
auto pend_task_count = pending_task_count_.load(std::memory_order_relaxed);
if (pend_task_count > max_pending_task_count_) {
DINGO_LOG(WARNING) << fmt::format("[execqueue] exceed max pending task limit, {}/{}", pend_task_count,
max_pending_task_count_);
return false;
}
}

IncPendingTaskCount();
IncTotalTaskCount();

bthread_mutex_lock(&mutex_);
tasks_.push(task);
bthread_cond_signal(&cond_);
if (!use_prior_) {
tasks_.push(task);
} else {
prior_tasks_.push(task);
}
bthread_cond_broadcast(&cond_);
bthread_mutex_unlock(&mutex_);

return true;
}

bool PriorWorkerSet::ExecuteRR(TaskRunnablePtr task) { return Execute(task); }
bool SimpleWorkerSet::ExecuteRR(TaskRunnablePtr task) { return Execute(task); }

bool PriorWorkerSet::ExecuteLeastQueue(TaskRunnablePtr task) { return Execute(task); }
bool SimpleWorkerSet::ExecuteLeastQueue(TaskRunnablePtr task) { return Execute(task); }

bool PriorWorkerSet::ExecuteHashByRegionId(int64_t /*region_id*/, TaskRunnablePtr task) { return Execute(task); }
bool SimpleWorkerSet::ExecuteHashByRegionId(int64_t /*region_id*/, TaskRunnablePtr task) { return Execute(task); }

void PriorWorkerSet::WatchWorker(WorkerEventType type) {
void SimpleWorkerSet::WatchWorker(WorkerEventType type) {
if (type == WorkerEventType::kFinishTask) {
DecPendingTaskCount();
}
}

uint64_t PriorWorkerSet::TotalTaskCount() { return total_task_count_metrics_.get_value(); }
uint64_t SimpleWorkerSet::TotalTaskCount() { return total_task_count_metrics_.get_value(); }

void PriorWorkerSet::IncTotalTaskCount() { total_task_count_metrics_ << 1; }
void SimpleWorkerSet::IncTotalTaskCount() { total_task_count_metrics_ << 1; }

uint64_t PriorWorkerSet::PendingTaskCount() { return pending_task_count_.load(std::memory_order_relaxed); }
uint64_t SimpleWorkerSet::PendingTaskCount() { return pending_task_count_.load(std::memory_order_relaxed); }

void PriorWorkerSet::IncPendingTaskCount() {
void SimpleWorkerSet::IncPendingTaskCount() {
pending_task_count_metrics_ << 1;
pending_task_count_.fetch_add(1, std::memory_order_relaxed);
}

void PriorWorkerSet::DecPendingTaskCount() {
void SimpleWorkerSet::DecPendingTaskCount() {
pending_task_count_metrics_ << -1;
pending_task_count_.fetch_sub(1, std::memory_order_relaxed);
}

std::vector<std::vector<std::string>> PriorWorkerSet::GetPendingTaskTrace() { // NOLINT
std::vector<std::vector<std::string>> SimpleWorkerSet::GetPendingTaskTrace() { // NOLINT
std::vector<std::vector<std::string>> traces;

return traces;
}

void PriorWorkerSet::Notify(WorkerEventType type) {
void SimpleWorkerSet::Notify(WorkerEventType type) {
if (notify_func_ != nullptr) {
notify_func_(type);
}
Expand Down
31 changes: 17 additions & 14 deletions src/common/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ class Worker {

using WorkerPtr = std::shared_ptr<Worker>;

class WorkerSet {
class ExecqWorkerSet {
public:
WorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count);
~WorkerSet();
ExecqWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count);
~ExecqWorkerSet();

static std::shared_ptr<WorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count) {
return std::make_shared<WorkerSet>(name, worker_num, max_pending_task_count);
static std::shared_ptr<ExecqWorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count) {
return std::make_shared<ExecqWorkerSet>(name, worker_num, max_pending_task_count);
}

bool Init();
Expand Down Expand Up @@ -168,16 +168,17 @@ class WorkerSet {
bvar::Adder<int64_t> pending_task_count_metrics_;
};

using WorkerSetPtr = std::shared_ptr<WorkerSet>;
using ExecqWorkerSetPtr = std::shared_ptr<ExecqWorkerSet>;

class PriorWorkerSet {
class SimpleWorkerSet {
public:
PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread);
~PriorWorkerSet();
SimpleWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread,
bool use_prior);
~SimpleWorkerSet();

static std::shared_ptr<PriorWorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count,
bool use_pthead) {
return std::make_shared<PriorWorkerSet>(name, worker_num, max_pending_task_count, use_pthead);
static std::shared_ptr<SimpleWorkerSet> New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count,
bool use_pthead, bool use_prior) {
return std::make_shared<SimpleWorkerSet>(name, worker_num, max_pending_task_count, use_pthead, use_prior);
}

bool Init();
Expand Down Expand Up @@ -206,9 +207,11 @@ class PriorWorkerSet {

bthread_mutex_t mutex_;
bthread_cond_t cond_;
std::priority_queue<TaskRunnablePtr, std::vector<TaskRunnablePtr>, CompareTaskRunnable> tasks_;
std::priority_queue<TaskRunnablePtr, std::vector<TaskRunnablePtr>, CompareTaskRunnable> prior_tasks_;
std::queue<TaskRunnablePtr> tasks_;

bool use_pthread_;
bool use_prior_;
std::vector<Bthread> bthread_workers_;
std::vector<std::thread> pthread_workers_;

Expand All @@ -227,7 +230,7 @@ class PriorWorkerSet {
bvar::LatencyRecorder queue_run_metrics_;
};

using PriorWorkerSetPtr = std::shared_ptr<PriorWorkerSet>;
using SimpleWorkerSetPtr = std::shared_ptr<SimpleWorkerSet>;

} // namespace dingodb

Expand Down
2 changes: 1 addition & 1 deletion src/common/synchronization.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ResourcePool {
BAIDU_SCOPED_LOCK(mutex_);
pool_.push(item);
(*pool_size_) << 1;
bthread_cond_signal(&cond_);
bthread_cond_broadcast(&cond_);
}

// Get a resource from the pool
Expand Down
Loading

0 comments on commit 1aedede

Please sign in to comment.