Skip to content

Commit

Permalink
added mechanism to broadcast information about dead member
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed May 1, 2024
1 parent 3a89679 commit 16a9ab2
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 31 deletions.
216 changes: 186 additions & 30 deletions src/centralized/centralized-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@

#define RAND_BETWEEN(x, y) ((x) + (((double)rand()) / RAND_MAX)*(y-x))

MERCURY_GEN_PROC(membership_update_in_t,
((uint8_t)(update))\
((uint64_t)(rank))\
((hg_const_string_t)(address))\
((uint16_t)(provider_id)))

MERCURY_GEN_PROC(membership_update_out_t,
((uint32_t)(ret)))

/**
* @brief The "centralized" backend uses the member with rank 0
* as a centralized authority that is supposed to hold the most up to date
Expand All @@ -33,22 +42,27 @@ typedef struct centralized_context {
flock_group_view_t view;
hg_id_t ping_rpc_id;
hg_id_t get_view_rpc_id;
hg_id_t membership_update_rpc_id;
/* extracted from configuration */
double ping_timeout_ms;
double ping_interval_ms_min;
double ping_interval_ms_max;
unsigned ping_max_num_timeouts;
/* update callbacks */
flock_membership_update_fn member_update_callback;
flock_metadata_update_fn metadata_update_callback;
void* callback_context;
} centralized_context;

/**
* @brief Extra data attached to each member in the view in the primary member.
*/
typedef struct member_state {
flock_member_t* owner;
centralized_context* context;
hg_addr_t address;
size_t rank;
uint16_t provider_id;
margo_timer_t ping_timer;
_Atomic bool in_timer_callback;
double last_ping_timestamp;
hg_handle_t last_ping_handle;
uint8_t num_ping_timeouts;
Expand All @@ -58,12 +72,14 @@ typedef struct member_state {
static inline void member_state_free(void* args)
{
member_state* state = (member_state*)args;
if(!state) return;
ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx));
if(state->last_ping_handle) {
HG_Cancel(state->last_ping_handle);
}
if(state->ping_timer) {
margo_timer_cancel(state->ping_timer);
if(!state->in_timer_callback)
margo_timer_cancel(state->ping_timer);
margo_timer_destroy(state->ping_timer);
state->ping_timer = MARGO_TIMER_NULL;
}
Expand All @@ -81,6 +97,12 @@ static flock_return_t get_view(centralized_context* ctx);
static DECLARE_MARGO_RPC_HANDLER(get_view_rpc_ult)
static void get_view_rpc_ult(hg_handle_t h);

static flock_return_t issue_membership_update(
centralized_context* ctx, flock_update_t update,
size_t rank, const char* address, uint16_t provider_id);
static DECLARE_MARGO_RPC_HANDLER(membership_update_rpc_ult)
static void membership_update_rpc_ult(hg_handle_t h);

static flock_return_t centralized_destroy_group(void* ctx);

/**
Expand All @@ -89,16 +111,16 @@ static flock_return_t centralized_destroy_group(void* ctx);
* ```
* {
* "ping_timeout_ms": X,
* "ping_interval_ms": Y,
* "ping_interval_ms": Y or [Ymin, Ymax],
* "ping_max_num_timeouts": Z
* }
* ```
*
* - ping_timeout_ms is the timeout value when sending a ping RPC to a member.
* - ping_interval_ms is the time to wait between to ping RPCs to the same follower.
* - ping_interval_ms is the time to wait between two ping RPCs to the same follower.
* - ping_max_num_timeouts is the number of RPC timeouts before the member is considered dead.
* ping_interval_ms may be a list of two values [a,b] instead of a single value Y. If a list
* is provided, the interval will be drawn randomly from a uniform distribution in the [a,b]
* ping_interval_ms may be a list of two values [Ymin,Ymax] instead of a single value Y. If a list
* is provided, the interval will be drawn randomly from a uniform distribution in the [Ymin,Ymax]
* range each time.
*/
static flock_return_t centralized_create_group(
Expand Down Expand Up @@ -250,41 +272,53 @@ static flock_return_t centralized_create_group(
ctx->primary.provider_id = primary_member->provider_id;

/* copy the configuration */
ctx->config = config;
ctx->ping_timeout_ms = ping_timeout_ms_val;
ctx->config = config;
ctx->ping_timeout_ms = ping_timeout_ms_val;
ctx->ping_interval_ms_min = ping_interval_ms_min;
ctx->ping_interval_ms_max = ping_interval_ms_max;
ctx->ping_max_num_timeouts = ping_max_num_timeouts;

/* copy the membership and metadata update callbask */
ctx->member_update_callback = args->member_update_callback;
ctx->metadata_update_callback = args->metadata_update_callback;
ctx->callback_context = args->callback_context;

/* move the initial view in the context */
FLOCK_GROUP_VIEW_MOVE(&args->initial_view, &ctx->view);

/* register the RPCs */
ctx->ping_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_ping", uint64_t, void, ping_rpc_ult,
mid, "flock_centralized_ping",
uint64_t, void, ping_rpc_ult,
args->provider_id, args->pool);
margo_register_data(mid, ctx->ping_rpc_id, ctx, NULL);

ctx->get_view_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_get_view", void, flock_protected_group_view_t, get_view_rpc_ult,
mid, "flock_centralized_get_view",
void, flock_protected_group_view_t, get_view_rpc_ult,
args->provider_id, args->pool);
margo_register_data(mid, ctx->get_view_rpc_id, ctx, NULL);

ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER(
mid, "flock_centralized_membership_update",
membership_update_in_t, membership_update_out_t,
membership_update_rpc_ult, args->provider_id, args->pool);
margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL);

