diff --git a/include/flock/flock-backend.h b/include/flock/flock-backend.h index a2fae95..93db143 100644 --- a/include/flock/flock-backend.h +++ b/include/flock/flock-backend.h @@ -34,6 +34,7 @@ struct json_object; */ typedef struct flock_backend_init_args { margo_instance_id mid; + uint64_t rank; uint16_t provider_id; ABT_pool pool; struct json_object* config; diff --git a/src/centralized/centralized-backend.c b/src/centralized/centralized-backend.c index 8bab124..a214016 100644 --- a/src/centralized/centralized-backend.c +++ b/src/centralized/centralized-backend.c @@ -14,15 +14,6 @@ #define RAND_BETWEEN(x, y) ((x) + (((double)rand()) / RAND_MAX)*(y-x)) -MERCURY_GEN_PROC(membership_update_in_t, - ((uint8_t)(update))\ - ((uint64_t)(rank))\ - ((hg_const_string_t)(address))\ - ((uint16_t)(provider_id))) - -MERCURY_GEN_PROC(membership_update_out_t, - ((uint32_t)(ret))) - /** * @brief The "centralized" backend uses the member with rank 0 * as a centralized authority that is supposed to hold the most up to date @@ -32,6 +23,7 @@ MERCURY_GEN_PROC(membership_update_out_t, */ typedef struct centralized_context { margo_instance_id mid; + uint64_t rank; struct json_object* config; ABT_mutex_memory config_mtx; bool is_primary; @@ -40,9 +32,12 @@ typedef struct centralized_context { uint16_t provider_id; } primary; flock_group_view_t view; + // RPCs sent by rank 0 to other ranks hg_id_t ping_rpc_id; - hg_id_t get_view_rpc_id; hg_id_t membership_update_rpc_id; + // RPCs sent by other ranks to rank 0 + hg_id_t get_view_rpc_id; + hg_id_t leave_rpc_id; /* extracted from configuration */ double ping_timeout_ms; double ping_interval_ms_min; @@ -88,21 +83,53 @@ static inline void member_state_free(void* args) free(state); } +/** + * Ping RPC declaration. + */ static DECLARE_MARGO_RPC_HANDLER(ping_rpc_ult) static void ping_rpc_ult(hg_handle_t h); +/** + * Callback periodically called by the timer setup + * by rank 0 to ping other members. + */ static void ping_timer_callback(void* args); +/** + * Membership update RPC. + */ static flock_return_t get_view(centralized_context* ctx); static DECLARE_MARGO_RPC_HANDLER(get_view_rpc_ult) static void get_view_rpc_ult(hg_handle_t h); -static flock_return_t issue_membership_update( +/** + * Membership update RPC declaration and types. + */ +MERCURY_GEN_PROC(membership_update_in_t, + ((uint8_t)(update))\ + ((uint64_t)(rank))\ + ((hg_const_string_t)(address))\ + ((uint16_t)(provider_id))) + +MERCURY_GEN_PROC(membership_update_out_t, + ((uint32_t)(ret))) + +static flock_return_t broadcast_membership_update( centralized_context* ctx, flock_update_t update, size_t rank, const char* address, uint16_t provider_id); static DECLARE_MARGO_RPC_HANDLER(membership_update_rpc_ult) static void membership_update_rpc_ult(hg_handle_t h); +/** + * Member leaving RPC declaration. + */ +static flock_return_t leave(centralized_context* ctx); +static DECLARE_MARGO_RPC_HANDLER(leave_rpc_ult) +static void leave_rpc_ult(hg_handle_t h); + +/** + * Forward-declaration of the function that destroys the group. + */ static flock_return_t centralized_destroy_group(void* ctx); /** @@ -272,6 +299,7 @@ static flock_return_t centralized_create_group( ctx->primary.provider_id = primary_member->provider_id; /* copy the configuration */ + ctx->rank = args->rank; ctx->config = config; ctx->ping_timeout_ms = ping_timeout_ms_val; ctx->ping_interval_ms_min = ping_interval_ms_min; @@ -293,17 +321,23 @@ static flock_return_t centralized_create_group( args->provider_id, args->pool); margo_register_data(mid, ctx->ping_rpc_id, ctx, NULL); + ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER( + mid, "flock_centralized_membership_update", + membership_update_in_t, membership_update_out_t, + membership_update_rpc_ult, args->provider_id, args->pool); + margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL); + ctx->get_view_rpc_id = MARGO_REGISTER_PROVIDER( mid, "flock_centralized_get_view", void, flock_protected_group_view_t, get_view_rpc_ult, args->provider_id, args->pool); margo_register_data(mid, ctx->get_view_rpc_id, ctx, NULL); - ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER( - mid, "flock_centralized_membership_update", - membership_update_in_t, membership_update_out_t, - membership_update_rpc_ult, args->provider_id, args->pool); - margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL); + ctx->leave_rpc_id = MARGO_REGISTER_PROVIDER( + mid, "flock_centralized_leave", + uint64_t, void, leave_rpc_ult, + args->provider_id, args->pool); + margo_register_data(mid, ctx->leave_rpc_id, ctx, NULL); if(ctx->is_primary) { for(size_t i = 0; i < ctx->view.members.size; ++i) { @@ -340,6 +374,10 @@ static flock_return_t centralized_create_group( return ret; } +// ------------------------------------------------------------------------------- +// PING MECHANISM +// ------------------------------------------------------------------------------- + static void ping_timer_callback(void* args) { double now, next_ping_ms; @@ -406,7 +444,7 @@ static void ping_timer_callback(void* args) context->callback_context, FLOCK_MEMBER_DIED, rank, address, provider_id); } - issue_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id); + broadcast_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id); free(address); return; } @@ -452,6 +490,10 @@ static void ping_rpc_ult(hg_handle_t h) margo_destroy(h); } +// ------------------------------------------------------------------------------- +// GET_VIEW (RPC send by non-zero ranks to primary) +// ------------------------------------------------------------------------------- + static DEFINE_MARGO_RPC_HANDLER(get_view_rpc_ult) static void get_view_rpc_ult(hg_handle_t h) { @@ -479,24 +521,105 @@ static flock_return_t get_view(centralized_context* ctx) hret = margo_provider_forward(ctx->primary.provider_id, handle, NULL); if(hret != HG_SUCCESS) { - margo_destroy(handle); + // LCOV_EXCL_START + margo_error(ctx->mid, + "[flock] Could not get view from primary member, " + "margo_provider_forward failed: %s", HG_Error_to_string(hret)); goto finish; + // LCOV_EXCL_STOP } hret = margo_get_output(handle, &ctx->view); if(hret != HG_SUCCESS) { - margo_destroy(handle); + // LCOV_EXCL_START + margo_error(ctx->mid, + "[flock] Could not get view from primary member, " + "margo_get_output failed: %s", HG_Error_to_string(hret)); goto finish; + // LCOV_EXCL_STOP } margo_free_output(handle, NULL); finish: - margo_destroy(handle); + if(handle) margo_destroy(handle); return FLOCK_SUCCESS; } -static flock_return_t issue_membership_update( +// ------------------------------------------------------------------------------- +// LEAVE (RPC send by a non-zero rank to primary when it wants to leave) +// ------------------------------------------------------------------------------- + +static DEFINE_MARGO_RPC_HANDLER(leave_rpc_ult) +static void leave_rpc_ult(hg_handle_t h) +{ + char* address = NULL; + uint16_t provider_id; + uint64_t rank; + + /* find the margo instance */ + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* find the context */ + const struct hg_info* info = margo_get_info(h); + centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id); + if(!ctx) goto finish; + + /* the leaving provider sent its rank */ + hg_return_t hret = margo_get_input(h, &rank); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving process"); + goto finish; + // LCOV_EXCL_STOP + } + margo_free_input(h, &rank); + + FLOCK_GROUP_VIEW_LOCK(&ctx->view); + const flock_member_t* member = flock_group_view_find_member(&ctx->view, rank); + if(!member) { + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + margo_error(ctx->mid, "[flock] Rank %lu requested to leave but is not part of the group"); + goto finish; + } + provider_id = member->provider_id; + address = strdup(member->address); + flock_group_view_remove_member(&ctx->view, rank); + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + + if(ctx->member_update_callback) { + (ctx->member_update_callback)( + ctx->callback_context, FLOCK_MEMBER_LEFT, rank, address, provider_id); + } + + broadcast_membership_update(ctx, FLOCK_MEMBER_LEFT, rank, address, provider_id); + +finish: + free(address); + margo_respond(h, NULL); + margo_destroy(h); +} + +static flock_return_t leave(centralized_context* ctx) +{ + hg_return_t hret = HG_SUCCESS; + hg_handle_t handle = HG_HANDLE_NULL; + hret = margo_create(ctx->mid, ctx->primary.address, ctx->leave_rpc_id, &handle); + if(hret != HG_SUCCESS) goto finish; + + margo_provider_forward_timed(ctx->primary.provider_id, handle, &ctx->rank, 1000.0); + +finish: + if(handle) margo_destroy(handle); + return FLOCK_SUCCESS; +} + +// ------------------------------------------------------------------------------- +// MEMBERSHIP UPDATE +// (RPC send by rank 0 to other ranks to notify them of a membership change) +// ------------------------------------------------------------------------------- + +static flock_return_t broadcast_membership_update( centralized_context* ctx, flock_update_t update, size_t rank, const char* address, @@ -624,12 +747,16 @@ static flock_return_t centralized_destroy_group(void* ctx) free(timers); flock_group_view_clear_extra(&context->view); FLOCK_GROUP_VIEW_UNLOCK(&context->view); + } else { + leave(ctx); } // FIXME: in non-primary members, it's possible that we are calling margo_deregister // while a ping RPC is in flight. There is nothing much we can do, this is something // to solve at the Mercury level. See here: https://github.com/mercury-hpc/mercury/issues/534 if(context->ping_rpc_id) margo_deregister(context->mid, context->ping_rpc_id); if(context->get_view_rpc_id) margo_deregister(context->mid, context->get_view_rpc_id); + if(context->membership_update_rpc_id) margo_deregister(context->mid, context->membership_update_rpc_id); + if(context->leave_rpc_id) margo_deregister(context->mid, context->leave_rpc_id); if(context->config) json_object_put(context->config); flock_group_view_clear(&context->view); free(context); diff --git a/src/provider.c b/src/provider.c index 8a442c7..5bd783c 100644 --- a/src/provider.c +++ b/src/provider.c @@ -229,9 +229,11 @@ flock_return_t flock_provider_register( if(member->provider_id != provider_id) continue; if(strcmp(member->address, self_addr_str) != 0) continue; backend_init_args.join = false; + backend_init_args.rank = member->rank; is_first = i == 0; break; } + if(backend_init_args.join) backend_init_args.rank = UINT64_MAX; /* create the new group's context */ void* context = NULL;