Skip to content

Commit

Permalink
format and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Nov 8, 2024
1 parent c55a5f3 commit c4ba8fc
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 57 deletions.
14 changes: 8 additions & 6 deletions src/common/duplication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ const std::string duplication_constants::kDuplicationEnvMasterMetasKey /*NOLINT*
"duplication.master_metas";
const std::string duplication_constants::kDuplicationEnvMasterAppNameKey /*NOLINT*/ =
"duplication.master_app_name";
const std::string duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey /*NOLINT*/ =
"duplication.master_create_follower_app_status";
const std::string duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating /*NOLINT*/ =
"creating";
const std::string duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated /*NOLINT*/ =
"created";
const std::string duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey /*NOLINT*/
= "duplication.master_create_follower_app_status";
const std::string
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating /*NOLINT*/
= "creating";
const std::string
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated /*NOLINT*/
= "created";

/*extern*/ const char *duplication_status_to_string(duplication_status::type status)
{
Expand Down
106 changes: 65 additions & 41 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,36 @@ void meta_duplication_service::duplication_sync(duplication_sync_rpc rpc)
void meta_duplication_service::create_follower_app_for_duplication(
const std::shared_ptr<duplication_info> &dup, const std::shared_ptr<app_state> &app)
{
do_create_follower_app_for_duplication(dup, app, duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating, std::bind(&meta_duplication_service::on_follower_app_creating_for_duplication, this, dup, std::placeholders::_1, std::placeholders::_2));
do_create_follower_app_for_duplication(
dup,
app,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating,
std::bind(&meta_duplication_service::on_follower_app_creating_for_duplication,
this,
dup,
std::placeholders::_1,
std::placeholders::_2));
}

void meta_duplication_service::mark_follower_app_created_for_duplication(
const std::shared_ptr<duplication_info> &dup, const std::shared_ptr<app_state> &app)
{
do_create_follower_app_for_duplication(dup, app, duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated, std::bind(&meta_duplication_service::on_follower_app_created_for_duplication, this, dup, std::placeholders::_1, std::placeholders::_2));
do_create_follower_app_for_duplication(
dup,
app,
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreated,
std::bind(&meta_duplication_service::on_follower_app_created_for_duplication,
this,
dup,
std::placeholders::_1,
std::placeholders::_2));
}

void meta_duplication_service::do_create_follower_app_for_duplication(
const std::shared_ptr<duplication_info> &dup, const std::shared_ptr<app_state> &app, const std::string &create_status, std::function<void(error_code, configuration_create_app_response &&)> create_callback)
const std::shared_ptr<duplication_info> &dup,
const std::shared_ptr<app_state> &app,
const std::string &create_status,
std::function<void(error_code, configuration_create_app_response &&)> create_callback)
{
configuration_create_app_request request;
request.app_name = dup->remote_app_name;
Expand All @@ -495,59 +514,64 @@ void meta_duplication_service::do_create_follower_app_for_duplication(
_meta_svc->get_meta_list_string());
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterAppNameKey,
app->app_name);
request.options.envs.emplace(duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey,
create_status);
request.options.envs.emplace(
duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey, create_status);

host_port meta_servers;
meta_servers.assign_group(dup->remote_cluster_name.c_str());
meta_servers.group_host_port()->add_list(dup->remote_cluster_metas);

dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CREATE_APP);
dsn::marshall(msg, request);
rpc::call(
dsn::dns_resolver::instance().resolve_address(meta_servers),
msg,
_meta_svc->tracker(),
std::move(create_callback));
rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
msg,
_meta_svc->tracker(),
std::move(create_callback));
}

