Skip to content

Commit

Permalink
[refactor][sdk] Make sdk params use gflags
Browse files Browse the repository at this point in the history
  • Loading branch information
wchuande authored and ketor committed Feb 18, 2024
1 parent 47853c4 commit af35815
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 55 deletions.
1 change: 1 addition & 0 deletions src/sdk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_library(sdk
vector/vector_delete_task.cc
vector/vector_search_task.cc
utils/thread_pool_actuator.cc
common/param_config.cc
# TODO: use libary
${PROJECT_SOURCE_DIR}/src/coordinator/coordinator_interaction.cc
${PROJECT_SOURCE_DIR}/src/common/role.cc
Expand Down
6 changes: 3 additions & 3 deletions src/sdk/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,20 +393,20 @@ Status RegionCreator::Create(int64_t& out_region_id) {

if (data_->wait) {
int retry = 0;
while (retry < kCoordinatorInteractionMaxRetry) {
while (retry < FLAGS_coordinator_interaction_max_retry) {
bool creating = false;
DINGO_RETURN_NOT_OK(data_->stub.GetAdminTool()->IsCreateRegionInProgress(out_region_id, creating));

if (creating) {
retry++;
usleep(kCoordinatorInteractionDelayMs * 1000);
usleep(FLAGS_coordinator_interaction_delay_ms * 1000);
} else {
return Status::OK();
}
}

std::string msg = fmt::format("Fail query region:{} state retry:{} exceed limit:{}, delay ms:{}", out_region_id,
retry, kCoordinatorInteractionMaxRetry, kCoordinatorInteractionDelayMs);
retry, FLAGS_coordinator_interaction_max_retry, FLAGS_coordinator_interaction_delay_ms);
DINGO_LOG(INFO) << msg;
return Status::Incomplete(msg);
}
Expand Down
6 changes: 3 additions & 3 deletions src/sdk/client_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Status ClientStub::Open(std::string naming_service_url) {

// TODO: pass use gflag or add options
brpc::ChannelOptions options;
options.timeout_ms = kRpcChannelTimeOutMs;
options.connect_timeout_ms = kRpcChannelConnectTimeOutMs;
options.timeout_ms = FLAGS_rpc_channel_timeout_ms;
options.connect_timeout_ms = FLAGS_rpc_channel_connect_timeout_ms;
store_rpc_interaction_.reset(new RpcInteraction(options));

meta_cache_.reset(new MetaCache(coordinator_proxy_));
Expand All @@ -52,7 +52,7 @@ Status ClientStub::Open(std::string naming_service_url) {
txn_lock_resolver_.reset(new TxnLockResolver(*(this)));

actuator_.reset(new ThreadPoolActuator());
actuator_->Start(kActuatorThreadNum);
actuator_->Start(FLAGS_actuator_thread_num);

vector_index_cache_.reset(new VectorIndexCache(*coordinator_proxy_));

Expand Down
42 changes: 42 additions & 0 deletions src/sdk/common/param_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "sdk/common/param_config.h"

// ChannelOptions should set "timeout_ms > connect_timeout_ms" for circuit breaker
DEFINE_int64(rpc_channel_timeout_ms, 500000, "rpc channel timeout ms");
DEFINE_int64(rpc_channel_connect_timeout_ms, 3000, "rpc channel connect timeout ms");

DEFINE_int64(rpc_max_retry, 3, "rpc call max retry times");
DEFINE_int64(rpc_time_out_ms, 500000, "rpc call timeout ms");

DEFINE_int64(store_rpc_max_retry, 5, "store rpc max retry times, use case: wrong leader or request range invalid");
DEFINE_int64(store_rpc_retry_delay_ms, 1000, "store rpc retry delay ms");

DEFINE_int64(scan_batch_size, 10, "scan batch size, use for region scanner");

DEFINE_int64(coordinator_interaction_delay_ms, 200, "coordinator interaction delay ms");
DEFINE_int64(coordinator_interaction_max_retry, 300, "coordinator interaction max retry");

DEFINE_int64(txn_op_delay_ms, 200, "txn op delay ms");
DEFINE_int64(txn_op_max_retry, 2, "txn op max retry times");

DEFINE_int64(actuator_thread_num, 8, "actuator thread num");

DEFINE_int64(raw_kv_delay_ms, 200, "raw kv backoff delay ms");
DEFINE_int64(raw_kv_max_retry, 5, "raw kv max retry times");

DEFINE_int64(vector_op_delay_ms, 500, "raw kv backoff delay ms");
DEFINE_int64(vector_op_max_retry, 10, "raw kv max retry times");
39 changes: 21 additions & 18 deletions src/sdk/common/param_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,44 @@

#include <cstdint>

#include "gflags/gflags.h"

// TODO: make params in this file use glfags

const int64_t kSdkVlogLevel = 60;

// ChannelOptions should set "timeout_ms > connect_timeout_ms" for circuit breaker
const int64_t kRpcChannelTimeOutMs = 500000;
const int64_t kRpcChannelConnectTimeOutMs = 3000;

// each rpc call params
const int64_t kRpcCallMaxRetry = 3;
const int64_t kRpcTimeOutMs = 500000;
DECLARE_int64(rpc_channel_timeout_ms);
DECLARE_int64(rpc_channel_connect_timeout_ms);

// use case: wrong leader or request range invalid
const int64_t kRpcMaxRetry = 5;
// each rpc call params, set for brpc::Controller
DECLARE_int64(rpc_max_retry);
DECLARE_int64(rpc_time_out_ms);

const int64_t kRpcRetryDelayMs = 1000;
// each store rpc params, used for store rpc controller
DECLARE_int64(store_rpc_max_retry);
DECLARE_int64(store_rpc_retry_delay_ms);

// start: use for region scanner
const int64_t kScanBatchSize = 10;

DECLARE_int64(scan_batch_size);
const int64_t kMinScanBatchSize = 1;

const int64_t kMaxScanBatchSize = 100;
// end: use for region scanner

const int64_t kPrefetchRegionCount = 3;

const int64_t kCoordinatorInteractionDelayMs = 200;
const int64_t kCoordinatorInteractionMaxRetry = 300;
DECLARE_int64(coordinator_interaction_delay_ms);
DECLARE_int64(coordinator_interaction_max_retry);

DECLARE_int64(actuator_thread_num);

const int64_t kTxnOpMaxRetry = 2;
DECLARE_int64(raw_kv_delay_ms);
DECLARE_int64(raw_kv_max_retry);

const int64_t kActuatorThreadNum = 8;
DECLARE_int64(txn_op_delay_ms);
DECLARE_int64(txn_op_max_retry);

const int64_t kRawkvBackoffMs = 200;
const int64_t kRawkvMaxRetry = 10;
DECLARE_int64(vector_op_delay_ms);
DECLARE_int64(vector_op_max_retry);

#endif // DINGODB_SDK_PARAM_CONFIG_H_
2 changes: 1 addition & 1 deletion src/sdk/rawkv/raw_kv_region_scanner_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ RawKvRegionScannerImpl::RawKvRegionScannerImpl(const ClientStub& stub, std::shar
end_key_(std::move(end_key)),
opened_(false),
has_more_(false),
batch_size_(kScanBatchSize) {}
batch_size_(FLAGS_scan_batch_size) {}

static void RawKvRegionScannerImplDeleted(Status status, std::string scan_id) {
VLOG(kSdkVlogLevel) << "RawKvRegionScannerImpl deleted, scanner id: " << scan_id << " status:" << status.ToString();
Expand Down
5 changes: 3 additions & 2 deletions src/sdk/rawkv/raw_kv_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "sdk/rawkv/raw_kv_task.h"

#include "sdk/common/param_config.h"
#include "sdk/utils/async_util.h"

namespace dingodb {
Expand Down Expand Up @@ -69,7 +70,7 @@ bool RawKvTask::NeedRetry() {
if (error_code == pb::error::EREGION_VERSION || error_code == pb::error::EREGION_NOT_FOUND ||
error_code == pb::error::EKEY_OUT_OF_RANGE) {
retry_count_++;
if (retry_count_ < kRawkvMaxRetry) {
if (retry_count_ < FLAGS_raw_kv_max_retry) {
return true;
} else {
std::string msg =
Expand All @@ -83,7 +84,7 @@ bool RawKvTask::NeedRetry() {
}

void RawKvTask::BackoffAndRetry() {
stub.GetActuator()->Schedule([this] { DoAsync(); }, kRawkvBackoffMs);
stub.GetActuator()->Schedule([this] { DoAsync(); }, FLAGS_raw_kv_delay_ms);
}

void RawKvTask::FireCallback() {
Expand Down
4 changes: 2 additions & 2 deletions src/sdk/rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class ClientRpc : public Rpc {
response->Clear();
controller.Reset();
controller.set_log_id(butil::fast_rand());
controller.set_timeout_ms(kRpcTimeOutMs);
controller.set_max_retry(kRpcCallMaxRetry);
controller.set_timeout_ms(FLAGS_rpc_time_out_ms);
controller.set_max_retry(FLAGS_rpc_max_retry);
status = Status::OK();
}

Expand Down
9 changes: 3 additions & 6 deletions src/sdk/store/store_rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ void StoreRpcController::SendStoreRpc() {

void StoreRpcController::MaybeDelay() {
if (NeedDelay()) {
auto delay = DelayTimeMs();
DINGO_LOG(INFO) << "try to delay:" << delay << "ms";
(void)usleep(delay);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_store_rpc_retry_delay_ms << "ms";
(void)usleep(FLAGS_store_rpc_retry_delay_ms);
}
}

Expand Down Expand Up @@ -269,14 +268,12 @@ std::shared_ptr<Region> StoreRpcController::ProcessStoreRegionInfo(
return region;
}

bool StoreRpcController::NeedRetry() const { return this->rpc_retry_times_ < kRpcMaxRetry; }
bool StoreRpcController::NeedRetry() const { return this->rpc_retry_times_ < FLAGS_store_rpc_max_retry; }

bool StoreRpcController::NeedDelay() const { return status_.IsRemoteError(); }

bool StoreRpcController::NeedPickLeader() const { return !status_.IsRemoteError(); }

int64_t StoreRpcController::DelayTimeMs() const { return kRpcRetryDelayMs; }

const pb::error::Error& StoreRpcController::GetResponseError(Rpc& rpc) {
const auto* response = rpc.RawResponse();
const auto* descriptor = response->GetDescriptor();
Expand Down
1 change: 0 additions & 1 deletion src/sdk/store/store_rpc_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class StoreRpcController {
// backoff
void MaybeDelay();
bool NeedDelay() const;
int64_t DelayTimeMs() const;

bool PickNextLeader(butil::EndPoint& leader);

Expand Down
18 changes: 9 additions & 9 deletions src/sdk/transaction/txn_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ Status Transaction::TxnImpl::DoTxnGet(const std::string& key, std::string& value

if (NeedRetryAndInc(retry)) {
// TODO: set txn retry ms
DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms";
DelayRetry(kRpcRetryDelayMs);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms";
DelayRetry(FLAGS_txn_op_delay_ms);
} else {
break;
}
Expand Down Expand Up @@ -166,8 +166,8 @@ void Transaction::TxnImpl::ProcessTxnBatchGetSubTask(TxnSubTask* sub_task) {

if (NeedRetryAndInc(retry)) {
// TODO: set txn retry ms
DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms";
DelayRetry(kRpcRetryDelayMs);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms";
DelayRetry(FLAGS_txn_op_delay_ms);
} else {
break;
}
Expand Down Expand Up @@ -563,8 +563,8 @@ Status Transaction::TxnImpl::PreCommitPrimaryKey() {

if (NeedRetryAndInc(retry)) {
// TODO: set txn retry ms
DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms";
DelayRetry(kRpcRetryDelayMs);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms";
DelayRetry(FLAGS_txn_op_delay_ms);
} else {
break;
}
Expand Down Expand Up @@ -597,8 +597,8 @@ void Transaction::TxnImpl::ProcessTxnPrewriteSubTask(TxnSubTask* sub_task) {
}
if (NeedRetryAndInc(retry)) {
// TODO: set txn retry ms
DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms";
DelayRetry(kRpcRetryDelayMs);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms";
DelayRetry(FLAGS_txn_op_delay_ms);
} else {
// TODO: maybe set ret as meaningful status
break;
Expand Down Expand Up @@ -1024,7 +1024,7 @@ Status Transaction::TxnImpl::Rollback() {
}

bool Transaction::TxnImpl::NeedRetryAndInc(int& times) {
bool retry = times < kTxnOpMaxRetry;
bool retry = times < FLAGS_txn_op_max_retry;
times++;
return retry;
}
Expand Down
8 changes: 4 additions & 4 deletions src/sdk/transaction/txn_region_scanner_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TxnRegionScannerImpl::TxnRegionScannerImpl(const ClientStub& stub, std::shared_p
end_key_(std::move(end_key)),
opened_(false),
has_more_(false),
batch_size_(kScanBatchSize),
batch_size_(FLAGS_scan_batch_size),
next_key_(start_key_),
include_next_key_(true) {}

Expand Down Expand Up @@ -104,8 +104,8 @@ Status TxnRegionScannerImpl::NextBatch(std::vector<KVPair>& kvs) {
}

if (NeedRetryAndInc(retry)) {
DINGO_LOG(INFO) << "try to delay:" << kRpcRetryDelayMs << "ms";
DelayRetry(kRpcRetryDelayMs);
DINGO_LOG(INFO) << "try to delay:" << FLAGS_txn_op_delay_ms << "ms";
DelayRetry(FLAGS_txn_op_delay_ms);
} else {
break;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ Status TxnRegionScannerImpl::SetBatchSize(int64_t size) {
}

bool TxnRegionScannerImpl::NeedRetryAndInc(int& times) {
bool retry = times < kTxnOpMaxRetry;
bool retry = times < FLAGS_txn_op_max_retry;
times++;
return retry;
}
Expand Down
5 changes: 3 additions & 2 deletions src/sdk/vector/vector_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "sdk/vector/vector_task.h"

#include "sdk/common/param_config.h"
#include "sdk/utils/async_util.h"

namespace dingodb {
Expand Down Expand Up @@ -70,7 +71,7 @@ bool VectorTask::NeedRetry() {
if (error_code == pb::error::EREGION_VERSION || error_code == pb::error::EREGION_NOT_FOUND ||
error_code == pb::error::EKEY_OUT_OF_RANGE) {
retry_count_++;
if (retry_count_ < kRawkvMaxRetry) {
if (retry_count_ < FLAGS_vector_op_max_retry) {
return true;
} else {
std::string msg =
Expand All @@ -84,7 +85,7 @@ bool VectorTask::NeedRetry() {
}

void VectorTask::BackoffAndRetry() {
stub.GetActuator()->Schedule([this] { DoAsync(); }, kRawkvBackoffMs);
stub.GetActuator()->Schedule([this] { DoAsync(); }, FLAGS_vector_op_delay_ms);
}

void VectorTask::FireCallback() {
Expand Down
6 changes: 3 additions & 3 deletions test/unit_test/sdk/rawkv/test_raw_kv_region_scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ TEST_F(RawKvRegionScannerImplTest, SetBatchSize) {

RawKvRegionScannerImpl scanner(*stub, region, region->Range().start_key(), region->Range().end_key());

EXPECT_EQ(scanner.GetBatchSize(), kScanBatchSize);
EXPECT_EQ(scanner.GetBatchSize(), FLAGS_scan_batch_size);

scanner.SetBatchSize(0);
EXPECT_EQ(scanner.GetBatchSize(), kMinScanBatchSize);
Expand Down Expand Up @@ -232,7 +232,7 @@ TEST_F(RawKvRegionScannerImplTest, NextBatchFail) {
CHECK_NOTNULL(kv_rpc);

EXPECT_EQ(kv_rpc->Request()->scan_id(), scan_id);
EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), kScanBatchSize);
EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), FLAGS_scan_batch_size);

auto* error = kv_rpc->MutableResponse()->mutable_error();
error->set_errcode(pb::error::EINTERNAL);
Expand Down Expand Up @@ -282,7 +282,7 @@ TEST_F(RawKvRegionScannerImplTest, NextBatchNoData) {
CHECK_NOTNULL(kv_rpc);

EXPECT_EQ(kv_rpc->Request()->scan_id(), scan_id);
EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), kScanBatchSize);
EXPECT_EQ(kv_rpc->Request()->max_fetch_cnt(), FLAGS_scan_batch_size);

cb();
})
Expand Down
2 changes: 1 addition & 1 deletion test/unit_test/sdk/test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TestBase : public ::testing::Test {
EXPECT_CALL(*stub, GetTxnLockResolver).Times(testing::AnyNumber());

actuator.reset(new ThreadPoolActuator());
actuator->Start(kActuatorThreadNum);
actuator->Start(FLAGS_actuator_thread_num);
ON_CALL(*stub, GetActuator).WillByDefault(testing::Return(actuator));
EXPECT_CALL(*stub, GetActuator).Times(testing::AnyNumber());

Expand Down

0 comments on commit af35815

Please sign in to comment.