diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 89f2ad0398..26b6a1923a 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -456,7 +456,7 @@ void replica_stub::load_replica(dir_node *dn, utils::ex_lock &reps_lock, replicas &reps) { - LOG_INFO("process dir {}", dir); + LOG_INFO("loading replica: tag={}, dir={}", dn->tag, dir); const auto *const worker = task::get_current_worker2(); if (worker != nullptr) { @@ -470,10 +470,11 @@ void replica_stub::load_replica(dir_node *dn, return; } - LOG_INFO("{}@{}: load replica '{}' success, = <{}, {}>, last_prepared_decree = {}", + LOG_INFO("{}@{}: load replica successfully, tag={}, dir={}, last_durable_decree={}, " + "last_committed_decree={}, last_prepared_decree={}", rep->get_gpid(), dsn_primary_host_port(), + dn->tag, dir, rep->last_durable_decree(), rep->last_committed_decree(), @@ -493,7 +494,7 @@ void replica_stub::load_replicas(replicas &reps) const auto &disks = get_all_disk_dirs(); std::vector dir_indexes(disks.size(), 0); - std::vector> load_disk_queues(disks.size()); + std::vector>> load_disk_queues(disks.size()); utils::ex_lock reps_lock; while (true) { @@ -502,8 +503,18 @@ void replica_stub::load_replicas(replicas &reps) // For each round, start loading one replica for each disk in case there are too many // replicas in a disk, except that all of the replicas of this disk are being loaded. for (size_t disk_index = 0; disk_index < disks.size(); ++disk_index) { - auto &dir_index = dir_indexes[disk_index]; + // Structured bindings can be captured by closures in g++, while not supported + // well by clang. Thus we do not use following statement to bind both variables + // until clang has been upgraded to version 16 which could support that well: + // + // const auto &[dn, dirs] = disks[disk_index]; + // + // For the docs of clang 16 please see: + // + // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support: const auto &dirs = disks[disk_index].second; + + auto &dir_index = dir_indexes[disk_index]; if (dir_index >= dirs.size()) { // All of the replicas for the disk `disks[disk_index]` have begun to be loaded, // thus just skip. @@ -511,15 +522,27 @@ void replica_stub::load_replicas(replicas &reps) continue; } + const auto &dn = disks[disk_index].first; auto &load_disk_queue = load_disk_queues[disk_index]; if (!load_disk_queue.empty() && load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) { // Loading replicas should be throttled in case that disk IO is saturated. - if (load_disk_queue.front()->wait(FLAGS_load_replica_max_wait_time_ms)) { + if (load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) { load_disk_queue.pop(); } else { // There might be too many replicas that are being loaded which lead to // slow disk IO. + LOG_WARNING("after {} ms, loading dir({}) is still not finished, there are " + "{} replicas being loaded for disk(index={}, tag={}, path={}), " + "skip dir(index={}, path={}), turn to next disk", + FLAGS_load_replica_max_wait_time_ms, + load_disk_queue.front().first, + load_disk_queue.size(), + disk_index, + dn->tag, + dn->full_dir, + dir_index, + dirs[dir_index]); continue; } @@ -530,36 +553,35 @@ void replica_stub::load_replicas(replicas &reps) } } + LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})", + dir_index, + dirs[dir_index], + disk_index, + dn->tag, + dn->full_dir); + const auto &dir = dirs[dir_index++]; if (dsn::replication::is_data_dir_invalid(dir)) { LOG_WARNING("ignore dir {}", dir); continue; } - // Structured bindings can be captured by closures in g++, while not supported - // well by clang. Thus we do not use following statement to bind both variables - // until clang has been upgraded to version 16 which could support that well: - // - // const auto &[dn, dirs] = disks[disk_index]; - // - // For the docs of clang 16 please see: - // - // https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support: - const auto &dn = disks[disk_index].first; - load_disk_queue.push(tasking::create_task( - // Ensure that the thread pool is non-partitioned. - LPC_REPLICATION_INIT_LOAD, - &_tracker, - std::bind(static_cast( - &replica_stub::load_replica), - this, - dn, - dir, - std::ref(reps_lock), - std::ref(reps)))); - - load_disk_queue.back()->enqueue(); + load_disk_queue.emplace( + dir, + tasking::create_task( + // Ensure that the thread pool is non-partitioned. + LPC_REPLICATION_INIT_LOAD, + &_tracker, + std::bind(static_cast( + &replica_stub::load_replica), + this, + dn, + dir, + std::ref(reps_lock), + std::ref(reps)))); + + load_disk_queue.back().second->enqueue(); } if (finished_disks >= disks.size()) { @@ -571,7 +593,7 @@ void replica_stub::load_replicas(replicas &reps) // All loading tasks have been in the queue. Just wait all tasks to be finished. for (auto &load_disk_queue : load_disk_queues) { while (!load_disk_queue.empty()) { - CHECK_TRUE(load_disk_queue.front()->wait()); + CHECK_TRUE(load_disk_queue.front().second->wait()); load_disk_queue.pop(); } }