Skip to content

Commit

Permalink
implemented mechanism for members to leave
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed May 2, 2024
1 parent 9bf6a93 commit c30e682
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 21 deletions.
1 change: 1 addition & 0 deletions include/flock/flock-backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct json_object;
*/
typedef struct flock_backend_init_args {
margo_instance_id mid;
uint64_t rank;
uint16_t provider_id;
ABT_pool pool;
struct json_object* config;
Expand Down
169 changes: 148 additions & 21 deletions src/centralized/centralized-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@

#define RAND_BETWEEN(x, y) ((x) + (((double)rand()) / RAND_MAX)*(y-x))

MERCURY_GEN_PROC(membership_update_in_t,
((uint8_t)(update))\
((uint64_t)(rank))\
((hg_const_string_t)(address))\
((uint16_t)(provider_id)))

MERCURY_GEN_PROC(membership_update_out_t,
((uint32_t)(ret)))

/**
* @brief The "centralized" backend uses the member with rank 0
* as a centralized authority that is supposed to hold the most up to date
Expand All @@ -32,6 +23,7 @@ MERCURY_GEN_PROC(membership_update_out_t,
*/
typedef struct centralized_context {
margo_instance_id mid;
uint64_t rank;
struct json_object* config;
ABT_mutex_memory config_mtx;
bool is_primary;
Expand All @@ -40,9 +32,12 @@ typedef struct centralized_context {
uint16_t provider_id;
} primary;
flock_group_view_t view;
// RPCs sent by rank 0 to other ranks
hg_id_t ping_rpc_id;
hg_id_t get_view_rpc_id;
hg_id_t membership_update_rpc_id;
// RPCs sent by other ranks to rank 0
hg_id_t get_view_rpc_id;
hg_id_t leave_rpc_id;
/* extracted from configuration */
double ping_timeout_ms;
double ping_interval_ms_min;
Expand Down Expand Up @@ -88,21 +83,53 @@ static inline void member_state_free(void* args)
free(state);
}

/**
* Ping RPC declaration.
*/
static DECLARE_MARGO_RPC_HANDLER(ping_rpc_ult)
static void ping_rpc_ult(hg_handle_t h);

/**
* Callback periodically called by the timer setup
* by rank 0 to ping other members.
*/
static void ping_timer_callback(void* args);

/**
* Membership update RPC.
*/
static flock_return_t get_view(centralized_context* ctx);
static DECLARE_MARGO_RPC_HANDLER(get_view_rpc_ult)
static void get_view_rpc_ult(hg_handle_t h);

static flock_return_t issue_membership_update(
/**
* Membership update RPC declaration and types.
*/
MERCURY_GEN_PROC(membership_update_in_t,
((uint8_t)(update))\
((uint64_t)(rank))\
((hg_const_string_t)(address))\
((uint16_t)(provider_id)))

MERCURY_GEN_PROC(membership_update_out_t,
((uint32_t)(ret)))

static flock_return_t broadcast_membership_update(
centralized_context* ctx, flock_update_t update,
size_t rank, const char* address, uint16_t provider_id);
static DECLARE_MARGO_RPC_HANDLER(membership_update_rpc_ult)
static void membership_update_rpc_ult(hg_handle_t h);

/**
* Member leaving RPC declaration.
*/
static flock_return_t leave(centralized_context* ctx);
static DECLARE_MARGO_RPC_HANDLER(leave_rpc_ult)
static void leave_rpc_ult(hg_handle_t h);

/**
* Forward-declaration of the function that destroys the group.
*/
static flock_return_t centralized_destroy_group(void* ctx);

/**
Expand Down Expand Up @@ -272,6 +299,7 @@ static flock_return_t centralized_create_group(
ctx->primary.provider_id = primary_member->provider_id;

/* copy the configuration */
ctx->rank = args->rank;
ctx->config = config;
ctx->ping_timeout_ms = ping_timeout_ms_val;
ctx->ping_interval_ms_min = ping_interval_ms_min;
Expand All @@ -293,17 +321,23 @@ static flock_return_t centralized_create_group(
args->provider_id, args->pool);
margo_register_data(mid, ctx->ping_rpc_id, ctx, NULL);

ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_membership_update",
membership_update_in_t, membership_update_out_t,
membership_update_rpc_ult, args->provider_id, args->pool);
margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL);

ctx->get_view_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_get_view",
void, flock_protected_group_view_t, get_view_rpc_ult,
args->provider_id, args->pool);
margo_register_data(mid, ctx->get_view_rpc_id, ctx, NULL);

ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_membership_update",
membership_update_in_t, membership_update_out_t,
membership_update_rpc_ult, args->provider_id, args->pool);
margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL);
ctx->leave_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_leave",
uint64_t, void, leave_rpc_ult,
args->provider_id, args->pool);
margo_register_data(mid, ctx->leave_rpc_id, ctx, NULL);

if(ctx->is_primary) {
for(size_t i = 0; i < ctx->view.members.size; ++i) {
Expand Down Expand Up @@ -340,6 +374,10 @@ static flock_return_t centralized_create_group(
return ret;
}

// -------------------------------------------------------------------------------
// PING MECHANISM
// -------------------------------------------------------------------------------

static void ping_timer_callback(void* args)
{
double now, next_ping_ms;
Expand Down Expand Up @@ -406,7 +444,7 @@ static void ping_timer_callback(void* args)
context->callback_context, FLOCK_MEMBER_DIED, rank, address, provider_id);
}

issue_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id);
broadcast_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id);
free(address);
return;
}
Expand Down Expand Up @@ -452,6 +490,10 @@ static void ping_rpc_ult(hg_handle_t h)
margo_destroy(h);
}

