Skip to content

Commit

Permalink
HG Core: add copy_on_multi_recv and multi_recv_copy_threshold
Browse files Browse the repository at this point in the history
Use these two new parameters to fallback to memcpy to prevent
starvation of multi-recv buffers and potential deadlock situations.
  • Loading branch information
soumagne committed Jul 25, 2024
1 parent 7d34ab7 commit 7a7fd81
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 32 deletions.
140 changes: 109 additions & 31 deletions src/mercury_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@
#endif

/* Handle flags */
#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */
#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */
#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */
#define HG_CORE_HANDLE_LISTEN (1 << 1) /* Listener handle */
#define HG_CORE_HANDLE_MULTI_RECV (1 << 2) /* Handle used for multi-recv */
#define HG_CORE_HANDLE_USER (1 << 3) /* User-created handle */
#define HG_CORE_HANDLE_MULTI_RECV_COPY (1 << 4) /* Copy on multi-recv */

/* Op status bits */
#define HG_CORE_OP_COMPLETED (1 << 0) /* Operation completed */
Expand Down Expand Up @@ -136,12 +137,14 @@ struct hg_core_init_info {
uint32_t request_post_init; /* Init request count */
uint32_t request_post_incr; /* Increment request count */
uint32_t multi_recv_op_max; /* Multi-recv op max */
uint32_t multi_recv_copy_threshold; /* Copy threshold */
hg_checksum_level_t checksum_level; /* Checksum level */
uint8_t progress_mode; /* Progress mode */
bool loopback; /* Use loopback capability */
bool na_ext_init; /* NA externally initialized */
bool multi_recv; /* Use multi-recv capability */
bool listen; /* Listening on incoming RPC requests */
bool copy_on_multi_recv; /* Copy data on multi-recv */
};

