Skip to content

Commit

Permalink
[fix] scanner hangs due to negative num_running_scanners
Browse files Browse the repository at this point in the history
Before the patch, num_running_scanners is increased after submitting,
then it may be decreased before increasing then negative values can
be seen by get_block_from_queue and a expected submit does not happend.
  • Loading branch information
dataroaring committed Nov 12, 2023
1 parent 12b2b0f commit 764e1a7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
37 changes: 31 additions & 6 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

namespace doris::vectorized {

using namespace std::chrono_literals;

ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
Expand Down Expand Up @@ -217,7 +219,14 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy running).
// At this point, consumers are required to trigger new scheduling to ensure that
// data can be continuously fetched.
int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
int32_t serving_blocks_num = _serving_blocks_num;
bool to_be_schedule = should_be_scheduled();
int num_running_scanners = _num_running_scanners;

bool is_scheduled = false;
if (should_be_scheduled() && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
Expand All @@ -235,7 +244,13 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
_blocks_queue_added_cv.wait(l);
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
<< ", num_running_scanners " << num_running_scanners
<< ", to_be_scheudle " << to_be_schedule << (void*)this;
}
_blocks_queue_added_cv.wait_for(l, 1s);
}
}

Expand Down Expand Up @@ -297,19 +312,25 @@ void ScannerContext::set_should_stop() {
_blocks_queue_added_cv.notify_one();
}

void ScannerContext::update_num_running(int32_t scanner_inc, int32_t sched_inc) {
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
}

void ScannerContext::dec_num_scheduling_ctx() {
std::lock_guard l(_transfer_lock);
_num_running_scanners += scanner_inc;
_num_scheduling_ctx += sched_inc;
_num_scheduling_ctx--;
if (_finish_dependency) {
if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
_finish_dependency->set_ready_to_finish();
} else {
_finish_dependency->block_finishing();
}
}
_blocks_queue_added_cv.notify_one();
_ctx_finish_cv.notify_one();

if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
_ctx_finish_cv.notify_one();
}
}

bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) {
Expand Down Expand Up @@ -514,6 +535,10 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
}
}
}

if (thread_slot_num == 0) {
LOG(INFO) << "thread num 0 " << (void*)this;
}
}

taskgroup::TaskGroup* ScannerContext::get_task_group() const {
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,12 @@ class ScannerContext {
// Return true if this ScannerContext need no more process
virtual bool done() { return _is_finished || _should_stop; }

// Update the running num of scanners and contexts
void update_num_running(int32_t scanner_inc, int32_t sched_inc);
void inc_num_running_scanners(int32_t scanner_inc);

int get_num_running_scanners() const { return _num_running_scanners; }

void dec_num_scheduling_ctx();

int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }

void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
watch.start();
ctx->incr_num_ctx_scheduling(1);
size_t size = 0;
Defer defer {[&]() { ctx->update_num_running(size, -1); }};
Defer defer {[&]() {
ctx->incr_num_scanner_scheduling(size);
ctx->dec_num_scheduling_ctx();
}};

if (ctx->done()) {
return;
Expand All @@ -221,12 +224,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
return;
}

ctx->inc_num_running_scanners(this_run.size());

// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
auto iter = this_run.begin();
auto submit_to_thread_pool = [&] {
ctx->incr_num_scanner_scheduling(this_run.size());
if (ctx->thread_token != nullptr) {
// TODO llj tg how to treat this?
while (iter != this_run.end()) {
Expand Down Expand Up @@ -504,4 +508,4 @@ void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
}

} // namespace doris::vectorized
} // namespace doris::vectorized

0 comments on commit 764e1a7

Please sign in to comment.