From c34f447c4a9d6cd323c59a1e6f85e822463bf196 Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Tue, 16 May 2023 18:30:19 -0500 Subject: [PATCH 1/4] HG test: add concurrent multi RPC test Add multi-progress test Add multi-progress test with handle creation Refactoring of unit test cleanup --- Testing/unit/hg/mercury_unit.c | 40 ++- Testing/unit/hg/test_rpc.c | 452 +++++++++++++++++++++++++++++++-- 2 files changed, 446 insertions(+), 46 deletions(-) diff --git a/Testing/unit/hg/mercury_unit.c b/Testing/unit/hg/mercury_unit.c index c964b190..7179531b 100644 --- a/Testing/unit/hg/mercury_unit.c +++ b/Testing/unit/hg/mercury_unit.c @@ -127,46 +127,40 @@ hg_test_handle_create_cb(hg_handle_t handle, void *arg) static hg_return_t hg_test_finalize_rpc(struct hg_unit_info *info, hg_uint8_t target_id) { - hg_request_t *request_object = NULL; - hg_handle_t handle = HG_HANDLE_NULL; - hg_return_t ret = HG_SUCCESS, cleanup_ret; + hg_return_t ret; unsigned int completed; - request_object = hg_request_create(info->request_class); + hg_request_reset(info->request); - ret = HG_Create( - info->context, info->target_addr, hg_test_finalize_id_g, &handle); + ret = HG_Reset(info->handles[0], info->target_addr, hg_test_finalize_id_g); HG_TEST_CHECK_HG_ERROR( - done, ret, "HG_Create() failed (%s)", HG_Error_to_string(ret)); + error, ret, "HG_Reset() failed (%s)", HG_Error_to_string(ret)); /* Set target ID */ - ret = HG_Set_target_id(handle, target_id); + ret = HG_Set_target_id(info->handles[0], target_id); HG_TEST_CHECK_HG_ERROR( - done, ret, "HG_Set_target_id() failed (%s)", HG_Error_to_string(ret)); + error, ret, "HG_Set_target_id() failed (%s)", HG_Error_to_string(ret)); /* Forward call to target addr */ - ret = HG_Forward(handle, hg_test_finalize_rpc_cb, request_object, NULL); + ret = HG_Forward( + info->handles[0], hg_test_finalize_rpc_cb, info->request, NULL); HG_TEST_CHECK_HG_ERROR( - done, ret, "HG_Forward() failed (%s)", HG_Error_to_string(ret)); + error, ret, "HG_Forward() failed (%s)", HG_Error_to_string(ret)); - hg_request_wait(request_object, HG_TEST_TIMEOUT_MAX, &completed); + hg_request_wait(info->request, HG_TEST_TIMEOUT_MAX, &completed); if (!completed) { HG_TEST_LOG_WARNING("Canceling finalize, no response from server"); - ret = HG_Cancel(handle); + ret = HG_Cancel(info->handles[0]); HG_TEST_CHECK_HG_ERROR( - done, ret, "HG_Cancel() failed (%s)", HG_Error_to_string(ret)); + error, ret, "HG_Cancel() failed (%s)", HG_Error_to_string(ret)); - hg_request_wait(request_object, HG_TEST_TIMEOUT_MAX, &completed); + hg_request_wait(info->request, HG_TEST_TIMEOUT_MAX, &completed); } -done: - cleanup_ret = HG_Destroy(handle); - HG_TEST_CHECK_ERROR_DONE(cleanup_ret != HG_SUCCESS, - "HG_Destroy() failed (%s)", HG_Error_to_string(cleanup_ret)); - - hg_request_destroy(request_object); + return HG_SUCCESS; +error: return ret; } @@ -174,9 +168,7 @@ hg_test_finalize_rpc(struct hg_unit_info *info, hg_uint8_t target_id) static hg_return_t hg_test_finalize_rpc_cb(const struct hg_cb_info *callback_info) { - hg_request_t *request_object = (hg_request_t *) callback_info->arg; - - hg_request_complete(request_object); + hg_request_complete((hg_request_t *) callback_info->arg); return HG_SUCCESS; } diff --git a/Testing/unit/hg/test_rpc.c b/Testing/unit/hg/test_rpc.c index f522cc3c..5a74925f 100644 --- a/Testing/unit/hg/test_rpc.c +++ b/Testing/unit/hg/test_rpc.c @@ -31,11 +31,25 @@ struct forward_cb_args { struct forward_multi_cb_args { rpc_handle_t *rpc_handle; hg_return_t *rets; + hg_thread_mutex_t mutex; int32_t expected_count; /* Expected count */ int32_t complete_count; /* Completed count */ hg_request_t *request; /* Request */ }; +struct forward_no_req_cb_args { + hg_atomic_int32_t done; + rpc_handle_t *rpc_handle; + hg_return_t ret; +}; + +struct hg_test_multi_thread { + struct hg_unit_info *info; + hg_thread_t thread; + unsigned int thread_id; + hg_return_t ret; +}; + /********************/ /* Local Prototypes */ /********************/ @@ -75,6 +89,31 @@ hg_test_rpc_multi(hg_handle_t *handles, size_t handle_max, hg_addr_t addr, static hg_return_t hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info); +static hg_return_t +hg_test_rpc_launch_threads(struct hg_unit_info *info, hg_thread_func_t func); + +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_thread(void *arg); + +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_progress(void *arg); + +static hg_return_t +hg_test_rpc_no_req(hg_context_t *context, hg_handle_t handle, hg_cb_t callback); + +static hg_return_t +hg_test_rpc_no_req_cb(const struct hg_cb_info *callback_info); + +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_progress_create(void *arg); + +static hg_return_t +hg_test_rpc_no_req_create( + hg_context_t *context, hg_addr_t addr, hg_cb_t callback); + +static hg_return_t +hg_test_rpc_no_req_create_cb(const struct hg_cb_info *callback_info); + /*******************/ /* Local Variables */ /*******************/ @@ -377,6 +416,7 @@ hg_test_rpc_multi(hg_handle_t *handles, size_t handle_max, hg_addr_t addr, .rpc_handle = &rpc_open_handle, .request = request, .rets = NULL, + .mutex = HG_THREAD_MUTEX_INITIALIZER, .complete_count = 0, .expected_count = (int32_t) handle_max}; rpc_open_in_t in_struct = { @@ -385,6 +425,9 @@ hg_test_rpc_multi(hg_handle_t *handles, size_t handle_max, hg_addr_t addr, unsigned int flag; int rc; + HG_TEST_CHECK_ERROR(handle_max == 0, error, ret, HG_INVALID_PARAM, + "Handle max cannot be 0"); + hg_request_reset(request); forward_multi_cb_args.rets = @@ -450,6 +493,7 @@ hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info) int rpc_open_event_id; rpc_open_out_t rpc_open_out_struct; hg_return_t ret = callback_info->ret; + int32_t complete_count; HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", HG_Error_to_string(callback_info->ret)); @@ -479,13 +523,345 @@ hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info) } done: + hg_thread_mutex_lock(&args->mutex); args->rets[args->complete_count] = ret; - if (++args->complete_count == args->expected_count) + complete_count = ++args->complete_count; + hg_thread_mutex_unlock(&args->mutex); + if (complete_count == args->expected_count) hg_request_complete(args->request); return HG_SUCCESS; } +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_launch_threads(struct hg_unit_info *info, hg_thread_func_t func) +{ + struct hg_test_multi_thread *thread_infos; + unsigned int i; + hg_return_t ret; + int rc; + + thread_infos = + malloc(info->hg_test_info.thread_count * sizeof(*thread_infos)); + HG_TEST_CHECK_ERROR(thread_infos == NULL, error, ret, HG_NOMEM, + "Could not allocate thread array (%u)", + info->hg_test_info.thread_count); + + for (i = 0; i < info->hg_test_info.thread_count; i++) { + thread_infos[i].info = info; + thread_infos[i].thread_id = i; + + rc = hg_thread_create(&thread_infos[i].thread, func, &thread_infos[i]); + HG_TEST_CHECK_ERROR( + rc != 0, error, ret, HG_NOMEM, "hg_thread_create() failed"); + } + + for (i = 0; i < info->hg_test_info.thread_count; i++) { + rc = hg_thread_join(thread_infos[i].thread); + HG_TEST_CHECK_ERROR( + rc != 0, error, ret, HG_FAULT, "hg_thread_join() failed"); + } + for (i = 0; i < info->hg_test_info.thread_count; i++) + HG_TEST_CHECK_ERROR(thread_infos[i].ret != HG_SUCCESS, error, ret, + thread_infos[i].ret, "Error from thread %u (%s)", + thread_infos->thread_id, HG_Error_to_string(thread_infos[i].ret)); + + free(thread_infos); + + return HG_SUCCESS; + +error: + free(thread_infos); + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_thread(void *arg) +{ + struct hg_test_multi_thread *thread_arg = + (struct hg_test_multi_thread *) arg; + struct hg_unit_info *info = thread_arg->info; + hg_thread_ret_t tret = (hg_thread_ret_t) 0; + size_t handle_max = info->handle_max / info->hg_test_info.thread_count; + hg_handle_t *handles = &info->handles[thread_arg->thread_id * handle_max]; + hg_request_t *request; + hg_return_t ret; + + request = hg_request_create(info->request_class); + HG_TEST_CHECK_ERROR( + request == NULL, done, ret, HG_NOMEM, "Could not create request"); + + ret = hg_test_rpc_multi(handles, + (thread_arg->thread_id < (info->hg_test_info.thread_count - 1)) + ? handle_max + : info->handle_max - thread_arg->thread_id * handle_max, + info->target_addr, 0, hg_test_rpc_open_id_g, hg_test_rpc_multi_cb, + request); + HG_TEST_CHECK_HG_ERROR(done, ret, "hg_test_rpc_multiple() failed (%s)", + HG_Error_to_string(ret)); + +done: + hg_request_destroy(request); + thread_arg->ret = ret; + + hg_thread_exit(tret); + return tret; +} + +/*---------------------------------------------------------------------------*/ +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_progress(void *arg) +{ + struct hg_test_multi_thread *thread_arg = + (struct hg_test_multi_thread *) arg; + struct hg_unit_info *info = thread_arg->info; + hg_thread_ret_t tret = (hg_thread_ret_t) 0; + hg_return_t ret; + int i; + + HG_TEST_CHECK_ERROR(info->handle_max < thread_arg->thread_id, done, ret, + HG_INVALID_PARAM, "Handle max is too low (%zu)", info->handle_max); + + ret = HG_Reset(info->handles[thread_arg->thread_id], info->target_addr, + hg_test_rpc_open_id_g); + HG_TEST_CHECK_HG_ERROR( + done, ret, "HG_Reset() failed (%s)", HG_Error_to_string(ret)); + + for (i = 0; i < 100; i++) { + ret = hg_test_rpc_no_req(info->context, + info->handles[thread_arg->thread_id], hg_test_rpc_no_req_cb); + HG_TEST_CHECK_HG_ERROR(done, ret, "hg_test_rpc_no_req() failed (%s)", + HG_Error_to_string(ret)); + } + +done: + thread_arg->ret = ret; + + hg_thread_exit(tret); + return tret; +} + +/*---------------------------------------------------------------------------*/ +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_progress_create(void *arg) +{ + struct hg_test_multi_thread *thread_arg = + (struct hg_test_multi_thread *) arg; + struct hg_unit_info *info = thread_arg->info; + hg_thread_ret_t tret = (hg_thread_ret_t) 0; + hg_return_t ret; + int i; + + HG_TEST_CHECK_ERROR(info->handle_max < thread_arg->thread_id, done, ret, + HG_INVALID_PARAM, "Handle max is too low (%zu)", info->handle_max); + + for (i = 0; i < 100; i++) { + ret = hg_test_rpc_no_req_create( + info->context, info->target_addr, hg_test_rpc_no_req_create_cb); + HG_TEST_CHECK_HG_ERROR(done, ret, + "hg_test_rpc_no_req_create() failed (%s)", HG_Error_to_string(ret)); + } + +done: + thread_arg->ret = ret; + + hg_thread_exit(tret); + return tret; +} + +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_no_req(hg_context_t *context, hg_handle_t handle, hg_cb_t callback) +{ + hg_return_t ret; + rpc_handle_t rpc_open_handle = {.cookie = 100}; + struct forward_no_req_cb_args forward_cb_args = { + .done = HG_ATOMIC_VAR_INIT(0), + .rpc_handle = &rpc_open_handle, + .ret = HG_SUCCESS}; + rpc_open_in_t in_struct = { + .handle = rpc_open_handle, .path = HG_TEST_RPC_PATH}; + + ret = HG_Forward(handle, callback, &forward_cb_args, &in_struct); + HG_TEST_CHECK_HG_ERROR( + error, ret, "HG_Forward() failed (%s)", HG_Error_to_string(ret)); + + do { + unsigned int actual_count = 0; + + do { + ret = HG_Trigger(context, 0, 100, &actual_count); + } while ((ret == HG_SUCCESS) && actual_count); + HG_TEST_CHECK_ERROR_NORET(ret != HG_SUCCESS && ret != HG_TIMEOUT, error, + "HG_Trigger() failed (%s)", HG_Error_to_string(ret)); + + if (hg_atomic_get32(&forward_cb_args.done)) + break; + + ret = HG_Progress(context, 0); + } while (ret == HG_SUCCESS || ret == HG_TIMEOUT); + HG_TEST_CHECK_ERROR_NORET(ret != HG_SUCCESS && ret != HG_TIMEOUT, error, + "HG_Progress() failed (%s)", HG_Error_to_string(ret)); + + ret = forward_cb_args.ret; + HG_TEST_CHECK_HG_ERROR( + error, ret, "Error in HG callback (%s)", HG_Error_to_string(ret)); + + return HG_SUCCESS; + +error: + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_no_req_create( + hg_context_t *context, hg_addr_t addr, hg_cb_t callback) +{ + hg_return_t ret; + hg_handle_t handle; + rpc_handle_t rpc_open_handle = {.cookie = 100}; + struct forward_no_req_cb_args forward_cb_args = { + .done = HG_ATOMIC_VAR_INIT(0), + .rpc_handle = &rpc_open_handle, + .ret = HG_SUCCESS}; + rpc_open_in_t in_struct = { + .handle = rpc_open_handle, .path = HG_TEST_RPC_PATH}; + + ret = HG_Create(context, addr, hg_test_rpc_open_id_g, &handle); + HG_TEST_CHECK_HG_ERROR( + error, ret, "HG_Create() failed (%s)", HG_Error_to_string(ret)); + + ret = HG_Forward(handle, callback, &forward_cb_args, &in_struct); + HG_TEST_CHECK_HG_ERROR( + error, ret, "HG_Forward() failed (%s)", HG_Error_to_string(ret)); + + do { + unsigned int actual_count = 0; + + do { + ret = HG_Trigger(context, 0, 100, &actual_count); + } while ((ret == HG_SUCCESS) && actual_count); + HG_TEST_CHECK_ERROR_NORET(ret != HG_SUCCESS && ret != HG_TIMEOUT, error, + "HG_Trigger() failed (%s)", HG_Error_to_string(ret)); + + if (hg_atomic_get32(&forward_cb_args.done)) + break; + + ret = HG_Progress(context, 0); + } while (ret == HG_SUCCESS || ret == HG_TIMEOUT); + HG_TEST_CHECK_ERROR_NORET(ret != HG_SUCCESS && ret != HG_TIMEOUT, error, + "HG_Progress() failed (%s)", HG_Error_to_string(ret)); + + ret = forward_cb_args.ret; + HG_TEST_CHECK_HG_ERROR( + error, ret, "Error in HG callback (%s)", HG_Error_to_string(ret)); + + return HG_SUCCESS; + +error: + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_no_req_cb(const struct hg_cb_info *callback_info) +{ + hg_handle_t handle = callback_info->info.forward.handle; + struct forward_no_req_cb_args *args = + (struct forward_no_req_cb_args *) callback_info->arg; + int rpc_open_ret; + int rpc_open_event_id; + rpc_open_out_t rpc_open_out_struct; + hg_return_t ret = callback_info->ret; + + HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", + HG_Error_to_string(callback_info->ret)); + + /* Get output */ + ret = HG_Get_output(handle, &rpc_open_out_struct); + HG_TEST_CHECK_HG_ERROR( + done, ret, "HG_Get_output() failed (%s)", HG_Error_to_string(ret)); + + /* Get output parameters */ + rpc_open_ret = rpc_open_out_struct.ret; + rpc_open_event_id = rpc_open_out_struct.event_id; + HG_TEST_LOG_DEBUG("rpc_open returned: %d with event_id: %d", rpc_open_ret, + rpc_open_event_id); + (void) rpc_open_ret; + HG_TEST_CHECK_ERROR(rpc_open_event_id != (int) args->rpc_handle->cookie, + free, ret, HG_FAULT, "Cookie did not match RPC response"); + +free: + if (ret != HG_SUCCESS) + (void) HG_Free_output(handle, &rpc_open_out_struct); + else { + /* Free output */ + ret = HG_Free_output(handle, &rpc_open_out_struct); + HG_TEST_CHECK_HG_ERROR( + done, ret, "HG_Free_output() failed (%s)", HG_Error_to_string(ret)); + } + +done: + args->ret = ret; + hg_atomic_set32(&args->done, 1); + + return HG_SUCCESS; +} + +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_no_req_create_cb(const struct hg_cb_info *callback_info) +{ + hg_handle_t handle = callback_info->info.forward.handle; + struct forward_no_req_cb_args *args = + (struct forward_no_req_cb_args *) callback_info->arg; + int rpc_open_ret; + int rpc_open_event_id; + rpc_open_out_t rpc_open_out_struct; + hg_return_t ret = callback_info->ret; + + HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", + HG_Error_to_string(callback_info->ret)); + + /* Get output */ + ret = HG_Get_output(handle, &rpc_open_out_struct); + HG_TEST_CHECK_HG_ERROR( + done, ret, "HG_Get_output() failed (%s)", HG_Error_to_string(ret)); + + /* Get output parameters */ + rpc_open_ret = rpc_open_out_struct.ret; + rpc_open_event_id = rpc_open_out_struct.event_id; + HG_TEST_LOG_DEBUG("rpc_open returned: %d with event_id: %d", rpc_open_ret, + rpc_open_event_id); + (void) rpc_open_ret; + HG_TEST_CHECK_ERROR(rpc_open_event_id != (int) args->rpc_handle->cookie, + free, ret, HG_FAULT, "Cookie did not match RPC response"); + +free: + if (ret != HG_SUCCESS) + (void) HG_Free_output(handle, &rpc_open_out_struct); + else { + /* Free output */ + ret = HG_Free_output(handle, &rpc_open_out_struct); + HG_TEST_CHECK_HG_ERROR( + done, ret, "HG_Free_output() failed (%s)", HG_Error_to_string(ret)); + } + + HG_Destroy(handle); + +done: + args->ret = ret; + hg_atomic_set32(&args->done, 1); + + return HG_SUCCESS; +} + /*---------------------------------------------------------------------------*/ int main(int argc, char *argv[]) @@ -493,12 +869,30 @@ main(int argc, char *argv[]) struct hg_unit_info info; hg_return_t hg_ret; hg_id_t inv_id; + hg_handle_t handle; /* Initialize the interface */ hg_ret = hg_unit_init(argc, argv, false, &info); HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_unit_init() failed (%s)", HG_Error_to_string(hg_ret)); + /* RPC test with unregistered ID */ + inv_id = MERCURY_REGISTER(info.hg_class, "unreg_id", void, void, NULL); + HG_TEST_CHECK_ERROR_NORET(inv_id == 0, error, "HG_Register() failed"); + hg_ret = HG_Deregister(info.hg_class, inv_id); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, "HG_Deregister() failed (%s)", + HG_Error_to_string(hg_ret)); + + HG_TEST("RPC with unregistered ID"); + HG_Test_log_disable(); // Expected to produce errors + hg_ret = hg_test_rpc_input(info.handles[0], info.target_addr, inv_id, + hg_test_rpc_output_cb, info.request); + HG_Test_log_enable(); + HG_TEST_CHECK_ERROR_NORET(hg_ret != HG_NOENTRY, error, + "hg_test_rpc_input() failed (%s, expected %s)", + HG_Error_to_string(hg_ret), HG_Error_to_string(HG_NOENTRY)); + HG_PASSED(); + /* NULL RPC test */ HG_TEST("NULL RPC"); hg_ret = hg_test_rpc_no_input(info.handles[0], info.target_addr, @@ -560,29 +954,21 @@ main(int argc, char *argv[]) /* RPC test with no response */ HG_TEST("RPC without response"); - hg_ret = hg_test_rpc_input(info.handles[0], info.target_addr, + if (info.hg_test_info.na_test_info.self_send) { + hg_ret = HG_Create(info.context, info.target_addr, + hg_test_rpc_open_id_no_resp_g, &handle); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, "HG_Create() failed (%s)", + HG_Error_to_string(hg_ret)); + } else + handle = info.handles[0]; + hg_ret = hg_test_rpc_input(handle, info.target_addr, hg_test_rpc_open_id_no_resp_g, hg_test_rpc_no_output_cb, info.request); + if (info.hg_test_info.na_test_info.self_send) + HG_Destroy(handle); HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_input() failed (%s)", HG_Error_to_string(hg_ret)); HG_PASSED(); - /* RPC test with unregistered ID */ - inv_id = MERCURY_REGISTER(info.hg_class, "unreg_id", void, void, NULL); - HG_TEST_CHECK_ERROR_NORET(inv_id == 0, error, "HG_Register() failed"); - hg_ret = HG_Deregister(info.hg_class, inv_id); - HG_TEST_CHECK_HG_ERROR(error, hg_ret, "HG_Deregister() failed (%s)", - HG_Error_to_string(hg_ret)); - - HG_TEST("RPC with unregistered ID"); - HG_Test_log_disable(); // Expected to produce errors - hg_ret = hg_test_rpc_input(info.handles[0], info.target_addr, inv_id, - hg_test_rpc_output_cb, info.request); - HG_Test_log_enable(); - HG_TEST_CHECK_ERROR_NORET(hg_ret != HG_NOENTRY, error, - "hg_test_rpc_input() failed (%s, expected %s)", - HG_Error_to_string(hg_ret), HG_Error_to_string(HG_NOENTRY)); - HG_PASSED(); - if (!info.hg_test_info.na_test_info.self_send) { /* RPC test with invalid ID (not registered on server) */ inv_id = MERCURY_REGISTER(info.hg_class, "inv_id", void, void, NULL); @@ -616,15 +1002,37 @@ main(int argc, char *argv[]) HG_PASSED(); } - /* RPC test with multiple handle in flight */ - HG_TEST("concurrent RPCs"); + /* RPC test with multiple handles in flight */ + HG_TEST("multi RPCs"); hg_ret = hg_test_rpc_multi(info.handles, info.handle_max, info.target_addr, 0, hg_test_rpc_open_id_g, hg_test_rpc_multi_cb, info.request); HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_multiple() failed (%s)", HG_Error_to_string(hg_ret)); HG_PASSED(); - /* RPC test with multiple handle to multiple target contexts */ + /* RPC test with multiple handles in flight from multiple threads */ + HG_TEST("concurrent multi RPCs"); + hg_ret = hg_test_rpc_launch_threads(&info, hg_test_rpc_multi_thread); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, + "hg_test_rpc_launch_threads() failed (%s)", HG_Error_to_string(hg_ret)); + HG_PASSED(); + + /* RPC test from multiple threads with concurrent progress */ + HG_TEST("concurrent progress"); + hg_ret = hg_test_rpc_launch_threads(&info, hg_test_rpc_multi_progress); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, + "hg_test_rpc_launch_threads() failed (%s)", HG_Error_to_string(hg_ret)); + HG_PASSED(); + + /* RPC test from multiple threads with concurrent progress */ + HG_TEST("concurrent progress w/create"); + hg_ret = + hg_test_rpc_launch_threads(&info, hg_test_rpc_multi_progress_create); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, + "hg_test_rpc_launch_threads() failed (%s)", HG_Error_to_string(hg_ret)); + HG_PASSED(); + + /* RPC test with multiple handles to multiple target contexts */ if (info.hg_test_info.na_test_info.max_contexts) { hg_uint8_t i, context_count = info.hg_test_info.na_test_info.max_contexts; From d6ee48f1de51e89b136715938bcc4778810a002a Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Mon, 16 Oct 2023 12:08:09 -0500 Subject: [PATCH 2/4] HG/HG Core: store disabled response info in HG Core proc info Add HG_Core_registered_disable(d)_response() routines HG Core: refactor and optimize self RPC code path Add additional logging of refcount/expected op count Fixes for self RPCs with no response --- src/mercury.c | 41 +- src/mercury_core.c | 934 ++++++++++++++++++++++++++------------------- src/mercury_core.h | 37 +- 3 files changed, 580 insertions(+), 432 deletions(-) diff --git a/src/mercury.c b/src/mercury.c index feba0262..41a4b107 100644 --- a/src/mercury.c +++ b/src/mercury.c @@ -57,7 +57,6 @@ struct hg_proc_info { hg_proc_cb_t out_proc_cb; /* Output proc callback */ void *data; /* User data */ void (*free_callback)(void *); /* User data free callback */ - hg_bool_t no_response; /* RPC response not expected */ }; /* HG handle */ @@ -480,11 +479,6 @@ hg_get_struct(struct hg_private_handle *hg_handle, extra_buf_size = hg_handle->in_extra_buf_size; break; case HG_OUTPUT: - /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_proc_info->no_response, error, ret, - HG_OPNOTSUPPORTED, - "No output was produced on that RPC (no response)"); - /* Use custom header offset */ header_offset += hg_handle->handle.info.hg_class->out_offset; /* Set output proc */ @@ -612,11 +606,6 @@ hg_set_struct(struct hg_private_handle *hg_handle, extra_bulk = &hg_handle->in_extra_bulk; break; case HG_OUTPUT: - /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_proc_info->no_response, error, ret, - HG_OPNOTSUPPORTED, - "No output was produced on that RPC (no response)"); - /* Use custom header offset */ header_offset += hg_handle->handle.info.hg_class->out_offset; /* Set output proc */ @@ -1520,21 +1509,13 @@ hg_return_t HG_Registered_disable_response( hg_class_t *hg_class, hg_id_t id, hg_bool_t disable) { - struct hg_proc_info *hg_proc_info = NULL; hg_return_t ret; HG_CHECK_SUBSYS_ERROR( cls, hg_class == NULL, error, ret, HG_INVALID_ARG, "NULL HG class"); - /* Retrieve proc function from function map */ - hg_proc_info = (struct hg_proc_info *) HG_Core_registered_data( - hg_class->core_class, id); - HG_CHECK_SUBSYS_ERROR(cls, hg_proc_info == NULL, error, ret, HG_NOENTRY, - "Could not get registered data for RPC ID %" PRIu64, id); - - hg_proc_info->no_response = disable; - - return HG_SUCCESS; + return HG_Core_registered_disable_response( + hg_class->core_class, id, disable); error: return ret; @@ -1545,23 +1526,13 @@ hg_return_t HG_Registered_disabled_response( hg_class_t *hg_class, hg_id_t id, hg_bool_t *disabled_p) { - struct hg_proc_info *hg_proc_info = NULL; hg_return_t ret; HG_CHECK_SUBSYS_ERROR( cls, hg_class == NULL, error, ret, HG_INVALID_ARG, "NULL HG class"); - HG_CHECK_SUBSYS_ERROR(cls, disabled_p == NULL, error, ret, HG_INVALID_ARG, - "NULL pointer to disabled flag"); - - /* Retrieve proc function from function map */ - hg_proc_info = (struct hg_proc_info *) HG_Core_registered_data( - hg_class->core_class, id); - HG_CHECK_SUBSYS_ERROR(cls, hg_proc_info == NULL, error, ret, HG_NOENTRY, - "Could not get registered data for RPC ID %" PRIu64, id); - *disabled_p = hg_proc_info->no_response; - - return HG_SUCCESS; + return HG_Core_registered_disabled_response( + hg_class->core_class, id, disabled_p); error: return ret; @@ -2103,10 +2074,6 @@ HG_Forward(hg_handle_t handle, hg_cb_t callback, void *arg, void *in_struct) if (more_data) flags |= HG_CORE_MORE_DATA; - /* Set no response flag if no response required */ - if (hg_proc_info->no_response) - flags |= HG_CORE_NO_RESPONSE; - /* Send request */ ret = HG_Core_forward( handle->core_handle, hg_core_forward_cb, handle, flags, payload_size); diff --git a/src/mercury_core.c b/src/mercury_core.c index c31db7a8..42473e20 100644 --- a/src/mercury_core.c +++ b/src/mercury_core.c @@ -36,7 +36,8 @@ /****************/ /* Private flags */ -#define HG_CORE_SELF_FORWARD (1 << 3) /* Forward to self */ +#define HG_CORE_NO_RESPONSE (1 << 1) /* No response required */ +#define HG_CORE_SELF_FORWARD (1 << 2) /* Forward to self */ /* Size of comletion queue used for holding completed requests */ #define HG_CORE_ATOMIC_QUEUE_SIZE (1024) @@ -189,13 +190,13 @@ struct hg_core_private_class { }; /* Poll type */ -typedef enum hg_core_poll_type { +enum hg_core_poll_type { HG_CORE_POLL_LOOPBACK = 1, #ifdef NA_HAS_SM HG_CORE_POLL_SM, #endif HG_CORE_POLL_NA -} hg_core_poll_type_t; +}; /* Completion queue */ struct hg_core_completion_queue { @@ -288,14 +289,6 @@ struct hg_core_private_context { hg_bool_t posted; /* Posted receives on context */ }; -/* Info for wrapping callbacks if self addr */ -struct hg_core_self_cb_info { - hg_core_cb_t forward_cb; - void *forward_arg; - hg_core_cb_t respond_cb; - void *respond_arg; -}; - /* HG addr */ struct hg_core_private_addr { struct hg_core_addr core_addr; /* Must remain as first field */ @@ -308,23 +301,22 @@ struct hg_core_private_addr { }; /* HG core op type */ -typedef enum { - HG_CORE_FORWARD, /*!< Forward completion */ - HG_CORE_RESPOND, /*!< Respond completion */ - HG_CORE_NO_RESPOND, /*!< No response completion */ - HG_CORE_FORWARD_SELF, /*!< Self forward completion */ - HG_CORE_RESPOND_SELF, /*!< Self respond completion */ - HG_CORE_PROCESS /*!< Process completion */ -} hg_core_op_type_t; +#define HG_CORE_OP_TYPE \ + X(HG_CORE_FORWARD) /*!< Forward completion */ \ + X(HG_CORE_RESPOND) /*!< Respond completion */ \ + X(HG_CORE_PROCESS) /*!< Process completion */ + +#define X(a) a, +enum hg_core_op_type { HG_CORE_OP_TYPE }; +#undef X /* HG core operations */ struct hg_core_ops { hg_return_t (*forward)( struct hg_core_private_handle *hg_core_handle); /* forward */ - hg_return_t (*respond)( - struct hg_core_private_handle *hg_core_handle); /* respond */ - hg_return_t (*no_respond)( - struct hg_core_private_handle *hg_core_handle); /* no_respond */ + hg_return_t (*respond)(struct hg_core_private_handle *hg_core_handle, + hg_return_t ret_code); /* respond */ + void (*trigger)(struct hg_core_private_handle *hg_core_handle); /* trigger*/ }; /* HG core handle */ @@ -361,13 +353,12 @@ struct hg_core_private_handle { hg_atomic_int32_t ret_status; /* Handle return status */ hg_atomic_int32_t op_completed_count; /* Completed operation count */ hg_atomic_int32_t - op_expected_count; /* Expected operation count for completion */ - hg_core_op_type_t op_type; /* Core operation type */ - hg_return_t ret; /* Return code associated to handle */ - hg_uint8_t cookie; /* Cookie */ - hg_bool_t reuse; /* Re-use handle once ref_count is 0 */ - hg_bool_t is_self; /* Self processed */ - hg_bool_t no_response; /* Require response or not */ + op_expected_count; /* Expected operation count for completion */ + hg_atomic_int32_t flags; /* Flags */ + enum hg_core_op_type op_type; /* Core operation type */ + hg_return_t ret; /* Return code associated to handle */ + hg_uint8_t cookie; /* Cookie */ + hg_bool_t reuse; /* Re-use handle once ref_count is 0 */ }; /* HG op id */ @@ -390,6 +381,14 @@ struct hg_core_op_id { /* Local Prototypes */ /********************/ +/** + * Convert op_type to string. + */ +#ifdef HG_HAS_DEBUG +static const char * +hg_core_op_type_to_string(enum hg_core_op_type op_type); +#endif + /** * Init counters. */ @@ -784,25 +783,21 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, * Send response locally. */ static HG_INLINE hg_return_t -hg_core_respond_self(struct hg_core_private_handle *hg_core_handle); +hg_core_respond_self( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code); /** * Do not send response locally. */ -static HG_INLINE hg_return_t +static HG_INLINE void hg_core_no_respond_self(struct hg_core_private_handle *hg_core_handle); /** * Send response through NA. */ static hg_return_t -hg_core_respond_na(struct hg_core_private_handle *hg_core_handle); - -/** - * Do not send response through NA. - */ -static HG_INLINE hg_return_t -hg_core_no_respond_na(struct hg_core_private_handle *hg_core_handle); +hg_core_respond_na( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code); /** * Send input callback. @@ -865,18 +860,6 @@ hg_core_send_ack(hg_core_handle_t handle, hg_return_t ret); static HG_INLINE void hg_core_ack_cb(const struct na_cb_info *callback_info); -/** - * Wrapper for local callback execution. - */ -static hg_return_t -hg_core_self_cb(const struct hg_core_cb_info *callback_info); - -/** - * Process handle (used for self execution). - */ -static hg_return_t -hg_core_process_self(struct hg_core_private_handle *hg_core_handle); - /** * Process handle. */ @@ -948,15 +931,51 @@ hg_core_trigger(struct hg_core_private_context *context, /** * Trigger callback from HG lookup op ID. */ -static hg_return_t +static HG_INLINE void hg_core_trigger_lookup_entry(struct hg_core_op_id *hg_core_op_id); /** * Trigger callback from HG core handle. */ -static hg_return_t +static HG_INLINE void hg_core_trigger_entry(struct hg_core_private_handle *hg_core_handle); +/** + * Trigger callback from self HG core handle. + */ +static HG_INLINE void +hg_core_trigger_self(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger callback from HG core handle. + */ +static HG_INLINE void +hg_core_trigger_na(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger RPC handler callback from HG core handle. + */ +static void +hg_core_trigger_process(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger forward callback. + */ +static HG_INLINE void +hg_core_trigger_forward_cb(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger respond callback. + */ +static HG_INLINE void +hg_core_trigger_respond_cb(struct hg_core_private_handle *hg_core_handle); + +/** + * Wrapper for local callback execution. + */ +static void +hg_core_trigger_self_respond_cb(struct hg_core_private_handle *hg_core_handle); + /** * Cancel handle. */ @@ -967,16 +986,23 @@ hg_core_cancel(struct hg_core_private_handle *hg_core_handle); /* Local Variables */ /*******************/ +/* Callback type string table */ +#define X(a) #a, +#ifdef HG_HAS_DEBUG +static const char *const hg_core_op_type_name_g[] = {HG_CORE_OP_TYPE}; +#endif +#undef X + /* Default ops */ static const struct hg_core_ops hg_core_ops_na_g = { .forward = hg_core_forward_na, .respond = hg_core_respond_na, - .no_respond = hg_core_no_respond_na}; + .trigger = hg_core_trigger_na}; static const struct hg_core_ops hg_core_ops_self_g = { .forward = hg_core_forward_self, .respond = hg_core_respond_self, - .no_respond = hg_core_no_respond_self}; + .trigger = hg_core_trigger_self}; /* Default log outlets */ #ifdef _WIN32 @@ -1017,6 +1043,15 @@ static HG_LOG_DEBUG_DECL_DLOG(diag) = HG_LOG_DLOG_INITIALIZER( static HG_LOG_SUBSYS_DLOG_DECL_REGISTER(diag, HG_CORE_SUBSYS_NAME); #endif +/*---------------------------------------------------------------------------*/ +#ifdef HG_HAS_DEBUG +static const char * +hg_core_op_type_to_string(enum hg_core_op_type op_type) +{ + return hg_core_op_type_name_g[op_type]; +} +#endif + /*---------------------------------------------------------------------------*/ #if defined(HG_HAS_DEBUG) && !defined(_WIN32) static void @@ -3242,7 +3277,7 @@ hg_core_create(struct hg_core_private_context *context, na_class_t *na_class, return HG_SUCCESS; error: - hg_core_destroy(hg_core_handle); + (void) hg_core_destroy(hg_core_handle); return ret; } @@ -3251,38 +3286,32 @@ hg_core_create(struct hg_core_private_context *context, na_class_t *na_class, static hg_return_t hg_core_destroy(struct hg_core_private_handle *hg_core_handle) { - int32_t ref_count; + int32_t ref_count, flags, no_response_done = 0; hg_return_t ret; if (hg_core_handle == NULL) return HG_SUCCESS; - /* This will push the RPC handle back to completion queue when no - * response is sent and we are sending to ourselves. This ensures that - * there is no race between the callback execution and the RPC - * completion. */ - if (hg_core_handle->is_self && hg_core_handle->no_response && - hg_atomic_get32(&hg_core_handle->no_response_done) > 0 && - hg_atomic_cas32(&hg_core_handle->no_response_done, - hg_atomic_get32(&hg_core_handle->ref_count), 0)) { - /* Safe as the decremented refcount will always be > 0 */ - ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); - HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, - (void *) hg_core_handle, ref_count); - - ret = hg_core_handle->ops.no_respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not complete handle"); - - return HG_SUCCESS; - } + /* Retrieve flags before decrementing refcount */ + flags = hg_atomic_get32(&hg_core_handle->flags); + if ((flags & HG_CORE_SELF_FORWARD) && (flags & HG_CORE_NO_RESPONSE)) + no_response_done = hg_atomic_get32(&hg_core_handle->no_response_done); /* Standard destroy refcount decrement */ ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, (void *) hg_core_handle, ref_count); - if (ref_count > 0) + if (ref_count > 0) { + /* This will push the RPC handle back to completion queue when no + * response is sent and we are sending to ourselves. This ensures that + * there is no race between the callback execution and the RPC + * completion. */ + if (ref_count == no_response_done) + hg_core_no_respond_self(hg_core_handle); + return HG_SUCCESS; /* Cannot free yet */ + } /* Re-use handle if we were listening, otherwise destroy it */ if (hg_core_handle->reuse && @@ -3484,6 +3513,8 @@ hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle, hg_atomic_init32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_init32(&hg_core_handle->op_completed_count, 0); return HG_SUCCESS; @@ -3562,8 +3593,9 @@ hg_core_reset(struct hg_core_private_handle *hg_core_handle) hg_core_handle->out_buf_used = 0; hg_atomic_init32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_init32(&hg_core_handle->op_completed_count, 0); - hg_core_handle->no_response = HG_FALSE; /* Free extra data here if needed */ if (hg_core_class->more_data_cb.release) @@ -3676,10 +3708,14 @@ hg_core_set_rpc(struct hg_core_private_handle *hg_core_handle, hg_core_handle->na_addr = na_addr; /* Set forward call depending on address self */ - hg_core_handle->is_self = hg_core_class->init_info.loopback && - hg_core_addr->core_addr.is_self; - hg_core_handle->ops = - (hg_core_handle->is_self) ? hg_core_ops_self_g : hg_core_ops_na_g; + if (hg_core_class->init_info.loopback && + hg_core_addr->core_addr.is_self) { + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_SELF_FORWARD); + hg_core_handle->ops = hg_core_ops_self_g; + } else { + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_SELF_FORWARD); + hg_core_handle->ops = hg_core_ops_na_g; + } } /* We also allow for NULL RPC id to be passed (same reason as above) */ @@ -3695,7 +3731,13 @@ hg_core_set_rpc(struct hg_core_private_handle *hg_core_handle, /* Cache RPC info */ hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; + if (hg_core_rpc_info->no_response) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_NO_RESPONSE); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_NO_RESPONSE); } + HG_LOG_SUBSYS_DEBUG(rpc, "Handle (%p) flags set to 0x%x", + (void *) hg_core_handle, hg_atomic_get32(&hg_core_handle->flags)); return HG_SUCCESS; @@ -3831,6 +3873,8 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, /* Reset op counts */ hg_atomic_set32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_set32(&hg_core_handle->op_completed_count, 0); /* Reset handle ret */ @@ -3851,31 +3895,16 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, error, ret, HG_MSGSIZE, "Exceeding input buffer size"); /* Parse flags */ - if (flags & HG_CORE_NO_RESPONSE) - hg_core_handle->no_response = HG_TRUE; - if (hg_core_handle->is_self) - flags |= HG_CORE_SELF_FORWARD; + if (flags & HG_CORE_MORE_DATA) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_MORE_DATA); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_MORE_DATA); /* Set callback, keep request and response callbacks separate so that * they do not get overwritten when forwarding to ourself */ hg_core_handle->request_callback = callback; hg_core_handle->request_arg = arg; - /* Set header */ - hg_core_handle->in_header.msg.request.id = - hg_core_handle->core_handle.info.id; - hg_core_handle->in_header.msg.request.flags = flags; - /* Set the cookie as origin context ID, so that when the cookie is - * unpacked by the target and assigned to HG info context_id, the NA - * layer knows which context ID it needs to send the response to. */ - hg_core_handle->in_header.msg.request.cookie = - hg_core_handle->core_handle.info.context->id; - - /* Encode request header */ - ret = hg_core_proc_header_request( - &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_ENCODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); - #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ hg_atomic_incr64( @@ -3895,7 +3924,9 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, hg_atomic_set32(&hg_core_handle->status, HG_CORE_OP_COMPLETED); /* Rollback ref_count taken above */ - hg_atomic_decr32(&hg_core_handle->ref_count); + ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, + (void *) hg_core_handle, ref_count); return ret; } @@ -3904,11 +3935,31 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, static hg_return_t hg_core_forward_self(struct hg_core_private_handle *hg_core_handle) { + hg_return_t ret; + + /* Save ref_count, when sending to self and no response is being sent, + * we can only use that refcount to determine that the RPC callback has + * been fully executed. */ + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE) + hg_atomic_set32(&hg_core_handle->no_response_done, + hg_atomic_get32(&hg_core_handle->ref_count)); + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; + hg_core_handle->op_type = HG_CORE_PROCESS; - /* Post operation to self processing pool */ - return hg_core_process_self(hg_core_handle); + /* Process input */ + ret = hg_core_process_input(hg_core_handle); + if (ret != HG_SUCCESS) { + HG_LOG_SUBSYS_ERROR(rpc, "Could not process input"); + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); + } + + /* Mark as completed */ + hg_core_complete_op(hg_core_handle); + + /* Always handle error from callback when forwarding to self, implies + * submission is always successful. */ + return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ @@ -3921,12 +3972,30 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) /* Set operation type for trigger */ hg_core_handle->op_type = HG_CORE_FORWARD; + /* Set header */ + hg_core_handle->in_header.msg.request.id = + hg_core_handle->core_handle.info.id; + hg_core_handle->in_header.msg.request.flags = + (hg_uint8_t) (hg_atomic_get32(&hg_core_handle->flags) & 0xff); + /* Set the cookie as origin context ID, so that when the cookie is + * unpacked by the target and assigned to HG info context_id, the NA + * layer knows which context ID it needs to send the response to. */ + hg_core_handle->in_header.msg.request.cookie = + hg_core_handle->core_handle.info.context->id; + + /* Encode request header */ + ret = hg_core_proc_header_request( + &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_ENCODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); + /* Generate tag */ hg_core_handle->tag = hg_core_gen_request_tag(HG_CORE_HANDLE_CLASS(hg_core_handle)); /* Pre-post recv (output) if response is expected */ - if (!hg_core_handle->no_response) { + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE)) { + int32_t HG_DEBUG_LOG_USED expected_count; + na_ret = NA_Msg_recv_expected(hg_core_handle->na_class, hg_core_handle->na_context, hg_core_recv_output_cb, hg_core_handle, hg_core_handle->core_handle.out_buf, @@ -3934,12 +4003,15 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) hg_core_handle->out_buf_plugin_data, hg_core_handle->na_addr, hg_core_handle->core_handle.info.context_id, hg_core_handle->tag, hg_core_handle->na_recv_op_id); - HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error_recv, ret, + HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error, ret, (hg_return_t) na_ret, "Could not post recv for output buffer (%s)", NA_Error_to_string(na_ret)); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); } /* Mark handle as posted */ @@ -3958,18 +4030,22 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) return HG_SUCCESS; -error_recv: +error: return ret; error_send: hg_atomic_and32(&hg_core_handle->status, ~HG_CORE_OP_POSTED); hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_ERRORED); - if (hg_core_handle->no_response) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE) { /* No recv was posted */ return ret; } else { - hg_atomic_decr32(&hg_core_handle->op_expected_count); + int32_t HG_DEBUG_LOG_USED expected_count = + hg_atomic_decr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count decr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Keep error for return status */ hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); @@ -3994,12 +4070,14 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_core_cb_t callback, void *arg, hg_uint8_t flags, hg_size_t payload_size, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED ref_count; hg_size_t header_size; hg_return_t ret = HG_SUCCESS; /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->no_response, done, ret, - HG_OPNOTSUPPORTED, "Sending response was disabled on that RPC"); + HG_CHECK_SUBSYS_ERROR(rpc, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE, done, + ret, HG_OPNOTSUPPORTED, "Sending response was disabled on that RPC"); /* Reset handle ret */ hg_core_handle->ret = HG_SUCCESS; @@ -4018,32 +4096,28 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_core_handle->out_buf_used > hg_core_handle->core_handle.out_buf_size, error, ret, HG_MSGSIZE, "Exceeding output buffer size"); + /* Parse flags */ + if (flags & HG_CORE_MORE_DATA) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_MORE_DATA); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_MORE_DATA); + /* Set callback, keep request and response callbacks separate so that * they do not get overwritten when forwarding to ourself */ hg_core_handle->response_callback = callback; hg_core_handle->response_arg = arg; - /* Set header */ - hg_core_handle->out_header.msg.response.ret_code = (hg_int8_t) ret_code; - hg_core_handle->out_header.msg.response.flags = flags; - hg_core_handle->out_header.msg.response.cookie = hg_core_handle->cookie; - - /* Encode response header */ - ret = hg_core_proc_header_response( - &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_ENCODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); - - /* If addr is self, forward locally, otherwise send the encoded buffer - * through NA and pre-post response */ - ret = hg_core_handle->ops.respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not respond"); - #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ hg_atomic_incr64( HG_CORE_HANDLE_CLASS(hg_core_handle)->counters.rpc_resp_sent_count); #endif + /* If addr is self, forward locally, otherwise send the encoded buffer + * through NA and pre-post response */ + ret = hg_core_handle->ops.respond(hg_core_handle, ret_code); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not respond"); + done: return ret; @@ -4052,20 +4126,30 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_COMPLETED); /* Decrement refcount on handle */ - hg_atomic_decr32(&hg_core_handle->ref_count); + ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, + (void *) hg_core_handle, ref_count); return ret; } /*---------------------------------------------------------------------------*/ static HG_INLINE hg_return_t -hg_core_respond_self(struct hg_core_private_handle *hg_core_handle) +hg_core_respond_self( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED expected_count; + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_RESPOND_SELF; + hg_core_handle->op_type = HG_CORE_RESPOND; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Pass return code */ + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret_code); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Complete and add to completion queue */ hg_core_complete_op(hg_core_handle); @@ -4074,37 +4158,57 @@ hg_core_respond_self(struct hg_core_private_handle *hg_core_handle) } /*---------------------------------------------------------------------------*/ -static HG_INLINE hg_return_t +static HG_INLINE void hg_core_no_respond_self(struct hg_core_private_handle *hg_core_handle) { + int32_t HG_DEBUG_LOG_USED expected_count; + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; + hg_core_handle->op_type = HG_CORE_FORWARD; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Reset saved refcount */ + hg_atomic_set32(&hg_core_handle->no_response_done, 0); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Complete and add to completion queue */ hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ static hg_return_t -hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) +hg_core_respond_na( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED expected_count; hg_return_t ret; na_return_t na_ret; hg_bool_t ack_recv_posted = HG_FALSE; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Set header */ + hg_core_handle->out_header.msg.response.ret_code = (hg_int8_t) ret_code; + hg_core_handle->out_header.msg.response.flags = + (hg_uint8_t) (hg_atomic_get32(&hg_core_handle->flags) & 0xff); + hg_core_handle->out_header.msg.response.cookie = hg_core_handle->cookie; + + /* Encode response header */ + ret = hg_core_proc_header_response( + &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_ENCODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Set operation type for trigger */ hg_core_handle->op_type = HG_CORE_RESPOND; /* More data on output requires an ack once it is processed */ - if (hg_core_handle->out_header.msg.response.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { size_t buf_size = hg_core_handle->core_handle.na_out_header_offset + sizeof(hg_uint8_t); @@ -4126,8 +4230,11 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) NA_Error_to_string(na_ret)); } - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Pre-post recv (ack) if more data is expected */ na_ret = NA_Msg_recv_expected(hg_core_handle->na_class, @@ -4164,7 +4271,10 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_ERRORED); if (ack_recv_posted) { - hg_atomic_decr32(&hg_core_handle->op_expected_count); + expected_count = hg_atomic_decr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count decr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Keep error for return status */ hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); @@ -4190,21 +4300,6 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) return ret; } -/*---------------------------------------------------------------------------*/ -static HG_INLINE hg_return_t -hg_core_no_respond_na(struct hg_core_private_handle *hg_core_handle) -{ - /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_NO_RESPOND; - - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); - - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - /*---------------------------------------------------------------------------*/ static HG_INLINE void hg_core_send_input_cb(const struct na_cb_info *callback_info) @@ -4235,7 +4330,8 @@ hg_core_send_input_cb(const struct na_cb_info *callback_info) HG_LOG_SUBSYS_ERROR(rpc, "NA callback returned error (%s)", NA_Error_to_string(callback_info->ret)); - if (!(status & HG_CORE_OP_CANCELED) && !hg_core_handle->no_response) { + if (!(status & HG_CORE_OP_CANCELED) && + !(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE)) { na_return_t na_ret; hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_CANCELED); @@ -4459,31 +4555,37 @@ hg_core_process_input(struct hg_core_private_handle *hg_core_handle) hg_atomic_incr64(hg_core_class->counters.rpc_req_recv_count); #endif - /* Get and verify input header */ - ret = hg_core_proc_header_request( - &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_DECODE); - HG_CHECK_SUBSYS_HG_ERROR( - rpc, error, ret, "Could not decode request header"); - - /* Get operation ID from header */ - hg_core_handle->core_handle.info.id = - hg_core_handle->in_header.msg.request.id; - hg_core_handle->cookie = hg_core_handle->in_header.msg.request.cookie; - /* TODO assign target ID from cookie directly for now */ - hg_core_handle->core_handle.info.context_id = hg_core_handle->cookie; - - /* Parse flags */ - hg_core_handle->no_response = - hg_core_handle->in_header.msg.request.flags & HG_CORE_NO_RESPONSE; + /* We can skip RPC headers etc if we are sending to ourselves */ + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD)) { + /* Get and verify input header */ + ret = hg_core_proc_header_request(&hg_core_handle->core_handle, + &hg_core_handle->in_header, HG_DECODE); + HG_CHECK_SUBSYS_HG_ERROR( + rpc, error, ret, "Could not decode request header"); + + /* Get operation ID from header */ + hg_core_handle->core_handle.info.id = + hg_core_handle->in_header.msg.request.id; + hg_core_handle->cookie = hg_core_handle->in_header.msg.request.cookie; + /* TODO assign target ID from cookie directly for now */ + hg_core_handle->core_handle.info.context_id = hg_core_handle->cookie; + + /* Parse flags */ + hg_atomic_set32(&hg_core_handle->flags, + hg_core_handle->in_header.msg.request.flags); + } HG_LOG_SUBSYS_DEBUG(rpc, "Processed input for handle %p, ID=%" PRIu64 ", cookie=%" PRIu8 ", no_response=%d", (void *) hg_core_handle, hg_core_handle->core_handle.info.id, - hg_core_handle->cookie, hg_core_handle->no_response); + hg_core_handle->cookie, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE); /* Must let upper layer get extra payload if HG_CORE_MORE_DATA is set */ - if (hg_core_handle->in_header.msg.request.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { + int32_t HG_DEBUG_LOG_USED expected_count; + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_class->more_data_cb.acquire == NULL, error, ret, HG_OPNOTSUPPORTED, "No callback defined for acquiring more data"); @@ -4492,8 +4594,11 @@ hg_core_process_input(struct hg_core_private_handle *hg_core_handle) "Must recv extra input data payload for handle %p", (void *) hg_core_handle); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ @@ -4606,16 +4711,20 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, hg_atomic_incr64(hg_core_class->counters.rpc_resp_recv_count); #endif - /* Get and verify output header */ - ret = hg_core_proc_header_response( - &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_DECODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not decode header"); + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD)) { + /* Get and verify output header */ + ret = hg_core_proc_header_response(&hg_core_handle->core_handle, + &hg_core_handle->out_header, HG_DECODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not decode header"); - /* Get return code from header */ - hg_atomic_set32(&hg_core_handle->ret_status, - (int32_t) hg_core_handle->out_header.msg.response.ret_code); + /* Get return code from header */ + hg_atomic_set32(&hg_core_handle->ret_status, + (int32_t) hg_core_handle->out_header.msg.response.ret_code); - /* Parse flags */ + /* Parse flags */ + hg_atomic_set32(&hg_core_handle->flags, + hg_core_handle->out_header.msg.response.flags); + } HG_LOG_SUBSYS_DEBUG(rpc, "Processed output for handle %p, ID=%" PRIu64 ", ret=%" PRId32, @@ -4623,7 +4732,9 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, hg_atomic_get32(&hg_core_handle->ret_status)); /* Must let upper layer get extra payload if HG_CORE_MORE_DATA is set */ - if (hg_core_handle->out_header.msg.response.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { + int32_t HG_DEBUG_LOG_USED expected_count; + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_class->more_data_cb.acquire == NULL, error, ret, HG_OPNOTSUPPORTED, "No callback defined for acquiring more data"); @@ -4632,8 +4743,11 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, "Must recv extra output data payload for handle %p", (void *) hg_core_handle); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ @@ -4762,85 +4876,6 @@ hg_core_ack_cb(const struct na_cb_info *callback_info) hg_core_complete_op(hg_core_handle); } -/*---------------------------------------------------------------------------*/ -static hg_return_t -hg_core_self_cb(const struct hg_core_cb_info *callback_info) -{ - struct hg_core_private_handle *hg_core_handle = - (struct hg_core_private_handle *) callback_info->info.respond.handle; - int32_t HG_DEBUG_LOG_USED ref_count; - hg_return_t ret; - - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); - - /* First execute response callback */ - if (hg_core_handle->response_callback) { - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.ret = HG_SUCCESS; /* TODO report failure */ - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = (hg_core_handle_t) hg_core_handle; - - hg_core_handle->response_callback(&hg_core_cb_info); - } - - /* Assign forward callback back to handle */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; - - /* Increment refcount and push handle back to completion queue */ - ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); - HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, - (void *) hg_core_handle, ref_count); - - /* Process output */ - ret = hg_core_process_output(hg_core_handle, hg_core_more_data_complete); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not process output"); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; - -error: - hg_atomic_cas32( - &hg_core_handle->ret_status, (int32_t) HG_SUCCESS, (int32_t) ret); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - -/*---------------------------------------------------------------------------*/ -static hg_return_t -hg_core_process_self(struct hg_core_private_handle *hg_core_handle) -{ - hg_return_t ret; - - /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_PROCESS; - - /* Process input */ - ret = hg_core_process_input(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not process input"); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; - -error: - hg_atomic_cas32( - &hg_core_handle->ret_status, (int32_t) HG_SUCCESS, (int32_t) ret); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - /*---------------------------------------------------------------------------*/ static hg_return_t hg_core_process(struct hg_core_private_handle *hg_core_handle) @@ -4849,26 +4884,31 @@ hg_core_process(struct hg_core_private_handle *hg_core_handle) int32_t HG_DEBUG_LOG_USED ref_count; hg_return_t ret; - /* Retrieve exe function from function map */ - hg_core_rpc_info = - hg_core_map_lookup(&HG_CORE_HANDLE_CLASS(hg_core_handle)->rpc_map, - &hg_core_handle->core_handle.info.id); - if (hg_core_rpc_info == NULL) { - HG_LOG_SUBSYS_WARNING(rpc, - "Could not find RPC ID (%" PRIu64 ") in RPC map", - hg_core_handle->core_handle.info.id); - HG_GOTO_DONE(error, ret, HG_NOENTRY); + /* Already cached for self RPCs */ + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD) + hg_core_rpc_info = hg_core_handle->core_handle.rpc_info; + else { + /* Retrieve exe function from function map */ + hg_core_rpc_info = + hg_core_map_lookup(&HG_CORE_HANDLE_CLASS(hg_core_handle)->rpc_map, + &hg_core_handle->core_handle.info.id); + if (hg_core_rpc_info == NULL) { + HG_LOG_SUBSYS_WARNING(rpc, + "Could not find RPC ID (%" PRIu64 ") in RPC map", + hg_core_handle->core_handle.info.id); + HG_GOTO_DONE(error, ret, HG_NOENTRY); + } + // HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info == NULL, error, ret, + // HG_NOENTRY, + // "Could not find RPC ID (%" PRIu64 ") in RPC map", + // hg_core_handle->core_handle.info.id); + + /* Cache RPC info */ + hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; } - // HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info == NULL, error, ret, - // HG_NOENTRY, - // "Could not find RPC ID (%" PRIu64 ") in RPC map", - // hg_core_handle->core_handle.info.id); HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info->rpc_cb == NULL, error, ret, HG_INVALID_ARG, "No RPC callback registered"); - /* Cache RPC info */ - hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; - /* Increment ref count here so that a call to HG_Destroy in user's RPC * callback does not free the handle but only schedules its completion */ @@ -4896,7 +4936,7 @@ hg_core_complete_op(struct hg_core_private_handle *hg_core_handle) op_expected_count = hg_atomic_get32(&hg_core_handle->op_expected_count); - HG_LOG_SUBSYS_DEBUG(rpc, + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Completed %" PRId32 "/%" PRId32 " NA operations for handle (%p)", op_completed_count, op_expected_count, (void *) hg_core_handle); @@ -4924,7 +4964,8 @@ hg_core_complete(struct hg_core_private_handle *hg_core_handle, hg_return_t ret) (hg_core_handle_t) hg_core_handle; hg_core_completion_add(hg_core_handle->core_handle.info.context, - &hg_core_handle->hg_completion_entry, hg_core_handle->is_self); + &hg_core_handle->hg_completion_entry, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD); } /*---------------------------------------------------------------------------*/ @@ -5336,17 +5377,13 @@ hg_core_trigger(struct hg_core_private_context *context, /* Trigger entry */ switch (hg_completion_entry->op_type) { case HG_ADDR: - ret = hg_core_trigger_lookup_entry( + hg_core_trigger_lookup_entry( hg_completion_entry->op_id.hg_core_op_id); - HG_CHECK_SUBSYS_HG_ERROR( - poll, done, ret, "Could not trigger addr completion entry"); break; case HG_RPC: - ret = hg_core_trigger_entry( + hg_core_trigger_entry( (struct hg_core_private_handle *) hg_completion_entry->op_id.hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR( - poll, done, ret, "Could not trigger RPC completion entry"); break; case HG_BULK: ret = hg_bulk_trigger_entry( @@ -5371,125 +5408,183 @@ hg_core_trigger(struct hg_core_private_context *context, } /*---------------------------------------------------------------------------*/ -static hg_return_t +static HG_INLINE void hg_core_trigger_lookup_entry(struct hg_core_op_id *hg_core_op_id) { /* Execute callback */ if (hg_core_op_id->callback) { - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.arg = hg_core_op_id->arg; - hg_core_cb_info.ret = HG_SUCCESS; - hg_core_cb_info.type = HG_CB_LOOKUP; - hg_core_cb_info.info.lookup.addr = - (hg_core_addr_t) hg_core_op_id->info.lookup.hg_core_addr; + struct hg_core_cb_info hg_core_cb_info = {.arg = hg_core_op_id->arg, + .ret = HG_SUCCESS, + .type = HG_CB_LOOKUP, + .info.lookup.addr = + (hg_core_addr_t) hg_core_op_id->info.lookup.hg_core_addr}; hg_core_op_id->callback(&hg_core_cb_info); } /* NB. OK to free after callback execution, op ID is not re-used */ free(hg_core_op_id); - - return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ -static hg_return_t +static HG_INLINE void hg_core_trigger_entry(struct hg_core_private_handle *hg_core_handle) { - hg_return_t ret; - hg_atomic_and32(&hg_core_handle->status, ~HG_CORE_OP_QUEUED); - if (hg_core_handle->op_type == HG_CORE_PROCESS) { - int32_t HG_DEBUG_LOG_USED ref_count; + HG_LOG_SUBSYS_DEBUG(rpc, "Triggering callback type %s", + hg_core_op_type_to_string(hg_core_handle->op_type)); - /* Simply exit if error occurred */ - if (hg_core_handle->ret != HG_SUCCESS) - HG_GOTO_DONE(done, ret, HG_SUCCESS); + hg_core_handle->ops.trigger(hg_core_handle); - /* Take another reference to make sure the handle only gets freed - * after the response is sent */ - ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); + /* Reuse handle if we were listening, otherwise destroy it */ + (void) hg_core_destroy(hg_core_handle); +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_self(struct hg_core_private_handle *hg_core_handle) +{ + switch (hg_core_handle->op_type) { + case HG_CORE_PROCESS: + hg_core_trigger_process(hg_core_handle); + break; + case HG_CORE_FORWARD: + hg_core_trigger_forward_cb(hg_core_handle); + break; + case HG_CORE_RESPOND: + hg_core_trigger_self_respond_cb(hg_core_handle); + break; + default: + HG_LOG_SUBSYS_ERROR(rpc, "Invalid core operation type"); + break; + } +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_na(struct hg_core_private_handle *hg_core_handle) +{ + switch (hg_core_handle->op_type) { + case HG_CORE_PROCESS: + hg_core_trigger_process(hg_core_handle); + break; + case HG_CORE_FORWARD: + hg_core_trigger_forward_cb(hg_core_handle); + break; + case HG_CORE_RESPOND: + hg_core_trigger_respond_cb(hg_core_handle); + break; + default: + HG_LOG_SUBSYS_ERROR(rpc, "Invalid core operation type"); + break; + } +} + +/*---------------------------------------------------------------------------*/ +static void +hg_core_trigger_process(struct hg_core_private_handle *hg_core_handle) +{ + hg_return_t ret; + int32_t flags; + + /* Silently exit if error occurred */ + if (hg_core_handle->ret != HG_SUCCESS) + return; + + flags = hg_atomic_get32(&hg_core_handle->flags); + /* Take another reference to make sure the handle only gets freed + * after the response is sent (when forwarding to self, always take a + * refcount to make sure the handle does not get re-used too early) */ + if (!(flags & HG_CORE_NO_RESPONSE) || (flags & HG_CORE_SELF_FORWARD)) { + int32_t HG_DEBUG_LOG_USED ref_count = + hg_atomic_incr32(&hg_core_handle->ref_count); HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, (void *) hg_core_handle, ref_count); + } - /* Save ref_count, when sending to self and no response is being sent, - * we can only use that refcount to determine that the RPC callback has - * been fully executed. */ - if (hg_core_handle->is_self && hg_core_handle->no_response) - hg_atomic_set32(&hg_core_handle->no_response_done, ref_count); - - /* Run RPC callback */ - ret = hg_core_process(hg_core_handle); - if (ret != HG_SUCCESS && !hg_core_handle->no_response) { - hg_size_t header_size = - hg_core_header_response_get_size() + - hg_core_handle->core_handle.na_out_header_offset; - - /* Respond in case of error */ - ret = hg_core_respond( - hg_core_handle, NULL, NULL, 0, header_size, ret); - HG_CHECK_SUBSYS_HG_ERROR(rpc, done, ret, "Could not respond"); - } + /* Run RPC callback */ + ret = hg_core_process(hg_core_handle); + if (ret != HG_SUCCESS && !(flags & HG_CORE_NO_RESPONSE)) { + hg_size_t header_size = + hg_core_header_response_get_size() + + hg_core_handle->core_handle.na_out_header_offset; - /* No response callback */ - if (hg_core_handle->no_response && !hg_core_handle->is_self) { - ret = hg_core_handle->ops.no_respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR( - rpc, done, ret, "Could not complete handle"); - } - } else { - hg_core_cb_t hg_cb = NULL; - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.ret = hg_core_handle->ret; - - switch (hg_core_handle->op_type) { - case HG_CORE_FORWARD_SELF: - case HG_CORE_FORWARD: - hg_cb = hg_core_handle->request_callback; - hg_core_cb_info.arg = hg_core_handle->request_arg; - hg_core_cb_info.type = HG_CB_FORWARD; - hg_core_cb_info.info.forward.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_RESPOND: - hg_cb = hg_core_handle->response_callback; - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_RESPOND_SELF: - hg_cb = hg_core_self_cb; - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_NO_RESPOND: - /* Nothing */ - break; - case HG_CORE_PROCESS: - default: - HG_GOTO_SUBSYS_ERROR(rpc, done, ret, HG_OPNOTSUPPORTED, - "Invalid core operation type"); - } + /* Respond in case of error */ + (void) hg_core_respond(hg_core_handle, NULL, NULL, 0, header_size, ret); + } +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_forward_cb(struct hg_core_private_handle *hg_core_handle) +{ + if (hg_core_handle->request_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->request_arg, + .ret = hg_core_handle->ret, + .type = HG_CB_FORWARD, + .info.forward.handle = (hg_core_handle_t) hg_core_handle}; - /* Execute user callback. - * NB. The handle cannot be destroyed before the callback execution - * as the user may carry the handle in the callback. */ - if (hg_cb) - hg_cb(&hg_core_cb_info); + (void) hg_core_handle->request_callback(&hg_core_cb_info); } +} -done: - /* Reuse handle if we were listening, otherwise destroy it */ - ret = hg_core_destroy(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, done, ret, "Could not destroy handle"); +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_respond_cb(struct hg_core_private_handle *hg_core_handle) +{ + if (hg_core_handle->response_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->response_arg, + .ret = hg_core_handle->ret, + .type = HG_CB_RESPOND, + .info.respond.handle = (hg_core_handle_t) hg_core_handle}; - return ret; + (void) hg_core_handle->response_callback(&hg_core_cb_info); + } +} + +/*---------------------------------------------------------------------------*/ +static void +hg_core_trigger_self_respond_cb(struct hg_core_private_handle *hg_core_handle) +{ + int32_t HG_DEBUG_LOG_USED ref_count, HG_DEBUG_LOG_USED expected_count; + hg_return_t ret; + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); + + /* Increment refcount and push handle back to completion queue */ + ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, + (void *) hg_core_handle, ref_count); + + /* First execute response callback */ + if (hg_core_handle->response_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->response_arg, + .ret = HG_SUCCESS, /* Keep return as SUCCESS */ + .type = HG_CB_RESPOND, + .info.respond.handle = (hg_core_handle_t) hg_core_handle}; + + (void) hg_core_handle->response_callback(&hg_core_cb_info); + } + + /* Assign forward callback back to handle */ + hg_core_handle->op_type = HG_CORE_FORWARD; + + /* Process output */ + ret = hg_core_process_output(hg_core_handle, hg_core_more_data_complete); + if (ret != HG_SUCCESS) { + HG_LOG_SUBSYS_ERROR(rpc, "Could not process output"); + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); + } + + /* Mark as completed */ + hg_core_complete_op(hg_core_handle); } /*---------------------------------------------------------------------------*/ @@ -5499,8 +5594,9 @@ hg_core_cancel(struct hg_core_private_handle *hg_core_handle) hg_return_t ret; int32_t status; - HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->is_self, error, ret, - HG_OPNOTSUPPORTED, "Local cancellation is not supported"); + HG_CHECK_SUBSYS_ERROR(rpc, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD, error, + ret, HG_OPNOTSUPPORTED, "Local cancellation is not supported"); /* Exit if op has already completed */ status = hg_atomic_get32(&hg_core_handle->status); @@ -5940,6 +6036,58 @@ HG_Core_registered_data(hg_core_class_t *hg_core_class, hg_id_t id) return NULL; } +/*---------------------------------------------------------------------------*/ +hg_return_t +HG_Core_registered_disable_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t disable) +{ + struct hg_core_private_class *private_class = + (struct hg_core_private_class *) hg_core_class; + struct hg_core_rpc_info *hg_core_rpc_info = NULL; + hg_return_t ret; + + HG_CHECK_SUBSYS_ERROR(cls, hg_core_class == NULL, error, ret, + HG_INVALID_ARG, "NULL HG core class"); + + hg_core_rpc_info = hg_core_map_lookup(&private_class->rpc_map, &id); + HG_CHECK_SUBSYS_ERROR(cls, hg_core_rpc_info == NULL, error, ret, HG_NOENTRY, + "Could not find RPC ID (%" PRIu64 ") in RPC map", id); + + hg_core_rpc_info->no_response = disable; + + return HG_SUCCESS; + +error: + return ret; +} + +/*---------------------------------------------------------------------------*/ +hg_return_t +HG_Core_registered_disabled_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t *disabled_p) +{ + struct hg_core_private_class *private_class = + (struct hg_core_private_class *) hg_core_class; + struct hg_core_rpc_info *hg_core_rpc_info = NULL; + hg_return_t ret; + + HG_CHECK_SUBSYS_ERROR(cls, hg_core_class == NULL, error, ret, + HG_INVALID_ARG, "NULL HG core class"); + HG_CHECK_SUBSYS_ERROR(cls, disabled_p == NULL, error, ret, HG_INVALID_ARG, + "NULL pointer to disabled flag"); + + hg_core_rpc_info = hg_core_map_lookup(&private_class->rpc_map, &id); + HG_CHECK_SUBSYS_ERROR(cls, hg_core_rpc_info == NULL, error, ret, HG_NOENTRY, + "Could not find RPC ID (%" PRIu64 ") in RPC map", id); + + *disabled_p = hg_core_rpc_info->no_response; + + return HG_SUCCESS; + +error: + return ret; +} + /*---------------------------------------------------------------------------*/ hg_return_t HG_Core_addr_lookup1(hg_core_context_t *context, hg_core_cb_t callback, @@ -6240,7 +6388,7 @@ HG_Core_create(hg_core_context_t *context, hg_core_addr_t addr, hg_id_t id, HG_CHECK_SUBSYS_ERROR(rpc, handle_p == NULL, error, ret, HG_INVALID_ARG, "NULL pointer to HG core handle"); - HG_LOG_SUBSYS_DEBUG(rpc, + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Creating new handle with ID=%" PRIu64 ", address=%p", id, (void *) addr); @@ -6263,14 +6411,14 @@ HG_Core_create(hg_core_context_t *context, hg_core_addr_t addr, hg_id_t id, "Could not set new RPC info to handle %p", (void *) hg_core_handle); HG_LOG_SUBSYS_DEBUG( - rpc, "Created new handle (%p)", (void *) hg_core_handle); + rpc_ref, "Created new handle (%p)", (void *) hg_core_handle); *handle_p = (hg_core_handle_t) hg_core_handle; return HG_SUCCESS; error: - hg_core_destroy(hg_core_handle); + (void) hg_core_destroy(hg_core_handle); return ret; } diff --git a/src/mercury_core.h b/src/mercury_core.h index 01c39a40..a1c14801 100644 --- a/src/mercury_core.h +++ b/src/mercury_core.h @@ -72,8 +72,7 @@ typedef hg_return_t (*hg_core_cb_t)( #define HG_CORE_OP_ID_IGNORE ((hg_core_op_id_t *) 1) /* Flags */ -#define HG_CORE_MORE_DATA (1 << 0) /* More data required */ -#define HG_CORE_NO_RESPONSE (1 << 1) /* No response required */ +#define HG_CORE_MORE_DATA (1 << 0) /* More data required */ /*********************/ /* Public Prototypes */ @@ -505,6 +504,39 @@ HG_Core_register_data(hg_core_class_t *hg_core_class, hg_id_t id, void *data, HG_PUBLIC void * HG_Core_registered_data(hg_core_class_t *hg_core_class, hg_id_t id); +/** + * Disable response for a given RPC ID. This allows an origin process to send an + * RPC to a target without waiting for a response. The RPC completes locally and + * the callback on the origin is therefore pushed to the completion queue once + * the RPC send is completed. By default, all RPCs expect a response to + * be sent back. + * + * \param hg_core_class [IN] pointer to HG core class + * \param id [IN] registered function ID + * \param disable [IN] boolean (HG_TRUE to disable + * HG_FALSE to re-enable) + * + * \return HG_SUCCESS or corresponding HG error code + */ +HG_PUBLIC hg_return_t +HG_Core_registered_disable_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t disable); + +/** + * Check if response is disabled for a given RPC ID + * (i.e., HG_Registered_disable_response() has been called for this RPC ID). + * + * \param hg_core_class [IN] pointer to HG core class + * \param id [IN] registered function ID + * \param disabled_p [OUT] boolean (HG_TRUE if disabled + * HG_FALSE if enabled) + * + * \return HG_SUCCESS or corresponding HG error code + */ +HG_PUBLIC hg_return_t +HG_Core_registered_disabled_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t *disabled_p); + /** * Lookup an addr from a peer address/name. Addresses need to be * freed by calling HG_Core_addr_free(). After completion, user callback is @@ -967,6 +999,7 @@ struct hg_core_rpc_info { void *data; /* User data */ void (*free_callback)(void *); /* User data free callback */ hg_id_t id; /* RPC ID */ + hg_bool_t no_response; /* RPC response not expected */ }; /* HG core handle */ From 5f14645b4041bd679ded71f2afb8627069a49569 Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Thu, 19 Oct 2023 16:55:30 -0500 Subject: [PATCH 3/4] CI: disable parallel with self test --- Testing/unit/hg/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Testing/unit/hg/CMakeLists.txt b/Testing/unit/hg/CMakeLists.txt index 78c250ae..be3a0ecf 100644 --- a/Testing/unit/hg/CMakeLists.txt +++ b/Testing/unit/hg/CMakeLists.txt @@ -163,7 +163,7 @@ function(add_mercury_test_comm_all test_name) if(NOT ((${comm} STREQUAL "bmi") OR (${comm} STREQUAL "mpi"))) add_mercury_test_comm(${test_name} ${comm} "${NA_${upper_comm}_TESTING_PROTOCOL}" - "${NA_TESTING_NO_BLOCK}" ${MERCURY_TESTING_ENABLE_PARALLEL} true false) + "${NA_TESTING_NO_BLOCK}" false true false) endif() # Scalable test if(NOT APPLE) From 935264b0ab9f94a0cb7fb02617b9929879936cb3 Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Thu, 19 Oct 2023 18:26:10 -0500 Subject: [PATCH 4/4] HG util: prevent locking in hg_request_wait() Concurrent progress in multi-threaded scenarios on the same context could complete another thread's request and let it sit in progress --- src/util/mercury_request.c | 42 ++++++++------------------------------ 1 file changed, 8 insertions(+), 34 deletions(-) diff --git a/src/util/mercury_request.c b/src/util/mercury_request.c index 503f4238..941f4b37 100644 --- a/src/util/mercury_request.c +++ b/src/util/mercury_request.c @@ -6,8 +6,7 @@ */ #include "mercury_request.h" -#include "mercury_thread_condition.h" -#include "mercury_thread_mutex.h" +#include "mercury_param.h" #include "mercury_time.h" #include "mercury_util_error.h" @@ -17,6 +16,8 @@ /* Local Macros */ /****************/ +#define HG_REQUEST_PROGRESS_TIMEOUT (500) + /************************************/ /* Local Type and Struct Definition */ /************************************/ @@ -25,9 +26,6 @@ struct hg_request_class { hg_request_progress_func_t progress_func; hg_request_trigger_func_t trigger_func; void *arg; - bool progressing; - hg_thread_mutex_t progress_mutex; - hg_thread_cond_t progress_cond; }; /********************/ @@ -53,9 +51,6 @@ hg_request_init(hg_request_progress_func_t progress_func, hg_request_class->progress_func = progress_func; hg_request_class->trigger_func = trigger_func; hg_request_class->arg = arg; - hg_request_class->progressing = false; - hg_thread_mutex_init(&hg_request_class->progress_mutex); - hg_thread_cond_init(&hg_request_class->progress_cond); done: return hg_request_class; @@ -70,8 +65,7 @@ hg_request_finalize(hg_request_class_t *request_class, void **arg) if (arg) *arg = request_class->arg; - hg_thread_mutex_destroy(&request_class->progress_mutex); - hg_thread_cond_destroy(&request_class->progress_cond); + free(request_class); } @@ -115,7 +109,8 @@ hg_request_wait( deadline = hg_time_add(now, remaining); do { - unsigned int trigger_flag = 0; + unsigned int trigger_flag = 0, + progress_timeout = hg_time_to_ms(remaining); int trigger_ret; do { @@ -127,31 +122,10 @@ hg_request_wait( if (completed) break; - hg_thread_mutex_lock(&request->request_class->progress_mutex); - if (request->request_class->progressing) { - if (hg_thread_cond_timedwait(&request->request_class->progress_cond, - &request->request_class->progress_mutex, - hg_time_to_ms(remaining)) != HG_UTIL_SUCCESS) { - /* Timeout occurred so leave */ - hg_thread_mutex_unlock(&request->request_class->progress_mutex); - break; - } - /* Continue as request may have completed in the meantime */ - hg_thread_mutex_unlock(&request->request_class->progress_mutex); - goto next; - } - request->request_class->progressing = true; - hg_thread_mutex_unlock(&request->request_class->progress_mutex); - request->request_class->progress_func( - hg_time_to_ms(remaining), request->request_class->arg); - - hg_thread_mutex_lock(&request->request_class->progress_mutex); - request->request_class->progressing = false; - hg_thread_cond_broadcast(&request->request_class->progress_cond); - hg_thread_mutex_unlock(&request->request_class->progress_mutex); + MIN(progress_timeout, HG_REQUEST_PROGRESS_TIMEOUT), + request->request_class->arg); -next: if (timeout_ms != 0) hg_time_get_current_ms(&now); remaining = hg_time_subtract(deadline, now);