Skip to content

Commit

Permalink
added mechanism for process to join a centralized group
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed May 2, 2024
1 parent c30e682 commit 4f2a415
Showing 1 changed file with 177 additions and 5 deletions.
182 changes: 177 additions & 5 deletions src/centralized/centralized-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef struct centralized_context {
// RPCs sent by other ranks to rank 0
hg_id_t get_view_rpc_id;
hg_id_t leave_rpc_id;
hg_id_t join_rpc_id;
/* extracted from configuration */
double ping_timeout_ms;
double ping_interval_ms_min;
Expand Down Expand Up @@ -127,6 +128,32 @@ static flock_return_t leave(centralized_context* ctx);
static DECLARE_MARGO_RPC_HANDLER(leave_rpc_ult)
static void leave_rpc_ult(hg_handle_t h);

/**
* Member joining types and RPC declaration.
*/
MERCURY_GEN_PROC(join_in_t,
((uint64_t)(requested_rank))\
((uint16_t)(provider_id)))

typedef struct join_out_t {
uint32_t ret;
flock_group_view_t* view;
} join_out_t;

static inline hg_return_t hg_proc_join_out_t(hg_proc_t proc, void* args) {
join_out_t* out = (join_out_t*)args;
hg_return_t hret = HG_SUCCESS;
hret = hg_proc_uint32_t(proc, &out->ret);
if(hret != HG_SUCCESS) return hret;
if(out->view && out->ret == FLOCK_SUCCESS)
hret = hg_proc_flock_protected_group_view_t(proc, out->view);
return hret;
}

static flock_return_t join(centralized_context* ctx, uint64_t request_rank, uint16_t provider_id);
static DECLARE_MARGO_RPC_HANDLER(join_rpc_ult)
static void join_rpc_ult(hg_handle_t h);

/**
* Forward-declaration of the function that destroys the group.
*/
Expand Down Expand Up @@ -339,6 +366,12 @@ static flock_return_t centralized_create_group(
args->provider_id, args->pool);
margo_register_data(mid, ctx->leave_rpc_id, ctx, NULL);

ctx->join_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_join",
join_in_t, join_out_t, join_rpc_ult,
args->provider_id, args->pool);
margo_register_data(mid, ctx->join_rpc_id, ctx, NULL);

if(ctx->is_primary) {
for(size_t i = 0; i < ctx->view.members.size; ++i) {
flock_member_t* member = &ctx->view.members.data[i];
Expand All @@ -361,9 +394,13 @@ static flock_return_t centralized_create_group(
margo_timer_start(state->ping_timer, interval);
}
} else if(args->join) {
ret = get_view(ctx);
if(ret != FLOCK_SUCCESS) goto error;
// TODO
ret = join(ctx, args->rank, args->provider_id);
if(ret != FLOCK_SUCCESS) {
// LCOV_EXCL_START
margo_error(mid, "[flock] Could not join existing group");
goto error;
// LCOV_EXCL_STOP
}
}

*context = ctx;
Expand Down Expand Up @@ -569,7 +606,7 @@ static void leave_rpc_ult(hg_handle_t h)
hg_return_t hret = margo_get_input(h, &rank);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving process");
margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving provider");
goto finish;
// LCOV_EXCL_STOP
}
Expand All @@ -579,7 +616,8 @@ static void leave_rpc_ult(hg_handle_t h)
const flock_member_t* member = flock_group_view_find_member(&ctx->view, rank);
if(!member) {
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);
margo_error(ctx->mid, "[flock] Rank %lu requested to leave but is not part of the group");
margo_error(ctx->mid,
"[flock] Rank %lu requested to leave but is not part of the group", rank);
goto finish;
}
provider_id = member->provider_id;
Expand Down Expand Up @@ -614,6 +652,139 @@ static flock_return_t leave(centralized_context* ctx)
return FLOCK_SUCCESS;
}

// -------------------------------------------------------------------------------
// JOIN (RPC send by a non-zero rank to primary when it wants to join)
// -------------------------------------------------------------------------------

