diff --git a/src/coprocessor/coprocessor_v2.cc b/src/coprocessor/coprocessor_v2.cc index 48ad97ea2..16e251642 100755 --- a/src/coprocessor/coprocessor_v2.cc +++ b/src/coprocessor/coprocessor_v2.cc @@ -55,16 +55,66 @@ bvar::Adder CoprocessorV2::bvar_coprocessor_v2_filter_running_num("din bvar::Adder CoprocessorV2::bvar_coprocessor_v2_filter_total_num("dingo_coprocessor_v2_filter_total_num"); bvar::LatencyRecorder CoprocessorV2::coprocessor_v2_filter_latency("dingo_coprocessor_v2_filter_latency"); -CoprocessorV2::CoprocessorV2() : bvar_guard_for_coprocessor_v2_latency_(&coprocessor_v2_latency) { +CoprocessorV2::CoprocessorV2() + : bvar_guard_for_coprocessor_v2_latency_(&coprocessor_v2_latency) +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + , + coprocessor_v2_start_time_point(std::chrono::steady_clock::now()), + coprocessor_v2_spend_time_ms(0), + iter_next_spend_time_ms(0), + get_kv_spend_time_ms(0), + trans_field_spend_time_ms(0), + decode_spend_time_ms(0), + rel_expr_spend_time_ms(0), + misc_spend_time_ms(0), + open_spend_time_ms(0) +#endif +{ bvar_coprocessor_v2_object_running_num << 1; bvar_coprocessor_v2_object_total_num << 1; }; CoprocessorV2::~CoprocessorV2() { Close(); bvar_coprocessor_v2_object_running_num << -1; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + // coprocessor_v2_end_time_point = std::chrono::steady_clock::now(); + coprocessor_v2_spend_time_ms = std::chrono::duration_cast(coprocessor_v2_end_time_point - + coprocessor_v2_start_time_point) + .count(); + misc_spend_time_ms = coprocessor_v2_spend_time_ms - iter_next_spend_time_ms - get_kv_spend_time_ms - + trans_field_spend_time_ms - decode_spend_time_ms - rel_expr_spend_time_ms - open_spend_time_ms; + DINGO_LOG(INFO) << fmt::format( + "CoprocessorV2 time_consumption total:{}ms, iter next:{}ms, get kv:{}ms, trans field:{}ms, decode and " + "encode:{}ms, rel expr:{}ms, misc:{}ms, open:{}ms", + coprocessor_v2_spend_time_ms, iter_next_spend_time_ms, get_kv_spend_time_ms, trans_field_spend_time_ms, + decode_spend_time_ms, rel_expr_spend_time_ms, misc_spend_time_ms, open_spend_time_ms); + + DINGO_LOG(INFO) << fmt::format( + "CoprocessorV2 time_consumption percent: iter next:{}%, get kv:{}% trans field:{}%, decode and " + "encode:{}%, rel expr:{}%, misc:{}%, open:{}%", + static_cast(iter_next_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(get_kv_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(trans_field_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(decode_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(rel_expr_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(misc_spend_time_ms) / coprocessor_v2_spend_time_ms * 100, + static_cast(open_spend_time_ms) / coprocessor_v2_spend_time_ms * 100); +#endif } butil::Status CoprocessorV2::Open(const std::any& coprocessor) { +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + + auto open_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto open_end = lambda_time_now_function(); + open_spend_time_ms += lambda_time_diff_microseconds_function(open_start, open_end); + }); +#endif butil::Status status; DINGO_LOG(DEBUG) << fmt::format("CoprocessorV2::Open Enter"); @@ -172,6 +222,13 @@ butil::Status CoprocessorV2::Open(const std::any& coprocessor) { butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max_fetch_cnt, int64_t max_bytes_rpc, std::vector* kvs, bool& has_more) { +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + ON_SCOPE_EXIT([&]() { coprocessor_v2_end_time_point = std::chrono::steady_clock::now(); }); +#endif BvarLatencyGuard bvar_guard(&coprocessor_v2_execute_latency); CoprocessorV2::bvar_coprocessor_v2_execute_running_num << 1; CoprocessorV2::bvar_coprocessor_v2_execute_total_num << 1; @@ -182,8 +239,19 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max has_more = false; while (iter->Valid()) { pb::common::KeyValue kv; - *kv.mutable_key() = iter->Key(); - *kv.mutable_value() = iter->Value(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto kv_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto kv_end = lambda_time_now_function(); + get_kv_spend_time_ms += lambda_time_diff_microseconds_function(kv_start, kv_end); + }); +#endif + *kv.mutable_key() = iter->Key(); + *kv.mutable_value() = iter->Value(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif bool has_result_kv = false; pb::common::KeyValue result_key_value; DINGO_LOG(DEBUG) << fmt::format("CoprocessorV2::DoExecute Call"); @@ -207,11 +275,32 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max "CoprocessorV2 UptoLimit. key_only : {} max_fetch_cnt : {} max_bytes_rpc : {} cur_fetch_cnt : {} " "cur_bytes_rpc : {}", key_only, max_fetch_cnt, max_bytes_rpc, scan_filter.GetCurFetchCnt(), scan_filter.GetCurBytesRpc()); - iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto next_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto next_end = lambda_time_now_function(); + iter_next_spend_time_ms += lambda_time_diff_microseconds_function(next_start, next_end); + }); +#endif + iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif break; } - - iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto next_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto next_end = lambda_time_now_function(); + iter_next_spend_time_ms += lambda_time_diff_microseconds_function(next_start, next_end); + }); +#endif + iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif } status = GetKvFromExprEndOfFinish(key_only, max_fetch_cnt, max_bytes_rpc, kvs); @@ -224,6 +313,13 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, int64_t limit, bool key_only, bool /*is_reverse*/, pb::store::TxnResultInfo& txn_result_info, std::vector& kvs, bool& has_more, std::string& end_key) { +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + ON_SCOPE_EXIT([&]() { coprocessor_v2_end_time_point = std::chrono::steady_clock::now(); }); +#endif BvarLatencyGuard bvar_guard(&coprocessor_v2_execute_txn_latency); CoprocessorV2::bvar_coprocessor_v2_execute_txn_running_num << 1; CoprocessorV2::bvar_coprocessor_v2_execute_txn_total_num << 1; @@ -237,8 +333,19 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, int64_t limit, bool ke while (iter->Valid(txn_result_info)) { pb::common::KeyValue kv; - *kv.mutable_key() = iter->Key(); - *kv.mutable_value() = iter->Value(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto kv_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto kv_end = lambda_time_now_function(); + get_kv_spend_time_ms += lambda_time_diff_microseconds_function(kv_start, kv_end); + }); +#endif + *kv.mutable_key() = iter->Key(); + *kv.mutable_value() = iter->Value(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif bool has_result_kv = false; pb::common::KeyValue result_key_value; DINGO_LOG(DEBUG) << fmt::format("CoprocessorV2::DoExecute Call"); @@ -265,11 +372,32 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, int64_t limit, bool ke "cur_bytes_rpc : {}", key_only, std::min(limit, FLAGS_max_scan_line_limit), std::numeric_limits::max(), scan_filter.GetCurFetchCnt(), scan_filter.GetCurBytesRpc()); - iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto next_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto next_end = lambda_time_now_function(); + iter_next_spend_time_ms += lambda_time_diff_microseconds_function(next_start, next_end); + }); +#endif + iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif break; } - - iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto next_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto next_end = lambda_time_now_function(); + iter_next_spend_time_ms += lambda_time_diff_microseconds_function(next_start, next_end); + }); +#endif + iter->Next(); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + } +#endif } status = GetKvFromExprEndOfFinish(key_only, limit, FLAGS_max_scan_memory_size, &kvs); @@ -280,6 +408,9 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, int64_t limit, bool ke } butil::Status CoprocessorV2::Filter(const std::string& key, const std::string& value, bool& is_reserved) { +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + ON_SCOPE_EXIT([&]() { coprocessor_v2_end_time_point = std::chrono::steady_clock::now(); }); +#endif BvarLatencyGuard bvar_guard(&coprocessor_v2_filter_latency); CoprocessorV2::bvar_coprocessor_v2_filter_running_num << 1; CoprocessorV2::bvar_coprocessor_v2_filter_total_num << 1; @@ -321,12 +452,35 @@ butil::Status CoprocessorV2::DoExecute(const std::string& key, const std::string } std::vector result_record; - status = RelExprHelper::TransFromOperandWrapper(result_operand_ptr, result_serial_schemas_, result_column_indexes_, - result_record); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + { + auto trans_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto trans_end = lambda_time_now_function(); + trans_field_spend_time_ms += lambda_time_diff_microseconds_function(trans_start, trans_end); + }); +#endif + status = RelExprHelper::TransFromOperandWrapper(result_operand_ptr, result_serial_schemas_, result_column_indexes_, + result_record); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif + +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto encode_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto encode_end = lambda_time_now_function(); + decode_spend_time_ms += lambda_time_diff_microseconds_function(encode_start, encode_end); + }); +#endif status = GetKvFromExpr(result_record, has_result_kv, result_kv); if (!status.ok()) { @@ -365,14 +519,38 @@ butil::Status CoprocessorV2::DoRelExprCore(const std::vector& original #if defined(TEST_COPROCESSOR_V2_MOCK) Utils::DebugPrintAnyArray(original_record, "From Decode"); #endif - std::unique_ptr> operand_ptr = std::make_unique>(); - status = RelExprHelper::TransToOperandWrapper(original_serial_schemas_, selection_column_indexes_, original_record, - operand_ptr); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + { + auto trans_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto trans_end = lambda_time_now_function(); + trans_field_spend_time_ms += lambda_time_diff_microseconds_function(trans_start, trans_end); + }); +#endif + + status = RelExprHelper::TransToOperandWrapper(original_serial_schemas_, selection_column_indexes_, original_record, + operand_ptr); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } + +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif + +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto expr_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto expr_end = lambda_time_now_function(); + rel_expr_spend_time_ms += lambda_time_diff_microseconds_function(expr_start, expr_end); + }); +#endif try { std::vector* raw_operand_ptr = operand_ptr.release(); @@ -393,22 +571,36 @@ butil::Status CoprocessorV2::DoRelExprCoreWrapper(const std::string& key, const butil::Status status; std::vector original_record; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; + auto decode_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto decode_end = lambda_time_now_function(); + decode_spend_time_ms += lambda_time_diff_microseconds_function(decode_start, decode_end); + }); +#endif + int ret = 0; + try { + // decode some column. not decode all + ret = original_record_decoder_->Decode(key, value, selection_column_indexes_, original_record); + } catch (const std::exception& my_exception) { + std::string error_message = fmt::format("serial::Decode failed exception : {}", my_exception.what()); + DINGO_LOG(ERROR) << error_message; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); + } - int ret = 0; - try { - // decode some column. not decode all - ret = original_record_decoder_->Decode(key, value, selection_column_indexes_, original_record); - } catch (const std::exception& my_exception) { - std::string error_message = fmt::format("serial::Decode failed exception : {}", my_exception.what()); - DINGO_LOG(ERROR) << error_message; - return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); - } - - if (ret < 0) { - std::string error_message = fmt::format("serial::Decode failed"); - DINGO_LOG(ERROR) << error_message; - return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); + if (ret < 0) { + std::string error_message = fmt::format("serial::Decode failed"); + DINGO_LOG(ERROR) << error_message; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); + } +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif return DoRelExprCore(original_record, result_operand_ptr); } @@ -418,37 +610,76 @@ butil::Status CoprocessorV2::GetKvFromExprEndOfFinish(bool /*key_only*/, size_t std::vector* kvs) { butil::Status status; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + auto lambda_time_now_function = []() { return std::chrono::steady_clock::now(); }; + auto lambda_time_diff_microseconds_function = [](auto start, auto end) { + return std::chrono::duration_cast(end - start).count(); + }; +#endif + while (true) { std::unique_ptr> result_operand_ptr; - try { - const expr::Tuple* result_tuple = rel_runner_->Get(); - result_operand_ptr.reset(const_cast(result_tuple)); - } catch (const std::exception& my_exception) { - std::string error_message = fmt::format("rel::RelRunner Get failed. exception : {}", my_exception.what()); - DINGO_LOG(ERROR) << error_message; - return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto expr_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto expr_end = lambda_time_now_function(); + rel_expr_spend_time_ms += lambda_time_diff_microseconds_function(expr_start, expr_end); + }); +#endif + try { + const expr::Tuple* result_tuple = rel_runner_->Get(); + result_operand_ptr.reset(const_cast(result_tuple)); + } catch (const std::exception& my_exception) { + std::string error_message = fmt::format("rel::RelRunner Get failed. exception : {}", my_exception.what()); + DINGO_LOG(ERROR) << error_message; + return butil::Status(pb::error::EILLEGAL_PARAMTETERS, error_message); + } +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif if (!result_operand_ptr) { break; } std::vector result_record; - status = RelExprHelper::TransFromOperandWrapper(result_operand_ptr, result_serial_schemas_, result_column_indexes_, - result_record); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto trans_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto trans_end = lambda_time_now_function(); + trans_field_spend_time_ms += lambda_time_diff_microseconds_function(trans_start, trans_end); + }); +#endif + status = RelExprHelper::TransFromOperandWrapper(result_operand_ptr, result_serial_schemas_, + result_column_indexes_, result_record); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif bool has_result_kv = false; pb::common::KeyValue result_kv; - - status = GetKvFromExpr(result_record, &has_result_kv, &result_kv); - if (!status.ok()) { - DINGO_LOG(ERROR) << status.error_cstr(); - return status; +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + { + auto encode_start = lambda_time_now_function(); + ON_SCOPE_EXIT([&]() { + auto encode_end = lambda_time_now_function(); + decode_spend_time_ms += lambda_time_diff_microseconds_function(encode_start, encode_end); + }); +#endif + status = GetKvFromExpr(result_record, &has_result_kv, &result_kv); + if (!status.ok()) { + DINGO_LOG(ERROR) << status.error_cstr(); + return status; + } +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) } +#endif if (has_result_kv) { kvs->emplace_back(std::move(result_kv)); diff --git a/src/coprocessor/coprocessor_v2.h b/src/coprocessor/coprocessor_v2.h index 8d07b62ee..c45e6acfd 100755 --- a/src/coprocessor/coprocessor_v2.h +++ b/src/coprocessor/coprocessor_v2.h @@ -33,6 +33,12 @@ namespace dingodb { +#ifndef ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION +#define ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION +#endif + +#undef ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION + class CoprocessorV2 : public RawCoprocessor { public: CoprocessorV2(); @@ -112,6 +118,19 @@ class CoprocessorV2 : public RawCoprocessor { static bvar::LatencyRecorder coprocessor_v2_filter_latency; BvarLatencyGuard bvar_guard_for_coprocessor_v2_latency_; // NOLINT + +#if defined(ENABLE_COPROCESSOR_V2_STATISTICS_TIME_CONSUMPTION) + std::chrono::steady_clock::time_point coprocessor_v2_start_time_point; + std::chrono::steady_clock::time_point coprocessor_v2_end_time_point; + int64_t coprocessor_v2_spend_time_ms; + int64_t iter_next_spend_time_ms; + int64_t get_kv_spend_time_ms; + int64_t trans_field_spend_time_ms; + int64_t decode_spend_time_ms; + int64_t rel_expr_spend_time_ms; + int64_t misc_spend_time_ms; + int64_t open_spend_time_ms; +#endif }; } // namespace dingodb