if(ctx->is_primary) {
// Note: the loop starts at 1 because the first member is the
// primary member and is not going to ping itself
for(size_t i = 1; i < ctx->view.members.size; ++i) {
for(size_t i = 0; i < ctx->view.members.size; ++i) {
flock_member_t* member = &ctx->view.members.data[i];
member->extra.data = calloc(1, sizeof(struct member_state));
member->extra.free = member_state_free;
member_state* state = (member_state*)(member->extra.data);
state->context = ctx;
state->rank = member->rank;
state->provider_id = member->provider_id;
state->owner = member;
if(HG_SUCCESS != margo_addr_lookup(mid, member->address, &state->address)) {
ret = FLOCK_ERR_FROM_MERCURY;
goto error;
}
if(i == 0) continue;
// create timer only for non-primary members
margo_timer_create(mid, ping_timer_callback, state, &state->ping_timer);
state->last_ping_timestamp = ABT_get_wtime();
double interval = RAND_BETWEEN(
Expand All @@ -295,6 +329,7 @@ static flock_return_t centralized_create_group(
} else if(args->join) {
ret = get_view(ctx);
if(ret != FLOCK_SUCCESS) goto error;
// TODO
}

*context = ctx;
Expand All @@ -309,6 +344,7 @@ static void ping_timer_callback(void* args)
{
double now, next_ping_ms;
member_state* state = (member_state*)args;
state->in_timer_callback = true;
hg_return_t hret = HG_SUCCESS;

ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx));
Expand All @@ -325,7 +361,7 @@ static void ping_timer_callback(void* args)
margo_request req = MARGO_REQUEST_NULL;
double timeout = state->context->ping_timeout_ms;
hret = margo_provider_iforward_timed(
state->provider_id,
state->owner->provider_id,
state->last_ping_handle,
&state->context->view.digest,
timeout, &req);
Expand All @@ -345,25 +381,38 @@ static void ping_timer_callback(void* args)
ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx));

// request was canceled, we need to terminate
if(hret == HG_CANCELED) return;

if(hret == HG_SUCCESS) state->num_ping_timeouts = 0;
else if(hret == HG_TIMEOUT) state->num_ping_timeouts += 1;
else {
margo_warning(state->context->mid,
"[flock] Unhandled error from margo_provider_forward_timed in ping timer (%s)",
HG_Error_to_string(hret));
if(hret == HG_CANCELED) {
state->in_timer_callback = false;
return;
}

if(state->num_ping_timeouts == state->context->ping_max_num_timeouts) {
centralized_context* context = state->context;
if(hret == HG_SUCCESS) state->num_ping_timeouts = 0;
else state->num_ping_timeouts += 1;

centralized_context* context = state->context;
if(state->num_ping_timeouts == context->ping_max_num_timeouts) {
size_t rank = state->owner->rank;
char* address = strdup(state->owner->address);
uint16_t provider_id = state->owner->provider_id;
margo_trace(context->mid,
"[flock] Ping to member %lu (%s, %d) timed out %d times, "
"considering the member dead.", rank, address, provider_id, context->ping_max_num_timeouts);
FLOCK_GROUP_VIEW_LOCK(&context->view);
flock_group_view_remove_member(&context->view, state->rank);
flock_group_view_remove_member(&context->view, rank);
FLOCK_GROUP_VIEW_UNLOCK(&context->view);

if(context->member_update_callback) {
(context->member_update_callback)(
context->callback_context, FLOCK_MEMBER_DIED, rank, address, provider_id);
}

issue_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id);
free(address);
return;
}

restart_timer:
state->in_timer_callback = false;
now = ABT_get_wtime();
next_ping_ms = RAND_BETWEEN(
state->context->ping_interval_ms_min,
Expand Down Expand Up @@ -447,6 +496,113 @@ static flock_return_t get_view(centralized_context* ctx)
return FLOCK_SUCCESS;
}

static flock_return_t issue_membership_update(
centralized_context* ctx,
flock_update_t update,
size_t rank, const char* address,
uint16_t provider_id)
{
hg_return_t hret = HG_SUCCESS;
flock_return_t ret = FLOCK_SUCCESS;
size_t rpc_count = 0;
hg_handle_t* handles = NULL;
margo_request* requests = NULL;
membership_update_out_t* out;
membership_update_in_t in = {
.update = (uint8_t)update,
.rank = rank,
.address = address,
.provider_id = provider_id
};

FLOCK_GROUP_VIEW_LOCK(&ctx->view);
rpc_count = ctx->view.members.size - 1;
handles = (hg_handle_t*)calloc(rpc_count, sizeof(*handles));
requests = (margo_request*)calloc(rpc_count, sizeof(*requests));
out = (membership_update_out_t*)calloc(rpc_count, sizeof(*out));

margo_trace(ctx->mid,
"[flock] Issuing membership update to %lu members...", rpc_count);

for(size_t i = 0; i < rpc_count; ++i) {
member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data;
hret = margo_create(ctx->mid, state->address, ctx->membership_update_rpc_id, &handles[i]);
if(hret != HG_SUCCESS) {
margo_error(ctx->mid,
"[flock] Could not create handle to issue membership update to rank %lu",
state->owner->rank);
}
}

for(size_t i = 0; i < rpc_count; ++i) {
member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data;
if(!handles[i]) continue;
hret = margo_provider_iforward_timed(
state->owner->provider_id, handles[i], &in, 1000.0, &requests[i]);
if(hret != HG_SUCCESS) {
margo_error(ctx->mid,
"[flock] Could not forward membership update to rank %lu",
state->owner->rank);
}
}
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);

for(size_t i = 0; i < rpc_count; ++i) {
if(requests[i]) {
margo_wait(requests[i]);
margo_get_output(handles[i], &out[i]);
margo_free_output(handles[i], &out[i]);
}
if(handles[i]) margo_destroy(handles[i]);
}
free(requests);
free(handles);
free(out);

return ret;
}

static DEFINE_MARGO_RPC_HANDLER(membership_update_rpc_ult)
static void membership_update_rpc_ult(hg_handle_t h)
{
hg_return_t hret = HG_SUCCESS;
membership_update_in_t in = {0};
membership_update_out_t out = {0};

/* find the margo instance */
margo_instance_id mid = margo_hg_handle_get_instance(h);

/* find the context */
const struct hg_info* info = margo_get_info(h);
centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id);
if(!ctx) goto finish;

hret = margo_get_input(h, &in);
if(hret != HG_SUCCESS) {
// LCOV_EXCL_START
out.ret = (uint32_t)FLOCK_ERR_FROM_MERCURY;
goto finish;
// LCOV_EXCL_STOP
}

FLOCK_GROUP_VIEW_LOCK(&ctx->view);
flock_group_view_remove_member(&ctx->view, in.rank);
FLOCK_GROUP_VIEW_UNLOCK(&ctx->view);

if(ctx->member_update_callback) {
(ctx->member_update_callback)(
ctx->callback_context, in.update, in.rank, in.address, in.provider_id);
}

/* respond with the current view */
margo_respond(h, &ctx->view);

finish:
margo_free_input(h, &in);
margo_respond(h, &out);
margo_destroy(h);
}

static flock_return_t centralized_destroy_group(void* ctx)
{
centralized_context* context = (centralized_context*)ctx;
Expand Down Expand Up @@ -506,7 +662,7 @@ static flock_return_t centralized_add_metadata(
(void)key;
(void)value;
return FLOCK_ERR_OP_UNSUPPORTED;
// LCOV_EXCL_END
// LCOV_EXCL_STOP
}

static flock_return_t centralized_remove_metadata(
Expand All @@ -516,7 +672,7 @@ static flock_return_t centralized_remove_metadata(
(void)ctx;
(void)key;
return FLOCK_ERR_OP_UNSUPPORTED;
// LCOV_EXCL_END
// LCOV_EXCL_STOP
}

static flock_backend_impl centralized_backend = {
Expand Down
Loading

0 comments on commit 16a9ab2

Please sign in to comment.