From af358153f73c2589422bad5dc5f3ff72b3fead8d Mon Sep 17 00:00:00 2001 From: wchuande Date: Sun, 18 Feb 2024 16:02:02 +0800 Subject: [PATCH] [refactor][sdk] Make sdk params use gflags --- src/sdk/CMakeLists.txt | 1 + src/sdk/client.cc | 6 +-- src/sdk/client_stub.cc | 6 +-- src/sdk/common/param_config.cc | 42 +++++++++++++++++++ src/sdk/common/param_config.h | 39 +++++++++-------- src/sdk/rawkv/raw_kv_region_scanner_impl.cc | 2 +- src/sdk/rawkv/raw_kv_task.cc | 5 ++- src/sdk/rpc/rpc.h | 4 +- src/sdk/store/store_rpc_controller.cc | 9 ++-- src/sdk/store/store_rpc_controller.h | 1 - src/sdk/transaction/txn_impl.cc | 18 ++++---- .../transaction/txn_region_scanner_impl.cc | 8 ++-- src/sdk/vector/vector_task.cc | 5 ++- .../sdk/rawkv/test_raw_kv_region_scanner.cc | 6 +-- test/unit_test/sdk/test_base.h | 2 +- 15 files changed, 99 insertions(+), 55 deletions(-) create mode 100644 src/sdk/common/param_config.cc diff --git a/src/sdk/CMakeLists.txt b/src/sdk/CMakeLists.txt index e02e8017e..258d7e535 100644 --- a/src/sdk/CMakeLists.txt +++ b/src/sdk/CMakeLists.txt @@ -38,6 +38,7 @@ add_library(sdk vector/vector_delete_task.cc vector/vector_search_task.cc utils/thread_pool_actuator.cc + common/param_config.cc # TODO: use libary ${PROJECT_SOURCE_DIR}/src/coordinator/coordinator_interaction.cc ${PROJECT_SOURCE_DIR}/src/common/role.cc diff --git a/src/sdk/client.cc b/src/sdk/client.cc index 9955f372f..ac9544f28 100644 --- a/src/sdk/client.cc +++ b/src/sdk/client.cc @@ -393,20 +393,20 @@ Status RegionCreator::Create(int64_t& out_region_id) { if (data_->wait) { int retry = 0; - while (retry < kCoordinatorInteractionMaxRetry) { + while (retry < FLAGS_coordinator_interaction_max_retry) { bool creating = false; DINGO_RETURN_NOT_OK(data_->stub.GetAdminTool()->IsCreateRegionInProgress(out_region_id, creating)); if (creating) { retry++; - usleep(kCoordinatorInteractionDelayMs * 1000); + usleep(FLAGS_coordinator_interaction_delay_ms * 1000); } else { return Status::OK(); } } std::string msg = fmt::format("Fail query region:{} state retry:{} exceed limit:{}, delay ms:{}", out_region_id, - retry, kCoordinatorInteractionMaxRetry, kCoordinatorInteractionDelayMs); + retry, FLAGS_coordinator_interaction_max_retry, FLAGS_coordinator_interaction_delay_ms); DINGO_LOG(INFO) << msg; return Status::Incomplete(msg); } diff --git a/src/sdk/client_stub.cc b/src/sdk/client_stub.cc index d77b132a1..415b74dbc 100644 --- a/src/sdk/client_stub.cc +++ b/src/sdk/client_stub.cc @@ -37,8 +37,8 @@ Status ClientStub::Open(std::string naming_service_url) { // TODO: pass use gflag or add options brpc::ChannelOptions options; - options.timeout_ms = kRpcChannelTimeOutMs; - options.connect_timeout_ms = kRpcChannelConnectTimeOutMs; + options.timeout_ms = FLAGS_rpc_channel_timeout_ms; + options.connect_timeout_ms = FLAGS_rpc_channel_connect_timeout_ms; store_rpc_interaction_.reset(new RpcInteraction(options)); meta_cache_.reset(new MetaCache(coordinator_proxy_)); @@ -52,7 +52,7 @@ Status ClientStub::Open(std::string naming_service_url) { txn_lock_resolver_.reset(new TxnLockResolver(*(this))); actuator_.reset(new ThreadPoolActuator()); - actuator_->Start(kActuatorThreadNum); + actuator_->Start(FLAGS_actuator_thread_num); vector_index_cache_.reset(new VectorIndexCache(*coordinator_proxy_)); diff --git a/src/sdk/common/param_config.cc b/src/sdk/common/param_config.cc new file mode 100644 index 000000000..e315f0494 --- /dev/null +++ b/src/sdk/common/param_config.cc @@ -0,0 +1,42 @@ + +// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "sdk/common/param_config.h" + +// ChannelOptions should set "timeout_ms > connect_timeout_ms" for circuit breaker +DEFINE_int64(rpc_channel_timeout_ms, 500000, "rpc channel timeout ms"); +DEFINE_int64(rpc_channel_connect_timeout_ms, 3000, "rpc channel connect timeout ms"); + +DEFINE_int64(rpc_max_retry, 3, "rpc call max retry times"); +DEFINE_int64(rpc_time_out_ms, 500000, "rpc call timeout ms"); + +DEFINE_int64(store_rpc_max_retry, 5, "store rpc max retry times, use case: wrong leader or request range invalid"); +DEFINE_int64(store_rpc_retry_delay_ms, 1000, "store rpc retry delay ms"); + +DEFINE_int64(scan_batch_size, 10, "scan batch size, use for region scanner"); + +DEFINE_int64(coordinator_interaction_delay_ms, 200, "coordinator interaction delay ms"); +DEFINE_int64(coordinator_interaction_max_retry, 300, "coordinator interaction max retry"); + +DEFINE_int64(txn_op_delay_ms, 200, "txn op delay ms"); +DEFINE_int64(txn_op_max_retry, 2, "txn op max retry times"); + +DEFINE_int64(actuator_thread_num, 8, "actuator thread num"); + +DEFINE_int64(raw_kv_delay_ms, 200, "raw kv backoff delay ms"); +DEFINE_int64(raw_kv_max_retry, 5, "raw kv max retry times"); + +DEFINE_int64(vector_op_delay_ms, 500, "raw kv backoff delay ms"); +DEFINE_int64(vector_op_max_retry, 10, "raw kv max retry times"); \ No newline at end of file diff --git a/src/sdk/common/param_config.h b/src/sdk/common/param_config.h index 6f7613ebd..2d377e371 100644 --- a/src/sdk/common/param_config.h +++ b/src/sdk/common/param_config.h @@ -17,41 +17,44 @@ #include +#include "gflags/gflags.h" + // TODO: make params in this file use glfags const int64_t kSdkVlogLevel = 60; // ChannelOptions should set "timeout_ms > connect_timeout_ms" for circuit breaker -const int64_t kRpcChannelTimeOutMs = 500000; -const int64_t kRpcChannelConnectTimeOutMs = 3000; - -// each rpc call params -const int64_t kRpcCallMaxRetry = 3; -const int64_t kRpcTimeOutMs = 500000; +DECLARE_int64(rpc_channel_timeout_ms); +DECLARE_int64(rpc_channel_connect_timeout_ms); -// use case: wrong leader or request range invalid -const int64_t kRpcMaxRetry = 5; +// each rpc call params, set for brpc::Controller +DECLARE_int64(rpc_max_retry); +DECLARE_int64(rpc_time_out_ms); -const int64_t kRpcRetryDelayMs = 1000; +// each store rpc params, used for store rpc controller +DECLARE_int64(store_rpc_max_retry); +DECLARE_int64(store_rpc_retry_delay_ms); // start: use for region scanner -const int64_t kScanBatchSize = 10; - +DECLARE_int64(scan_batch_size); const int64_t kMinScanBatchSize = 1; - const int64_t kMaxScanBatchSize = 100; // end: use for region scanner const int64_t kPrefetchRegionCount = 3; -const int64_t kCoordinatorInteractionDelayMs = 200; -const int64_t kCoordinatorInteractionMaxRetry = 300; +DECLARE_int64(coordinator_interaction_delay_ms); +DECLARE_int64(coordinator_interaction_max_retry); + +DECLARE_int64(actuator_thread_num); -const int64_t kTxnOpMaxRetry = 2; +DECLARE_int64(raw_kv_delay_ms); +DECLARE_int64(raw_kv_max_retry); -const int64_t kActuatorThreadNum = 8; +DECLARE_int64(txn_op_delay_ms); +DECLARE_int64(txn_op_max_retry); -const int64_t kRawkvBackoffMs = 200; -const int64_t kRawkvMaxRetry = 10; +DECLARE_int64(vector_op_delay_ms); +DECLARE_int64(vector_op_max_retry); #endif // DINGODB_SDK_PARAM_CONFIG_H_ \ No newline at end of file diff --git a/src/sdk/rawkv/raw_kv_region_scanner_impl.cc b/src/sdk/rawkv/raw_kv_region_scanner_impl.cc index f19786e8a..91b43d96f 100644 --- a/src/sdk/rawkv/raw_kv_region_scanner_impl.cc +++ b/src/sdk/rawkv/raw_kv_region_scanner_impl.cc @@ -37,7 +37,7 @@ RawKvRegionScannerImpl::RawKvRegionScannerImpl(const ClientStub& stub, std::shar end_key_(std::move(end_key)), opened_(false), has_more_(false), - batch_size_(kScanBatchSize) {} + batch_size_(FLAGS_scan_batch_size) {} static void RawKvRegionScannerImplDeleted(Status status, std::string scan_id) { VLOG(kSdkVlogLevel) << "RawKvRegionScannerImpl deleted, scanner id: " << scan_id << " status:" << status.ToString(); diff --git a/src/sdk/rawkv/raw_kv_task.cc b/src/sdk/rawkv/raw_kv_task.cc index de53f1834..836511898 100644 --- a/src/sdk/rawkv/raw_kv_task.cc +++ b/src/sdk/rawkv/raw_kv_task.cc @@ -14,6 +14,7 @@ #include "sdk/rawkv/raw_kv_task.h" +#include "sdk/common/param_config.h" #include "sdk/utils/async_util.h" namespace dingodb { @@ -69,7 +70,7 @@ bool RawKvTask::NeedRetry() { if (error_code == pb::error::EREGION_VERSION || error_code == pb::error::EREGION_NOT_FOUND || error_code == pb::error::EKEY_OUT_OF_RANGE) { retry_count_++; - if (retry_count_ < kRawkvMaxRetry) { + if (retry_count_ < FLAGS_raw_kv_max_retry) { return true; } else { std::string msg = @@ -83,7 +84,7 @@ bool RawKvTask::NeedRetry() { } void RawKvTask::BackoffAndRetry() { - stub.GetActuator()->Schedule([this] { DoAsync(); }, kRawkvBackoffMs); + stub.GetActuator()->Schedule([this] { DoAsync(); }, FLAGS_raw_kv_delay_ms); } void RawKvTask::FireCallback() { diff --git a/src/sdk/rpc/rpc.h b/src/sdk/rpc/rpc.h index 43694b688..c9f857577 100644 --- a/src/sdk/rpc/rpc.h +++ b/src/sdk/rpc/rpc.h @@ -129,8 +129,8 @@ class ClientRpc : public Rpc { response->Clear(); controller.Reset(); controller.set_log_id(butil::fast_rand()); - controller.set_timeout_ms(kRpcTimeOutMs); - controller.set_max_retry(kRpcCallMaxRetry); + controller.set_timeout_ms(FLAGS_rpc_time_out_ms); + controller.set_max_retry(FLAGS_rpc_max_retry); status = Status::OK(); } diff --git a/src/sdk/store/store_rpc_controller.cc b/src/sdk/store/store_rpc_controller.cc index aab126a9c..8be855dd4 100644 --- a/src/sdk/store/store_rpc_controller.cc +++ b/src/sdk/store/store_rpc_controller.cc @@ -109,9 +109,8 @@ void StoreRpcController::SendStoreRpc() { void StoreRpcController::MaybeDelay() { if (NeedDelay()) { - auto delay = DelayTimeMs(); - DINGO_LOG(INFO) << "try to delay:" << delay << "ms"; - (void)usleep(delay); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_store_rpc_retry_delay_ms << "ms"; + (void)usleep(FLAGS_store_rpc_retry_delay_ms); } } @@ -269,14 +268,12 @@ std::shared_ptr StoreRpcController::ProcessStoreRegionInfo( return region; } -bool StoreRpcController::NeedRetry() const { return this->rpc_retry_times_ < kRpcMaxRetry; } +bool StoreRpcController::NeedRetry() const { return this->rpc_retry_times_ < FLAGS_store_rpc_max_retry; } bool StoreRpcController::NeedDelay() const { return status_.IsRemoteError(); } bool StoreRpcController::NeedPickLeader() const { return !status_.IsRemoteError(); } -int64_t StoreRpcController::DelayTimeMs() const { return kRpcRetryDelayMs; } - const pb::error::Error& StoreRpcController::GetResponseError(Rpc& rpc) { const auto* response = rpc.RawResponse(); const auto* descriptor = response->GetDescriptor(); diff --git a/src/sdk/store/store_rpc_controller.h b/src/sdk/store/store_rpc_controller.h index 52c72b1d9..58f16e702 100644 --- a/src/sdk/store/store_rpc_controller.h +++ b/src/sdk/store/store_rpc_controller.h @@ -56,7 +56,6 @@ class StoreRpcController { // backoff void MaybeDelay(); bool NeedDelay() const; - int64_t DelayTimeMs() const; bool PickNextLeader(butil::EndPoint& leader); diff --git a/src/sdk/transaction/txn_impl.cc b/src/sdk/transaction/txn_impl.cc index 44fc7fd68..be3cefde1 100644 --- a/src/sdk/transaction/txn_impl.cc +++ b/src/sdk/transaction/txn_impl.cc @@ -93,8 +93,8 @@ Status Transaction::TxnImpl::DoTxnGet(const std::string& key, std::string& value if (NeedRetryAndInc(retry)) { // TODO: set txn retry ms - DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms"; - DelayRetry(kRpcRetryDelayMs); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms"; + DelayRetry(FLAGS_txn_op_delay_ms); } else { break; } @@ -166,8 +166,8 @@ void Transaction::TxnImpl::ProcessTxnBatchGetSubTask(TxnSubTask* sub_task) { if (NeedRetryAndInc(retry)) { // TODO: set txn retry ms - DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms"; - DelayRetry(kRpcRetryDelayMs); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms"; + DelayRetry(FLAGS_txn_op_delay_ms); } else { break; } @@ -563,8 +563,8 @@ Status Transaction::TxnImpl::PreCommitPrimaryKey() { if (NeedRetryAndInc(retry)) { // TODO: set txn retry ms - DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms"; - DelayRetry(kRpcRetryDelayMs); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms"; + DelayRetry(FLAGS_txn_op_delay_ms); } else { break; } @@ -597,8 +597,8 @@ void Transaction::TxnImpl::ProcessTxnPrewriteSubTask(TxnSubTask* sub_task) { } if (NeedRetryAndInc(retry)) { // TODO: set txn retry ms - DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms"; - DelayRetry(kRpcRetryDelayMs); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms"; + DelayRetry(FLAGS_txn_op_delay_ms); } else { // TODO: maybe set ret as meaningful status break; @@ -1024,7 +1024,7 @@ Status Transaction::TxnImpl::Rollback() { } bool Transaction::TxnImpl::NeedRetryAndInc(int& times) { - bool retry = times < kTxnOpMaxRetry; + bool retry = times < FLAGS_txn_op_max_retry; times++; return retry; } diff --git a/src/sdk/transaction/txn_region_scanner_impl.cc b/src/sdk/transaction/txn_region_scanner_impl.cc index b6ea20f22..3eab48c1f 100644 --- a/src/sdk/transaction/txn_region_scanner_impl.cc +++ b/src/sdk/transaction/txn_region_scanner_impl.cc @@ -36,7 +36,7 @@ TxnRegionScannerImpl::TxnRegionScannerImpl(const ClientStub& stub, std::shared_p end_key_(std::move(end_key)), opened_(false), has_more_(false), - batch_size_(kScanBatchSize), + batch_size_(FLAGS_scan_batch_size), next_key_(start_key_), include_next_key_(true) {} @@ -104,8 +104,8 @@ Status TxnRegionScannerImpl::NextBatch(std::vector& kvs) { } if (NeedRetryAndInc(retry)) { - DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms"; - DelayRetry(kRpcRetryDelayMs); + DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms"; + DelayRetry(FLAGS_txn_op_delay_ms); } else { break; } @@ -157,7 +157,7 @@ Status TxnRegionScannerImpl::SetBatchSize(int64_t size) { } bool TxnRegionScannerImpl::NeedRetryAndInc(int& times) { - bool retry = times < kTxnOpMaxRetry; + bool retry = times < FLAGS_txn_op_max_retry; times++; return retry; } diff --git a/src/sdk/vector/vector_task.cc b/src/sdk/vector/vector_task.cc index 6a41e6c0b..d5326f26b 100644 --- a/src/sdk/vector/vector_task.cc +++ b/src/sdk/vector/vector_task.cc @@ -14,6 +14,7 @@ #include "sdk/vector/vector_task.h" +#include "sdk/common/param_config.h" #include "sdk/utils/async_util.h" namespace dingodb { @@ -70,7 +71,7 @@ bool VectorTask::NeedRetry() { if (error_code == pb::error::EREGION_VERSION || error_code == pb::error::EREGION_NOT_FOUND || error_code == pb::error::EKEY_OUT_OF_RANGE) { retry_count_++; - if (retry_count_ < kRawkvMaxRetry) { + if (retry_count_ < FLAGS_vector_op_max_retry) { return true; } else { std::string msg = @@ -84,7 +85,7 @@ bool VectorTask::NeedRetry() { } void VectorTask::BackoffAndRetry() { - stub.GetActuator()->Schedule([this] { DoAsync(); }, kRawkvBackoffMs); + stub.GetActuator()->Schedule([this] { DoAsync(); }, FLAGS_vector_op_delay_ms); } void VectorTask::FireCallback() { diff --git a/test/unit_test/sdk/rawkv/test_raw_kv_region_scanner.cc b/test/unit_test/sdk/rawkv/test_raw_kv_region_scanner.cc index 984561e27..0f71f5396 100644 --- a/test/unit_test/sdk/rawkv/test_raw_kv_region_scanner.cc +++ b/test/unit_test/sdk/rawkv/test_raw_kv_region_scanner.cc @@ -198,7 +198,7 @@ TEST_F(RawKvRegionScannerImplTest, SetBatchSize) { RawKvRegionScannerImpl scanner(*stub, region, region->Range().start_key(), region->Range().end_key()); - EXPECT_EQ(scanner.GetBatchSize(), kScanBatchSize); + EXPECT_EQ(scanner.GetBatchSize(), FLAGS_scan_batch_size); scanner.SetBatchSize(0); EXPECT_EQ(scanner.GetBatchSize(), kMinScanBatchSize); @@ -232,7 +232,7 @@ TEST_F(RawKvRegionScannerImplTest, NextBatchFail) { CHECK_NOTNULL(kv_rpc); EXPECT_EQ(kv_rpc->Request()->scan_id(), scan_id); - EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), kScanBatchSize); + EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), FLAGS_scan_batch_size); auto* error = kv_rpc->MutableResponse()->mutable_error(); error->set_errcode(pb::error::EINTERNAL); @@ -282,7 +282,7 @@ TEST_F(RawKvRegionScannerImplTest, NextBatchNoData) { CHECK_NOTNULL(kv_rpc); EXPECT_EQ(kv_rpc->Request()->scan_id(), scan_id); - EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), kScanBatchSize); + EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), FLAGS_scan_batch_size); cb(); }) diff --git a/test/unit_test/sdk/test_base.h b/test/unit_test/sdk/test_base.h index 103dec06a..356dea81b 100644 --- a/test/unit_test/sdk/test_base.h +++ b/test/unit_test/sdk/test_base.h @@ -71,7 +71,7 @@ class TestBase : public ::testing::Test { EXPECT_CALL(*stub, GetTxnLockResolver).Times(testing::AnyNumber()); actuator.reset(new ThreadPoolActuator()); - actuator->Start(kActuatorThreadNum); + actuator->Start(FLAGS_actuator_thread_num); ON_CALL(*stub, GetActuator).WillByDefault(testing::Return(actuator)); EXPECT_CALL(*stub, GetActuator).Times(testing::AnyNumber());