/* RPC map */
Expand Down Expand Up @@ -344,6 +347,8 @@ struct hg_core_private_handle {
na_op_id_t *na_recv_op_id; /* Operation ID for recv */
na_op_id_t *na_ack_op_id; /* Operation ID for ack */
struct hg_core_multi_recv_op *multi_recv_op; /* Multi-recv operation */
void *in_buf_storage; /* Storage input buffer */
size_t in_buf_storage_size; /* Storage input buffer size */
size_t in_buf_used; /* Amount of input buffer used */
size_t out_buf_used; /* Amount of output buffer used */
na_tag_t tag; /* Tag used for request and response */
Expand All @@ -358,6 +363,7 @@ struct hg_core_private_handle {
enum hg_core_op_type op_type; /* Core operation type */
hg_return_t ret; /* Return code associated to handle */
uint8_t cookie; /* Cookie */
bool multi_recv_copy; /* Copy on multi-recv */
bool reuse; /* Re-use handle once ref_count is 0 */
};

Expand Down Expand Up @@ -1239,9 +1245,25 @@ hg_core_init(const char *na_info_string, bool na_listen, unsigned int version,
hg_core_class->init_info.multi_recv_op_max =
(hg_init_info.multi_recv_op_max == 0) ? HG_CORE_MULTI_RECV_OP_COUNT
: hg_init_info.multi_recv_op_max;
hg_core_class->init_info.copy_on_multi_recv =
hg_init_info.copy_on_multi_recv;
if (hg_core_class->init_info.copy_on_multi_recv) {
HG_CHECK_SUBSYS_ERROR(cls,
hg_init_info.multi_recv_copy_threshold >
(unsigned int) hg_core_class->init_info.multi_recv_op_max,
error, ret, HG_INVALID_ARG,
"multi_recv_copy_threshold (%u) cannot exceed multi_recv_op_max "
"(%u)",
hg_init_info.multi_recv_copy_threshold,
(unsigned int) hg_core_class->init_info.multi_recv_op_max);
hg_core_class->init_info.multi_recv_copy_threshold =
hg_init_info.multi_recv_copy_threshold;
} else
hg_core_class->init_info.multi_recv_copy_threshold =
hg_core_class->init_info.multi_recv_op_max; /* never copy */

/* Save checksum level */
#ifdef HG_HAS_CHECKSUMS
/* Save checksum level */
hg_core_class->init_info.checksum_level = hg_init_info.checksum_level;
#else
HG_CHECK_SUBSYS_WARNING(cls,
Expand Down Expand Up @@ -1885,6 +1907,8 @@ hg_core_context_post(struct hg_core_private_context *context)
HG_CHECK_SUBSYS_HG_ERROR(
ctx, error, ret, "Could not allocate multi-recv resources");
flags |= HG_CORE_HANDLE_MULTI_RECV;
if (hg_core_class->init_info.copy_on_multi_recv)
flags |= HG_CORE_HANDLE_MULTI_RECV_COPY;
}

/* Create pool of handles */
Expand All @@ -1900,7 +1924,7 @@ hg_core_context_post(struct hg_core_private_context *context)
if (context->core_context.na_sm_context != NULL) {
ret = hg_core_handle_pool_create(context,
hg_core_class->core_class.na_sm_class,
context->core_context.na_sm_context, flags,
context->core_context.na_sm_context, HG_CORE_HANDLE_LISTEN,
hg_core_class->init_info.request_post_init,
hg_core_class->init_info.request_post_incr,
&context->sm_handle_pool);
Expand Down Expand Up @@ -3490,23 +3514,37 @@ hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle,
/* When using multi-recv, only allocate resources per handle for expected
* messages. */
if (flags & HG_CORE_HANDLE_MULTI_RECV) {
if (flags & HG_CORE_HANDLE_MULTI_RECV_COPY) {
hg_core_handle->in_buf_storage_size =
NA_Msg_get_max_unexpected_size(na_class);

hg_core_handle->in_buf_storage =
NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size,
NA_RECV, &hg_core_handle->in_buf_plugin_data);
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL,
error, ret, HG_NOMEM, "Could not allocate buffer for input");
}
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
} else {
/* Initialize in/out buffers and use unexpected message size */
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size =
NA_Msg_get_max_unexpected_size(na_class);

hg_core_handle->core_handle.in_buf =
NA_Msg_buf_alloc(na_class, hg_core_handle->core_handle.in_buf_size,
hg_core_handle->in_buf_storage =
NA_Msg_buf_alloc(na_class, hg_core_handle->in_buf_storage_size,
(flags & HG_CORE_HANDLE_LISTEN) ? NA_RECV : NA_SEND,
&hg_core_handle->in_buf_plugin_data);
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->core_handle.in_buf == NULL,
HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->in_buf_storage == NULL,
error, ret, HG_NOMEM, "Could not allocate buffer for input");

hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;

na_ret =
NA_Msg_init_unexpected(na_class, hg_core_handle->core_handle.in_buf,
hg_core_handle->core_handle.in_buf_size);
NA_Msg_init_unexpected(na_class, hg_core_handle->in_buf_storage,
hg_core_handle->in_buf_storage_size);
HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error, ret,
(hg_return_t) na_ret, "Could not initialize input buffer (%s)",
NA_Error_to_string(na_ret));
Expand Down Expand Up @@ -3575,16 +3613,14 @@ hg_core_free_na(struct hg_core_private_handle *hg_core_handle)
hg_core_handle->na_ack_op_id = NULL;

/* Free buffers */
if (hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) {
if (hg_core_handle->multi_recv_op != NULL) {
hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count);
hg_core_handle->multi_recv_op = NULL;
}
} else {
NA_Msg_buf_free(hg_core_handle->na_class,
hg_core_handle->core_handle.in_buf,
hg_core_handle->in_buf_plugin_data);
if ((hg_atomic_get32(&hg_core_handle->status) & HG_CORE_OP_MULTI_RECV) &&
(hg_core_handle->multi_recv_op != NULL)) {
hg_atomic_decr32(&hg_core_handle->multi_recv_op->ref_count);
hg_core_handle->multi_recv_op = NULL;
}
NA_Msg_buf_free(hg_core_handle->na_class, hg_core_handle->in_buf_storage,
hg_core_handle->in_buf_plugin_data);
hg_core_handle->in_buf_storage = NULL;
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->in_buf_plugin_data = NULL;

Expand Down Expand Up @@ -3673,7 +3709,7 @@ hg_core_reset_post(struct hg_core_private_handle *hg_core_handle)
hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) hg_core_handle->ret);

