Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HG: add HG_Get_input/output_payload_size() (fix #751) #753

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Testing/unit/hg/mercury_rpc_cb.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle)
int event_id;
int open_ret;
hg_return_t ret = HG_SUCCESS;
hg_size_t payload_size = HG_Get_input_payload_size(handle);
size_t expected_string_payload_size =
strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3;

HG_TEST_CHECK_ERROR(
payload_size != sizeof(rpc_handle_t) + expected_string_payload_size,
done, ret, HG_FAULT,
"invalid input payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_handle_t) + expected_string_payload_size);

/* Get input buffer */
ret = HG_Get_input(handle, &in_struct);
Expand Down Expand Up @@ -210,6 +219,11 @@ HG_TEST_RPC_CB(hg_test_rpc_open, handle)
HG_TEST_CHECK_HG_ERROR(
done, ret, "HG_Respond() failed (%s)", HG_Error_to_string(ret));

payload_size = HG_Get_output_payload_size(handle);
HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_open_out_t));

done:
ret = HG_Destroy(handle);
HG_TEST_CHECK_ERROR_DONE(
Expand Down
3 changes: 3 additions & 0 deletions Testing/unit/hg/mercury_unit.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct hg_test_handle_info {
/* Public Macros */
/*****************/

/* Test path */
#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt")

/*********************/
/* Public Prototypes */
/*********************/
Expand Down
30 changes: 27 additions & 3 deletions Testing/unit/hg/test_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
/* Local Macros */
/****************/

/* Test path */
#define HG_TEST_RPC_PATH (HG_TEST_TEMP_DIRECTORY "/test.txt")

/* Wait timeout in ms */
#define HG_TEST_WAIT_TIMEOUT (HG_TEST_TIMEOUT * 1000)

Expand Down Expand Up @@ -178,6 +175,9 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id,
.no_entry = false};
rpc_open_in_t in_struct = {
.handle = rpc_open_handle, .path = HG_TEST_RPC_PATH};
hg_size_t payload_size;
size_t expected_string_payload_size =
strlen(HG_TEST_RPC_PATH) + sizeof(uint64_t) + 3;
unsigned int flag;
int rc;

Expand All @@ -204,6 +204,13 @@ hg_test_rpc_input(hg_handle_t handle, hg_addr_t addr, hg_id_t rpc_id,
HG_TEST_CHECK_HG_ERROR(
error, ret, "Error in HG callback (%s)", HG_Error_to_string(ret));

payload_size = HG_Get_input_payload_size(handle);
HG_TEST_CHECK_ERROR(
payload_size != sizeof(rpc_handle_t) + expected_string_payload_size,
error, ret, HG_FAULT,
"invalid input payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_handle_t) + expected_string_payload_size);

return HG_SUCCESS;

error:
Expand Down Expand Up @@ -262,12 +269,16 @@ hg_test_rpc_output_cb(const struct hg_cb_info *callback_info)
int rpc_open_event_id;
rpc_open_out_t rpc_open_out_struct;
hg_return_t ret = callback_info->ret;
hg_size_t payload_size = HG_Get_output_payload_size(handle);

if (args->no_entry && ret == HG_NOENTRY)
goto done;

HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)",
HG_Error_to_string(callback_info->ret));
HG_TEST_CHECK_ERROR(payload_size != sizeof(rpc_open_out_t), done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, sizeof(rpc_open_out_t));

/* Get output */
ret = HG_Get_output(handle, &rpc_open_out_struct);
Expand Down Expand Up @@ -329,9 +340,16 @@ hg_test_rpc_output_overflow_cb(const struct hg_cb_info *callback_info)
size_t string_len;
# endif
hg_return_t ret = callback_info->ret;
hg_size_t payload_size = HG_Get_output_payload_size(handle);
size_t expected_string_payload_size =
HG_Class_get_output_eager_size(HG_Get_info(handle)->hg_class) * 2 + 3 +
2 * sizeof(uint64_t);

HG_TEST_CHECK_HG_ERROR(done, ret, "Error in HG callback (%s)",
HG_Error_to_string(callback_info->ret));
HG_TEST_CHECK_ERROR(payload_size != expected_string_payload_size, done, ret,
HG_FAULT, "invalid output payload size (%" PRId64 "), expected (%zu)",
payload_size, expected_string_payload_size);

/* Get output */
ret = HG_Get_output(handle, &out_struct);
Expand Down Expand Up @@ -899,6 +917,12 @@ main(int argc, char *argv[])
hg_test_rpc_null_id_g, hg_test_rpc_no_output_cb, info.request);
HG_TEST_CHECK_HG_ERROR(error, hg_ret, "hg_test_rpc_no_input() failed (%s)",
HG_Error_to_string(hg_ret));
HG_TEST_CHECK_ERROR(HG_Get_input_payload_size(info.handles[0]) != 0, error,
hg_ret, HG_FAULT, "input payload non null (%" PRId64 ")",
HG_Get_input_payload_size(info.handles[0]));
HG_TEST_CHECK_ERROR(HG_Get_output_payload_size(info.handles[0]) != 0, error,
hg_ret, HG_FAULT, "output payload non null (%" PRId64 ")",
HG_Get_output_payload_size(info.handles[0]));
HG_PASSED();

/* Simple RPC test */
Expand Down
48 changes: 48 additions & 0 deletions src/mercury.c
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,30 @@ HG_Reset(hg_handle_t handle, hg_addr_t addr, hg_id_t id)
return ret;
}

/*---------------------------------------------------------------------------*/
hg_size_t
HG_Get_input_payload_size(hg_handle_t handle)
{
struct hg_private_handle *private_handle =
(struct hg_private_handle *) handle;

HG_CHECK_SUBSYS_ERROR_NORET(
rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle");

if (private_handle->in_extra_buf != NULL)
return private_handle->in_extra_buf_size;
else {
hg_size_t header_size = hg_header_get_size(HG_INPUT),
payload_size =
HG_Core_get_input_payload_size(handle->core_handle);

return (payload_size > header_size) ? payload_size - header_size : 0;
}

error:
return 0;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Get_input(hg_handle_t handle, void *in_struct)
Expand Down Expand Up @@ -1882,6 +1906,30 @@ HG_Free_input(hg_handle_t handle, void *in_struct)
return ret;
}

/*---------------------------------------------------------------------------*/
hg_size_t
HG_Get_output_payload_size(hg_handle_t handle)
{
struct hg_private_handle *private_handle =
(struct hg_private_handle *) handle;

HG_CHECK_SUBSYS_ERROR_NORET(
rpc, handle == HG_HANDLE_NULL, error, "NULL HG handle");

if (private_handle->out_extra_buf != NULL)
return private_handle->out_extra_buf_size;
else {
hg_size_t header_size = hg_header_get_size(HG_OUTPUT),
payload_size =
HG_Core_get_output_payload_size(handle->core_handle);

return (payload_size > header_size) ? payload_size - header_size : 0;
}

error:
return 0;
}

/*---------------------------------------------------------------------------*/
hg_return_t
HG_Get_output(hg_handle_t handle, void *out_struct)
Expand Down
20 changes: 20 additions & 0 deletions src/mercury.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,16 @@ HG_Set_data(hg_handle_t handle, void *data, void (*free_callback)(void *));
static HG_INLINE void *
HG_Get_data(hg_handle_t handle) HG_WARN_UNUSED_RESULT;

/**
* Retrieve input payload size from a given handle.
*
* \param handle [IN] HG handle
*
* \return Non-negative value or zero if no payload or the handle is not valid
*/
HG_PUBLIC hg_size_t
HG_Get_input_payload_size(hg_handle_t handle);

/**
* Get input from handle (requires registration of input proc to deserialize
* parameters). Input must be freed using HG_Free_input().
Expand Down Expand Up @@ -817,6 +827,16 @@ HG_Get_input(hg_handle_t handle, void *in_struct);
HG_PUBLIC hg_return_t
HG_Free_input(hg_handle_t handle, void *in_struct);

/**
* Retrieve output payload size from a given handle.
*
* \param handle [IN] HG handle
*
* \return Non-negative value or zero if no payload or the handle is not valid
*/
HG_PUBLIC hg_size_t
HG_Get_output_payload_size(hg_handle_t handle);

/**
* Get output from handle (requires registration of output proc to deserialize
* parameters). Output must be freed using HG_Free_output().
Expand Down
71 changes: 43 additions & 28 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ struct hg_core_private_handle {
struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */
void *in_buf_storage; /* Storage input buffer */
size_t in_buf_storage_size; /* Storage input buffer size */
size_t in_buf_used; /* Amount of input buffer used */
size_t out_buf_used; /* Amount of output buffer used */
na_tag_t tag; /* Tag used for request and response */
hg_atomic_int32_t ref_count; /* Reference count */
hg_atomic_int32_t no_response_done; /* Reference count to reach for done */
Expand Down Expand Up @@ -3660,8 +3658,8 @@ hg_core_reset(struct hg_core_private_handle *hg_core_handle)
hg_core_handle->tag = 0;
hg_core_handle->cookie = 0;
hg_core_handle->ret = HG_SUCCESS;
hg_core_handle->in_buf_used = 0;
hg_core_handle->out_buf_used = 0;
hg_core_handle->core_handle.in_buf_used = 0;
hg_core_handle->core_handle.out_buf_used = 0;
hg_atomic_init32(
&hg_core_handle->op_expected_count, 1); /* Default (no response) */
HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32,
Expand Down Expand Up @@ -3962,9 +3960,10 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle,
hg_core_handle->core_handle.na_in_header_offset;

/* Set the actual size of the msg that needs to be transmitted */
hg_core_handle->in_buf_used = header_size + payload_size;
hg_core_handle->core_handle.in_buf_used = header_size + payload_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->in_buf_used > hg_core_handle->core_handle.in_buf_size,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error, ret, HG_MSGSIZE, "Exceeding input buffer size");

