From 973749730dd7878d0597384dbb4fac8d90bc1338 Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Tue, 13 Aug 2024 15:54:09 -0500 Subject: [PATCH] HG: add HG_Get_input/output_payload_size() (fix #751) Add the ability to query input / output payload sizes Update tests to use new routines NA BMI/MPI: return actual msg size through cb info --- Testing/unit/hg/mercury_rpc_cb.c | 14 +++++++ Testing/unit/hg/mercury_unit.h | 3 ++ Testing/unit/hg/test_rpc.c | 30 ++++++++++++-- src/mercury.c | 48 +++++++++++++++++++++ src/mercury.h | 20 +++++++++ src/mercury_core.c | 71 +++++++++++++++++++------------- src/mercury_core.h | 48 +++++++++++++++++++++ src/na/na_bmi.c | 6 +++ src/na/na_mpi.c | 2 + 9 files changed, 211 insertions(+), 31 deletions(-) diff --git a/Testing/unit/hg/mercury_rpc_cb.c b/Testing/unit/hg/mercury_rpc_cb.c index b3e01b01..540f8617 100644 --- a/Testing/unit/hg/mercury_rpc_cb.c +++ b/Testing/unit/hg/mercury_rpc_cb.c @@ -183,6 +183,15 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle) int event_id; int open_ret; hg_return_t ret = HG_SUCCESS; + hg_size_t payload_size = HG_Get_input_payload_size(handle); + size_t expected_string_payload_size = + strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3; + + HG_TEST_CHECK_ERROR( + payload_size != sizeof(rpc_handle_t) + expected_string_payload_size, + done, ret, HG_FAULT, + "invalid input payload size (%" PRId64 "), expected (%zu)", + payload_size, sizeof(rpc_handle_t) + expected_string_payload_size); /* Get input buffer */ ret = HG_Get_input(handle, &in_struct); @@ -210,6 +219,11 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle) HG_TEST_CHECK_HG_ERROR( done, ret, "HG_Respond() failed (%s)", HG_Error_to_string(ret)); + payload_size = HG_Get_output_payload_size(handle); + HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret, + HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)", + payload_size, sizeof(rpc_open_out_t)); + done: ret = HG_Destroy(handle); HG_TEST_CHECK_ERROR_DONE( diff --git a/Testing/unit/hg/mercury_unit.h b/Testing/unit/hg/mercury_unit.h index 628a1128..12446ca8 100644 --- a/Testing/unit/hg/mercury_unit.h +++ b/Testing/unit/hg/mercury_unit.h @@ -49,6 +49,9 @@ struct hg_test_handle_info { /* Public Macros */ /*****************/ +/* Test path */ +#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt") + /*********************/ /* Public Prototypes */ /*********************/ diff --git a/Testing/unit/hg/test_rpc.c b/Testing/unit/hg/test_rpc.c index 5a74925f..63e25607 100644 --- a/Testing/unit/hg/test_rpc.c +++ b/Testing/unit/hg/test_rpc.c @@ -11,9 +11,6 @@ /* Local Macros */ /****************/ -/* Test path */ -#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt") - /* Wait timeout in ms */ #define HG_TEST_WAIT_TIMEOUT (HG_TEST_TIMEOUT * 1000) @@ -178,6 +175,9 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id, .no_entry = false}; rpc_open_in_t in_struct = { .handle = rpc_open_handle, .path = HG_TEST_RPC_PATH}; + hg_size_t payload_size; + size_t expected_string_payload_size = + strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3; unsigned int flag; int rc; @@ -204,6 +204,13 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id, HG_TEST_CHECK_HG_ERROR( error, ret, "Error in HG callback (%s)", HG_Error_to_string(ret)); + payload_size = HG_Get_input_payload_size(handle); + HG_TEST_CHECK_ERROR( + payload_size != sizeof(rpc_handle_t) + expected_string_payload_size, + error, ret, HG_FAULT, + "invalid input payload size (%" PRId64 "), expected (%zu)", + payload_size, sizeof(rpc_handle_t) + expected_string_payload_size); + return HG_SUCCESS; error: @@ -262,12 +269,16 @@ hg_test_rpc_output_cb(const struct hg_cb_info *callback_info) int rpc_open_event_id; rpc_open_out_t rpc_open_out_struct; hg_return_t ret = callback_info->ret; + hg_size_t payload_size = HG_Get_output_payload_size(handle); if (args->no_entry && ret == HG_NOENTRY) goto done; HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", HG_Error_to_string(callback_info->ret)); + HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret, + HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)", + payload_size, sizeof(rpc_open_out_t)); /* Get output */ ret = HG_Get_output(handle, &rpc_open_out_struct); @@ -329,9 +340,16 @@ hg_test_rpc_output_overflow_cb(const struct hg_cb_info *callback_info) size_t string_len; # endif hg_return_t ret = callback_info->ret; + hg_size_t payload_size = HG_Get_output_payload_size(handle); + size_t expected_string_payload_size = + HG_Class_get_output_eager_size(HG_Get_info(handle)->hg_class) * 2 + 3 + + 2 * sizeof(uint64_t); HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)", HG_Error_to_string(callback_info->ret)); + HG_TEST_CHECK_ERROR(payload_size != expected_string_payload_size, done, ret, + HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)", + payload_size, expected_string_payload_size); /* Get output */ ret = HG_Get_output(handle, &out_struct); @@ -899,6 +917,12 @@ main(int argc, char *argv[]) hg_test_rpc_null_id_g, hg_test_rpc_no_output_cb, info.request); HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_no_input() failed (%s)", HG_Error_to_string(hg_ret)); + HG_TEST_CHECK_ERROR(HG_Get_input_payload_size(info.handles[0]) != 0, error, + hg_ret, HG_FAULT, "input payload non null (%" PRId64 ")", + HG_Get_input_payload_size(info.handles[0])); + HG_TEST_CHECK_ERROR(HG_Get_output_payload_size(info.handles[0]) != 0, error, + hg_ret, HG_FAULT, "output payload non null (%" PRId64 ")", + HG_Get_output_payload_size(info.handles[0])); HG_PASSED(); /* Simple RPC test */ diff --git a/src/mercury.c b/src/mercury.c index a3462274..d5256d49 100644 --- a/src/mercury.c +++ b/src/mercury.c @@ -1822,6 +1822,30 @@ HG_Reset(hg_handle_t handle, hg_addr_t addr, hg_id_t id) return ret; } +/*---------------------------------------------------------------------------*/ +hg_size_t +HG_Get_input_payload_size(hg_handle_t handle) +{ + struct hg_private_handle *private_handle = + (struct hg_private_handle *) handle; + + HG_CHECK_SUBSYS_ERROR_NORET( + rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle"); + + if (private_handle->in_extra_buf != NULL) + return private_handle->in_extra_buf_size; + else { + hg_size_t header_size = hg_header_get_size(HG_INPUT), + payload_size = + HG_Core_get_input_payload_size(handle->core_handle); + + return (payload_size > header_size) ? payload_size - header_size : 0; + } + +error: + return 0; +} + /*---------------------------------------------------------------------------*/ hg_return_t HG_Get_input(hg_handle_t handle, void *in_struct) @@ -1882,6 +1906,30 @@ HG_Free_input(hg_handle_t handle, void *in_struct) return ret; } +/*---------------------------------------------------------------------------*/ +hg_size_t +HG_Get_output_payload_size(hg_handle_t handle) +{ + struct hg_private_handle *private_handle = + (struct hg_private_handle *) handle; + + HG_CHECK_SUBSYS_ERROR_NORET( + rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle"); + + if (private_handle->out_extra_buf != NULL) + return private_handle->out_extra_buf_size; + else { + hg_size_t header_size = hg_header_get_size(HG_OUTPUT), + payload_size = + HG_Core_get_output_payload_size(handle->core_handle); + + return (payload_size > header_size) ? payload_size - header_size : 0; + } + +error: + return 0; +} + /*---------------------------------------------------------------------------*/ hg_return_t HG_Get_output(hg_handle_t handle, void *out_struct) diff --git a/src/mercury.h b/src/mercury.h index b7397eb4..09e04006 100644 --- a/src/mercury.h +++ b/src/mercury.h @@ -784,6 +784,16 @@ HG_Set_data(hg_handle_t handle, void *data, void (*free_callback)(void *)); static HG_INLINE void * HG_Get_data(hg_handle_t handle) HG_WARN_UNUSED_RESULT; +/** + * Retrieve input payload size from a given handle. + * + * \param handle [IN] HG handle + * + * \return Non-negative value or zero if no payload or the handle is not valid + */ +HG_PUBLIC hg_size_t +HG_Get_input_payload_size(hg_handle_t handle); + /** * Get input from handle (requires registration of input proc to deserialize * parameters). Input must be freed using HG_Free_input(). @@ -817,6 +827,16 @@ HG_Get_input(hg_handle_t handle, void *in_struct); HG_PUBLIC hg_return_t HG_Free_input(hg_handle_t handle, void *in_struct); +/** + * Retrieve output payload size from a given handle. + * + * \param handle [IN] HG handle + * + * \return Non-negative value or zero if no payload or the handle is not valid + */ +HG_PUBLIC hg_size_t +HG_Get_output_payload_size(hg_handle_t handle); + /** * Get output from handle (requires registration of output proc to deserialize * parameters). Output must be freed using HG_Free_output(). diff --git a/src/mercury_core.c b/src/mercury_core.c index 3cb40af2..7c4bf36a 100644 --- a/src/mercury_core.c +++ b/src/mercury_core.c @@ -348,8 +348,6 @@ struct hg_core_private_handle { struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */ void *in_buf_storage; /* Storage input buffer */ size_t in_buf_storage_size; /* Storage input buffer size */ - size_t in_buf_used; /* Amount of input buffer used */ - size_t out_buf_used; /* Amount of output buffer used */ na_tag_t tag; /* Tag used for request and response */ hg_atomic_int32_t ref_count; /* Reference count */ hg_atomic_int32_t no_response_done; /* Reference count to reach for done */ @@ -3660,8 +3658,8 @@ hg_core_reset(struct hg_core_private_handle *hg_core_handle) hg_core_handle->tag = 0; hg_core_handle->cookie = 0; hg_core_handle->ret = HG_SUCCESS; - hg_core_handle->in_buf_used = 0; - hg_core_handle->out_buf_used = 0; + hg_core_handle->core_handle.in_buf_used = 0; + hg_core_handle->core_handle.out_buf_used = 0; hg_atomic_init32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, @@ -3962,9 +3960,10 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, hg_core_handle->core_handle.na_in_header_offset; /* Set the actual size of the msg that needs to be transmitted */ - hg_core_handle->in_buf_used = header_size + payload_size; + hg_core_handle->core_handle.in_buf_used = header_size + payload_size; HG_CHECK_SUBSYS_ERROR(rpc, - hg_core_handle->in_buf_used > hg_core_handle->core_handle.in_buf_size, + hg_core_handle->core_handle.in_buf_used > + hg_core_handle->core_handle.in_buf_size, error, ret, HG_MSGSIZE, "Exceeding input buffer size"); /* Parse flags */ @@ -4093,7 +4092,8 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) /* Post send (input) */ na_ret = NA_Msg_send_unexpected(hg_core_handle->na_class, hg_core_handle->na_context, hg_core_send_input_cb, hg_core_handle, - hg_core_handle->core_handle.in_buf, hg_core_handle->in_buf_used, + hg_core_handle->core_handle.in_buf, + hg_core_handle->core_handle.in_buf_used, hg_core_handle->in_buf_plugin_data, hg_core_handle->na_addr, hg_core_handle->core_handle.info.context_id, hg_core_handle->tag, hg_core_handle->na_send_op_id); @@ -4164,9 +4164,10 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_core_handle->core_handle.na_out_header_offset; /* Set the actual size of the msg that needs to be transmitted */ - hg_core_handle->out_buf_used = header_size + payload_size; + hg_core_handle->core_handle.out_buf_used = header_size + payload_size; HG_CHECK_SUBSYS_ERROR(rpc, - hg_core_handle->out_buf_used > hg_core_handle->core_handle.out_buf_size, + hg_core_handle->core_handle.out_buf_used > + hg_core_handle->core_handle.out_buf_size, error, ret, HG_MSGSIZE, "Exceeding output buffer size"); /* Parse flags */ @@ -4328,7 +4329,8 @@ hg_core_respond_na( /* Post expected send (output) */ na_ret = NA_Msg_send_expected(hg_core_handle->na_class, hg_core_handle->na_context, hg_core_send_output_cb, hg_core_handle, - hg_core_handle->core_handle.out_buf, hg_core_handle->out_buf_used, + hg_core_handle->core_handle.out_buf, + hg_core_handle->core_handle.out_buf_used, hg_core_handle->out_buf_plugin_data, hg_core_handle->na_addr, hg_core_handle->core_handle.info.context_id, hg_core_handle->tag, hg_core_handle->na_send_op_id); @@ -4474,19 +4476,19 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info) hg_core_handle->core_handle.info.addr->na_addr = hg_core_handle->na_addr; hg_core_handle->tag = na_cb_info_recv_unexpected->tag; - hg_core_handle->in_buf_used = + hg_core_handle->core_handle.in_buf_used = na_cb_info_recv_unexpected->actual_buf_size; - HG_CHECK_SUBSYS_ERROR_NORET(rpc, - hg_core_handle->in_buf_used > + HG_CHECK_SUBSYS_ERROR(rpc, + hg_core_handle->core_handle.in_buf_used > hg_core_handle->core_handle.in_buf_size, - error, + error, ret, HG_OVERFLOW, "Actual transfer size (%zu) is too large for unexpected recv", - hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_used); HG_LOG_SUBSYS_DEBUG(rpc, "Processing input for handle %p, tag=%u, buf_size=%zu", (void *) hg_core_handle, hg_core_handle->tag, - hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_used); /* Process input information */ ret = hg_core_process_input(hg_core_handle); @@ -4582,24 +4584,25 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) hg_core_handle->core_handle.info.addr->na_addr = hg_core_handle->na_addr; hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag; - hg_core_handle->in_buf_used = + hg_core_handle->core_handle.in_buf_used = na_cb_info_multi_recv_unexpected->actual_buf_size; /* Either copy the buffer to release early or point to the actual * multi-recv buffer space to save a memcpy */ if (hg_core_handle->multi_recv_copy) { - HG_CHECK_SUBSYS_ERROR_NORET(rpc, - hg_core_handle->in_buf_used > + HG_CHECK_SUBSYS_ERROR(rpc, + hg_core_handle->core_handle.in_buf_used > hg_core_handle->in_buf_storage_size, - error, + error, ret, HG_OVERFLOW, "Actual transfer size (%zu) is too large for unexpected recv", - hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_used); HG_LOG_SUBSYS_DEBUG(rpc, "Copying multi-recv payload of size %zu for handle (%p)", - hg_core_handle->in_buf_used, (void *) hg_core_handle); + hg_core_handle->core_handle.in_buf_used, + (void *) hg_core_handle); memcpy(hg_core_handle->in_buf_storage, na_cb_info_multi_recv_unexpected->actual_buf, - hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_used); hg_core_handle->core_handle.in_buf_size = hg_core_handle->in_buf_storage_size; hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage; @@ -4611,9 +4614,10 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) } else { HG_LOG_SUBSYS_DEBUG(rpc, "Using direct multi-recv payload of size %zu for handle (%p)", - hg_core_handle->in_buf_used, (void *) hg_core_handle); + hg_core_handle->core_handle.in_buf_used, + (void *) hg_core_handle); hg_core_handle->core_handle.in_buf_size = - hg_core_handle->in_buf_used; + hg_core_handle->core_handle.in_buf_used; hg_core_handle->core_handle.in_buf = na_cb_info_multi_recv_unexpected->actual_buf; } @@ -4621,7 +4625,7 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info) HG_LOG_SUBSYS_DEBUG(rpc, "Processing input for handle %p, tag=%u, buf_size=%zu", (void *) hg_core_handle, hg_core_handle->tag, - hg_core_handle->in_buf_used); + hg_core_handle->core_handle.in_buf_used); /* Process input information */ ret = hg_core_process_input(hg_core_handle); @@ -4774,8 +4778,19 @@ hg_core_recv_output_cb(const struct na_cb_info *callback_info) hg_return_t ret; if (callback_info->ret == NA_SUCCESS) { - HG_LOG_SUBSYS_DEBUG(rpc, "Processing output for handle %p, tag=%u", - (void *) hg_core_handle, hg_core_handle->tag); + hg_core_handle->core_handle.out_buf_used = + callback_info->info.recv_expected.actual_buf_size; + HG_CHECK_SUBSYS_ERROR(rpc, + hg_core_handle->core_handle.out_buf_used > + hg_core_handle->core_handle.out_buf_size, + error, ret, HG_OVERFLOW, + "Actual transfer size (%zu) is too large for expected recv", + hg_core_handle->core_handle.out_buf_used); + + HG_LOG_SUBSYS_DEBUG(rpc, + "Processing output for handle %p, tag=%u, buf_size=%zu", + (void *) hg_core_handle, hg_core_handle->tag, + hg_core_handle->core_handle.out_buf_used); /* Process output information */ ret = hg_core_process_output(hg_core_handle, hg_core_send_ack); diff --git a/src/mercury_core.h b/src/mercury_core.h index a38a0b54..e6eee7e6 100644 --- a/src/mercury_core.h +++ b/src/mercury_core.h @@ -865,6 +865,16 @@ HG_Core_get_rpc_data(hg_core_handle_t handle) HG_WARN_UNUSED_RESULT; static HG_INLINE hg_return_t HG_Core_set_target_id(hg_core_handle_t handle, uint8_t id); +/** + * Get input payload size from handle. + * + * \param handle [IN] HG handle + * + * \return Non-negative value or zero if no payload + */ +static HG_INLINE size_t +HG_Core_get_input_payload_size(hg_core_handle_t handle); + /** * Get input buffer from handle that can be used for serializing/deserializing * parameters. @@ -892,6 +902,16 @@ HG_Core_get_input( HG_PUBLIC hg_return_t HG_Core_release_input(hg_core_handle_t handle); +/** + * Get output payload size from handle. + * + * \param handle [IN] HG handle + * + * \return Non-negative value or zero if no payload + */ +static HG_INLINE size_t +HG_Core_get_output_payload_size(hg_core_handle_t handle); + /** * Get output buffer from handle that can be used for serializing/deserializing * parameters. @@ -1096,6 +1116,8 @@ struct hg_core_handle { size_t out_buf_size; /* Output buffer size */ size_t na_in_header_offset; /* Input NA header offset */ size_t na_out_header_offset; /* Output NA header offset */ + size_t in_buf_used; /* Amount of input buffer used */ + size_t out_buf_used; /* Amount of output buffer used */ }; /*---------------------------------------------------------------------------*/ @@ -1288,6 +1310,19 @@ HG_Core_set_target_id(hg_core_handle_t handle, uint8_t id) return HG_SUCCESS; } +/*---------------------------------------------------------------------------*/ +static HG_INLINE size_t +HG_Core_get_input_payload_size(hg_core_handle_t handle) +{ + size_t header_size = + hg_core_header_request_get_size() + handle->na_in_header_offset; + + if (handle->in_buf_used > header_size) + return handle->in_buf_used - header_size; + else + return 0; +} + /*---------------------------------------------------------------------------*/ static HG_INLINE hg_return_t HG_Core_get_input( @@ -1306,6 +1341,19 @@ HG_Core_get_input( return HG_SUCCESS; } +/*---------------------------------------------------------------------------*/ +static HG_INLINE size_t +HG_Core_get_output_payload_size(hg_core_handle_t handle) +{ + size_t header_size = + hg_core_header_response_get_size() + handle->na_out_header_offset; + + if (handle->out_buf_used > header_size) + return handle->out_buf_used - header_size; + else + return 0; +} + /*---------------------------------------------------------------------------*/ static HG_INLINE hg_return_t HG_Core_get_output( diff --git a/src/na/na_bmi.c b/src/na/na_bmi.c index de60c5be..5d6a8ecf 100644 --- a/src/na/na_bmi.c +++ b/src/na/na_bmi.c @@ -1126,7 +1126,13 @@ na_bmi_complete(struct na_bmi_op_id *na_bmi_op_id) break; case NA_CB_SEND_UNEXPECTED: case NA_CB_SEND_EXPECTED: + break; case NA_CB_RECV_EXPECTED: + if (callback_info->ret != NA_SUCCESS) + callback_info->info.recv_expected.actual_buf_size = 0; + else + callback_info->info.recv_expected.actual_buf_size = + (size_t) na_bmi_op_id->info.msg.actual_buf_size; break; case NA_CB_PUT: case NA_CB_GET: diff --git a/src/na/na_mpi.c b/src/na/na_mpi.c index 81e577b8..60903a17 100644 --- a/src/na/na_mpi.c +++ b/src/na/na_mpi.c @@ -2275,6 +2275,8 @@ na_mpi_complete(struct na_mpi_op_id *na_mpi_op_id) ret = NA_SIZE_ERROR; goto done; } + callback_info->info.recv_expected.actual_buf_size = + (size_t) na_mpi_op_id->info.recv_expected.actual_size; break; case NA_CB_PUT: /* Transfer is now done so free RMA info */