Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Jul 30, 2024
1 parent 4cac172 commit 80f2a64
Showing 1 changed file with 53 additions and 31 deletions.
84 changes: 53 additions & 31 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ void replica_stub::load_replica(dir_node *dn,
utils::ex_lock &reps_lock,
replicas &reps)
{
LOG_INFO("process dir {}", dir);
LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir);

const auto *const worker = task::get_current_worker2();
if (worker != nullptr) {
Expand All @@ -470,10 +470,11 @@ void replica_stub::load_replica(dir_node *dn,
return;
}

LOG_INFO("{}@{}: load replica '{}' success, <durable, "
"commit> = <{}, {}>, last_prepared_decree = {}",
LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, last_durable_decree={}, "
"last_committed_decree={}, last_prepared_decree={}",
rep->get_gpid(),
dsn_primary_host_port(),
dn->tag,
dir,
rep->last_durable_decree(),
rep->last_committed_decree(),
Expand All @@ -493,7 +494,7 @@ void replica_stub::load_replicas(replicas &reps)
const auto &disks = get_all_disk_dirs();

std::vector<size_t> dir_indexes(disks.size(), 0);
std::vector<std::queue<task_ptr>> load_disk_queues(disks.size());
std::vector<std::queue<std::pair<std::string, task_ptr>>> load_disk_queues(disks.size());
utils::ex_lock reps_lock;

while (true) {
Expand All @@ -502,24 +503,46 @@ void replica_stub::load_replicas(replicas &reps)
// For each round, start loading one replica for each disk in case there are too many
// replicas in a disk, except that all of the replicas of this disk are being loaded.
for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) {
auto &dir_index = dir_indexes[disk_index];
// Structured bindings can be captured by closures in g++, while not supported
// well by clang. Thus we do not use following statement to bind both variables
// until clang has been upgraded to version 16 which could support that well:
//
// const auto &[dn, dirs] = disks[disk_index];
//
// For the docs of clang 16 please see:
//
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
const auto &dirs = disks[disk_index].second;

auto &dir_index = dir_indexes[disk_index];
if (dir_index >= dirs.size()) {
// All of the replicas for the disk `disks[disk_index]` have begun to be loaded,
// thus just skip.
++finished_disks;
continue;
}

const auto &dn = disks[disk_index].first;
auto &load_disk_queue = load_disk_queues[disk_index];
if (!load_disk_queue.empty() &&
load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (load_disk_queue.front()->wait(FLAGS_load_replica_max_wait_time_ms)) {
if (load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
load_disk_queue.pop();
} else {
// There might be too many replicas that are being loaded which lead to
// slow disk IO.
LOG_WARNING("after {} ms, loading dir({}) is still not finished, there are "
"{} replicas being loaded for disk(index={}, tag={}, path={}), "
"skip dir(index={}, path={}), turn to next disk",
FLAGS_load_replica_max_wait_time_ms,
load_disk_queue.front().first,
load_disk_queue.size(),
disk_index,
dn->tag,
dn->full_dir,
dir_index,
dirs[dir_index]);
continue;
}

Expand All @@ -530,36 +553,35 @@ void replica_stub::load_replicas(replicas &reps)
}
}

LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})",
dir_index,
dirs[dir_index],
disk_index,
dn->tag,
dn->full_dir);

const auto &dir = dirs[dir_index++];
if (dsn::replication::is_data_dir_invalid(dir)) {
LOG_WARNING("ignore dir {}", dir);
continue;
}

// Structured bindings can be captured by closures in g++, while not supported
// well by clang. Thus we do not use following statement to bind both variables
// until clang has been upgraded to version 16 which could support that well:
//
// const auto &[dn, dirs] = disks[disk_index];
//
// For the docs of clang 16 please see:
//
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
const auto &dn = disks[disk_index].first;
load_disk_queue.push(tasking::create_task(
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
&_tracker,
std::bind(static_cast<void (replica_stub::*)(
dir_node *, const std::string &, utils::ex_lock &, replicas &)>(
&replica_stub::load_replica),
this,
dn,
dir,
std::ref(reps_lock),
std::ref(reps))));

load_disk_queue.back()->enqueue();
load_disk_queue.emplace(
dir,
tasking::create_task(
// Ensure that the thread pool is non-partitioned.
LPC_REPLICATION_INIT_LOAD,
&_tracker,
std::bind(static_cast<void (replica_stub::*)(
dir_node *, const std::string &, utils::ex_lock &, replicas &)>(
&replica_stub::load_replica),
this,
dn,
dir,
std::ref(reps_lock),
std::ref(reps))));

load_disk_queue.back().second->enqueue();
}

if (finished_disks >= disks.size()) {
Expand All @@ -571,7 +593,7 @@ void replica_stub::load_replicas(replicas &reps)
// All loading tasks have been in the queue. Just wait all tasks to be finished.
for (auto &load_disk_queue : load_disk_queues) {
while (!load_disk_queue.empty()) {
CHECK_TRUE(load_disk_queue.front()->wait());
CHECK_TRUE(load_disk_queue.front().second->wait());
load_disk_queue.pop();
}
}
Expand Down

0 comments on commit 80f2a64

Please sign in to comment.