Skip to content

Commit

Permalink
[fix][store] Fixup heartbeat issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Dec 5, 2023
1 parent f651b00 commit 520d0d8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 20 deletions.
1 change: 0 additions & 1 deletion src/client/store_client_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,6 @@ int64_t SendVectorCount(int64_t region_id, int64_t start_vector_id, int64_t end_

InteractionManager::GetInstance().SendRequestWithContext("IndexService", "VectorCount", request, response);

DINGO_LOG(INFO) << "VectorCount response: " << response.DebugString();
return response.error().errcode() != 0 ? 0 : response.count();
}

Expand Down
17 changes: 12 additions & 5 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3647,6 +3647,13 @@ void CoordinatorControl::UpdateRegionMapAndStoreOperation(const pb::common::Stor
DINGO_LOG(INFO) << "UpdateRegionMapAndStoreOperation partial heartbeat, split/merge, region_metrics_map_size = "
<< store_metrics.region_metrics_map_size()
<< ", region_metrics=" << store_metrics.ShortDebugString();
} else if (store_metrics.is_partial_region_metrics()) {
DINGO_LOG(INFO) << "UpdateRegionMapAndStoreOperation partial heartbeat, not split/merge, region_metrics_map_size = "
<< store_metrics.region_metrics_map_size()
<< ", region_metrics=" << store_metrics.ShortDebugString();
} else {
DINGO_LOG(INFO) << "UpdateRegionMapAndStoreOperation full heartbeat, region_metrics_map_size = "
<< store_metrics.region_metrics_map_size();
}

// update region_map
Expand Down Expand Up @@ -3725,7 +3732,7 @@ void CoordinatorControl::UpdateRegionMapAndStoreOperation(const pb::common::Stor
<< store_metrics.id() << " region_id = " << region_metrics.id();
continue;
} else {
DINGO_LOG(INFO) << "follower will update RegionMape, store_id=" << store_metrics.id()
DINGO_LOG(INFO) << "follower will update RegionMap, store_id=" << store_metrics.id()
<< " region_id = " << region_metrics.id();
}
} else {
Expand Down Expand Up @@ -3788,10 +3795,10 @@ void CoordinatorControl::UpdateRegionMapAndStoreOperation(const pb::common::Stor
if (region_to_update.definition().range().start_key() != region_metrics.region_definition().range().start_key() ||
region_to_update.definition().range().end_key() != region_metrics.region_definition().range().end_key()) {
DINGO_LOG(INFO) << "region range change region_id = " << region_metrics.id() << " old range = ["
<< region_to_update.definition().range().start_key() << ", "
<< region_to_update.definition().range().end_key() << ")"
<< " new range = [" << region_metrics.region_definition().range().start_key() << ", "
<< region_metrics.region_definition().range().end_key() << ")";
<< Helper::StringToHex(region_to_update.definition().range().start_key()) << ", "
<< Helper::StringToHex(region_to_update.definition().range().end_key()) << ")"
<< " new range = [" << Helper::StringToHex(region_metrics.region_definition().range().start_key())
<< ", " << Helper::StringToHex(region_metrics.region_definition().range().end_key()) << ")";
if (!leader_has_old_epoch) {
need_update_region_metrics = true;
}
Expand Down
30 changes: 16 additions & 14 deletions src/store/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,30 @@ void HeartbeatTask::SendStoreHeartbeat(std::shared_ptr<CoordinatorInteraction> c
}
}
for (const auto& region_meta : region_metas) {
if (region_meta->State() == pb::common::StoreRegionState::SPLITTING ||
region_meta->State() == pb::common::StoreRegionState::MERGING) {
auto inner_region = region_meta->InnerRegion();
if (inner_region.state() == pb::common::StoreRegionState::SPLITTING ||
inner_region.state() == pb::common::StoreRegionState::MERGING) {
DINGO_LOG(WARNING) << fmt::format("[heartbeat.store][region({})] region state({}) not suit heartbeat.",
region_meta->Id(), pb::common::StoreRegionState_Name(region_meta->State()));
inner_region.id(), pb::common::StoreRegionState_Name(inner_region.state()));
continue;
}

pb::common::RegionMetrics tmp_region_metrics;
auto metrics = region_metrics->GetMetrics(region_meta->Id());
auto metrics = region_metrics->GetMetrics(inner_region.id());
if (metrics != nullptr) {
tmp_region_metrics = metrics->InnerRegionMetrics();
}

tmp_region_metrics.set_id(region_meta->Id());
tmp_region_metrics.set_leader_store_id(region_meta->LeaderId());
tmp_region_metrics.set_store_region_state(region_meta->State());
*(tmp_region_metrics.mutable_region_definition()) = region_meta->Definition();
tmp_region_metrics.set_snapshot_epoch_version(region_meta->SnapshotEpochVersion());
tmp_region_metrics.set_id(inner_region.id());
tmp_region_metrics.set_leader_store_id(inner_region.leader_id());
tmp_region_metrics.set_store_region_state(inner_region.state());
*(tmp_region_metrics.mutable_region_definition()) = inner_region.definition();
tmp_region_metrics.set_snapshot_epoch_version(inner_region.snapshot_epoch_version());

if (region_meta->State() == pb::common::StoreRegionState::NORMAL ||
region_meta->State() == pb::common::StoreRegionState::STANDBY ||
region_meta->State() == pb::common::StoreRegionState::TOMBSTONE) {
auto raft_node = raft_store_engine->GetNode(region_meta->Id());
if (inner_region.state() == pb::common::StoreRegionState::NORMAL ||
inner_region.state() == pb::common::StoreRegionState::STANDBY ||
inner_region.state() == pb::common::StoreRegionState::TOMBSTONE) {
auto raft_node = raft_store_engine->GetNode(inner_region.id());
if (raft_node != nullptr) {
*(tmp_region_metrics.mutable_braft_status()) = (*raft_node->GetStatus());
}
Expand All @@ -127,7 +129,7 @@ void HeartbeatTask::SendStoreHeartbeat(std::shared_ptr<CoordinatorInteraction> c
vector_index_status->set_snapshot_log_id(vector_index_wrapper->SnapshotLogId());
}

mut_region_metrics_map->insert({region_meta->Id(), tmp_region_metrics});
mut_region_metrics_map->insert({inner_region.id(), tmp_region_metrics});
}

DINGO_LOG(INFO) << fmt::format("[heartbeat.store] request region count({}) size({}) elapsed time({} ms)",
Expand Down

0 comments on commit 520d0d8

Please sign in to comment.