/* Multi-recv buffers */
if (use_multi_recv && multi_recv_op != NULL) {
if (use_multi_recv) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
hg_core_handle->multi_recv_op = NULL;
Expand Down Expand Up @@ -3860,8 +3896,10 @@ hg_core_release_input(struct hg_core_private_handle *hg_core_handle)

/* Multi-recv buffers */
if (multi_recv_op != NULL) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
if (!hg_core_handle->multi_recv_copy) {
hg_core_handle->core_handle.in_buf = NULL;
hg_core_handle->core_handle.in_buf_size = 0;
}
hg_core_handle->multi_recv_op = NULL;

if (hg_atomic_decr32(&multi_recv_op->ref_count) == 0 &&
Expand Down Expand Up @@ -4436,12 +4474,14 @@ hg_core_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_recv_unexpected->tag;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
na_cb_info_recv_unexpected->actual_buf_size >
hg_core_handle->core_handle.in_buf_size,
error, "Actual transfer size is too large for unexpected recv");
hg_core_handle->in_buf_used =
na_cb_info_recv_unexpected->actual_buf_size;
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
hg_core_handle->core_handle.in_buf_size,
error,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
Expand Down Expand Up @@ -4503,6 +4543,10 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_return_t ret;

if (callback_info->ret == NA_SUCCESS) {
unsigned int copy_threshold =
HG_CORE_CONTEXT_CLASS(context)->init_info.multi_recv_op_max -
HG_CORE_CONTEXT_CLASS(context)->init_info.multi_recv_copy_threshold;

/* Get a new handle from the pool */
ret = hg_core_handle_pool_get(context->handle_pool, &hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(
Expand All @@ -4512,6 +4556,10 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_MULTI_RECV);
/* Prevent from reposting multi-recv buffer until done with handle */
hg_atomic_incr32(&multi_recv_op->ref_count);
hg_core_handle->multi_recv_copy =
(HG_CORE_CONTEXT_CLASS(context)->init_info.copy_on_multi_recv &&
(hg_atomic_get32(&context->multi_recv_op_count) <=
(int32_t) copy_threshold));

if (na_cb_info_multi_recv_unexpected->last) {
HG_LOG_SUBSYS_DEBUG(rpc,
Expand Down Expand Up @@ -4539,11 +4587,41 @@ hg_core_multi_recv_input_cb(const struct na_cb_info *callback_info)
hg_core_handle->core_handle.info.addr->na_addr =
hg_core_handle->na_addr;
hg_core_handle->tag = na_cb_info_multi_recv_unexpected->tag;
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used =
na_cb_info_multi_recv_unexpected->actual_buf_size;
hg_core_handle->in_buf_used = hg_core_handle->core_handle.in_buf_size;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;

/* Either copy the buffer to release early or point to the actual
* multi-recv buffer space to save a memcpy */
if (hg_core_handle->multi_recv_copy) {
HG_CHECK_SUBSYS_ERROR_NORET(rpc,
hg_core_handle->in_buf_used >
hg_core_handle->in_buf_storage_size,
error,
"Actual transfer size (%zu) is too large for unexpected recv",
hg_core_handle->in_buf_used);
HG_LOG_SUBSYS_DEBUG(rpc,
"Copying multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
memcpy(hg_core_handle->in_buf_storage,
na_cb_info_multi_recv_unexpected->actual_buf,
hg_core_handle->in_buf_used);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_storage_size;
hg_core_handle->core_handle.in_buf = hg_core_handle->in_buf_storage;

ret = hg_core_release_input(hg_core_handle);
HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret,
"Could not release input for handle (%p)",
(void *) hg_core_handle);
} else {
HG_LOG_SUBSYS_DEBUG(rpc,
"Using direct multi-recv payload of size %zu for handle (%p)",
hg_core_handle->in_buf_used, (void *) hg_core_handle);
hg_core_handle->core_handle.in_buf_size =
hg_core_handle->in_buf_used;
hg_core_handle->core_handle.in_buf =
na_cb_info_multi_recv_unexpected->actual_buf;
}

HG_LOG_SUBSYS_DEBUG(rpc,
"Processing input for handle %p, tag=%u, buf_size=%zu",
Expand Down
17 changes: 16 additions & 1 deletion src/mercury_core_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ struct hg_init_info {
* existing buffers from being reposted.
* Default value is: 4 */
unsigned int multi_recv_op_max;

/* Force copy when receiving RPCs and multi-recv is being used (in an effort
* to release multi-recv buffers as soon as possible). This is particularly
* useful in large scale situations where multi-recv buffers tend to be
* quickly exhausted.
* Default value is: false */
bool copy_on_multi_recv;

/* When using copy_on_multi_recv, controls when we should start copying data
* in an effort to release multi-recv buffers. Copy will start when
* multi_recv_copy_threshold buffers have been consumed. Value should not
* exceed multi_recv_op_max.
* Default value is: 0 (always copy) */
unsigned int multi_recv_copy_threshold;
};

/* Error return codes:
Expand Down Expand Up @@ -204,7 +218,8 @@ typedef enum {
.sm_info_string = NULL, .checksum_level = HG_CHECKSUM_NONE, \
.no_bulk_eager = false, .no_loopback = false, .stats = false, \
.no_multi_recv = false, .release_input_early = false, \
.no_overflow = false, .multi_recv_op_max = 0 \
.no_overflow = false, .multi_recv_op_max = 0, \
.copy_on_multi_recv = false, .multi_recv_copy_threshold = 0 \
}

#endif /* MERCURY_CORE_TYPES_H */

0 comments on commit 7a7fd81

Please sign in to comment.