Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: enhance the loading process of replicas particularly when a significant number of replicas are spread across multiple disks #2078

Open
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

empiredan
Copy link
Contributor

No description provided.

@github-actions github-actions bot added the cpp label Jul 18, 2024
@empiredan empiredan marked this pull request as ready for review July 30, 2024 09:13

// Get the dir name for a replica from a potentially longer path (both absolute and
// relative paths are possible).
static std::string get_replica_dir_name(const std::string &dir);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests for these utility functions?

  • get_replica_dir_name
  • parse_replica_dir_name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, added.

parse_replica_dir_name(const std::string &dir_name, gpid &pid, std::string &app_type);

// Load an existing replica which is located in `dn` with `dir`. Usually each different
// `dn` represents a unique disk. `dir` is the absolute path of the directory for a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// `dn` represents a unique disk. `dir` is the absolute path of the directory for a
// `dn` represents an unique disk. `dir` is the absolute path of the directory for a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a should be put before unique which begins with a consonant sound j.

// Load an existing replica which is located in `dn` with `dir`. Usually each different
// `dn` represents a unique disk. `dir` is the absolute path of the directory for a
// replica.
virtual replica_ptr load_replica(dir_node *dn, const char *dir);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual replica_ptr load_replica(dir_node *dn, const char *dir);
virtual replica_ptr load_replica(dir_node *dn, const char *replica_dir);

Clarify it's the replica's dir, not the dir_node's.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

Comment on lines 392 to 401
// Load all replicas synchronously from all disks to `reps`. This function would ensure
// that data on each disk is loaded more evenly, rather than that a disk would begin to
// be loaded only after another has been finished, in case that there are too many replicas
// on a disk and other disks cannot start loading until this disk is finished.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplified as this?

Suggested change
// Load all replicas synchronously from all disks to `reps`. This function would ensure
// that data on each disk is loaded more evenly, rather than that a disk would begin to
// be loaded only after another has been finished, in case that there are too many replicas
// on a disk and other disks cannot start loading until this disk is finished.
// Load all replicas simultaneously from all disks to `reps`.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

const auto *const worker = task::get_current_worker2();
if (worker != nullptr) {
CHECK(!(worker->pool()->spec().partitioned),
"The thread pool for loading replicas must not be partitioned since load balancing "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to mention which the thread pool is, so that the administators could know how to adjust the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

CHECK(reps.find(rep->get_gpid()) == reps.end(),
"conflict replica dir: {} <--> {}",
rep->dir(),
reps[rep->get_gpid()]->dir());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to reuse the found iterator to avoid finding it again.

//
// For the docs of clang 16 please see:
//
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support:
// https://releases.llvm.org/16.0.0/tools/clang/docs/ReleaseNotes.html#c-20-feature-support.

Comment on lines +496 to +499
std::vector<size_t> dir_indexes(disks.size(), 0);
std::vector<std::queue<std::pair<std::string, task_ptr>>> load_disk_queues(disks.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add comments to describe what are they used for.

FLAGS_load_replica_max_wait_time_ms,
load_disk_queue.front().first,
load_disk_queue.size(),
disk_index,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disk_index is just an internal variable, it may confused, is it necessary to be logged?

Comment on lines 527 to 529
if (!load_disk_queue.empty() &&
load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!load_disk_queue.empty() &&
load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
if (load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {

continue;
}

// Continue to load a replica since we are within the limit now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip loading?

}
}

LOG_DEBUG("ready to load dir(index={}, path={}) for disk(index={}, tag={}, path={})",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving it below the next continue in line 566?

@@ -2015,7 +2164,6 @@ replica *replica_stub::load_replica(dir_node *dn, const char *dir)
const auto err = rep->initialize_on_load();
if (err != ERR_OK) {
LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
rep->close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that it would be called immediately by delete rep;.

tsk->wait();
}
uint64_t finish_time = dsn_now_ms();
utils::chronograph chrono;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The macros in src/utils/timer.h may help.

if (!load_disk_queue.empty() &&
load_disk_queue.size() >= FLAGS_max_replicas_on_load_for_each_disk) {
// Loading replicas should be throttled in case that disk IO is saturated.
if (load_disk_queue.front().second->wait(FLAGS_load_replica_max_wait_time_ms)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this patch implemented a theadpool-with-max-threads, isn't it? The benifit compare to the former implementation is now we can limit the max_replicas_on_load_for_each_disk.

Could the rocksdb::ThreadPool work well here? https://github.com/apache/incubator-pegasus/blob/master/src/shell/commands/local_partition_split.cpp#L392

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants