Skip to content

Commit

Permalink
HG: add HG_Get_input/output_payload_size() (fix #751)
Browse files Browse the repository at this point in the history
Add the ability to query input / output payload sizes

Update tests to use new routines

NA BMI/MPI: return actual msg size through cb info
  • Loading branch information
soumagne committed Aug 15, 2024
1 parent fd4e665 commit 2635745
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 31 deletions.
14 changes: 14 additions & 0 deletions Testing/unit/hg/mercury_rpc_cb.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle)
int event_id;
int open_ret;
hg_return_t ret = HG_SUCCESS;
hg_size_t payload_size = HG_Get_input_payload_size(handle);
size_t expected_string_payload_size =
strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3;

HG_TEST_CHECK_ERROR(
payload_size != sizeof(rpc_handle_t) + expected_string_payload_size,
done, ret, HG_FAULT,
"invalid input payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_handle_t) + expected_string_payload_size);

/* Get input buffer */
ret = HG_Get_input(handle, &in_struct);
Expand Down Expand Up @@ -210,6 +219,11 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle)
HG_TEST_CHECK_HG_ERROR(
done, ret, "HG_Respond() failed (%s)", HG_Error_to_string(ret));

payload_size = HG_Get_output_payload_size(handle);
HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_open_out_t));

done:
ret = HG_Destroy(handle);
HG_TEST_CHECK_ERROR_DONE(
Expand Down
3 changes: 3 additions & 0 deletions Testing/unit/hg/mercury_unit.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct hg_test_handle_info {
/* Public Macros */
/*****************/

/* Test path */
#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt")

/*********************/
/* Public Prototypes */
/*********************/
Expand Down
30 changes: 27 additions & 3 deletions Testing/unit/hg/test_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
/* Local Macros */
/****************/

/* Test path */
#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt")

/* Wait timeout in ms */
#define HG_TEST_WAIT_TIMEOUT (HG_TEST_TIMEOUT * 1000)

Expand Down Expand Up @@ -178,6 +175,9 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id,
.no_entry = false};
rpc_open_in_t in_struct = {
.handle = rpc_open_handle, .path = HG_TEST_RPC_PATH};
hg_size_t payload_size;
size_t expected_string_payload_size =
strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3;
unsigned int flag;
int rc;

Expand All @@ -204,6 +204,13 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id,
HG_TEST_CHECK_HG_ERROR(
error, ret, "Error in HG callback (%s)", HG_Error_to_string(ret));

payload_size = HG_Get_input_payload_size(handle);
HG_TEST_CHECK_ERROR(
payload_size != sizeof(rpc_handle_t) + expected_string_payload_size,
error, ret, HG_FAULT,
"invalid input payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_handle_t) + expected_string_payload_size);

return HG_SUCCESS;

error:
Expand Down Expand Up @@ -262,12 +269,16 @@ hg_test_rpc_output_cb(const struct hg_cb_info *callback_info)
int rpc_open_event_id;
rpc_open_out_t rpc_open_out_struct;
hg_return_t ret = callback_info->ret;
hg_size_t payload_size = HG_Get_output_payload_size(handle);

if (args->no_entry && ret == HG_NOENTRY)
goto done;

HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)",
HG_Error_to_string(callback_info->ret));
HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_open_out_t));

/* Get output */
ret = HG_Get_output(handle, &rpc_open_out_struct);
Expand Down Expand Up @@ -329,9 +340,16 @@ hg_test_rpc_output_overflow_cb(const struct hg_cb_info *callback_info)
size_t string_len;
# endif
hg_return_t ret = callback_info->ret;
hg_size_t payload_size = HG_Get_output_payload_size(handle);
size_t expected_string_payload_size =
HG_Class_get_output_eager_size(HG_Get_info(handle)->hg_class) * 2 + 3 +
2 * sizeof(uint64_t);

HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)",
HG_Error_to_string(callback_info->ret));
HG_TEST_CHECK_ERROR(payload_size != expected_string_payload_size, done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, expected_string_payload_size);

/* Get output */
ret = HG_Get_output(handle, &out_struct);
Expand Down Expand Up @@ -899,6 +917,12 @@ main(int argc, char *argv[])
hg_test_rpc_null_id_g, hg_test_rpc_no_output_cb, info.request);
HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_no_input() failed (%s)",
HG_Error_to_string(hg_ret));
HG_TEST_CHECK_ERROR(HG_Get_input_payload_size(info.handles[0]) != 0, error,
hg_ret, HG_FAULT, "input payload non null (%" PRId64 ")",
HG_Get_input_payload_size(info.handles[0]));
HG_TEST_CHECK_ERROR(HG_Get_output_payload_size(info.handles[0]) != 0, error,
hg_ret, HG_FAULT, "output payload non null (%" PRId64 ")",
HG_Get_output_payload_size(info.handles[0]));
HG_PASSED();

/* Simple RPC test */
Expand Down
48 changes: 48 additions & 0 deletions src/mercury.c
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,30 @@ HG_Reset(hg_handle_t handle, hg_addr_t addr, hg_id_t id)
return ret;
}

/*---------------------------------------------------------------------------*/
hg_size_t
HG_Get_input_payload_size(hg_handle_t handle)
{
struct hg_private_handle *private_handle =
(struct hg_private_handle *) handle;

HG_CHECK_SUBSYS_ERROR_NORET(
rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle");

if (private_handle->in_extra_buf != NULL)
return private_handle->in_extra_buf_size;
else {
hg_size_t header_size = hg_header_get_size(HG_INPUT),
payload_size =
HG_Core_get_input_payload_size(handle->core_handle);

return (payload_size > header_size) ? payload_size - header_size : 0;
}

error:
return 0;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Get_input(hg_handle_t handle, void *in_struct)
Expand Down Expand Up @@ -1882,6 +1906,30 @@ HG_Free_input(hg_handle_t handle, void *in_struct)
return ret;
}

/*---------------------------------------------------------------------------*/
hg_size_t
HG_Get_output_payload_size(hg_handle_t handle)
{
struct hg_private_handle *private_handle =
(struct hg_private_handle *) handle;

HG_CHECK_SUBSYS_ERROR_NORET(
rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle");

if (private_handle->out_extra_buf != NULL)
return private_handle->out_extra_buf_size;
else {
hg_size_t header_size = hg_header_get_size(HG_OUTPUT),
payload_size =
HG_Core_get_output_payload_size(handle->core_handle);

return (payload_size > header_size) ? payload_size - header_size : 0;
}

error:
return 0;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Get_output(hg_handle_t handle, void *out_struct)
Expand Down
20 changes: 20 additions & 0 deletions src/mercury.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,16 @@ HG_Set_data(hg_handle_t handle, void *data, void (*free_callback)(void *));
static HG_INLINE void *
HG_Get_data(hg_handle_t handle) HG_WARN_UNUSED_RESULT;

/**
* Retrieve input payload size from a given handle.
*
* \param handle [IN] HG handle
*
* \return Non-negative value or zero if no payload or the handle is not valid
*/
HG_PUBLIC hg_size_t
HG_Get_input_payload_size(hg_handle_t handle);

/**
* Get input from handle (requires registration of input proc to deserialize
* parameters). Input must be freed using HG_Free_input().
Expand Down Expand Up @@ -817,6 +827,16 @@ HG_Get_input(hg_handle_t handle, void *in_struct);
HG_PUBLIC hg_return_t
HG_Free_input(hg_handle_t handle, void *in_struct);

/**
* Retrieve output payload size from a given handle.
*
* \param handle [IN] HG handle
*
* \return Non-negative value or zero if no payload or the handle is not valid
*/
HG_PUBLIC hg_size_t
HG_Get_output_payload_size(hg_handle_t handle);

/**
* Get output from handle (requires registration of output proc to deserialize
* parameters). Output must be freed using HG_Free_output().
Expand Down
71 changes: 43 additions & 28 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ struct hg_core_private_handle {
struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */
void *in_buf_storage; /* Storage input buffer */
size_t in_buf_storage_size; /* Storage input buffer size */
size_t in_buf_used; /* Amount of input buffer used */
size_t out_buf_used; /* Amount of output buffer used */
na_tag_t tag; /* Tag used for request and response */
hg_atomic_int32_t ref_count; /* Reference count */
hg_atomic_int32_t no_response_done; /* Reference count to reach for done */
Expand Down Expand Up @@ -3660,8 +3658,8 @@ hg_core_reset(struct hg_core_private_handle *hg_core_handle)
hg_core_handle->tag = 0;
hg_core_handle->cookie = 0;
hg_core_handle->ret = HG_SUCCESS;
hg_core_handle->in_buf_used = 0;
hg_core_handle->out_buf_used = 0;
hg_core_handle->core_handle.in_buf_used = 0;
hg_core_handle->core_handle.out_buf_used = 0;
hg_atomic_init32(
&hg_core_handle->op_expected_count, 1); /* Default (no response) */
HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32,
Expand Down Expand Up @@ -3962,9 +3960,10 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle,
hg_core_handle->core_handle.na_in_header_offset;

/* Set the actual size of the msg that needs to be transmitted */
hg_core_handle->in_buf_used = header_size + payload_size;
hg_core_handle->core_handle.in_buf_used = header_size + payload_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->in_buf_used > hg_core_handle->core_handle.in_buf_size,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error, ret, HG_MSGSIZE, "Exceeding input buffer size");

/* Parse flags */
Expand Down Expand Up @@ -4093,7 +4092,8 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle)
/* Post send (input) */
na_ret = NA_Msg_send_unexpected(hg_core_handle->na_class,
hg_core_handle->na_context, hg_core_send_input_cb, hg_core_handle,
hg_core_handle->core_handle.in_buf, hg_core_handle->in_buf_used,
hg_core_handle->core_handle.in_buf,
hg_core_handle->core_handle.in_buf_used,
hg_core_handle->in_buf_plugin_data, hg_core_handle->na_addr,
hg_core_handle->core_handle.info.context_id, hg_core_handle->tag,
hg_core_handle->na_send_op_id);
Expand Down Expand Up @@ -4164,9 +4164,10 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle,
hg_core_handle->core_handle.na_out_header_offset;

/* Set the actual size of the msg that needs to be transmitted */
hg_core_handle->out_buf_used = header_size + payload_size;
hg_core_handle->core_handle.out_buf_used = header_size + payload_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->out_buf_used > hg_core_handle->core_handle.out_buf_size,
hg_core_handle->core_handle.out_buf_used >
hg_core_handle->core_handle.out_buf_size,
error, ret, HG_MSGSIZE, "Exceeding output buffer size");