/* Parse flags */
Expand Down Expand Up @@ -4093,7 +4092,8 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle)
/* Post send (input) */
na_ret = NA_Msg_send_unexpected(hg_core_handle->na_class,
hg_core_handle->na_context, hg_core_send_input_cb, hg_core_handle,
hg_core_handle->core_handle.in_buf, hg_core_handle->in_buf_used,
hg_core_handle->core_handle.in_buf,
hg_core_handle->core_handle.in_buf_used,
hg_core_handle->in_buf_plugin_data, hg_core_handle->na_addr,
hg_core_handle->core_handle.info.context_id, hg_core_handle->tag,
hg_core_handle->na_send_op_id);
Expand Down Expand Up @@ -4164,9 +4164,10 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle,
hg_core_handle->core_handle.na_out_header_offset;

/* Set the actual size of the msg that needs to be transmitted */
hg_core_handle->out_buf_used = header_size + payload_size;
hg_core_handle->core_handle.out_buf_used = header_size + payload_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->out_buf_used > hg_core_handle->core_handle.out_buf_size,
hg_core_handle->core_handle.out_buf_used >
hg_core_handle->core_handle.out_buf_size,
error, ret, HG_MSGSIZE, "Exceeding output buffer size");

/* Parse flags */
Expand Down Expand Up @@ -4328,7 +4329,8 @@ hg_core_respond_na(
/* Post expected send (output) */
na_ret = NA_Msg_send_expected(hg_core_handle->na_class,
hg_core_handle->na_context, hg_core_send_output_cb, hg_core_handle,
hg_core_handle->core_handle.out_buf, hg_core_handle->out_buf_used,
hg_core_handle->core_handle.out_buf,
hg_core_handle->core_handle.out_buf_used,
hg_core_handle->out_buf_plugin_data, hg_core_handle->na_addr,
hg_core_handle->core_handle.info.context_id, hg_core_handle->tag,
hg_core_handle->na_send_op_id);
Expand Down Expand Up @@ -4474,19 +4476,19 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_recv_unexpected->tag;
hg_core_handle->in_buf_used =
hg_core_handle->core_handle.in_buf_used =
na_cb_info_recv_unexpected->actual_buf_size;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

/* Process input information */
ret = hg_core_process_input(hg_core_handle);
Expand Down Expand Up @@ -4582,24 +4584,25 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag;
hg_core_handle->in_buf_used =
hg_core_handle->core_handle.in_buf_used =
na_cb_info_multi_recv_unexpected->actual_buf_size;

/* Either copy the buffer to release early or point to the actual
* multi-recv buffer space to save a memcpy */
if (hg_core_handle->multi_recv_copy) {
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.in_buf_used >
hg_core_handle->in_buf_storage_size,
error,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);
HG_LOG_SUBSYS_DEBUG(rpc,
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;
hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;
Expand All @@ -4611,17 +4614,18 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
} else {
HG_LOG_SUBSYS_DEBUG(rpc,
"Using direct multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_used,
(void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used;
hg_core_handle->core_handle.in_buf_used;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;
}

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_used);

/* Process input information */
ret = hg_core_process_input(hg_core_handle);
Expand Down Expand Up @@ -4774,8 +4778,19 @@ hg_core_recv_output_cb(const struct na_cb_info *callback_info)
hg_return_t ret;

if (callback_info->ret == NA_SUCCESS) {
HG_LOG_SUBSYS_DEBUG(rpc, "Processing output for handle %p, tag=%u",
(void *) hg_core_handle, hg_core_handle->tag);
hg_core_handle->core_handle.out_buf_used =
callback_info->info.recv_expected.actual_buf_size;
HG_CHECK_SUBSYS_ERROR(rpc,
hg_core_handle->core_handle.out_buf_used >
hg_core_handle->core_handle.out_buf_size,
error, ret, HG_OVERFLOW,
"Actual transfer size (%zu) is too large for expected recv",
hg_core_handle->core_handle.out_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing output for handle %p, tag=%u, buf_size=%zu",
(void *) hg_core_handle, hg_core_handle->tag,
hg_core_handle->core_handle.out_buf_used);

/* Process output information */
ret = hg_core_process_output(hg_core_handle, hg_core_send_ack);
Expand Down
Loading
Loading