static DEFINE_MARGO_RPC_HANDLER(join_rpc_ult)
static void join_rpc_ult(hg_handle_t h)
{
uint16_t provider_id;
uint64_t rank;

join_in_t in = {0};
join_out_t out = {
.ret = FLOCK_SUCCESS,
.view = NULL
};

/* find the margo instance */
margo_instance_id mid = margo_hg_handle_get_instance(h);

/* find the context */
const struct hg_info* info = margo_get_info(h);
centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id);
if(!ctx) goto finish;

/* convert incoming address into a string */
char address[256];
hg_size_t address_size = 256;
margo_addr_to_string(mid, address, &address_size, info->addr);

/* the join provider sent its information */
hg_return_t hret = margo_get_input(h, &in);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid,
"[flock] Could not deserialize information from joining process");
out.ret = FLOCK_ERR_FROM_MERCURY;
goto finish;
// LCOV_EXCL_STOP
}

provider_id = in.provider_id;
rank = in.requested_rank;

margo_free_input(h, &in);

FLOCK_GROUP_VIEW_LOCK(&ctx->view);
if(rank != UINT64_MAX && flock_group_view_find_member(&ctx->view, rank) != NULL) {
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);
margo_error(ctx->mid, "[flock] Requested rank %lu already part of the group", rank);
goto finish;
}
flock_member_t* member = flock_group_view_add_member(&ctx->view, rank, provider_id, address);

member->extra.data = calloc(1, sizeof(struct member_state));
member->extra.free = member_state_free;
member_state* state = (member_state*)(member->extra.data);
state->context = ctx;
state->owner = member;
margo_addr_dup(mid, info->addr, &state->address);

// create timer for the joining member
margo_timer_create(mid, ping_timer_callback, state, &state->ping_timer);
state->last_ping_timestamp = ABT_get_wtime();
double interval = RAND_BETWEEN(
state->context->ping_interval_ms_min,
state->context->ping_interval_ms_max);
margo_timer_start(state->ping_timer, interval);

FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);

// notify everyone that the new member joined
if(ctx->member_update_callback) {
(ctx->member_update_callback)(
ctx->callback_context, FLOCK_MEMBER_JOINED, rank, address, provider_id);
}

broadcast_membership_update(ctx, FLOCK_MEMBER_JOINED, rank, address, provider_id);

// set out.view
out.view = &ctx->view;

finish:
margo_respond(h, &out);
margo_destroy(h);
}

static flock_return_t join(centralized_context* ctx, uint64_t rank, uint16_t provider_id)
{
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
join_in_t in = {
.provider_id = provider_id,
.requested_rank = rank
};
join_out_t out = {
.ret = 0,
.view = &ctx->view
};

hret = margo_create(ctx->mid, ctx->primary.address, ctx->join_rpc_id, &handle);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid, "[flock] Could not create hg_handle for join RPC: %s",
HG_Error_to_string(hret));
goto finish;
// LCOV_EXCL_STOP
}

hret = margo_provider_forward(ctx->primary.provider_id, handle, &in);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid, "[flock] Could not create hg_handle for join RPC: %s",
HG_Error_to_string(hret));
goto finish;
// LCOV_EXCL_STOP
}

hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid, "[flock] Could not get output from join RPC: %s",
HG_Error_to_string(hret));
goto finish;
// LCOV_EXCL_STOP
}
out.view = NULL;
margo_free_output(handle, &out);

finish:
if(handle) margo_destroy(handle);
return FLOCK_SUCCESS;
}

// -------------------------------------------------------------------------------
// MEMBERSHIP UPDATE
// (RPC send by rank 0 to other ranks to notify them of a membership change)
Expand Down Expand Up @@ -649,6 +820,7 @@ static flock_return_t broadcast_membership_update(

for(size_t i = 0; i < rpc_count; ++i) {
member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data;
if(state->owner->rank == rank) continue; // don't sent to the process concerned with the update
hret = margo_create(ctx->mid, state->address, ctx->membership_update_rpc_id, &handles[i]);
if(hret != HG_SUCCESS) {
margo_error(ctx->mid,
Expand Down

0 comments on commit 4f2a415

Please sign in to comment.