/* Parse flags */
Expand Down Expand Up @@ -4328,7 +4329,8 @@ hg_core_respond_na(
/* Post expected send (output) */
na_ret = NA_Msg_send_expected(hg_core_handle->na_class,
hg_core_handle->na_context, hg_core_send_output_cb, hg_core_handle,
hg_core_handle->core_handle.out_buf, hg_core_handle->out_buf_used,
hg_core_handle->core_handle.out_buf,
hg_core_handle->core_handle.out_buf_used,
hg_core_handle->out_buf_plugin_data, hg_core_handle->na_addr,
hg_core_handle->core_handle.info.context_id, hg_core_handle->tag,
hg_core_handle->na_send_op_id);
Expand Down Expand Up @@ -4474,19 +4476,19 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_recv_unexpected->tag;
hg_core_handle->in_buf_used =
hg_core_handle->core_handle.in_buf_used =
na_cb_info_recv_unexpected->actual_buf_size;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

/* Process input information */
ret = hg_core_process_input(hg_core_handle);
Expand Down Expand Up @@ -4582,24 +4584,25 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag;
hg_core_handle->in_buf_used =
hg_core_handle->core_handle.in_buf_used =
na_cb_info_multi_recv_unexpected->actual_buf_size;

/* Either copy the buffer to release early or point to the actual
* multi-recv buffer space to save a memcpy */
if (hg_core_handle->multi_recv_copy) {
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->in_buf_storage_size,
error,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);
HG_LOG_SUBSYS_DEBUG(rpc,
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;
hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;
Expand All @@ -4611,17 +4614,18 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
} else {
HG_LOG_SUBSYS_DEBUG(rpc,
"Using direct multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used;
hg_core_handle->core_handle.in_buf_used;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;
}

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

/* Process input information */
ret = hg_core_process_input(hg_core_handle);
Expand Down Expand Up @@ -4774,8 +4778,19 @@ hg_core_recv_output_cb(const struct na_cb_info *callback_info)
hg_return_t ret;

if (callback_info->ret == NA_SUCCESS) {
HG_LOG_SUBSYS_DEBUG(rpc, "Processing output for handle %p, tag=%u",
(void *) hg_core_handle, hg_core_handle->tag);
hg_core_handle->core_handle.out_buf_used =
callback_info->info.recv_expected.actual_buf_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.out_buf_used >
hg_core_handle->core_handle.out_buf_size,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for expected recv",
hg_core_handle->core_handle.out_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing output for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->core_handle.out_buf_used);

/* Process output information */
ret = hg_core_process_output(hg_core_handle, hg_core_send_ack);
Expand Down
Loading

0 comments on commit 2635745

Please sign in to comment.