From 4f2a41590ed24f204197f568d717bb67ea17c461 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Thu, 2 May 2024 15:33:24 +0100 Subject: [PATCH] added mechanism for process to join a centralized group --- src/centralized/centralized-backend.c | 182 +++++++++++++++++++++++++- 1 file changed, 177 insertions(+), 5 deletions(-) diff --git a/src/centralized/centralized-backend.c b/src/centralized/centralized-backend.c index a214016..201659f 100644 --- a/src/centralized/centralized-backend.c +++ b/src/centralized/centralized-backend.c @@ -38,6 +38,7 @@ typedef struct centralized_context { // RPCs sent by other ranks to rank 0 hg_id_t get_view_rpc_id; hg_id_t leave_rpc_id; + hg_id_t join_rpc_id; /* extracted from configuration */ double ping_timeout_ms; double ping_interval_ms_min; @@ -127,6 +128,32 @@ static flock_return_t leave(centralized_context* ctx); static DECLARE_MARGO_RPC_HANDLER(leave_rpc_ult) static void leave_rpc_ult(hg_handle_t h); +/** + * Member joining types and RPC declaration. + */ +MERCURY_GEN_PROC(join_in_t, + ((uint64_t)(requested_rank))\ + ((uint16_t)(provider_id))) + +typedef struct join_out_t { + uint32_t ret; + flock_group_view_t* view; +} join_out_t; + +static inline hg_return_t hg_proc_join_out_t(hg_proc_t proc, void* args) { + join_out_t* out = (join_out_t*)args; + hg_return_t hret = HG_SUCCESS; + hret = hg_proc_uint32_t(proc, &out->ret); + if(hret != HG_SUCCESS) return hret; + if(out->view && out->ret == FLOCK_SUCCESS) + hret = hg_proc_flock_protected_group_view_t(proc, out->view); + return hret; +} + +static flock_return_t join(centralized_context* ctx, uint64_t request_rank, uint16_t provider_id); +static DECLARE_MARGO_RPC_HANDLER(join_rpc_ult) +static void join_rpc_ult(hg_handle_t h); + /** * Forward-declaration of the function that destroys the group. */ @@ -339,6 +366,12 @@ static flock_return_t centralized_create_group( args->provider_id, args->pool); margo_register_data(mid, ctx->leave_rpc_id, ctx, NULL); + ctx->join_rpc_id = MARGO_REGISTER_PROVIDER( + mid, "flock_centralized_join", + join_in_t, join_out_t, join_rpc_ult, + args->provider_id, args->pool); + margo_register_data(mid, ctx->join_rpc_id, ctx, NULL); + if(ctx->is_primary) { for(size_t i = 0; i < ctx->view.members.size; ++i) { flock_member_t* member = &ctx->view.members.data[i]; @@ -361,9 +394,13 @@ static flock_return_t centralized_create_group( margo_timer_start(state->ping_timer, interval); } } else if(args->join) { - ret = get_view(ctx); - if(ret != FLOCK_SUCCESS) goto error; - // TODO + ret = join(ctx, args->rank, args->provider_id); + if(ret != FLOCK_SUCCESS) { + // LCOV_EXCL_START + margo_error(mid, "[flock] Could not join existing group"); + goto error; + // LCOV_EXCL_STOP + } } *context = ctx; @@ -569,7 +606,7 @@ static void leave_rpc_ult(hg_handle_t h) hg_return_t hret = margo_get_input(h, &rank); if(hret != HG_SUCCESS) { // LCOV_EXCL_START - margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving process"); + margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving provider"); goto finish; // LCOV_EXCL_STOP } @@ -579,7 +616,8 @@ static void leave_rpc_ult(hg_handle_t h) const flock_member_t* member = flock_group_view_find_member(&ctx->view, rank); if(!member) { FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); - margo_error(ctx->mid, "[flock] Rank %lu requested to leave but is not part of the group"); + margo_error(ctx->mid, + "[flock] Rank %lu requested to leave but is not part of the group", rank); goto finish; } provider_id = member->provider_id; @@ -614,6 +652,139 @@ static flock_return_t leave(centralized_context* ctx) return FLOCK_SUCCESS; } +// ------------------------------------------------------------------------------- +// JOIN (RPC send by a non-zero rank to primary when it wants to join) +// ------------------------------------------------------------------------------- + +static DEFINE_MARGO_RPC_HANDLER(join_rpc_ult) +static void join_rpc_ult(hg_handle_t h) +{ + uint16_t provider_id; + uint64_t rank; + + join_in_t in = {0}; + join_out_t out = { + .ret = FLOCK_SUCCESS, + .view = NULL + }; + + /* find the margo instance */ + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* find the context */ + const struct hg_info* info = margo_get_info(h); + centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id); + if(!ctx) goto finish; + + /* convert incoming address into a string */ + char address[256]; + hg_size_t address_size = 256; + margo_addr_to_string(mid, address, &address_size, info->addr); + + /* the join provider sent its information */ + hg_return_t hret = margo_get_input(h, &in); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_error(ctx->mid, + "[flock] Could not deserialize information from joining process"); + out.ret = FLOCK_ERR_FROM_MERCURY; + goto finish; + // LCOV_EXCL_STOP + } + + provider_id = in.provider_id; + rank = in.requested_rank; + + margo_free_input(h, &in); + + FLOCK_GROUP_VIEW_LOCK(&ctx->view); + if(rank != UINT64_MAX && flock_group_view_find_member(&ctx->view, rank) != NULL) { + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + margo_error(ctx->mid, "[flock] Requested rank %lu already part of the group", rank); + goto finish; + } + flock_member_t* member = flock_group_view_add_member(&ctx->view, rank, provider_id, address); + + member->extra.data = calloc(1, sizeof(struct member_state)); + member->extra.free = member_state_free; + member_state* state = (member_state*)(member->extra.data); + state->context = ctx; + state->owner = member; + margo_addr_dup(mid, info->addr, &state->address); + + // create timer for the joining member + margo_timer_create(mid, ping_timer_callback, state, &state->ping_timer); + state->last_ping_timestamp = ABT_get_wtime(); + double interval = RAND_BETWEEN( + state->context->ping_interval_ms_min, + state->context->ping_interval_ms_max); + margo_timer_start(state->ping_timer, interval); + + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + + // notify everyone that the new member joined + if(ctx->member_update_callback) { + (ctx->member_update_callback)( + ctx->callback_context, FLOCK_MEMBER_JOINED, rank, address, provider_id); + } + + broadcast_membership_update(ctx, FLOCK_MEMBER_JOINED, rank, address, provider_id); + + // set out.view + out.view = &ctx->view; + +finish: + margo_respond(h, &out); + margo_destroy(h); +} + +static flock_return_t join(centralized_context* ctx, uint64_t rank, uint16_t provider_id) +{ + hg_return_t hret = HG_SUCCESS; + hg_handle_t handle = HG_HANDLE_NULL; + join_in_t in = { + .provider_id = provider_id, + .requested_rank = rank + }; + join_out_t out = { + .ret = 0, + .view = &ctx->view + }; + + hret = margo_create(ctx->mid, ctx->primary.address, ctx->join_rpc_id, &handle); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_error(ctx->mid, "[flock] Could not create hg_handle for join RPC: %s", + HG_Error_to_string(hret)); + goto finish; + // LCOV_EXCL_STOP + } + + hret = margo_provider_forward(ctx->primary.provider_id, handle, &in); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_error(ctx->mid, "[flock] Could not create hg_handle for join RPC: %s", + HG_Error_to_string(hret)); + goto finish; + // LCOV_EXCL_STOP + } + + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + margo_error(ctx->mid, "[flock] Could not get output from join RPC: %s", + HG_Error_to_string(hret)); + goto finish; + // LCOV_EXCL_STOP + } + out.view = NULL; + margo_free_output(handle, &out); + +finish: + if(handle) margo_destroy(handle); + return FLOCK_SUCCESS; +} + // ------------------------------------------------------------------------------- // MEMBERSHIP UPDATE // (RPC send by rank 0 to other ranks to notify them of a membership change) @@ -649,6 +820,7 @@ static flock_return_t broadcast_membership_update( for(size_t i = 0; i < rpc_count; ++i) { member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data; + if(state->owner->rank == rank) continue; // don't sent to the process concerned with the update hret = margo_create(ctx->mid, state->address, ctx->membership_update_rpc_id, &handles[i]); if(hret != HG_SUCCESS) { margo_error(ctx->mid,