void meta_duplication_service::on_follower_app_creating_for_duplication(const std::shared_ptr<duplication_info> &dup, error_code err, configuration_create_app_response &&resp) {
FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
[&err](std::string_view) -> void { err = ERR_OK; });
void meta_duplication_service::on_follower_app_creating_for_duplication(
const std::shared_ptr<duplication_info> &dup,
error_code err,
configuration_create_app_response &&resp)
{
FAIL_POINT_INJECT_NOT_RETURN_F("update_app_request_ok",
[&err](std::string_view) -> void { err = ERR_OK; });

error_code create_err = err == ERR_OK ? resp.err : err;
FAIL_POINT_INJECT_NOT_RETURN_F(
"persist_dup_status_failed",
[&create_err](std::string_view) -> void { create_err = ERR_OK; });
error_code create_err = err == ERR_OK ? resp.err : err;
FAIL_POINT_INJECT_NOT_RETURN_F(
"persist_dup_status_failed",
[&create_err](std::string_view) -> void { create_err = ERR_OK; });

error_code update_err = ERR_NO_NEED_OPERATE;
if (create_err == ERR_OK) {
update_err = dup->alter_status(duplication_status::DS_APP);
}
error_code update_err = ERR_NO_NEED_OPERATE;
if (create_err == ERR_OK) {
update_err = dup->alter_status(duplication_status::DS_APP);
}

FAIL_POINT_INJECT_F("persist_dup_status_failed",
[](std::string_view) -> void { return; });
FAIL_POINT_INJECT_F("persist_dup_status_failed", [](std::string_view) -> void { return; });

if (update_err == ERR_OK) {
blob value = dup->to_json_blob();
// Note: this function is `async`, it may not be persisted completed
// after executing, now using `_is_altering` to judge whether `updating` or
// `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY`
_meta_svc->get_meta_storage()->set_data(std::string(dup->store_path),
std::move(value),
[dup]() { dup->persist_status(); });
} else {
LOG_ERROR("create follower app[{}.{}] to trigger duplicate checkpoint failed: "
"duplication_status = {}, create_err = {}, update_err = {}",
dup->remote_cluster_name,
dup->remote_app_name,
duplication_status_to_string(dup->status()),
create_err,
update_err);
}
if (update_err == ERR_OK) {
blob value = dup->to_json_blob();
// Note: this function is `async`, it may not be persisted completed
// after executing, now using `_is_altering` to judge whether `updating` or
// `completed`, if `_is_altering`, dup->alter_status() will return `ERR_BUSY`
_meta_svc->get_meta_storage()->set_data(
std::string(dup->store_path), std::move(value), [dup]() { dup->persist_status(); });
} else {
LOG_ERROR("create follower app[{}.{}] to trigger duplicate checkpoint failed: "
"duplication_status = {}, create_err = {}, update_err = {}",
dup->remote_cluster_name,
dup->remote_app_name,
duplication_status_to_string(dup->status()),
create_err,
update_err);
}
}

void meta_duplication_service::on_follower_app_created_for_duplication(const std::shared_ptr<duplication_info> &dup, error_code err, configuration_create_app_response &&resp) {
void meta_duplication_service::on_follower_app_created_for_duplication(
const std::shared_ptr<duplication_info> &dup,
error_code err,
configuration_create_app_response &&resp)
{
if (err != ERR_OK || resp.err != ERR_OK) {
return;
}
Expand Down
14 changes: 14 additions & 0 deletions src/meta/duplication/meta_duplication_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ class meta_duplication_service

void create_follower_app_for_duplication(const std::shared_ptr<duplication_info> &dup,
const std::shared_ptr<app_state> &app);
void do_create_follower_app_for_duplication(
const std::shared_ptr<duplication_info> &dup,
const std::shared_ptr<app_state> &app,
const std::string &create_status,
std::function<void(error_code, configuration_create_app_response &&)> create_callback);
void mark_follower_app_created_for_duplication(const std::shared_ptr<duplication_info> &dup,
const std::shared_ptr<app_state> &app);
void on_follower_app_creating_for_duplication(const std::shared_ptr<duplication_info> &dup,
error_code err,
configuration_create_app_response &&resp);
void on_follower_app_created_for_duplication(const std::shared_ptr<duplication_info> &dup,
error_code err,
configuration_create_app_response &&resp);

void check_follower_app_if_create_completed(const std::shared_ptr<duplication_info> &dup);

// Get zk path for duplication.
Expand Down
26 changes: 16 additions & 10 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,22 @@ void server_state::do_app_create(std::shared_ptr<app_state> &app)
app_dir, LPC_META_STATE_HIGH, on_create_app_root, value);
}

namespace {

bool is_follower_app_creating(const dsn::replication::app_state &app)
{
const auto &iter = app.envs.find(
dsn::replication::duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey);
if (iter == app.envs.end()) {
return false;
}

return iter->second == dsn::replication::duplication_constants::
kDuplicationEnvMasterCreateFollowerAppStatusCreating;
}

} // anonymous namespace

void server_state::create_app(dsn::message_ex *msg)
{
configuration_create_app_request request;
Expand Down Expand Up @@ -3050,16 +3066,6 @@ bool validate_target_max_replica_count_internal(int32_t max_replica_count,
return true;
}

bool is_follower_app_creating(const dsn::replication::app_state &app)
{
const auto &iter = app.envs.find(dsn::replication::duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusKey);
if (iter == app.envs.end()) {
return false;
}

return iter->second == dsn::replication::duplication_constants::kDuplicationEnvMasterCreateFollowerAppStatusCreating;
}

} // anonymous namespace

bool server_state::validate_target_max_replica_count(int32_t max_replica_count,
Expand Down

0 comments on commit c4ba8fc

Please sign in to comment.