From 16a9ab2a9480c18bebc1f4e3014655fe65c9a6c2 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Wed, 1 May 2024 16:57:04 +0100 Subject: [PATCH] added mechanism to broadcast information about dead member --- src/centralized/centralized-backend.c | 216 ++++++++++++++++++++++---- tests/test-centralized.cpp | 60 ++++++- 2 files changed, 245 insertions(+), 31 deletions(-) diff --git a/src/centralized/centralized-backend.c b/src/centralized/centralized-backend.c index 546f654..8bab124 100644 --- a/src/centralized/centralized-backend.c +++ b/src/centralized/centralized-backend.c @@ -14,6 +14,15 @@ #define RAND_BETWEEN(x, y) ((x) + (((double)rand()) / RAND_MAX)*(y-x)) +MERCURY_GEN_PROC(membership_update_in_t, + ((uint8_t)(update))\ + ((uint64_t)(rank))\ + ((hg_const_string_t)(address))\ + ((uint16_t)(provider_id))) + +MERCURY_GEN_PROC(membership_update_out_t, + ((uint32_t)(ret))) + /** * @brief The "centralized" backend uses the member with rank 0 * as a centralized authority that is supposed to hold the most up to date @@ -33,22 +42,27 @@ typedef struct centralized_context { flock_group_view_t view; hg_id_t ping_rpc_id; hg_id_t get_view_rpc_id; + hg_id_t membership_update_rpc_id; /* extracted from configuration */ double ping_timeout_ms; double ping_interval_ms_min; double ping_interval_ms_max; unsigned ping_max_num_timeouts; + /* update callbacks */ + flock_membership_update_fn member_update_callback; + flock_metadata_update_fn metadata_update_callback; + void* callback_context; } centralized_context; /** * @brief Extra data attached to each member in the view in the primary member. */ typedef struct member_state { + flock_member_t* owner; centralized_context* context; hg_addr_t address; - size_t rank; - uint16_t provider_id; margo_timer_t ping_timer; + _Atomic bool in_timer_callback; double last_ping_timestamp; hg_handle_t last_ping_handle; uint8_t num_ping_timeouts; @@ -58,12 +72,14 @@ typedef struct member_state { static inline void member_state_free(void* args) { member_state* state = (member_state*)args; + if(!state) return; ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx)); if(state->last_ping_handle) { HG_Cancel(state->last_ping_handle); } if(state->ping_timer) { - margo_timer_cancel(state->ping_timer); + if(!state->in_timer_callback) + margo_timer_cancel(state->ping_timer); margo_timer_destroy(state->ping_timer); state->ping_timer = MARGO_TIMER_NULL; } @@ -81,6 +97,12 @@ static flock_return_t get_view(centralized_context* ctx); static DECLARE_MARGO_RPC_HANDLER(get_view_rpc_ult) static void get_view_rpc_ult(hg_handle_t h); +static flock_return_t issue_membership_update( + centralized_context* ctx, flock_update_t update, + size_t rank, const char* address, uint16_t provider_id); +static DECLARE_MARGO_RPC_HANDLER(membership_update_rpc_ult) +static void membership_update_rpc_ult(hg_handle_t h); + static flock_return_t centralized_destroy_group(void* ctx); /** @@ -89,16 +111,16 @@ static flock_return_t centralized_destroy_group(void* ctx); * ``` * { * "ping_timeout_ms": X, - * "ping_interval_ms": Y, + * "ping_interval_ms": Y or [Ymin, Ymax], * "ping_max_num_timeouts": Z * } * ``` * * - ping_timeout_ms is the timeout value when sending a ping RPC to a member. - * - ping_interval_ms is the time to wait between to ping RPCs to the same follower. + * - ping_interval_ms is the time to wait between two ping RPCs to the same follower. * - ping_max_num_timeouts is the number of RPC timeouts before the member is considered dead. - * ping_interval_ms may be a list of two values [a,b] instead of a single value Y. If a list - * is provided, the interval will be drawn randomly from a uniform distribution in the [a,b] + * ping_interval_ms may be a list of two values [Ymin,Ymax] instead of a single value Y. If a list + * is provided, the interval will be drawn randomly from a uniform distribution in the [Ymin,Ymax] * range each time. */ static flock_return_t centralized_create_group( @@ -250,41 +272,53 @@ static flock_return_t centralized_create_group( ctx->primary.provider_id = primary_member->provider_id; /* copy the configuration */ - ctx->config = config; - ctx->ping_timeout_ms = ping_timeout_ms_val; + ctx->config = config; + ctx->ping_timeout_ms = ping_timeout_ms_val; ctx->ping_interval_ms_min = ping_interval_ms_min; ctx->ping_interval_ms_max = ping_interval_ms_max; ctx->ping_max_num_timeouts = ping_max_num_timeouts; + /* copy the membership and metadata update callbask */ + ctx->member_update_callback = args->member_update_callback; + ctx->metadata_update_callback = args->metadata_update_callback; + ctx->callback_context = args->callback_context; + /* move the initial view in the context */ FLOCK_GROUP_VIEW_MOVE(&args->initial_view, &ctx->view); /* register the RPCs */ ctx->ping_rpc_id = MARGO_REGISTER_PROVIDER( - mid, "flock_centralized_ping", uint64_t, void, ping_rpc_ult, + mid, "flock_centralized_ping", + uint64_t, void, ping_rpc_ult, args->provider_id, args->pool); margo_register_data(mid, ctx->ping_rpc_id, ctx, NULL); ctx->get_view_rpc_id = MARGO_REGISTER_PROVIDER( - mid, "flock_centralized_get_view", void, flock_protected_group_view_t, get_view_rpc_ult, + mid, "flock_centralized_get_view", + void, flock_protected_group_view_t, get_view_rpc_ult, args->provider_id, args->pool); margo_register_data(mid, ctx->get_view_rpc_id, ctx, NULL); + ctx->membership_update_rpc_id = MARGO_REGISTER_PROVIDER( + mid, "flock_centralized_membership_update", + membership_update_in_t, membership_update_out_t, + membership_update_rpc_ult, args->provider_id, args->pool); + margo_register_data(mid, ctx->membership_update_rpc_id, ctx, NULL); + if(ctx->is_primary) { - // Note: the loop starts at 1 because the first member is the - // primary member and is not going to ping itself - for(size_t i = 1; i < ctx->view.members.size; ++i) { + for(size_t i = 0; i < ctx->view.members.size; ++i) { flock_member_t* member = &ctx->view.members.data[i]; member->extra.data = calloc(1, sizeof(struct member_state)); member->extra.free = member_state_free; member_state* state = (member_state*)(member->extra.data); state->context = ctx; - state->rank = member->rank; - state->provider_id = member->provider_id; + state->owner = member; if(HG_SUCCESS != margo_addr_lookup(mid, member->address, &state->address)) { ret = FLOCK_ERR_FROM_MERCURY; goto error; } + if(i == 0) continue; + // create timer only for non-primary members margo_timer_create(mid, ping_timer_callback, state, &state->ping_timer); state->last_ping_timestamp = ABT_get_wtime(); double interval = RAND_BETWEEN( @@ -295,6 +329,7 @@ static flock_return_t centralized_create_group( } else if(args->join) { ret = get_view(ctx); if(ret != FLOCK_SUCCESS) goto error; + // TODO } *context = ctx; @@ -309,6 +344,7 @@ static void ping_timer_callback(void* args) { double now, next_ping_ms; member_state* state = (member_state*)args; + state->in_timer_callback = true; hg_return_t hret = HG_SUCCESS; ABT_mutex_lock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx)); @@ -325,7 +361,7 @@ static void ping_timer_callback(void* args) margo_request req = MARGO_REQUEST_NULL; double timeout = state->context->ping_timeout_ms; hret = margo_provider_iforward_timed( - state->provider_id, + state->owner->provider_id, state->last_ping_handle, &state->context->view.digest, timeout, &req); @@ -345,25 +381,38 @@ static void ping_timer_callback(void* args) ABT_mutex_unlock(ABT_MUTEX_MEMORY_GET_HANDLE(&state->mtx)); // request was canceled, we need to terminate - if(hret == HG_CANCELED) return; - - if(hret == HG_SUCCESS) state->num_ping_timeouts = 0; - else if(hret == HG_TIMEOUT) state->num_ping_timeouts += 1; - else { - margo_warning(state->context->mid, - "[flock] Unhandled error from margo_provider_forward_timed in ping timer (%s)", - HG_Error_to_string(hret)); + if(hret == HG_CANCELED) { + state->in_timer_callback = false; + return; } - if(state->num_ping_timeouts == state->context->ping_max_num_timeouts) { - centralized_context* context = state->context; + if(hret == HG_SUCCESS) state->num_ping_timeouts = 0; + else state->num_ping_timeouts += 1; + + centralized_context* context = state->context; + if(state->num_ping_timeouts == context->ping_max_num_timeouts) { + size_t rank = state->owner->rank; + char* address = strdup(state->owner->address); + uint16_t provider_id = state->owner->provider_id; + margo_trace(context->mid, + "[flock] Ping to member %lu (%s, %d) timed out %d times, " + "considering the member dead.", rank, address, provider_id, context->ping_max_num_timeouts); FLOCK_GROUP_VIEW_LOCK(&context->view); - flock_group_view_remove_member(&context->view, state->rank); + flock_group_view_remove_member(&context->view, rank); FLOCK_GROUP_VIEW_UNLOCK(&context->view); + + if(context->member_update_callback) { + (context->member_update_callback)( + context->callback_context, FLOCK_MEMBER_DIED, rank, address, provider_id); + } + + issue_membership_update(context, FLOCK_MEMBER_DIED, rank, address, provider_id); + free(address); return; } restart_timer: + state->in_timer_callback = false; now = ABT_get_wtime(); next_ping_ms = RAND_BETWEEN( state->context->ping_interval_ms_min, @@ -447,6 +496,113 @@ static flock_return_t get_view(centralized_context* ctx) return FLOCK_SUCCESS; } +static flock_return_t issue_membership_update( + centralized_context* ctx, + flock_update_t update, + size_t rank, const char* address, + uint16_t provider_id) +{ + hg_return_t hret = HG_SUCCESS; + flock_return_t ret = FLOCK_SUCCESS; + size_t rpc_count = 0; + hg_handle_t* handles = NULL; + margo_request* requests = NULL; + membership_update_out_t* out; + membership_update_in_t in = { + .update = (uint8_t)update, + .rank = rank, + .address = address, + .provider_id = provider_id + }; + + FLOCK_GROUP_VIEW_LOCK(&ctx->view); + rpc_count = ctx->view.members.size - 1; + handles = (hg_handle_t*)calloc(rpc_count, sizeof(*handles)); + requests = (margo_request*)calloc(rpc_count, sizeof(*requests)); + out = (membership_update_out_t*)calloc(rpc_count, sizeof(*out)); + + margo_trace(ctx->mid, + "[flock] Issuing membership update to %lu members...", rpc_count); + + for(size_t i = 0; i < rpc_count; ++i) { + member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data; + hret = margo_create(ctx->mid, state->address, ctx->membership_update_rpc_id, &handles[i]); + if(hret != HG_SUCCESS) { + margo_error(ctx->mid, + "[flock] Could not create handle to issue membership update to rank %lu", + state->owner->rank); + } + } + + for(size_t i = 0; i < rpc_count; ++i) { + member_state* state = (member_state*)ctx->view.members.data[i+1].extra.data; + if(!handles[i]) continue; + hret = margo_provider_iforward_timed( + state->owner->provider_id, handles[i], &in, 1000.0, &requests[i]); + if(hret != HG_SUCCESS) { + margo_error(ctx->mid, + "[flock] Could not forward membership update to rank %lu", + state->owner->rank); + } + } + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + + for(size_t i = 0; i < rpc_count; ++i) { + if(requests[i]) { + margo_wait(requests[i]); + margo_get_output(handles[i], &out[i]); + margo_free_output(handles[i], &out[i]); + } + if(handles[i]) margo_destroy(handles[i]); + } + free(requests); + free(handles); + free(out); + + return ret; +} + +static DEFINE_MARGO_RPC_HANDLER(membership_update_rpc_ult) +static void membership_update_rpc_ult(hg_handle_t h) +{ + hg_return_t hret = HG_SUCCESS; + membership_update_in_t in = {0}; + membership_update_out_t out = {0}; + + /* find the margo instance */ + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* find the context */ + const struct hg_info* info = margo_get_info(h); + centralized_context* ctx = (centralized_context*)margo_registered_data(mid, info->id); + if(!ctx) goto finish; + + hret = margo_get_input(h, &in); + if(hret != HG_SUCCESS) { + // LCOV_EXCL_START + out.ret = (uint32_t)FLOCK_ERR_FROM_MERCURY; + goto finish; + // LCOV_EXCL_STOP + } + + FLOCK_GROUP_VIEW_LOCK(&ctx->view); + flock_group_view_remove_member(&ctx->view, in.rank); + FLOCK_GROUP_VIEW_UNLOCK(&ctx->view); + + if(ctx->member_update_callback) { + (ctx->member_update_callback)( + ctx->callback_context, in.update, in.rank, in.address, in.provider_id); + } + + /* respond with the current view */ + margo_respond(h, &ctx->view); + +finish: + margo_free_input(h, &in); + margo_respond(h, &out); + margo_destroy(h); +} + static flock_return_t centralized_destroy_group(void* ctx) { centralized_context* context = (centralized_context*)ctx; @@ -506,7 +662,7 @@ static flock_return_t centralized_add_metadata( (void)key; (void)value; return FLOCK_ERR_OP_UNSUPPORTED; - // LCOV_EXCL_END + // LCOV_EXCL_STOP } static flock_return_t centralized_remove_metadata( @@ -516,7 +672,7 @@ static flock_return_t centralized_remove_metadata( (void)ctx; (void)key; return FLOCK_ERR_OP_UNSUPPORTED; - // LCOV_EXCL_END + // LCOV_EXCL_STOP } static flock_backend_impl centralized_backend = { diff --git a/tests/test-centralized.cpp b/tests/test-centralized.cpp index fa3c0e6..bc0efd9 100644 --- a/tests/test-centralized.cpp +++ b/tests/test-centralized.cpp @@ -48,7 +48,7 @@ TEST_CASE("Test group handle for centralized group", "[centralize]") { }, "bootstrap": "view" })"); - +#if 0 SECTION("Test provider functionalities") { char* config = flock_provider_get_config(group->providers[0]); REQUIRE(config != nullptr); @@ -183,6 +183,64 @@ TEST_CASE("Test group handle for centralized group", "[centralize]") { ret = flock_client_finalize(client); REQUIRE(ret == FLOCK_SUCCESS); + // let the group do a few pings margo_thread_sleep(context->mid, 5000); } +#endif + + SECTION("Test removing member") { + flock_client_t client; + flock_return_t ret; + // create a client object + ret = flock_client_init(context->mid, ABT_POOL_NULL, &client); + REQUIRE(ret == FLOCK_SUCCESS); + + flock_group_handle_t rh; + // create a group handle + ret = flock_group_handle_create(client, + context->addr, 1, FLOCK_MODE_INIT_UPDATE, &rh); + REQUIRE(ret == FLOCK_SUCCESS); + + // check group size + size_t group_size = 0; + ret = flock_group_size(rh, &group_size); + REQUIRE(ret == FLOCK_SUCCESS); + REQUIRE(group_size == 5); + + // forcefully remove rank 4 + flock_provider_destroy(group->providers.back()); + group->providers.pop_back(); + + // sleep a bit + margo_thread_sleep(context->mid, 5000); + + // update the group handle + ret = flock_group_update_view(rh, NULL); + REQUIRE(ret == FLOCK_SUCCESS); + + // check group size + ret = flock_group_size(rh, &group_size); + REQUIRE(ret == FLOCK_SUCCESS); + REQUIRE(group_size == 4); + + // test getting addresses and provider IDs with correct ranks + for(size_t i = 0; i < 4; ++i) { + hg_addr_t addr = HG_ADDR_NULL; + ret = flock_group_member_get_address(rh, i, &addr); + REQUIRE(ret == FLOCK_SUCCESS); + REQUIRE(addr != HG_ADDR_NULL); + margo_addr_free(context->mid, addr); + + char* address = NULL; + ret = flock_group_member_get_address_string(rh, i, &address); + REQUIRE(ret == FLOCK_SUCCESS); + REQUIRE(address != NULL); + free(address); + + uint16_t provider_id = 0; + ret = flock_group_member_get_provider_id(rh, i, &provider_id); + REQUIRE(ret == FLOCK_SUCCESS); + REQUIRE(provider_id == i+1); + } + } }