diff --git a/.asf.yaml b/.asf.yaml index 7a7d845e4c9bb0..e3d516b35c19a5 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -63,7 +63,6 @@ github: - COMPILE (DORIS_COMPILE) - Need_2_Approval - Cloud UT (Doris Cloud UT) - - performance (Doris Performance) required_pull_request_reviews: dismiss_stale_reviews: true diff --git a/aazcp.tar.gz b/aazcp.tar.gz new file mode 100644 index 00000000000000..681acf72cde859 Binary files /dev/null and b/aazcp.tar.gz differ diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 7b688b7dcdf6ef..0cd5a3ee1ac748 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -17,6 +17,7 @@ #include "agent/workload_group_listener.h" +#include "runtime/exec_env.h" #include "runtime/workload_group/workload_group.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" diff --git a/be/src/agent/workload_group_listener.h b/be/src/agent/workload_group_listener.h index f596535908d079..9578a36f70d63e 100644 --- a/be/src/agent/workload_group_listener.h +++ b/be/src/agent/workload_group_listener.h @@ -20,10 +20,11 @@ #include #include "agent/topic_listener.h" -#include "runtime/exec_env.h" namespace doris { +class ExecEnv; + class WorkloadGroupListener : public TopicListener { public: ~WorkloadGroupListener() {} diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 6b4dbe360da651..1acf8efe32e62b 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() { // plus 1 to skip the delete version. // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. update_cumulative_point(); + if (!config::enable_sleep_between_delete_cumu_compaction) { + st = Status::Error( + "_last_delete_version.first not equal to -1"); + } } return st; } diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 1cc4d052a81d69..b086def3c03ee5 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -169,6 +169,15 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, start_resp.alter_version()); + std::vector cluster_key_idxes; + if (!_base_tablet_schema->cluster_key_uids().empty()) { + for (const auto& uid : _base_tablet_schema->cluster_key_uids()) { + cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid)); + } + reader_context.read_orderby_key_columns = &cluster_key_idxes; + reader_context.is_unique = false; + reader_context.sequence_id_idx = -1; + } for (auto& split : rs_splits) { RETURN_IF_ERROR(split.rs_reader->init(&reader_context)); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index b66a9cfbdb2245..650909a29157cd 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -677,7 +677,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto st = compaction->prepare_compact(); if (!st.ok()) { long now = duration_cast(system_clock::now().time_since_epoch()).count(); - if (st.is()) { + if (st.is() && + st.msg() != "_last_delete_version.first not equal to -1") { // Backoff strategy if no suitable version tablet->last_cumu_no_suitable_version_ms = now; } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c3d00e23c98e5e..95a3e61fb5517a 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -251,7 +251,7 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50"); // download low speed time(seconds) DEFINE_mInt32(download_low_speed_time, "300"); // whether to download small files in batch -DEFINE_mBool(enable_batch_download, "false"); +DEFINE_mBool(enable_batch_download, "true"); DEFINE_String(sys_log_dir, ""); DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf"); @@ -1211,7 +1211,7 @@ DEFINE_Bool(exit_on_exception, "false"); DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup -DEFINE_mString(doris_cgroup_cpu_path, ""); +DEFINE_String(doris_cgroup_cpu_path, ""); DEFINE_mBool(enable_be_proc_monitor, "false"); DEFINE_mInt32(be_proc_monitor_interval_ms, "10000"); @@ -1402,6 +1402,7 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); // Enable validation to check the correctness of table size. DEFINE_Bool(enable_table_size_correctness_check, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); +DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false"); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index c0b2e19b49a6be..f8a9c3f7480b33 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1292,7 +1292,7 @@ DECLARE_mInt32(tablet_schema_cache_capacity); DECLARE_mBool(exit_on_exception); // cgroup -DECLARE_mString(doris_cgroup_cpu_path); +DECLARE_String(doris_cgroup_cpu_path); DECLARE_mBool(enable_be_proc_monitor); DECLARE_mInt32(be_proc_monitor_interval_ms); DECLARE_Int32(workload_group_metrics_interval_ms); @@ -1487,6 +1487,8 @@ DECLARE_Bool(force_regenerate_rowsetid_on_start_error); DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); // Enable validation to check the correctness of table size. DECLARE_Bool(enable_table_size_correctness_check); +// Enable sleep 5s between delete cumulative compaction. +DECLARE_mBool(enable_sleep_between_delete_cumu_compaction); #ifdef BE_TEST // test s3 diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 73035ecf3957eb..12bf1749a5694d 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -437,6 +437,8 @@ void Daemon::calculate_metrics_thread() { // update lst map DorisMetrics::instance()->system_metrics()->get_network_traffic( &lst_net_send_bytes, &lst_net_receive_bytes); + + DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num(); } update_rowsets_and_segments_num_metrics(); } diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 9365bb00288db1..5da2e6acbb9bdf 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -492,15 +492,15 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t auto* output_ptr = output; while (input_len > 0) { - //if faild , fall back to large block begin - auto* large_block_input_ptr = input_ptr; - auto* large_block_output_ptr = output_ptr; - if (input_len < sizeof(uint32_t)) { - return Status::InvalidArgument(strings::Substitute( - "fail to do hadoop-lz4 decompress, input_len=$0", input_len)); + *more_input_bytes = sizeof(uint32_t) - input_len; + break; } + //if faild, fall back to large block begin + auto* large_block_input_ptr = input_ptr; + auto* large_block_output_ptr = output_ptr; + uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr); input_ptr += sizeof(uint32_t); @@ -609,15 +609,15 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, auto* output_ptr = output; while (input_len > 0) { - //if faild , fall back to large block begin - auto* large_block_input_ptr = input_ptr; - auto* large_block_output_ptr = output_ptr; - if (input_len < sizeof(uint32_t)) { - return Status::InvalidArgument(strings::Substitute( - "fail to do hadoop-snappy decompress, input_len=$0", input_len)); + *more_input_bytes = sizeof(uint32_t) - input_len; + break; } + //if faild, fall back to large block begin + auto* large_block_input_ptr = input_ptr; + auto* large_block_output_ptr = output_ptr; + uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr); input_ptr += sizeof(uint32_t); diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index 372f840401c4ad..4bb8b8b70dd722 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -144,8 +144,19 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req, const auto& rowset_id = get_http_param(req, kRowsetIdParameter); const auto& segment_index = get_http_param(req, kSegmentIndexParameter); const auto& segment_index_id = req->param(kSegmentIndexIdParameter); - segment_index_file_path = - tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id); + auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index); + if (tablet->tablet_schema()->get_inverted_index_storage_format() == + InvertedIndexStorageFormatPB::V1) { + // now CCR not support for variant + index v1 + constexpr std::string_view index_suffix = ""; + segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path), + std::stoll(segment_index_id), index_suffix); + } else { + DCHECK(segment_index_id == "-1"); + segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); + } is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty(); } catch (const std::exception& e) { HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 7e71f3eb910053..e8db5cb542fb4b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -145,7 +145,8 @@ void StreamLoadAction::handle(HttpRequest* req) { << ctx->commit_and_publish_txn_cost_nanos / 1000000 << ", number_total_rows=" << ctx->number_total_rows << ", number_loaded_rows=" << ctx->number_loaded_rows - << ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes; + << ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes + << ", error_url=" << ctx->error_url; // update statistics streaming_load_requests_total->increment(1); diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index 04e1121cab63ba..b26ba67b54625c 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -146,7 +146,7 @@ void memory_info_handler(std::stringstream* output) { auto* _opaque = static_cast(opaque); _opaque->append(buf); }; - jemalloc_stats_print(write_cb, &tmp, "a"); + malloc_stats_print(write_cb, &tmp, "a"); boost::replace_all(tmp, "\n", "
"); (*output) << tmp; #else diff --git a/be/src/io/cache/block_file_cache_profile.h b/be/src/io/cache/block_file_cache_profile.h index 19d7f4139f7f15..54118d5094cd52 100644 --- a/be/src/io/cache/block_file_cache_profile.h +++ b/be/src/io/cache/block_file_cache_profile.h @@ -75,6 +75,7 @@ struct FileCacheProfile { struct FileCacheProfileReporter { RuntimeProfile::Counter* num_local_io_total = nullptr; RuntimeProfile::Counter* num_remote_io_total = nullptr; + RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr; RuntimeProfile::Counter* local_io_timer = nullptr; RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr; RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr; @@ -90,6 +91,8 @@ struct FileCacheProfileReporter { cache_profile, 1); num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT, cache_profile, 1); + num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL( + profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1); local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1); remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1); write_cache_io_timer = @@ -107,6 +110,8 @@ struct FileCacheProfileReporter { void update(const FileCacheStatistics* statistics) const { COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total); COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total); + COUNTER_UPDATE(num_inverted_index_remote_io_total, + statistics->num_inverted_index_remote_io_total); COUNTER_UPDATE(local_io_timer, statistics->local_io_timer); COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer); COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer); diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index c9a273c5d368a6..f16e0019b6dcc5 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -126,7 +126,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* ReadStatistics stats; auto defer_func = [&](int*) { if (io_ctx->file_cache_stats) { - _update_state(stats, io_ctx->file_cache_stats); + _update_state(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index); io::FileCacheProfile::instance().update(io_ctx->file_cache_stats); } }; @@ -312,7 +312,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* } void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, - FileCacheStatistics* statis) const { + FileCacheStatistics* statis, + bool is_inverted_index) const { if (statis == nullptr) { return; } @@ -320,6 +321,9 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, statis->num_local_io_total++; statis->bytes_read_from_local += read_stats.bytes_read; } else { + if (is_inverted_index) { + statis->num_inverted_index_remote_io_total++; + } statis->num_remote_io_total++; statis->bytes_read_from_remote += read_stats.bytes_read; } diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index b3efb83c0803c8..685414cfa3aba6 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -76,7 +76,8 @@ class CachedRemoteFileReader final : public FileReader { int64_t local_read_timer = 0; int64_t local_write_timer = 0; }; - void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const; + void _update_state(const ReadStatistics& stats, FileCacheStatistics* state, + bool is_inverted_index) const; }; } // namespace doris::io diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp index 6552d454824796..e9bed7f5887dc3 100644 --- a/be/src/io/fs/err_utils.cpp +++ b/be/src/io/fs/err_utils.cpp @@ -122,13 +122,13 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) { using namespace Aws::Http; switch (err.GetResponseCode()) { case HttpResponseCode::NOT_FOUND: - return Status::Error("{}: {} {} type={}, request_id={}", msg, - err.GetExceptionName(), err.GetMessage(), + return Status::Error("{}: {} {} code=NOT_FOUND, type={}, request_id={}", + msg, err.GetExceptionName(), err.GetMessage(), err.GetErrorType(), err.GetRequestId()); case HttpResponseCode::FORBIDDEN: - return Status::Error("{}: {} {} type={}, request_id={}", msg, - err.GetExceptionName(), err.GetMessage(), - err.GetErrorType(), err.GetRequestId()); + return Status::Error( + "{}: {} {} code=FORBIDDEN, type={}, request_id={}", msg, err.GetExceptionName(), + err.GetMessage(), err.GetErrorType(), err.GetRequestId()); default: return Status::Error( "{}: {} {} code={} type={}, request_id={}", msg, err.GetExceptionName(), diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 80a594473dc376..4acc0538b7ef4f 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -38,6 +38,7 @@ namespace io { struct FileCacheStatistics { int64_t num_local_io_total = 0; int64_t num_remote_io_total = 0; + int64_t num_inverted_index_remote_io_total = 0; int64_t local_io_timer = 0; int64_t bytes_read_from_local = 0; int64_t bytes_read_from_remote = 0; @@ -60,6 +61,7 @@ struct IOContext { int64_t expiration_time = 0; const TUniqueId* query_id = nullptr; // Ref FileCacheStatistics* file_cache_stats = nullptr; // Ref + bool is_inverted_index = false; }; } // namespace io diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 7b2bb5d4ee3cfb..aec38699e014a2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -495,10 +495,35 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { Status Compaction::do_inverted_index_compaction() { const auto& ctx = _output_rs_writer->context(); if (!config::inverted_index_compaction_enable || _input_row_num <= 0 || - !_stats.rowid_conversion || ctx.columns_to_do_index_compaction.empty()) { + ctx.columns_to_do_index_compaction.empty()) { return Status::OK(); } + auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { + LOG(WARNING) << "failed to do index compaction" + << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id + << ". index_id=" << index_id; + for (auto& rowset : _input_rowsets) { + rowset->set_skip_index_compaction(column_uniq_id); + LOG(INFO) << "mark skipping inverted index compaction next time" + << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() + << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; + } + }; + + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_rowid_conversion_null", + { _stats.rowid_conversion = nullptr; }) + if (!_stats.rowid_conversion) { + LOG(WARNING) << "failed to do index compaction, rowid conversion is null" + << ". tablet=" << _tablet->tablet_id() + << ", input row number=" << _input_row_num; + mark_skip_index_compaction(ctx, error_handler); + + return Status::Error( + "failed to do index compaction, rowid conversion is null. tablet={}", + _tablet->tablet_id()); + } + OlapStopWatch inverted_watch; // translation vec @@ -521,8 +546,7 @@ Status Compaction::do_inverted_index_compaction() { auto src_segment_num = src_seg_to_id_map.size(); auto dest_segment_num = dest_segment_num_rows.size(); - DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", - { dest_segment_num = 0; }) + // when all the input rowsets are deleted, the output rowset will be empty and dest_segment_num will be 0. if (dest_segment_num <= 0) { LOG(INFO) << "skip doing index compaction due to no output segments" << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num @@ -600,27 +624,62 @@ Status Compaction::do_inverted_index_compaction() { DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error", { find_it = rs_id_to_rowset_map.end(); }) if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] { - // DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id; - return Status::InternalError("cannot find rowset. tablet_id={} rowset_id={}", - _tablet->tablet_id(), rowset_id.to_string()); + LOG(WARNING) << "failed to do index compaction, cannot find rowset. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "failed to do index compaction, cannot find rowset. tablet_id={} rowset_id={}", + _tablet->tablet_id(), rowset_id.to_string()); } auto* rowset = find_it->second; auto fs = rowset->rowset_meta()->fs(); DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs = nullptr; }) if (!fs) { - return Status::InternalError("get fs failed, resource_id={}", - rowset->rowset_meta()->resource_id()); + LOG(WARNING) << "failed to do index compaction, get fs failed. resource_id=" + << rowset->rowset_meta()->resource_id(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "get fs failed, resource_id={}", rowset->rowset_meta()->resource_id()); } - auto seg_path = DORIS_TRY(rowset->segment_path(seg_id)); + auto seg_path = rowset->segment_path(seg_id); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_seg_path_nullptr", { + seg_path = ResultError(Status::Error( + "do_inverted_index_compaction_seg_path_nullptr")); + }) + if (!seg_path.has_value()) { + LOG(WARNING) << "failed to do index compaction, get segment path failed. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string() + << " seg_id=" << seg_id; + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "get segment path failed. tablet_id={} rowset_id={} seg_id={}", + _tablet->tablet_id(), rowset_id.to_string(), seg_id); + } auto inverted_index_file_reader = std::make_unique( - fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, + fs, + std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(seg_id)); - RETURN_NOT_OK_STATUS_WITH_WARN( - inverted_index_file_reader->init(config::inverted_index_read_buffer_size), - "inverted_index_file_reader init faiqled"); + auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader", + { + st = Status::Error( + "debug point: " + "Compaction::do_inverted_index_compaction_init_inverted_index_" + "file_reader error"); + }) + if (!st.ok()) { + LOG(WARNING) << "failed to do index compaction, init inverted index file reader " + "failed. tablet_id=" + << _tablet->tablet_id() << " rowset_id=" << rowset_id.to_string() + << " seg_id=" << seg_id; + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "init inverted index file reader failed. tablet_id={} rowset_id={} seg_id={}", + _tablet->tablet_id(), rowset_id.to_string(), seg_id); + } inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader); } @@ -628,7 +687,20 @@ Status Compaction::do_inverted_index_compaction() { // format: rowsetId_segmentId auto& inverted_index_file_writers = dynamic_cast(_output_rs_writer.get()) ->inverted_index_file_writers(); - DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num); + DBUG_EXECUTE_IF( + "Compaction::do_inverted_index_compaction_inverted_index_file_writers_size_error", + { inverted_index_file_writers.clear(); }) + if (inverted_index_file_writers.size() != dest_segment_num) { + LOG(WARNING) << "failed to do index compaction, dest segment num not match. tablet_id=" + << _tablet->tablet_id() << " dest_segment_num=" << dest_segment_num + << " inverted_index_file_writers.size()=" + << inverted_index_file_writers.size(); + mark_skip_index_compaction(ctx, error_handler); + return Status::Error( + "dest segment num not match. tablet_id={} dest_segment_num={} " + "inverted_index_file_writers.size()={}", + _tablet->tablet_id(), dest_segment_num, inverted_index_file_writers.size()); + } // use tmp file dir to store index files auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); @@ -637,18 +709,6 @@ Status Compaction::do_inverted_index_compaction() { << ". tablet=" << _tablet->tablet_id() << ", source index size=" << src_segment_num << ", destination index size=" << dest_segment_num << "."; - auto error_handler = [this](int64_t index_id, int64_t column_uniq_id) { - LOG(WARNING) << "failed to do index compaction" - << ". tablet=" << _tablet->tablet_id() << ". column uniq id=" << column_uniq_id - << ". index_id=" << index_id; - for (auto& rowset : _input_rowsets) { - rowset->set_skip_index_compaction(column_uniq_id); - LOG(INFO) << "mark skipping inverted index compaction next time" - << ". tablet=" << _tablet->tablet_id() << ", rowset=" << rowset->rowset_id() - << ", column uniq id=" << column_uniq_id << ", index_id=" << index_id; - } - }; - Status status = Status::OK(); for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) { auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); @@ -658,6 +718,10 @@ Status Compaction::do_inverted_index_compaction() { if (index_meta == nullptr) { status = Status::Error( fmt::format("Can not find index_meta for col {}", col.name())); + LOG(WARNING) << "failed to do index compaction, can not find index_meta for column" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id; + error_handler(-1, column_uniq_id); break; } @@ -671,6 +735,11 @@ Status Compaction::do_inverted_index_compaction() { "debug point: Compaction::open_index_file_reader error")); }) if (!res.has_value()) { + LOG(WARNING) << "failed to do index compaction, open inverted index file " + "reader failed" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id + << ", src_segment_id=" << src_segment_id; throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg()); } src_idx_dirs[src_segment_id] = std::move(res.value()); @@ -682,6 +751,11 @@ Status Compaction::do_inverted_index_compaction() { "debug point: Compaction::open_inverted_index_file_writer error")); }) if (!res.has_value()) { + LOG(WARNING) << "failed to do index compaction, open inverted index file " + "writer failed" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id + << ", dest_segment_id=" << dest_segment_id; throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg()); } // Destination directories in dest_index_dirs do not need to be deconstructed, @@ -714,6 +788,23 @@ Status Compaction::do_inverted_index_compaction() { return Status::OK(); } +void Compaction::mark_skip_index_compaction( + const RowsetWriterContext& context, + const std::function& error_handler) { + for (auto&& column_uniq_id : context.columns_to_do_index_compaction) { + auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); + const auto* index_meta = _cur_tablet_schema->inverted_index(col); + if (index_meta == nullptr) { + LOG(WARNING) << "mark skip index compaction, can not find index_meta for column" + << ". tablet=" << _tablet->tablet_id() + << ", column uniq id=" << column_uniq_id; + error_handler(-1, column_uniq_id); + continue; + } + error_handler(index_meta->index_id(), column_uniq_id); + } +} + void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { for (const auto& index : _cur_tablet_schema->inverted_indexes()) { auto col_unique_ids = index->col_unique_ids(); @@ -789,7 +880,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { // TODO: inverted_index_path auto seg_path = rowset->segment_path(i); DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", { - seg_path = ResultError(Status::Error("error")); + seg_path = ResultError(Status::Error( + "construct_skip_inverted_index_seg_path_nullptr")); }) if (!seg_path) { LOG(WARNING) << seg_path.error(); @@ -800,8 +892,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { try { auto inverted_index_file_reader = std::make_unique( fs, - std::string { - InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)}, + std::string {InvertedIndexDescriptor::get_index_file_path_prefix( + seg_path.value())}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(i)); auto st = inverted_index_file_reader->init( @@ -1013,8 +1105,31 @@ Status CompactionMixin::modify_rowsets() { if (!_tablet->tablet_meta()->tablet_schema()->cluster_key_uids().empty()) { merged_missed_rows_size += _stats.filtered_rows; } + + // Suppose a heavy schema change process on BE converting tablet A to tablet B. + // 1. during schema change double write, new loads write [X-Y] on tablet B. + // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are picked for cumu compaction(X<=aget_header_lock()); + need_to_check_missed_rows = + std::all_of(_input_rowsets.begin(), _input_rowsets.end(), + [&](const RowsetSharedPtr& rowset) { + return tablet()->rowset_exists_unlocked(rowset); + }); + } + if (_tablet->tablet_state() == TABLET_RUNNING && - merged_missed_rows_size != missed_rows_size) { + merged_missed_rows_size != missed_rows_size && need_to_check_missed_rows) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << _stats.merged_rows << "), filtered rows(" << _stats.filtered_rows diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index ccabf7dadb4733..057f4084b068b3 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -70,6 +70,10 @@ class Compaction { // merge inverted index files Status do_inverted_index_compaction(); + // mark all columns in columns_to_do_index_compaction to skip index compaction next time. + void mark_skip_index_compaction(const RowsetWriterContext& context, + const std::function& error_handler); + void construct_index_compaction_columns(RowsetWriterContext& ctx); virtual Status construct_output_rowset_writer(RowsetWriterContext& ctx) = 0; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 2dfd30fb86ed9a..a9509a005763f6 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -100,6 +100,20 @@ Status CumulativeCompaction::prepare_compact() { } Status CumulativeCompaction::execute_compact() { + DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == _tablet->tablet_id()) { + LOG(INFO) << "start debug block " + << "CumulativeCompaction::execute_compact.block"; + while (DebugPoints::instance()->is_enable( + "CumulativeCompaction::execute_compact.block")) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + LOG(INFO) << "end debug block " + << "CumulativeCompaction::execute_compact.block"; + } + }) + std::unique_lock lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock); if (!lock.owns_lock()) { return Status::Error( diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index ee7a2b1812a0ae..c812a12b656580 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -28,6 +28,7 @@ #include "olap/olap_common.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" +#include "util/debug_points.h" namespace doris { @@ -246,6 +247,21 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { + DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { + auto target_tablet_id = dp->param("tablet_id", -1); + if (target_tablet_id == tablet->tablet_id()) { + auto start_version = dp->param("start_version", -1); + auto end_version = dp->param("end_version", -1); + for (auto& rowset : candidate_rowsets) { + if (rowset->start_version() >= start_version && + rowset->end_version() <= end_version) { + input_rowsets->push_back(rowset); + } + } + } + return input_rowsets->size(); + }) + size_t promotion_size = tablet->cumulative_promotion_size(); auto max_version = tablet->max_version().first; int transient_size = 0; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 765f67a07c7884..f8cc79b205535f 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -34,6 +34,7 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/aggregate_functions/aggregate_function_reader.h" @@ -589,6 +590,7 @@ void MemTable::shrink_memtable_by_agg() { } bool MemTable::need_flush() const { + DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); auto max_size = config::write_buffer_size; if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { auto update_columns_size = _num_columns; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index f988c46c027c26..dcbdca921ab8e8 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -79,7 +79,16 @@ Status compact_column(int64_t index_id, // delete temporary segment_path, only when inverted_index_ram_dir_enable is false if (!config::inverted_index_ram_dir_enable) { - std::ignore = io::global_local_filesystem()->delete_directory(tmp_path.data()); + auto st = io::global_local_filesystem()->delete_directory(tmp_path.data()); + DBUG_EXECUTE_IF("compact_column_delete_tmp_path_error", { + st = Status::Error( + "debug point: compact_column_delete_tmp_path_error in index compaction"); + }) + if (!st.ok()) { + LOG(WARNING) << "compact column failed to delete tmp path: " << tmp_path + << ", error: " << st.to_string(); + return st; + } } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 60006ea84550a2..f1b2b0eaedd4fd 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -96,12 +96,19 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) { if (start + len > _length) { _CLTHROWA(CL_ERR_IO, "read past EOF"); } - base->setIoContext(_io_ctx); + + if (_io_ctx) { + base->setIoContext(_io_ctx); + } + base->setIndexFile(_is_index_file); base->seek(fileOffset + start); bool read_from_buffer = true; base->readBytes(b, len, read_from_buffer); - base->setIoContext(nullptr); + + if (_io_ctx) { + base->setIoContext(nullptr); + } } CSIndexInput::~CSIndexInput() = default; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index 8d480829a0cd37..813a78f2a3fa86 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -27,21 +27,27 @@ namespace doris::segment_v2 { -Status InvertedIndexFileReader::init(int32_t read_buffer_size) { +Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) { if (!_inited) { _read_buffer_size = read_buffer_size; if (_storage_format >= InvertedIndexStorageFormatPB::V2) { - auto st = _init_from(read_buffer_size); + auto st = _init_from(read_buffer_size, io_ctx); if (!st.ok()) { return st; } } _inited = true; + } else { + if (_storage_format == InvertedIndexStorageFormatPB::V2) { + if (_stream) { + _stream->setIoContext(io_ctx); + } + } } return Status::OK(); } -Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size) { +Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) { auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix); std::unique_lock lock(_mutex); // Lock for writing @@ -76,6 +82,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size) { err.what()); } _stream = std::unique_ptr(index_input); + _stream->setIoContext(io_ctx); // 3. read file int32_t version = _stream->readInt(); // Read version number diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h index 443d40cfaf0d4f..ed6ee85e7d7bf1 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h @@ -58,7 +58,8 @@ class InvertedIndexFileReader { _storage_format(storage_format), _idx_file_info(idx_file_info) {} - Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size); + Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, + const io::IOContext* io_ctx = nullptr); Result> open(const TabletIndex* index_meta) const; void debug_file_entries(); std::string get_index_file_cache_key(const TabletIndex* index_meta) const; @@ -70,7 +71,7 @@ class InvertedIndexFileReader { int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 : _stream->length(); } private: - Status _init_from(int32_t read_buffer_size); + Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx); Result> _open(int64_t index_id, const std::string& index_suffix) const; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index dde436054cd35b..41d9fb48356299 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -180,6 +180,7 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput : BufferedIndexInput(buffer_size) { this->_pos = 0; this->_handle = std::move(handle); + _io_ctx.is_inverted_index = true; } protected: diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 889fee1fc87ef9..b40f9121125207 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -121,7 +121,8 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx, if (!dir) { // TODO: ugly code here, try to refact. - auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size); + auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size, + io_ctx); if (!st.ok()) { LOG(WARNING) << st; return st; @@ -137,7 +138,6 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx, InvertedIndexDescriptor::get_temporary_null_bitmap_file_name(); if (dir->fileExists(null_bitmap_file_name)) { null_bitmap_in = dir->openInput(null_bitmap_file_name); - null_bitmap_in->setIoContext(io_ctx); size_t null_bitmap_size = null_bitmap_in->length(); faststring buf; buf.resize(null_bitmap_size); @@ -180,7 +180,8 @@ Status InvertedIndexReader::handle_searcher_cache( SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer); IndexSearcherPtr searcher; - auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size); + auto st = + _inverted_index_file_reader->init(config::inverted_index_read_buffer_size, io_ctx); if (!st.ok()) { LOG(WARNING) << st; return st; @@ -211,6 +212,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir, auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(dir)); *searcher = searcher_result; + // When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries. + static_cast(dir)->getDorisIndexInput()->setIoContext(nullptr); + // NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand. mem_tracker->consume(index_searcher_builder->get_reader_size()); return Status::OK(); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index cdb637b1c42647..7f947612eed4ac 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -866,6 +866,9 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques for (int i = 0; i < num_cols; ++i) { return_columns[i] = i; } + std::vector cluster_key_idxes; + + DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK); // begin to find deltas to convert from base tablet to new tablet so that // obtain base tablet and new tablet's push lock and header write lock to prevent loading data @@ -980,6 +983,14 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap(); reader_context.version = Version(0, end_version); + if (!_base_tablet_schema->cluster_key_uids().empty()) { + for (const auto& uid : _base_tablet_schema->cluster_key_uids()) { + cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid)); + } + reader_context.read_orderby_key_columns = &cluster_key_idxes; + reader_context.is_unique = false; + reader_context.sequence_id_idx = -1; + } for (auto& rs_split : rs_splits) { res = rs_split.rs_reader->init(&reader_context); if (!res) { diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 67205835b53947..8202feb68c65b5 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -700,8 +700,10 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet InvertedIndexStorageFormatPB::V1) { for (const auto& index : tablet_schema.inverted_indexes()) { auto index_id = index->index_id(); - auto index_file = ref_tablet->get_segment_index_filepath( - rowset_id, segment_index, index_id); + auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( + InvertedIndexDescriptor::get_index_file_path_prefix( + segment_file_path), + index_id, index->get_index_suffix()); auto snapshot_segment_index_file_path = fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id, segment_index, index_id); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 644ca9133eb885..c7919b3f8dca24 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -512,6 +512,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) { return Status::OK(); } +bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) { + if (auto it = _rs_version_map.find(rowset->version()); it == _rs_version_map.end()) { + return false; + } else if (rowset->rowset_id() != it->second->rowset_id()) { + return false; + } + return true; +} + Status Tablet::modify_rowsets(std::vector& to_add, std::vector& to_delete, bool check_delete) { // the compaction process allow to compact the single version, eg: version[4-4]. @@ -1741,8 +1750,13 @@ Status Tablet::prepare_compaction_and_calculate_permits( } if (!res.ok()) { - tablet->set_last_cumu_compaction_failure_time(UnixMillis()); permits = 0; + // if we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. + // no need to wait 5s. + if (!(res.msg() == "_last_delete_version.first not equal to -1") || + config::enable_sleep_between_delete_cumu_compaction) { + tablet->set_last_cumu_compaction_failure_time(UnixMillis()); + } if (!res.is()) { DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); return Status::InternalError("prepare cumulative compaction with err: {}", res); @@ -1750,6 +1764,12 @@ Status Tablet::prepare_compaction_and_calculate_permits( // return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that we don't need to // print too much useless logs. // And because we set permits to 0, so even if we return OK here, nothing will be done. + LOG_INFO( + "cumulative compaction meet delete rowset, increase cumu point without other " + "operation.") + .tag("tablet id:", tablet->tablet_id()) + .tag("after cumulative compaction, cumu point:", + tablet->cumulative_layer_point()); return Status::OK(); } } else if (compaction_type == CompactionType::BASE_COMPACTION) { @@ -2595,30 +2615,6 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); } -std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, - std::string_view segment_index, - std::string_view index_id) const { - auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format(); - if (format == doris::InvertedIndexStorageFormatPB::V1) { - return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, - index_id); - } else { - return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index); - } -} - -std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index, - int64_t index_id) const { - auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format(); - if (format == doris::InvertedIndexStorageFormatPB::V1) { - return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, - index_id); - } else { - DCHECK(index_id == -1); - return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index); - } -} - std::vector Tablet::get_binlog_filepath(std::string_view binlog_version) const { const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version); std::vector binlog_filepath; @@ -2663,10 +2659,25 @@ void Tablet::gc_binlogs(int64_t version) { // add binlog segment files and index files for (int64_t i = 0; i < num_segments; ++i) { - wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i)); - for (const auto& index : this->tablet_schema()->inverted_indexes()) { - wait_for_deleted_binlog_files.emplace_back( - get_segment_index_filepath(rowset_id, i, index->index_id())); + auto segment_file_path = get_segment_filepath(rowset_id, i); + wait_for_deleted_binlog_files.emplace_back(segment_file_path); + + // index files + if (tablet_schema()->has_inverted_index()) { + if (tablet_schema()->get_inverted_index_storage_format() == + InvertedIndexStorageFormatPB::V1) { + for (const auto& index : tablet_schema()->inverted_indexes()) { + auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( + InvertedIndexDescriptor::get_index_file_path_prefix( + segment_file_path), + index->index_id(), index->get_index_suffix()); + wait_for_deleted_binlog_files.emplace_back(index_file); + } + } else { + auto index_file = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path)); + wait_for_deleted_binlog_files.emplace_back(index_file); + } } } }; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 0b7d758ab8fd88..d00476f044191c 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -173,6 +173,7 @@ class Tablet final : public BaseTablet { // MUST hold EXCLUSIVE `_meta_lock`. Status modify_rowsets(std::vector& to_add, std::vector& to_delete, bool check_delete = false); + bool rowset_exists_unlocked(const RowsetSharedPtr& rowset); Status add_inc_rowset(const RowsetSharedPtr& rowset); /// Delete stale rowset by timing. This delete policy uses now() minutes @@ -440,11 +441,6 @@ class Tablet final : public BaseTablet { std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; - std::string get_segment_index_filepath(std::string_view rowset_id, - std::string_view segment_index, - std::string_view index_id) const; - std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index, - int64_t index_id) const; bool can_add_binlog(uint64_t total_binlog_size) const; void gc_binlogs(int64_t version); Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index a300e6e0f09fa3..210aa6a8c56f08 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -409,8 +409,9 @@ Status EngineStorageMigrationTask::_copy_index_and_data_files( InvertedIndexStorageFormatPB::V1) { for (const auto& index : tablet_schema.inverted_indexes()) { auto index_id = index->index_id(); - auto index_file = - _tablet->get_segment_index_filepath(rowset_id, segment_index, index_id); + auto index_file = InvertedIndexDescriptor::get_index_file_path_v1( + InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path), + index_id, index->get_index_suffix()); auto snapshot_segment_index_file_path = fmt::format("{}/{}_{}_{}.binlog-index", full_path, rowset_id, segment_index, index_id); diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 5fef018423df25..dcf5c7a0a81d7c 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -179,12 +179,11 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } -void LocalExchangeSharedState::sub_running_source_operators( - LocalExchangeSourceLocalState& local_state) { +void LocalExchangeSharedState::sub_running_source_operators() { std::unique_lock lc(le_lock); if (exchanger->_running_source_operators.fetch_sub(1) == 1) { _set_always_ready(); - exchanger->finalize(local_state); + exchanger->finalize(); } } diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index ad018c8b4f8f3d..f1cfe2b02977e1 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -758,7 +758,7 @@ struct LocalExchangeSharedState : public BasicSharedState { } } void sub_running_sink_operators(); - void sub_running_source_operators(LocalExchangeSourceLocalState& local_state); + void sub_running_source_operators(); void _set_always_ready() { for (auto& dep : source_deps) { DCHECK(dep); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 3a9156f45b6758..fe0ab0b148e55a 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -352,17 +352,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { int64_t range_start, range_end; if (!_parent->cast()._window.__isset.window_start && _parent->cast()._window.window_end.type == - TAnalyticWindowBoundaryType:: - CURRENT_ROW) { //[preceding, current_row],[current_row, following] + TAnalyticWindowBoundaryType::CURRENT_ROW) { + // [preceding, current_row], [current_row, following] rewrite it's same + // as could reuse the previous calculate result, so don't call _reset_agg_status function + // going on calculate, add up data, no need to reset state range_start = _shared_state->current_row_position; - range_end = _shared_state->current_row_position + - 1; //going on calculate,add up data, no need to reset state + range_end = _shared_state->current_row_position + 1; } else { _reset_agg_status(); range_end = _shared_state->current_row_position + _rows_end_offset + 1; - if (!_parent->cast() - ._window.__isset - .window_start) { //[preceding, offset] --unbound: [preceding, following] + //[preceding, offset] --unbound: [preceding, following] + if (!_parent->cast()._window.__isset.window_start) { range_start = _partition_by_start.pos; } else { range_start = _shared_state->current_row_position + _rows_start_offset; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 22007a4b220348..b22ee9fd77e72f 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -144,7 +144,10 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); + RETURN_IF_ERROR(local_state._exchanger->sink( + state, in_block, eos, + {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr}, + {&local_state._channel_id, local_state._partitioner.get(), &local_state})); // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 435f7a410a4ca6..c067f023c8d420 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -65,7 +65,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState _partitioner = nullptr; - std::vector _partition_rows_histogram; // Used by random passthrough exchanger int _channel_id = 0; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index c4832b9958c00d..63e36cdfdb0c01 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -61,10 +61,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { } if (_exchanger) { - _exchanger->close(*this); + _exchanger->close({_channel_id, this}); } if (_shared_state) { - _shared_state->sub_running_source_operators(*this); + _shared_state->sub_running_source_operators(); } std::vector {}.swap(_local_merge_deps); @@ -116,7 +116,9 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized:: bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, eos, local_state)); + RETURN_IF_ERROR(local_state._exchanger->get_block( + state, block, eos, {nullptr, nullptr, local_state._copy_data_timer}, + {local_state._channel_id, &local_state})); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 647988f8b794cb..a963de8b684310 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -29,8 +29,12 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" template void Exchanger::_enqueue_data_and_set_ready(int channel_id, - LocalExchangeSinkLocalState& local_state, + LocalExchangeSinkLocalState* local_state, BlockType&& block) { + if (local_state == nullptr) { + _enqueue_data_and_set_ready(channel_id, std::move(block)); + return; + } size_t allocated_bytes = 0; // PartitionedBlock is used by shuffle exchanger. // PartitionedBlock will be push into multiple queues with different row ranges, so it will be @@ -44,47 +48,47 @@ void Exchanger::_enqueue_data_and_set_ready(int channel_id, allocated_bytes = block->data_block.allocated_bytes(); } std::unique_lock l(_m); - local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, - !std::is_same_v && - !std::is_same_v); + local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes, + !std::is_same_v && + !std::is_same_v); if (_data_queue[channel_id].enqueue(std::move(block))) { - local_state._shared_state->set_ready_to_read(channel_id); + local_state->_shared_state->set_ready_to_read(channel_id); } else { - local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes); + local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes); // `enqueue(block)` return false iff this queue's source operator is already closed so we // just unref the block. if constexpr (std::is_same_v || std::is_same_v) { - block.first->unref(local_state._shared_state, allocated_bytes, channel_id); + block.first->unref(local_state->_shared_state, allocated_bytes, channel_id); } else { - block->unref(local_state._shared_state, allocated_bytes, channel_id); + block->unref(local_state->_shared_state, allocated_bytes, channel_id); DCHECK_EQ(block->ref_value(), 0); } } } template -bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState& local_state, - BlockType& block, bool* eos, - vectorized::Block* data_block) { - return _dequeue_data(local_state, block, eos, data_block, local_state._channel_id); -} - -template -bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState& local_state, +bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id) { + if (local_state == nullptr) { + if (!_dequeue_data(block, eos, data_block, channel_id)) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: {}", + data_queue_debug_string(channel_id)); + } + return true; + } bool all_finished = _running_sink_operators == 0; if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v || std::is_same_v) { - local_state._shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block.first->data_block.allocated_bytes()); } else { - local_state._shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); + block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; @@ -95,54 +99,88 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState& local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v || std::is_same_v) { - local_state._shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage( + channel_id, block.first->data_block.allocated_bytes()); } else { - local_state._shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); + block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; } - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); + COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); + local_state->_dependency->block(); + } + return false; +} + +template +void Exchanger::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { + if constexpr (!std::is_same_v && + !std::is_same_v) { + block->ref(1); + } + if (!_data_queue[channel_id].enqueue(std::move(block))) { + if constexpr (std::is_same_v || + std::is_same_v) { + block.first->unref(); + } else { + block->unref(); + DCHECK_EQ(block->ref_value(), 0); + } + } +} + +template +bool Exchanger::_dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, + int channel_id) { + if (_data_queue[channel_id].try_dequeue(block)) { + if constexpr (!std::is_same_v && + !std::is_same_v) { + data_block->swap(block->data_block); + block->unref(); + DCHECK_EQ(block->ref_value(), 0); + } + return true; } return false; } Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } { - SCOPED_TIMER(local_state._compute_hash_value_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block)); + SCOPED_TIMER(profile.compute_hash_value_timer); + RETURN_IF_ERROR(sink_info.partitioner->do_partitioning(state, in_block)); } { - SCOPED_TIMER(local_state._distribute_timer); - RETURN_IF_ERROR(_split_rows(state, - local_state._partitioner->get_channel_ids().get(), - in_block, local_state)); + SCOPED_TIMER(profile.distribute_timer); + RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids().get(), + in_block, *sink_info.channel_id, sink_info.local_state)); } return Status::OK(); } -void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) { +void ShuffleExchanger::close(SourceInfo&& source_info) { PartitionedBlock partitioned_block; bool eos; vectorized::Block block; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, partitioned_block, &eos, &block)) { - partitioned_block.first->unref(local_state._shared_state, local_state._channel_id); + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + source_info.channel_id)) { + partitioned_block.first->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } } Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { PartitionedBlock partitioned_block; vectorized::MutableBlock mutable_block; @@ -153,14 +191,18 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block auto block_wrapper = partitioned_block.first; RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, offset_start + partitioned_block.second.length)); - block_wrapper->unref(local_state._shared_state, local_state._channel_id); + block_wrapper->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } while (mutable_block.rows() < state->batch_size() && !*eos && - _dequeue_data(local_state, partitioned_block, eos, block)); + _dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)); return Status::OK(); }; - if (_dequeue_data(local_state, partitioned_block, eos, block)) { - SCOPED_TIMER(local_state._copy_data_timer); + if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)) { + SCOPED_TIMER(profile.copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data()); @@ -169,22 +211,25 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block } Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + vectorized::Block* block, int channel_id, + LocalExchangeSinkLocalState* local_state) { + if (local_state == nullptr) { + return _split_rows(state, channel_ids, block, channel_id); + } const auto rows = cast_set(block->rows()); auto row_idx = std::make_shared>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; { - local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); + partition_rows_histogram.assign(_num_partitions + 1, 0); for (int32_t i = 0; i < rows; ++i) { - local_state._partition_rows_histogram[channel_ids[i]]++; + partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { - local_state._partition_rows_histogram[i] += - local_state._partition_rows_histogram[i - 1]; + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; } for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; - local_state._partition_rows_histogram[channel_ids[i]]--; + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; } } @@ -200,10 +245,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (new_block_wrapper->data_block.empty()) { return Status::OK(); } - local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), - local_state._channel_id); + local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), + channel_id); auto bucket_seq_to_instance_idx = - local_state._parent->cast()._bucket_seq_to_instance_idx; + local_state->_parent->cast()._bucket_seq_to_instance_idx; if (get_type() == ExchangeType::HASH_SHUFFLE) { /** * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of @@ -211,32 +256,32 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. */ - const auto& map = local_state._parent->cast() + const auto& map = local_state->_parent->cast() ._shuffle_idx_to_instance_idx; new_block_wrapper->ref(cast_set(map.size())); for (const auto& it : map) { DCHECK(it.second >= 0 && it.second < _num_partitions) << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; + uint32_t start = partition_rows_histogram[it.first]; + uint32_t size = partition_rows_histogram[it.first + 1] - start; if (size > 0) { _enqueue_data_and_set_ready(it.second, local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); + new_block_wrapper->unref(local_state->_shared_state, channel_id); } } } else { DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - uint32_t start = local_state._partition_rows_histogram[i]; - uint32_t size = local_state._partition_rows_histogram[i + 1] - start; + uint32_t start = partition_rows_histogram[i]; + uint32_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); + new_block_wrapper->unref(local_state->_shared_state, channel_id); } } } @@ -244,8 +289,53 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } +Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, int channel_id) { + const auto rows = cast_set(block->rows()); + auto row_idx = std::make_shared>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; + { + partition_rows_histogram.assign(_num_partitions + 1, 0); + for (int32_t i = 0; i < rows; ++i) { + partition_rows_histogram[channel_ids[i]]++; + } + for (int32_t i = 1; i <= _num_partitions; ++i) { + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; + } + for (int32_t i = rows - 1; i >= 0; --i) { + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; + } + } + + vectorized::Block data_block; + std::shared_ptr new_block_wrapper; + if (_free_blocks.try_dequeue(data_block)) { + new_block_wrapper = BlockWrapper::create_shared(std::move(data_block)); + } else { + new_block_wrapper = BlockWrapper::create_shared(block->clone_empty()); + } + + new_block_wrapper->data_block.swap(*block); + if (new_block_wrapper->data_block.empty()) { + return Status::OK(); + } + new_block_wrapper->ref(cast_set(_num_partitions)); + for (int i = 0; i < _num_partitions; i++) { + uint32_t start = partition_rows_histogram[i]; + uint32_t size = partition_rows_histogram[i + 1] - start; + if (size > 0) { + _enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, start, size}}); + } else { + new_block_wrapper->unref(); + } + } + + return Status::OK(); +} + Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -256,41 +346,43 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); wrapper = BlockWrapper::create_shared(std::move(new_block)); - auto channel_id = (local_state._channel_id++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper)); + auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; + _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); return Status::OK(); } -void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { +void PassthroughExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; BlockWrapperSPtr wrapper; bool eos; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } -void PassToOneExchanger::close(LocalExchangeSourceLocalState& local_state) { +void PassToOneExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; BlockWrapperSPtr wrapper; bool eos; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -301,70 +393,72 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block new_block.swap(*in_block); BlockWrapperSPtr wrapper = BlockWrapper::create_shared(std::move(new_block)); - _enqueue_data_and_set_ready(0, local_state, std::move(wrapper)); + _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); return Status::OK(); } Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { - if (local_state._channel_id != 0) { + Profile&& profile, SourceInfo&& source_info) { + if (source_info.channel_id != 0) { *eos = true; return Status::OK(); } BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (!in_block->empty()) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { new_block = {in_block->clone_empty()}; } - DCHECK_LE(local_state._channel_id, _data_queue.size()); + DCHECK_LE(*sink_info.channel_id, _data_queue.size()); new_block.swap(*in_block); - _enqueue_data_and_set_ready(local_state._channel_id, local_state, + _enqueue_data_and_set_ready(*sink_info.channel_id, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } - if (eos) { - local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready(); + if (eos && sink_info.local_state) { + sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id] + ->set_always_ready(); } return Status::OK(); } -void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) { +void ExchangerBase::finalize() { DCHECK(_running_source_operators == 0); vectorized::Block block; while (_free_blocks.try_dequeue(block)) { // do nothing } } -void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& local_state) { + +void LocalMergeSortExchanger::finalize() { BlockWrapperSPtr next_block; vectorized::Block block; bool eos; int id = 0; for (auto& data_queue : _data_queue) { data_queue.set_eos(); - while (_dequeue_data(local_state, next_block, &eos, &block, id)) { + while (_dequeue_data(next_block, &eos, &block, id)) { block = vectorized::Block(); } id++; } - ExchangerBase::finalize(local_state); + ExchangerBase::finalize(); } Status LocalMergeSortExchanger::build_merger(RuntimeState* state, - LocalExchangeSourceLocalState& local_state) { - RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state.profile())); + LocalExchangeSourceLocalState* local_state) { + RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state->profile())); std::vector child_block_suppliers; for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { - vectorized::BlockSupplier block_supplier = [&, id = channel_id](vectorized::Block* block, - bool* eos) { + vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id]( + vectorized::Block* block, bool* eos) { BlockWrapperSPtr next_block; _dequeue_data(local_state, next_block, eos, block, id); return Status::OK(); @@ -388,20 +482,21 @@ now sort(8) --> local merge(1) ---> datasink(1) [2] ----> */ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { - if (local_state._channel_id != 0) { + Profile&& profile, SourceInfo&& source_info) { + if (source_info.channel_id != 0) { *eos = true; return Status::OK(); } if (!_merger) { - RETURN_IF_ERROR(build_merger(state, local_state)); + DCHECK(source_info.local_state); + RETURN_IF_ERROR(build_merger(state, source_info.local_state)); } RETURN_IF_ERROR(_merger->get_next(block, eos)); return Status::OK(); } Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -411,32 +506,40 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block } new_block.swap(*in_block); auto wrapper = BlockWrapper::create_shared(std::move(new_block)); - local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(), - local_state._channel_id); + if (sink_info.local_state) { + sink_info.local_state->_shared_state->add_total_mem_usage( + wrapper->data_block.allocated_bytes(), *sink_info.channel_id); + } + wrapper->ref(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, wrapper->data_block.rows()}}); + _enqueue_data_and_set_ready(i, sink_info.local_state, + {wrapper, {0, wrapper->data_block.rows()}}); } return Status::OK(); } -void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { +void BroadcastExchanger::close(SourceInfo&& source_info) { BroadcastBlock partitioned_block; bool eos; vectorized::Block block; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, partitioned_block, &eos, &block)) { - partitioned_block.first->unref(local_state._shared_state, local_state._channel_id); + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + source_info.channel_id)) { + partitioned_block.first->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } } Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { BroadcastBlock partitioned_block; - if (_dequeue_data(local_state, partitioned_block, eos, block)) { - SCOPED_TIMER(local_state._copy_data_timer); + if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)) { + SCOPED_TIMER(profile.copy_data_timer); vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); @@ -444,7 +547,9 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, partitioned_block.second.offset_start, partitioned_block.second.length)); - block_wrapper->unref(local_state._shared_state, local_state._channel_id); + block_wrapper->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } return Status::OK(); @@ -452,21 +557,21 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state) { + SinkInfo&& sink_info) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { new_block = {in_block->clone_empty()}; } new_block.swap(*in_block); - auto channel_id = (local_state._channel_id++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, local_state, + auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; + _enqueue_data_and_set_ready(channel_id, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); return Status::OK(); } Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + SinkInfo&& sink_info) { std::vector channel_ids; const auto num_rows = block->rows(); channel_ids.resize(num_rows, 0); @@ -481,40 +586,39 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz std::iota(channel_ids.begin() + i, channel_ids.end(), 0); } } - return _split_rows(state, channel_ids.data(), block, local_state); + return _split_rows(state, channel_ids.data(), block, std::move(sink_info)); } Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + vectorized::Block* block, SinkInfo&& sink_info) { const auto rows = cast_set(block->rows()); auto row_idx = std::make_shared>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[*sink_info.channel_id]; { - local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); + partition_rows_histogram.assign(_num_partitions + 1, 0); for (int32_t i = 0; i < rows; ++i) { - local_state._partition_rows_histogram[channel_ids[i]]++; + partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { - local_state._partition_rows_histogram[i] += - local_state._partition_rows_histogram[i - 1]; + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; } for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; - local_state._partition_rows_histogram[channel_ids[i]]--; + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; } } for (int32_t i = 0; i < _num_partitions; i++) { - const size_t start = local_state._partition_rows_histogram[i]; - const size_t size = local_state._partition_rows_histogram[i + 1] - start; + const size_t start = partition_rows_histogram[i]; + const size_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { std::unique_ptr mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - _enqueue_data_and_set_ready(i, local_state, + _enqueue_data_and_set_ready(i, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } } @@ -522,34 +626,35 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, } Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - bool eos, LocalExchangeSinkLocalState& local_state) { + bool eos, Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } if (_is_pass_through) { - return _passthrough_sink(state, in_block, local_state); + return _passthrough_sink(state, in_block, std::move(sink_info)); } else { if (_total_block++ > _num_partitions) { _is_pass_through = true; } - return _shuffle_sink(state, in_block, local_state); + return _shuffle_sink(state, in_block, std::move(sink_info)); } } Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, - bool* eos, - LocalExchangeSourceLocalState& local_state) { + bool* eos, Profile&& profile, + SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } -void AdaptivePassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { +void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; bool eos; BlockWrapperSPtr wrapper; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 4d699baa52fb8b..d6871b2ba97cc3 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -20,14 +20,33 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" -namespace doris::pipeline { +namespace doris { #include "common/compile_check_begin.h" - +namespace vectorized { +class PartitionerBase; +} +namespace pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; struct BlockWrapper; class SortSourceOperatorX; +struct Profile { + RuntimeProfile::Counter* compute_hash_value_timer = nullptr; + RuntimeProfile::Counter* distribute_timer = nullptr; + RuntimeProfile::Counter* copy_data_timer = nullptr; +}; + +struct SinkInfo { + int* channel_id; + vectorized::PartitionerBase* partitioner; + LocalExchangeSinkLocalState* local_state; +}; + +struct SourceInfo { + int channel_id; + LocalExchangeSourceLocalState* local_state; +}; /** * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is * shared by all local exchange sink operators and source operators with the same id. @@ -60,15 +79,15 @@ class ExchangerBase { _free_block_limit(free_block_limit) {} virtual ~ExchangerBase() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) = 0; + Profile&& profile, SourceInfo&& source_info) = 0; virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) = 0; + Profile&& profile, SinkInfo&& sink_info) = 0; virtual ExchangeType get_type() const = 0; // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. - virtual void close(LocalExchangeSourceLocalState& local_state) = 0; + virtual void close(SourceInfo&& source_info) = 0; // Called if all local exchanger source operators are closed. We free the memory in // `_free_blocks` here. - virtual void finalize(LocalExchangeSourceLocalState& local_state); + virtual void finalize(); virtual std::string data_queue_debug_string(int i) = 0; @@ -155,12 +174,13 @@ class Exchanger : public ExchangerBase { protected: // Enqueue data block and set downstream source operator to read. - void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, + void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state, BlockType&& block); - bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos, - vectorized::Block* data_block); - bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos, + bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); + + void _enqueue_data_and_set_ready(int channel_id, BlockType&& block); + bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); std::vector> _data_queue; private: @@ -186,7 +206,7 @@ struct BlockWrapper { ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); } void ref(int delta) { ref_count += delta; } void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes, int channel_id) { - if (ref_count.fetch_sub(1) == 1) { + if (ref_count.fetch_sub(1) == 1 && shared_state != nullptr) { DCHECK_GT(allocated_bytes, 0); shared_state->sub_total_mem_usage(allocated_bytes, channel_id); if (shared_state->exchanger->_free_block_limit == 0 || @@ -201,7 +221,7 @@ struct BlockWrapper { } } - void unref(LocalExchangeSharedState* shared_state, int channel_id) { + void unref(LocalExchangeSharedState* shared_state = nullptr, int channel_id = 0) { unref(shared_state, data_block.allocated_bytes(), channel_id); } int ref_value() const { return ref_count.load(); } @@ -219,19 +239,24 @@ class ShuffleExchanger : public Exchanger { DCHECK_GT(num_partitions, 0); DCHECK_GT(num_sources, 0); _data_queue.resize(num_sources); + _partition_rows_histogram.resize(running_sink_operators); } ~ShuffleExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; - void close(LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; + void close(SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, LocalExchangeSinkLocalState& local_state); + vectorized::Block* block, int channel_id, + LocalExchangeSinkLocalState* local_state); + Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, int channel_id); + std::vector> _partition_rows_histogram; }; class BucketShuffleExchanger final : public ShuffleExchanger { @@ -255,13 +280,13 @@ class PassthroughExchanger final : public Exchanger { _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; class PassToOneExchanger final : public Exchanger { @@ -273,13 +298,13 @@ class PassToOneExchanger final : public Exchanger { _data_queue.resize(num_partitions); } ~PassToOneExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; class LocalMergeSortExchanger final : public Exchanger { @@ -292,17 +317,17 @@ class LocalMergeSortExchanger final : public Exchanger { _data_queue.resize(num_partitions); } ~LocalMergeSortExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::LOCAL_MERGE_SORT; } - Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& local_state); + Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState* local_state); - void close(LocalExchangeSourceLocalState& local_state) override {} - void finalize(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override {} + void finalize() override; private: std::unique_ptr _merger; @@ -318,13 +343,13 @@ class BroadcastExchanger final : public Exchanger { _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; //The code in AdaptivePassthroughExchanger is essentially @@ -337,26 +362,28 @@ class AdaptivePassthroughExchanger : public Exchanger { : Exchanger(running_sink_operators, num_partitions, free_block_limit) { _data_queue.resize(num_partitions); + _partition_rows_histogram.resize(running_sink_operators); } - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; private: Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state); - Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state); + SinkInfo&& sink_info); + Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info); Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, LocalExchangeSinkLocalState& local_state); + vectorized::Block* block, SinkInfo&& sink_info); std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0; + std::vector> _partition_rows_histogram; }; #include "common/compile_check_end.h" -} // namespace doris::pipeline +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index a0b3b799a764cb..ab380f9711f1ac 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -25,9 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime") file(GLOB_RECURSE RUNTIME_FILES CONFIGURE_DEPENDS *.cpp *.cc) -if (NOT USE_JEMALLOC OR NOT USE_MEM_TRACKER) - list(REMOVE_ITEM RUNTIME_FILES ${CMAKE_CURRENT_SOURCE_DIR}/memory/jemalloc_hook.cpp) -endif() +list(REMOVE_ITEM RUNTIME_FILES ${CMAKE_CURRENT_SOURCE_DIR}/memory/jemalloc_hook.cpp) add_library(Runtime STATIC ${RUNTIME_FILES} diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index ce18071fda0d07..19e8f76366c084 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -390,12 +390,20 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); if (!req.runtime_state->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(req.runtime_state->get_error_log_file_path())); + std::string error_log_url = + to_load_error_http_path(req.runtime_state->get_error_log_file_path()); + LOG(INFO) << "error log file path: " << error_log_url + << ", query id: " << print_id(req.query_id) + << ", fragment instance id: " << print_id(req.fragment_instance_id); + params.__set_tracking_url(error_log_url); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { if (!rs->get_error_log_file_path().empty()) { - params.__set_tracking_url(to_load_error_http_path(rs->get_error_log_file_path())); + std::string error_log_url = to_load_error_http_path(rs->get_error_log_file_path()); + LOG(INFO) << "error log file path: " << error_log_url + << ", query id: " << print_id(req.query_id) + << ", fragment instance id: " << print_id(rs->fragment_instance_id()); + params.__set_tracking_url(error_log_url); } if (rs->wal_id() > 0) { params.__set_txn_id(rs->wal_id()); diff --git a/be/src/runtime/memory/heap_profiler.cpp b/be/src/runtime/memory/heap_profiler.cpp index 01ed82f76ef6d1..0b0448ce0eaaf6 100644 --- a/be/src/runtime/memory/heap_profiler.cpp +++ b/be/src/runtime/memory/heap_profiler.cpp @@ -30,8 +30,8 @@ void HeapProfiler::set_prof_active(bool prof) { #ifdef USE_JEMALLOC std::lock_guard guard(_mutex); try { - int err = jemallctl("prof.active", nullptr, nullptr, &prof, 1); - err |= jemallctl("prof.thread_active_init", nullptr, nullptr, &prof, 1); + int err = mallctl("prof.active", nullptr, nullptr, &prof, 1); + err |= mallctl("prof.thread_active_init", nullptr, nullptr, &prof, 1); if (err) { LOG(WARNING) << "jemalloc heap profiling start failed, " << err; } else { @@ -48,7 +48,7 @@ bool HeapProfiler::get_prof_dump(const std::string& profile_file_name) { std::lock_guard guard(_mutex); const char* file_name_ptr = profile_file_name.c_str(); try { - int err = jemallctl("prof.dump", nullptr, nullptr, &file_name_ptr, sizeof(const char*)); + int err = mallctl("prof.dump", nullptr, nullptr, &file_name_ptr, sizeof(const char*)); if (err) { LOG(WARNING) << "dump heap profile failed, " << err; return false; @@ -93,7 +93,7 @@ bool HeapProfiler::check_heap_profiler() { #ifdef USE_JEMALLOC size_t value = 0; size_t sz = sizeof(value); - jemallctl("prof.active", &value, &sz, nullptr, 0); + mallctl("prof.active", &value, &sz, nullptr, 0); return value; #else return false; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index ecaf99061a070b..df7c4141691d0b 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -341,7 +341,9 @@ Status RuntimeState::create_error_log_file() { LOG(WARNING) << error_msg.str(); return Status::InternalError(error_msg.str()); } - VLOG_FILE << "create error log file: " << _error_log_file_path; + LOG(INFO) << "create error log file: " << _error_log_file_path + << ", query id: " << print_id(_query_id) + << ", fragment instance id: " << print_id(_fragment_instance_id); return Status::OK(); } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 482fadac44e051..ad4d22946f1b83 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -85,13 +85,18 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrnumber_unselected_rows = state->num_rows_load_unselected(); ctx->loaded_bytes = state->num_bytes_load_total(); int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); if (!ctx->group_commit && num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { // NOTE: Do not modify the error message here, for historical reasons, // some users may rely on this error message. - *status = Status::DataQualityError("too many filtered rows"); + if (ctx->need_commit_self) { + *status = + Status::DataQualityError("too many filtered rows, url: " + ctx->error_url); + } else { + *status = Status::DataQualityError("too many filtered rows"); + } } - ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); if (status->ok()) { DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); diff --git a/be/src/util/cgroup_util.cpp b/be/src/util/cgroup_util.cpp index 8f64fe699c6062..fc35be3dc35931 100644 --- a/be/src/util/cgroup_util.cpp +++ b/be/src/util/cgroup_util.cpp @@ -218,6 +218,10 @@ std::optional CGroupUtil::get_cgroupsv2_path(const std::string& sub Status CGroupUtil::read_int_line_from_cgroup_file(const std::filesystem::path& file_path, int64_t* val) { std::ifstream file_stream(file_path, std::ios::in); + if (!file_stream.is_open()) { + return Status::CgroupError("Error open {}", file_path.string()); + } + string line; getline(file_stream, line); if (file_stream.fail() || file_stream.bad()) { @@ -264,4 +268,167 @@ void CGroupUtil::read_int_metric_from_cgroup_file( } } +Status CGroupUtil::read_string_line_from_cgroup_file(const std::filesystem::path& file_path, + std::string* line_ptr) { + std::ifstream file_stream(file_path, std::ios::in); + if (!file_stream.is_open()) { + return Status::CgroupError("Error open {}", file_path.string()); + } + string line; + getline(file_stream, line); + if (file_stream.fail() || file_stream.bad()) { + return Status::CgroupError("Error reading {}: {}", file_path.string(), get_str_err_msg()); + } + *line_ptr = line; + return Status::OK(); +} + +Status CGroupUtil::parse_cpuset_line(std::string cpuset_line, int* cpu_count_ptr) { + if (cpuset_line.empty()) { + return Status::CgroupError("cpuset line is empty"); + } + std::vector ranges; + boost::split(ranges, cpuset_line, boost::is_any_of(",")); + int cpu_count = 0; + + for (const std::string& range : ranges) { + std::vector cpu_values; + boost::split(cpu_values, range, boost::is_any_of("-")); + + if (cpu_values.size() == 2) { + int start = std::stoi(cpu_values[0]); + int end = std::stoi(cpu_values[1]); + cpu_count += (end - start) + 1; + } else { + cpu_count++; + } + } + *cpu_count_ptr = cpu_count; + return Status::OK(); +} + +int CGroupUtil::get_cgroup_limited_cpu_number(int physical_cores) { + if (physical_cores <= 0) { + return physical_cores; + } + int ret = physical_cores; +#if defined(OS_LINUX) + // For cgroup v2 + // Child cgroup's cpu.max may bigger than parent group's cpu.max, + // so it should look up from current cgroup to top group. + // For cpuset, child cgroup's cpuset.cpus could not bigger thant parent's cpuset.cpus. + if (CGroupUtil::cgroupsv2_enable()) { + std::string cgroupv2_process_path = CGroupUtil::cgroupv2_of_process(); + if (cgroupv2_process_path.empty()) { + return ret; + } + std::filesystem::path current_cgroup_path = (default_cgroups_mount / cgroupv2_process_path); + ret = get_cgroup_v2_cpu_quota_number(current_cgroup_path, default_cgroups_mount, ret); + + current_cgroup_path = (default_cgroups_mount / cgroupv2_process_path); + ret = get_cgroup_v2_cpuset_number(current_cgroup_path, default_cgroups_mount, ret); + } else if (CGroupUtil::cgroupsv1_enable()) { + // cpu quota, should find first not empty config from current path to top. + // because if a process attach to current cgroup, its cpu quota may not be set. + std::string cpu_quota_path = ""; + Status cpu_quota_ret = CGroupUtil::find_abs_cgroupv1_path("cpu", &cpu_quota_path); + if (cpu_quota_ret.ok() && !cpu_quota_path.empty()) { + std::filesystem::path current_cgroup_path = cpu_quota_path; + ret = get_cgroup_v1_cpu_quota_number(current_cgroup_path, default_cgroups_mount, ret); + } + + //cpuset + // just lookup current process cgroup path is enough + // because if a process attach to current cgroup, its cpuset.cpus must be set. + std::string cpuset_path = ""; + Status cpuset_ret = CGroupUtil::find_abs_cgroupv1_path("cpuset", &cpuset_path); + if (cpuset_ret.ok() && !cpuset_path.empty()) { + std::filesystem::path current_path = cpuset_path; + ret = get_cgroup_v1_cpuset_number(current_path, ret); + } + } +#endif + return ret; +} + +int CGroupUtil::get_cgroup_v2_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpu_max_file(current_path / "cpu.max"); + if (cpu_max_file.is_open()) { + std::string cpu_limit_str; + double cpu_period; + cpu_max_file >> cpu_limit_str >> cpu_period; + if (cpu_limit_str != "max" && cpu_period != 0) { + double cpu_limit = std::stod(cpu_limit_str); + ret = std::min(static_cast(std::ceil(cpu_limit / cpu_period)), ret); + } + } + current_path = current_path.parent_path(); + } + return ret; +} + +int CGroupUtil::get_cgroup_v2_cpuset_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpuset_cpus_file(current_path / "cpuset.cpus.effective"); + current_path = current_path.parent_path(); + if (cpuset_cpus_file.is_open()) { + std::string cpuset_line; + cpuset_cpus_file >> cpuset_line; + if (cpuset_line.empty()) { + continue; + } + int cpus_count = 0; + static_cast(CGroupUtil::parse_cpuset_line(cpuset_line, &cpus_count)); + ret = std::min(cpus_count, ret); + break; + } + } + return ret; +} + +int CGroupUtil::get_cgroup_v1_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpu_quota_file(current_path / "cpu.cfs_quota_us"); + std::ifstream cpu_period_file(current_path / "cpu.cfs_period_us"); + if (cpu_quota_file.is_open() && cpu_period_file.is_open()) { + double cpu_quota_value; + double cpu_period_value; + cpu_quota_file >> cpu_quota_value; + cpu_period_file >> cpu_period_value; + if (cpu_quota_value > 0 && cpu_period_value > 0) { + ret = std::min(ret, + static_cast(std::ceil(cpu_quota_value / cpu_period_value))); + break; + } + } + current_path = current_path.parent_path(); + } + return ret; +} + +int CGroupUtil::get_cgroup_v1_cpuset_number(std::filesystem::path& current_path, int cpu_num) { + int ret = cpu_num; + std::string cpuset_line = ""; + Status cpuset_ret = CGroupUtil::read_string_line_from_cgroup_file( + (current_path / "cpuset.cpus"), &cpuset_line); + if (cpuset_ret.ok() && !cpuset_line.empty()) { + int cpuset_count = 0; + static_cast(CGroupUtil::parse_cpuset_line(cpuset_line, &cpuset_count)); + if (cpuset_count > 0) { + ret = std::min(ret, cpuset_count); + } + } + return ret; +} + } // namespace doris diff --git a/be/src/util/cgroup_util.h b/be/src/util/cgroup_util.h index bc1417453f41f6..54fc9494599f15 100644 --- a/be/src/util/cgroup_util.h +++ b/be/src/util/cgroup_util.h @@ -104,5 +104,27 @@ class CGroupUtil { static void read_int_metric_from_cgroup_file( const std::filesystem::path& file_path, std::unordered_map& metrics_map); + + static Status read_string_line_from_cgroup_file(const std::filesystem::path& file_path, + std::string* line_ptr); + + // cpuset_line: 0-4,6,8-10 + static Status parse_cpuset_line(std::string cpuset_line, int* cpu_count_ptr); + + static int get_cgroup_limited_cpu_number(int physical_cores); + + static int get_cgroup_v2_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v2_cpuset_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v1_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v1_cpuset_number(std::filesystem::path& current_path, int cpu_num); }; } // namespace doris diff --git a/be/src/util/cpu_info.cpp b/be/src/util/cpu_info.cpp index 116dacb8da7ed4..b49985cdc06830 100644 --- a/be/src/util/cpu_info.cpp +++ b/be/src/util/cpu_info.cpp @@ -59,6 +59,7 @@ #include "gflags/gflags.h" #include "gutil/stringprintf.h" #include "gutil/strings/substitute.h" +#include "util/cgroup_util.h" #include "util/pretty_printer.h" using boost::algorithm::contains; @@ -109,58 +110,6 @@ static struct { {"popcnt", CpuInfo::POPCNT}, {"avx", CpuInfo::AVX}, {"avx2", CpuInfo::AVX2}, }; -int cgroup_bandwidth_quota(int physical_cores) { - namespace fs = std::filesystem; - fs::path cpu_max = "/sys/fs/cgroup/cpu.max"; - fs::path cfs_quota = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; - fs::path cfs_period = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; - - int64_t quota, period; - char byte_buffer[1000]; - int64_t read_bytes; - - if (fs::exists(cpu_max)) { - // cgroup v2 - // https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html - std::ifstream file(cpu_max); - file.read(byte_buffer, 999); - read_bytes = file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 " %" SCNd64 "", "a, &period) != 2) { - return physical_cores; - } - } else if (fs::exists(cfs_quota) && fs::exists(cfs_period)) { - // cgroup v1 - // https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management - - // Read the quota, this indicates how many microseconds the CPU can be utilized by this cgroup per period - std::ifstream quota_file(cfs_quota); - quota_file.read(byte_buffer, 999); - read_bytes = quota_file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 "", "a) != 1) { - return physical_cores; - } - - // Read the time period, a cgroup can utilize the CPU up to quota microseconds every period - std::ifstream period_file(cfs_period); - period_file.read(byte_buffer, 999); - read_bytes = period_file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 "", &period) != 1) { - return physical_cores; - } - } else { - // No cgroup quota - return physical_cores; - } - if (quota > 0 && period > 0) { - return int64_t(ceil(double(quota) / double(period))); - } else { - return physical_cores; - } -} - // Helper function to parse for hardware flags. // values contains a list of space-separated flags. check to see if the flags we // care about are present. @@ -212,7 +161,7 @@ void CpuInfo::init() { } } - int num_cores = cgroup_bandwidth_quota(physical_num_cores); + int num_cores = CGroupUtil::get_cgroup_limited_cpu_number(physical_num_cores); if (max_mhz != 0) { cycles_per_ms_ = int64_t(max_mhz) * 1000; } else { diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index fe9cf84b2aed54..97e529ac6c72e1 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -101,7 +101,7 @@ void MemInfo::refresh_allocator_mem() { // the current epoch number, which might be useful to log as a sanity check. uint64_t epoch = 0; size_t sz = sizeof(epoch); - jemallctl("epoch", &epoch, &sz, &epoch, sz); + mallctl("epoch", &epoch, &sz, &epoch, sz); // Number of extents of the given type in this arena in the bucket corresponding to page size index. // Large size class starts at 16384, the extents have three sizes before 16384: 4096, 8192, and 12288, so + 3 diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 39ae9eb0b79cfb..0e14b64bd8f965 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -103,7 +103,7 @@ class MemInfo { #ifdef USE_JEMALLOC size_t value = 0; size_t sz = sizeof(value); - if (jemallctl(name.c_str(), &value, &sz, nullptr, 0) == 0) { + if (mallctl(name.c_str(), &value, &sz, nullptr, 0) == 0) { return value; } #endif @@ -114,7 +114,7 @@ class MemInfo { #ifdef USE_JEMALLOC unsigned value = 0; size_t sz = sizeof(value); - if (jemallctl(name.c_str(), &value, &sz, nullptr, 0) == 0) { + if (mallctl(name.c_str(), &value, &sz, nullptr, 0) == 0) { return value; } #endif @@ -146,8 +146,8 @@ class MemInfo { if (config::enable_je_purge_dirty_pages) { try { // Purge all unused dirty pages for arena , or for all arenas if equals MALLCTL_ARENAS_ALL. - int err = jemallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), - nullptr, nullptr, nullptr, 0); + int err = mallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(), + nullptr, nullptr, nullptr, 0); if (err) { LOG(WARNING) << "Jemalloc purge all unused dirty pages failed"; } @@ -166,7 +166,7 @@ class MemInfo { #ifdef USE_JEMALLOC constexpr size_t TCACHE_LIMIT = (1ULL << 30); // 1G if (allocator_cache_mem() - je_dirty_pages_mem() > TCACHE_LIMIT) { - int err = jemallctl("thread.tcache.flush", nullptr, nullptr, nullptr, 0); + int err = mallctl("thread.tcache.flush", nullptr, nullptr, nullptr, 0); if (err) { LOG(WARNING) << "Jemalloc thread.tcache.flush failed"; } diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index 973f461d8defe7..fc2cdcc9262b31 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -33,18 +33,23 @@ #include "gutil/strings/split.h" // for string split #include "gutil/strtoint.h" // for atoi64 +#include "util/cgroup_util.h" #include "util/mem_info.h" #include "util/perf_counters.h" namespace doris { +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(avail_cpu_num, MetricUnit::NOUNIT); + DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(host_cpu_num, MetricUnit::NOUNIT); struct CpuNumberMetrics { CpuNumberMetrics(MetricEntity* ent) : entity(ent) { INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, host_cpu_num); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, avail_cpu_num); } IntAtomicCounter* host_cpu_num {nullptr}; + IntAtomicCounter* avail_cpu_num {nullptr}; MetricEntity* entity = nullptr; }; @@ -1004,6 +1009,14 @@ void SystemMetrics::_update_proc_metrics() { fclose(fp); } +void SystemMetrics::update_be_avail_cpu_num() { + int64_t physical_cpu_num = _cpu_num_metrics->host_cpu_num->value(); + if (physical_cpu_num > 0) { + physical_cpu_num = CGroupUtil::get_cgroup_limited_cpu_number(physical_cpu_num); + _cpu_num_metrics->avail_cpu_num->set_value(physical_cpu_num); + } +} + void SystemMetrics::get_metrics_from_proc_vmstat() { #ifdef BE_TEST FILE* fp = fopen(k_ut_vmstat_path, "r"); diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 29ce8c9c02b359..2c5446b81f4f71 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -66,6 +66,8 @@ class SystemMetrics { void update_max_network_receive_bytes_rate(int64_t max_receive_bytes_rate); void update_allocator_metrics(); + void update_be_avail_cpu_num(); + private: void _install_cpu_metrics(); // On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz; diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index e0ec2bef62fc2a..d761d40c4c932c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -20,6 +20,8 @@ #pragma once +#include + #include "common/exception.h" #include "common/status.h" #include "util/defer_op.h" @@ -81,7 +83,7 @@ using ConstAggregateDataPtr = const char*; */ class IAggregateFunction { public: - IAggregateFunction(const DataTypes& argument_types_) : argument_types(argument_types_) {} + IAggregateFunction(DataTypes argument_types_) : argument_types(std::move(argument_types_)) {} /// Get main function name. virtual String get_name() const = 0; @@ -225,7 +227,7 @@ class IAggregateFunction { virtual void set_version(const int version_) { version = version_; } - virtual AggregateFunctionPtr transmit_to_stable() { return nullptr; } + virtual IAggregateFunction* transmit_to_stable() { return nullptr; } /// Verify function signature virtual Status verify_result_type(const bool without_key, const DataTypes& argument_types, diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h index 46450394627474..a5515145d9d2ad 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h +++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h @@ -341,12 +341,22 @@ class AggregateFunctionDistinct DataTypePtr get_return_type() const override { return nested_func->get_return_type(); } - AggregateFunctionPtr transmit_to_stable() override { - return AggregateFunctionPtr(new AggregateFunctionDistinct( - nested_func, IAggregateFunction::argument_types)); + IAggregateFunction* transmit_to_stable() override { + return new AggregateFunctionDistinct(nested_func, + IAggregateFunction::argument_types); } }; +template +struct FunctionStableTransfer { + using FunctionStable = T; +}; + +template