// -------------------------------------------------------------------------------
// GET_VIEW (RPC send by non-zero ranks to primary)
// -------------------------------------------------------------------------------

static DEFINE_MARGO_RPC_HANDLER(get_view_rpc_ult)
static void get_view_rpc_ult(hg_handle_t h)
{
Expand Down Expand Up @@ -479,24 +521,105 @@ static flock_return_t get_view(centralized_context* ctx)

hret = margo_provider_forward(ctx->primary.provider_id, handle, NULL);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
// LCOV_EXCL_START
margo_error(ctx->mid,
"[flock] Could not get view from primary member, "
"margo_provider_forward failed: %s", HG_Error_to_string(hret));
goto finish;
// LCOV_EXCL_STOP
}

hret = margo_get_output(handle, &ctx->view);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
// LCOV_EXCL_START
margo_error(ctx->mid,
"[flock] Could not get view from primary member, "
"margo_get_output failed: %s", HG_Error_to_string(hret));
goto finish;
// LCOV_EXCL_STOP
}

margo_free_output(handle, NULL);

finish:
margo_destroy(handle);
if(handle) margo_destroy(handle);
return FLOCK_SUCCESS;
}

static flock_return_t issue_membership_update(
// -------------------------------------------------------------------------------
// LEAVE (RPC send by a non-zero rank to primary when it wants to leave)
// -------------------------------------------------------------------------------

static DEFINE_MARGO_RPC_HANDLER(leave_rpc_ult)
static void leave_rpc_ult(hg_handle_t h)
{
char* address = NULL;
uint16_t provider_id;
uint64_t rank;

/* find the margo instance */
margo_instance_id mid = margo_hg_handle_get_instance(h);

/* find the context */
const struct hg_info* info = margo_get_info(h);
centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id);
if(!ctx) goto finish;

/* the leaving provider sent its rank */
hg_return_t hret = margo_get_input(h, &rank);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
margo_error(ctx->mid, "[flock] Could not deserialize rank of leaving process");
goto finish;
// LCOV_EXCL_STOP
}
margo_free_input(h, &rank);

FLOCK_GROUP_VIEW_LOCK(&ctx->view);
const flock_member_t* member = flock_group_view_find_member(&ctx->view, rank);
if(!member) {
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);
margo_error(ctx->mid, "[flock] Rank %lu requested to leave but is not part of the group");
goto finish;
}
provider_id = member->provider_id;
address = strdup(member->address);
flock_group_view_remove_member(&ctx->view, rank);
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);

if(ctx->member_update_callback) {
(ctx->member_update_callback)(
ctx->callback_context, FLOCK_MEMBER_LEFT, rank, address, provider_id);
}

broadcast_membership_update(ctx, FLOCK_MEMBER_LEFT, rank, address, provider_id);

finish:
free(address);
margo_respond(h, NULL);
margo_destroy(h);
}

static flock_return_t leave(centralized_context* ctx)
{
hg_return_t hret = HG_SUCCESS;
hg_handle_t handle = HG_HANDLE_NULL;
hret = margo_create(ctx->mid, ctx->primary.address, ctx->leave_rpc_id, &handle);
if(hret != HG_SUCCESS) goto finish;

margo_provider_forward_timed(ctx->primary.provider_id, handle, &ctx->rank, 1000.0);

finish:
if(handle) margo_destroy(handle);
return FLOCK_SUCCESS;
}

// -------------------------------------------------------------------------------
// MEMBERSHIP UPDATE
// (RPC send by rank 0 to other ranks to notify them of a membership change)
// -------------------------------------------------------------------------------

static flock_return_t broadcast_membership_update(
centralized_context* ctx,
flock_update_t update,
size_t rank, const char* address,
Expand Down Expand Up @@ -624,12 +747,16 @@ static flock_return_t centralized_destroy_group(void* ctx)
free(timers);
flock_group_view_clear_extra(&context->view);
FLOCK_GROUP_VIEW_UNLOCK(&context->view);
} else {
leave(ctx);
}
// FIXME: in non-primary members, it's possible that we are calling margo_deregister
// while a ping RPC is in flight. There is nothing much we can do, this is something
// to solve at the Mercury level. See here: https://github.com/mercury-hpc/mercury/issues/534
if(context->ping_rpc_id) margo_deregister(context->mid, context->ping_rpc_id);
if(context->get_view_rpc_id) margo_deregister(context->mid, context->get_view_rpc_id);
if(context->membership_update_rpc_id) margo_deregister(context->mid, context->membership_update_rpc_id);
if(context->leave_rpc_id) margo_deregister(context->mid, context->leave_rpc_id);
if(context->config) json_object_put(context->config);
flock_group_view_clear(&context->view);
free(context);
Expand Down
2 changes: 2 additions & 0 deletions src/provider.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,11 @@ flock_return_t flock_provider_register(
if(member->provider_id != provider_id) continue;
if(strcmp(member->address, self_addr_str) != 0) continue;
backend_init_args.join = false;
backend_init_args.rank = member->rank;
is_first = i == 0;
break;
}
if(backend_init_args.join) backend_init_args.rank = UINT64_MAX;

/* create the new group's context */
void* context = NULL;
Expand Down

0 comments on commit c30e682

Please sign in to comment.