diff --git a/Testing/unit/hg/test_rpc.c b/Testing/unit/hg/test_rpc.c index f522cc3c..ad28183c 100644 --- a/Testing/unit/hg/test_rpc.c +++ b/Testing/unit/hg/test_rpc.c @@ -31,11 +31,18 @@ struct forward_cb_args { struct forward_multi_cb_args { rpc_handle_t *rpc_handle; hg_return_t *rets; + hg_thread_mutex_t mutex; int32_t expected_count; /* Expected count */ int32_t complete_count; /* Completed count */ hg_request_t *request; /* Request */ }; +struct hg_test_multi_thread { + struct hg_unit_info *info; + hg_thread_t thread; + unsigned int thread_id; +}; + /********************/ /* Local Prototypes */ /********************/ @@ -75,6 +82,12 @@ hg_test_rpc_multi(hg_handle_t *handles, size_t handle_max, hg_addr_t addr, static hg_return_t hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info); +static hg_return_t +hg_test_rpc_multi_concurrent(struct hg_unit_info *info); + +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_thread(void *arg); + /*******************/ /* Local Variables */ /*******************/ @@ -377,6 +390,7 @@ hg_test_rpc_multi(hg_handle_t *handles, size_t handle_max, hg_addr_t addr, .rpc_handle = &rpc_open_handle, .request = request, .rets = NULL, + .mutex = HG_THREAD_MUTEX_INITIALIZER, .complete_count = 0, .expected_count = (int32_t) handle_max}; rpc_open_in_t in_struct = { @@ -450,6 +464,7 @@ hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info) int rpc_open_event_id; rpc_open_out_t rpc_open_out_struct; hg_return_t ret = callback_info->ret; + int32_t complete_count; HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", HG_Error_to_string(callback_info->ret)); @@ -479,13 +494,88 @@ hg_test_rpc_multi_cb(const struct hg_cb_info *callback_info) } done: + hg_thread_mutex_lock(&args->mutex); args->rets[args->complete_count] = ret; - if (++args->complete_count == args->expected_count) + complete_count = ++args->complete_count; + hg_thread_mutex_unlock(&args->mutex); + if (complete_count == args->expected_count) hg_request_complete(args->request); return HG_SUCCESS; } +/*---------------------------------------------------------------------------*/ +static hg_return_t +hg_test_rpc_multi_concurrent(struct hg_unit_info *info) +{ + struct hg_test_multi_thread *thread_infos; + unsigned int i; + hg_return_t ret; + int rc; + + thread_infos = + malloc(info->hg_test_info.thread_count * sizeof(*thread_infos)); + HG_TEST_CHECK_ERROR(thread_infos == NULL, error, ret, HG_NOMEM, + "Could not allocate thread array (%u)", + info->hg_test_info.thread_count); + + for (i = 0; i < info->hg_test_info.thread_count; i++) { + thread_infos[i].info = info; + thread_infos[i].thread_id = i; + + rc = hg_thread_create(&thread_infos[i].thread, hg_test_rpc_multi_thread, + &thread_infos[i]); + HG_TEST_CHECK_ERROR( + rc != 0, error, ret, HG_NOMEM, "hg_thread_create() failed"); + } + + for (i = 0; i < info->hg_test_info.thread_count; i++) { + rc = hg_thread_join(thread_infos[i].thread); + HG_TEST_CHECK_ERROR( + rc != 0, error, ret, HG_FAULT, "hg_thread_join() failed"); + } + + return HG_SUCCESS; + +error: + free(thread_infos); + + return ret; +} + +/*---------------------------------------------------------------------------*/ +static HG_THREAD_RETURN_TYPE +hg_test_rpc_multi_thread(void *arg) +{ + struct hg_test_multi_thread *thread_arg = + (struct hg_test_multi_thread *) arg; + struct hg_unit_info *info = thread_arg->info; + hg_thread_ret_t tret = (hg_thread_ret_t) 0; + size_t handle_max = info->handle_max / info->hg_test_info.thread_count; + hg_handle_t *handles = &info->handles[thread_arg->thread_id * handle_max]; + hg_request_t *request; + hg_return_t ret; + + request = hg_request_create(info->request_class); + HG_TEST_CHECK_ERROR( + request == NULL, done, ret, HG_NOMEM, "Could not create request"); + + ret = hg_test_rpc_multi(handles, + (thread_arg->thread_id < (info->hg_test_info.thread_count - 1)) + ? handle_max + : info->handle_max - thread_arg->thread_id * handle_max, + info->target_addr, 0, hg_test_rpc_open_id_g, hg_test_rpc_multi_cb, + request); + HG_TEST_CHECK_HG_ERROR(done, ret, "hg_test_rpc_multiple() failed (%s)", + HG_Error_to_string(ret)); + +done: + hg_request_destroy(request); + + hg_thread_exit(tret); + return tret; +} + /*---------------------------------------------------------------------------*/ int main(int argc, char *argv[]) @@ -617,13 +707,21 @@ main(int argc, char *argv[]) } /* RPC test with multiple handle in flight */ - HG_TEST("concurrent RPCs"); + HG_TEST("multi RPCs"); hg_ret = hg_test_rpc_multi(info.handles, info.handle_max, info.target_addr, 0, hg_test_rpc_open_id_g, hg_test_rpc_multi_cb, info.request); HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_multiple() failed (%s)", HG_Error_to_string(hg_ret)); HG_PASSED(); + /* RPC test with multiple handle in flight */ + HG_TEST("concurrent multi RPCs"); + hg_ret = hg_test_rpc_multi_concurrent(&info); + HG_TEST_CHECK_HG_ERROR(error, hg_ret, + "hg_test_rpc_multi_concurrent() failed (%s)", + HG_Error_to_string(hg_ret)); + HG_PASSED(); + /* RPC test with multiple handle to multiple target contexts */ if (info.hg_test_info.na_test_info.max_contexts) { hg_uint8_t i,