Skip to content

Commit

Permalink
[improvement] add a lower bound for bytes in scanner queue
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Dec 23, 2023
1 parent e0bb7fa commit 9a5217b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ DECLARE_mInt32(doris_scanner_queue_size);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace doris {
namespace io {
struct IOContext;

static bvar::Status<int64_t> g_bytes_in_stream_load_pipe("doris_bytes_in_stream_load_pipe", 0);

StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size,
int64_t total_length, bool use_proto)
: _buffered_bytes(0),
Expand Down Expand Up @@ -191,6 +193,7 @@ Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_
row_ptr.release();
}
_put_cond.notify_one();
g_bytes_in_stream_load_pipe.set_value(_buffered_bytes + _proto_buffered_bytes);
return Status::OK();
}

Expand Down Expand Up @@ -220,6 +223,8 @@ Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size)
}
}
_get_cond.notify_one();
g_bytes_in_stream_load_pipe.set_value(_buffered_bytes + _proto_buffered_bytes);

return Status::OK();
}

Expand Down
26 changes: 21 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ namespace doris::vectorized {

using namespace std::chrono_literals;

static bvar::Status<int64_t> g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);

ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int num_parallel_instances,
Expand Down Expand Up @@ -169,6 +171,9 @@ Status ScannerContext::init() {
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
int min_blocks = (_estimated_block_bytes + config::min_bytes_in_scanner_queue - 1) /
config::min_bytes_in_scanner_queue;
_free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
return_free_block(std::move(block));

#ifndef BE_TEST
Expand Down Expand Up @@ -236,19 +241,28 @@ void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block)
}

void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) {
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;

int64_t old_bytes_in_queue = 0;

for (auto& b : blocks) {
auto st = validate_block_schema(b.get());
if (!st.ok()) {
set_status_on_error(st, false);
set_status_on_error(st, true);
}
}

{
std::lock_guard l(_transfer_lock);
old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
blocks.clear();
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}

bool ScannerContext::empty_in_queue(int id) {
Expand Down Expand Up @@ -333,6 +347,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
}

g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);

if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
Expand Down

0 comments on commit 9a5217b

Please sign in to comment.