Skip to content

Commit

Permalink
opt code
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Sep 18, 2023
1 parent 3d9cf64 commit 8c75298
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 23 deletions.
5 changes: 0 additions & 5 deletions be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ class PipScannerContext : public vectorized::ScannerContext {
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}

bool should_be_scheduled() const override {
return (_current_used_bytes < _max_bytes_in_queue / 2 * _num_parallel_instances) &&
(_serving_blocks_num < allowed_blocks_num());
}

void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
Expand Down
18 changes: 6 additions & 12 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
limit(limit_),
_max_bytes_in_queue(max_bytes_in_blocks_queue_),
_max_bytes_in_queue(max_bytes_in_blocks_queue_ * num_parallel_instances),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
Expand All @@ -66,27 +66,21 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V
if (limit < 0) {
limit = -1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
// 1. Calculate max concurrency
// TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
if (_parent && _parent->_shared_scan_opt) {
DCHECK(_num_parallel_instances > 0);
_max_thread_num *= _num_parallel_instances;
}
_max_thread_num *= num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
(_local_state && _local_state->should_run_serial())) {
_max_thread_num = 1;
}
}

// After init function call, should not access _parent
Status ScannerContext::init() {
if (_parent) {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ScannerContext {
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 0,
int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1,
pipeline::ScanLocalStateBase* local_state = nullptr);

virtual ~ScannerContext() = default;
Expand Down Expand Up @@ -146,7 +146,7 @@ class ScannerContext {
virtual bool empty_in_queue(int id);

// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan
virtual inline bool should_be_scheduled() const {
inline bool should_be_scheduled() const {
return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
}
Expand Down
4 changes: 0 additions & 4 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
}

BlockUPtr block = ctx->get_free_block();
if (block == nullptr) {
LOG(INFO) << " get free block returns nullptr " << (void*)ctx;
break;
}

status = scanner->get_block(state, block.get(), &eos);
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos;
Expand Down

0 comments on commit 8c75298

Please sign in to comment.