diff --git a/src/mercury.c b/src/mercury.c index feba0262..41a4b107 100644 --- a/src/mercury.c +++ b/src/mercury.c @@ -57,7 +57,6 @@ struct hg_proc_info { hg_proc_cb_t out_proc_cb; /* Output proc callback */ void *data; /* User data */ void (*free_callback)(void *); /* User data free callback */ - hg_bool_t no_response; /* RPC response not expected */ }; /* HG handle */ @@ -480,11 +479,6 @@ hg_get_struct(struct hg_private_handle *hg_handle, extra_buf_size = hg_handle->in_extra_buf_size; break; case HG_OUTPUT: - /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_proc_info->no_response, error, ret, - HG_OPNOTSUPPORTED, - "No output was produced on that RPC (no response)"); - /* Use custom header offset */ header_offset += hg_handle->handle.info.hg_class->out_offset; /* Set output proc */ @@ -612,11 +606,6 @@ hg_set_struct(struct hg_private_handle *hg_handle, extra_bulk = &hg_handle->in_extra_bulk; break; case HG_OUTPUT: - /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_proc_info->no_response, error, ret, - HG_OPNOTSUPPORTED, - "No output was produced on that RPC (no response)"); - /* Use custom header offset */ header_offset += hg_handle->handle.info.hg_class->out_offset; /* Set output proc */ @@ -1520,21 +1509,13 @@ hg_return_t HG_Registered_disable_response( hg_class_t *hg_class, hg_id_t id, hg_bool_t disable) { - struct hg_proc_info *hg_proc_info = NULL; hg_return_t ret; HG_CHECK_SUBSYS_ERROR( cls, hg_class == NULL, error, ret, HG_INVALID_ARG, "NULL HG class"); - /* Retrieve proc function from function map */ - hg_proc_info = (struct hg_proc_info *) HG_Core_registered_data( - hg_class->core_class, id); - HG_CHECK_SUBSYS_ERROR(cls, hg_proc_info == NULL, error, ret, HG_NOENTRY, - "Could not get registered data for RPC ID %" PRIu64, id); - - hg_proc_info->no_response = disable; - - return HG_SUCCESS; + return HG_Core_registered_disable_response( + hg_class->core_class, id, disable); error: return ret; @@ -1545,23 +1526,13 @@ hg_return_t HG_Registered_disabled_response( hg_class_t *hg_class, hg_id_t id, hg_bool_t *disabled_p) { - struct hg_proc_info *hg_proc_info = NULL; hg_return_t ret; HG_CHECK_SUBSYS_ERROR( cls, hg_class == NULL, error, ret, HG_INVALID_ARG, "NULL HG class"); - HG_CHECK_SUBSYS_ERROR(cls, disabled_p == NULL, error, ret, HG_INVALID_ARG, - "NULL pointer to disabled flag"); - - /* Retrieve proc function from function map */ - hg_proc_info = (struct hg_proc_info *) HG_Core_registered_data( - hg_class->core_class, id); - HG_CHECK_SUBSYS_ERROR(cls, hg_proc_info == NULL, error, ret, HG_NOENTRY, - "Could not get registered data for RPC ID %" PRIu64, id); - *disabled_p = hg_proc_info->no_response; - - return HG_SUCCESS; + return HG_Core_registered_disabled_response( + hg_class->core_class, id, disabled_p); error: return ret; @@ -2103,10 +2074,6 @@ HG_Forward(hg_handle_t handle, hg_cb_t callback, void *arg, void *in_struct) if (more_data) flags |= HG_CORE_MORE_DATA; - /* Set no response flag if no response required */ - if (hg_proc_info->no_response) - flags |= HG_CORE_NO_RESPONSE; - /* Send request */ ret = HG_Core_forward( handle->core_handle, hg_core_forward_cb, handle, flags, payload_size); diff --git a/src/mercury_core.c b/src/mercury_core.c index c31db7a8..22bc199f 100644 --- a/src/mercury_core.c +++ b/src/mercury_core.c @@ -36,7 +36,8 @@ /****************/ /* Private flags */ -#define HG_CORE_SELF_FORWARD (1 << 3) /* Forward to self */ +#define HG_CORE_NO_RESPONSE (1 << 1) /* No response required */ +#define HG_CORE_SELF_FORWARD (1 << 2) /* Forward to self */ /* Size of comletion queue used for holding completed requests */ #define HG_CORE_ATOMIC_QUEUE_SIZE (1024) @@ -189,13 +190,13 @@ struct hg_core_private_class { }; /* Poll type */ -typedef enum hg_core_poll_type { +enum hg_core_poll_type { HG_CORE_POLL_LOOPBACK = 1, #ifdef NA_HAS_SM HG_CORE_POLL_SM, #endif HG_CORE_POLL_NA -} hg_core_poll_type_t; +}; /* Completion queue */ struct hg_core_completion_queue { @@ -288,14 +289,6 @@ struct hg_core_private_context { hg_bool_t posted; /* Posted receives on context */ }; -/* Info for wrapping callbacks if self addr */ -struct hg_core_self_cb_info { - hg_core_cb_t forward_cb; - void *forward_arg; - hg_core_cb_t respond_cb; - void *respond_arg; -}; - /* HG addr */ struct hg_core_private_addr { struct hg_core_addr core_addr; /* Must remain as first field */ @@ -308,23 +301,22 @@ struct hg_core_private_addr { }; /* HG core op type */ -typedef enum { - HG_CORE_FORWARD, /*!< Forward completion */ - HG_CORE_RESPOND, /*!< Respond completion */ - HG_CORE_NO_RESPOND, /*!< No response completion */ - HG_CORE_FORWARD_SELF, /*!< Self forward completion */ - HG_CORE_RESPOND_SELF, /*!< Self respond completion */ - HG_CORE_PROCESS /*!< Process completion */ -} hg_core_op_type_t; +#define HG_CORE_OP_TYPE \ + X(HG_CORE_FORWARD) /*!< Forward completion */ \ + X(HG_CORE_RESPOND) /*!< Respond completion */ \ + X(HG_CORE_PROCESS) /*!< Process completion */ + +#define X(a) a, +enum hg_core_op_type { HG_CORE_OP_TYPE }; +#undef X /* HG core operations */ struct hg_core_ops { hg_return_t (*forward)( struct hg_core_private_handle *hg_core_handle); /* forward */ - hg_return_t (*respond)( - struct hg_core_private_handle *hg_core_handle); /* respond */ - hg_return_t (*no_respond)( - struct hg_core_private_handle *hg_core_handle); /* no_respond */ + hg_return_t (*respond)(struct hg_core_private_handle *hg_core_handle, + hg_return_t ret_code); /* respond */ + void (*trigger)(struct hg_core_private_handle *hg_core_handle); /* trigger*/ }; /* HG core handle */ @@ -361,13 +353,12 @@ struct hg_core_private_handle { hg_atomic_int32_t ret_status; /* Handle return status */ hg_atomic_int32_t op_completed_count; /* Completed operation count */ hg_atomic_int32_t - op_expected_count; /* Expected operation count for completion */ - hg_core_op_type_t op_type; /* Core operation type */ - hg_return_t ret; /* Return code associated to handle */ - hg_uint8_t cookie; /* Cookie */ - hg_bool_t reuse; /* Re-use handle once ref_count is 0 */ - hg_bool_t is_self; /* Self processed */ - hg_bool_t no_response; /* Require response or not */ + op_expected_count; /* Expected operation count for completion */ + hg_atomic_int32_t flags; /* Flags */ + enum hg_core_op_type op_type; /* Core operation type */ + hg_return_t ret; /* Return code associated to handle */ + hg_uint8_t cookie; /* Cookie */ + hg_bool_t reuse; /* Re-use handle once ref_count is 0 */ }; /* HG op id */ @@ -390,6 +381,14 @@ struct hg_core_op_id { /* Local Prototypes */ /********************/ +/** + * Convert op_type to string. + */ +#ifdef HG_HAS_DEBUG +static const char * +hg_core_op_type_to_string(enum hg_core_op_type op_type); +#endif + /** * Init counters. */ @@ -784,25 +783,21 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, * Send response locally. */ static HG_INLINE hg_return_t -hg_core_respond_self(struct hg_core_private_handle *hg_core_handle); +hg_core_respond_self( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code); /** * Do not send response locally. */ -static HG_INLINE hg_return_t +static HG_INLINE void hg_core_no_respond_self(struct hg_core_private_handle *hg_core_handle); /** * Send response through NA. */ static hg_return_t -hg_core_respond_na(struct hg_core_private_handle *hg_core_handle); - -/** - * Do not send response through NA. - */ -static HG_INLINE hg_return_t -hg_core_no_respond_na(struct hg_core_private_handle *hg_core_handle); +hg_core_respond_na( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code); /** * Send input callback. @@ -865,18 +860,6 @@ hg_core_send_ack(hg_core_handle_t handle, hg_return_t ret); static HG_INLINE void hg_core_ack_cb(const struct na_cb_info *callback_info); -/** - * Wrapper for local callback execution. - */ -static hg_return_t -hg_core_self_cb(const struct hg_core_cb_info *callback_info); - -/** - * Process handle (used for self execution). - */ -static hg_return_t -hg_core_process_self(struct hg_core_private_handle *hg_core_handle); - /** * Process handle. */ @@ -948,15 +931,51 @@ hg_core_trigger(struct hg_core_private_context *context, /** * Trigger callback from HG lookup op ID. */ -static hg_return_t +static HG_INLINE void hg_core_trigger_lookup_entry(struct hg_core_op_id *hg_core_op_id); /** * Trigger callback from HG core handle. */ -static hg_return_t +static HG_INLINE void hg_core_trigger_entry(struct hg_core_private_handle *hg_core_handle); +/** + * Trigger callback from self HG core handle. + */ +static HG_INLINE void +hg_core_trigger_self(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger callback from HG core handle. + */ +static HG_INLINE void +hg_core_trigger_na(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger RPC handler callback from HG core handle. + */ +static void +hg_core_trigger_process(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger forward callback. + */ +static HG_INLINE void +hg_core_trigger_forward_cb(struct hg_core_private_handle *hg_core_handle); + +/** + * Trigger respond callback. + */ +static HG_INLINE void +hg_core_trigger_respond_cb(struct hg_core_private_handle *hg_core_handle); + +/** + * Wrapper for local callback execution. + */ +static void +hg_core_trigger_self_respond_cb(struct hg_core_private_handle *hg_core_handle); + /** * Cancel handle. */ @@ -967,16 +986,23 @@ hg_core_cancel(struct hg_core_private_handle *hg_core_handle); /* Local Variables */ /*******************/ +/* Callback type string table */ +#ifdef HG_HAS_DEBUG +# define X(a) #a, +static const char *const hg_core_op_type_name_g[] = {HG_CORE_OP_TYPE}; +# undef X +#endif + /* Default ops */ static const struct hg_core_ops hg_core_ops_na_g = { .forward = hg_core_forward_na, .respond = hg_core_respond_na, - .no_respond = hg_core_no_respond_na}; + .trigger = hg_core_trigger_na}; static const struct hg_core_ops hg_core_ops_self_g = { .forward = hg_core_forward_self, .respond = hg_core_respond_self, - .no_respond = hg_core_no_respond_self}; + .trigger = hg_core_trigger_self}; /* Default log outlets */ #ifdef _WIN32 @@ -1017,6 +1043,15 @@ static HG_LOG_DEBUG_DECL_DLOG(diag) = HG_LOG_DLOG_INITIALIZER( static HG_LOG_SUBSYS_DLOG_DECL_REGISTER(diag, HG_CORE_SUBSYS_NAME); #endif +/*---------------------------------------------------------------------------*/ +#ifdef HG_HAS_DEBUG +static const char * +hg_core_op_type_to_string(enum hg_core_op_type op_type) +{ + return hg_core_op_type_name_g[op_type]; +} +#endif + /*---------------------------------------------------------------------------*/ #if defined(HG_HAS_DEBUG) && !defined(_WIN32) static void @@ -3242,7 +3277,7 @@ hg_core_create(struct hg_core_private_context *context, na_class_t *na_class, return HG_SUCCESS; error: - hg_core_destroy(hg_core_handle); + (void) hg_core_destroy(hg_core_handle); return ret; } @@ -3251,38 +3286,32 @@ hg_core_create(struct hg_core_private_context *context, na_class_t *na_class, static hg_return_t hg_core_destroy(struct hg_core_private_handle *hg_core_handle) { - int32_t ref_count; + int32_t ref_count, flags, no_response_done = 0; hg_return_t ret; if (hg_core_handle == NULL) return HG_SUCCESS; - /* This will push the RPC handle back to completion queue when no - * response is sent and we are sending to ourselves. This ensures that - * there is no race between the callback execution and the RPC - * completion. */ - if (hg_core_handle->is_self && hg_core_handle->no_response && - hg_atomic_get32(&hg_core_handle->no_response_done) > 0 && - hg_atomic_cas32(&hg_core_handle->no_response_done, - hg_atomic_get32(&hg_core_handle->ref_count), 0)) { - /* Safe as the decremented refcount will always be > 0 */ - ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); - HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, - (void *) hg_core_handle, ref_count); - - ret = hg_core_handle->ops.no_respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not complete handle"); - - return HG_SUCCESS; - } + /* Retrieve flags before decrementing refcount */ + flags = hg_atomic_get32(&hg_core_handle->flags); + if ((flags & HG_CORE_SELF_FORWARD) && (flags & HG_CORE_NO_RESPONSE)) + no_response_done = hg_atomic_get32(&hg_core_handle->no_response_done); /* Standard destroy refcount decrement */ ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, (void *) hg_core_handle, ref_count); - if (ref_count > 0) + if (ref_count > 0) { + /* This will push the RPC handle back to completion queue when no + * response is sent and we are sending to ourselves. This ensures that + * there is no race between the callback execution and the RPC + * completion. */ + if (ref_count == no_response_done) + hg_core_no_respond_self(hg_core_handle); + return HG_SUCCESS; /* Cannot free yet */ + } /* Re-use handle if we were listening, otherwise destroy it */ if (hg_core_handle->reuse && @@ -3484,6 +3513,8 @@ hg_core_alloc_na(struct hg_core_private_handle *hg_core_handle, hg_atomic_init32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_init32(&hg_core_handle->op_completed_count, 0); return HG_SUCCESS; @@ -3562,8 +3593,9 @@ hg_core_reset(struct hg_core_private_handle *hg_core_handle) hg_core_handle->out_buf_used = 0; hg_atomic_init32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_init32(&hg_core_handle->op_completed_count, 0); - hg_core_handle->no_response = HG_FALSE; /* Free extra data here if needed */ if (hg_core_class->more_data_cb.release) @@ -3676,10 +3708,14 @@ hg_core_set_rpc(struct hg_core_private_handle *hg_core_handle, hg_core_handle->na_addr = na_addr; /* Set forward call depending on address self */ - hg_core_handle->is_self = hg_core_class->init_info.loopback && - hg_core_addr->core_addr.is_self; - hg_core_handle->ops = - (hg_core_handle->is_self) ? hg_core_ops_self_g : hg_core_ops_na_g; + if (hg_core_class->init_info.loopback && + hg_core_addr->core_addr.is_self) { + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_SELF_FORWARD); + hg_core_handle->ops = hg_core_ops_self_g; + } else { + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_SELF_FORWARD); + hg_core_handle->ops = hg_core_ops_na_g; + } } /* We also allow for NULL RPC id to be passed (same reason as above) */ @@ -3695,7 +3731,13 @@ hg_core_set_rpc(struct hg_core_private_handle *hg_core_handle, /* Cache RPC info */ hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; + if (hg_core_rpc_info->no_response) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_NO_RESPONSE); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_NO_RESPONSE); } + HG_LOG_SUBSYS_DEBUG(rpc, "Handle (%p) flags set to 0x%x", + (void *) hg_core_handle, hg_atomic_get32(&hg_core_handle->flags)); return HG_SUCCESS; @@ -3831,6 +3873,8 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, /* Reset op counts */ hg_atomic_set32( &hg_core_handle->op_expected_count, 1); /* Default (no response) */ + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count set to %" PRId32, + (void *) hg_core_handle, 1); hg_atomic_set32(&hg_core_handle->op_completed_count, 0); /* Reset handle ret */ @@ -3851,31 +3895,16 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, error, ret, HG_MSGSIZE, "Exceeding input buffer size"); /* Parse flags */ - if (flags & HG_CORE_NO_RESPONSE) - hg_core_handle->no_response = HG_TRUE; - if (hg_core_handle->is_self) - flags |= HG_CORE_SELF_FORWARD; + if (flags & HG_CORE_MORE_DATA) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_MORE_DATA); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_MORE_DATA); /* Set callback, keep request and response callbacks separate so that * they do not get overwritten when forwarding to ourself */ hg_core_handle->request_callback = callback; hg_core_handle->request_arg = arg; - /* Set header */ - hg_core_handle->in_header.msg.request.id = - hg_core_handle->core_handle.info.id; - hg_core_handle->in_header.msg.request.flags = flags; - /* Set the cookie as origin context ID, so that when the cookie is - * unpacked by the target and assigned to HG info context_id, the NA - * layer knows which context ID it needs to send the response to. */ - hg_core_handle->in_header.msg.request.cookie = - hg_core_handle->core_handle.info.context->id; - - /* Encode request header */ - ret = hg_core_proc_header_request( - &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_ENCODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); - #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ hg_atomic_incr64( @@ -3895,7 +3924,9 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, hg_atomic_set32(&hg_core_handle->status, HG_CORE_OP_COMPLETED); /* Rollback ref_count taken above */ - hg_atomic_decr32(&hg_core_handle->ref_count); + ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, + (void *) hg_core_handle, ref_count); return ret; } @@ -3904,11 +3935,31 @@ hg_core_forward(struct hg_core_private_handle *hg_core_handle, static hg_return_t hg_core_forward_self(struct hg_core_private_handle *hg_core_handle) { + hg_return_t ret; + + /* Save ref_count, when sending to self and no response is being sent, + * we can only use that refcount to determine that the RPC callback has + * been fully executed. */ + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE) + hg_atomic_set32(&hg_core_handle->no_response_done, + hg_atomic_get32(&hg_core_handle->ref_count)); + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; + hg_core_handle->op_type = HG_CORE_PROCESS; - /* Post operation to self processing pool */ - return hg_core_process_self(hg_core_handle); + /* Process input */ + ret = hg_core_process_input(hg_core_handle); + if (ret != HG_SUCCESS) { + HG_LOG_SUBSYS_ERROR(rpc, "Could not process input"); + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); + } + + /* Mark as completed */ + hg_core_complete_op(hg_core_handle); + + /* Always handle error from callback when forwarding to self, implies + * submission is always successful. */ + return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ @@ -3921,12 +3972,30 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) /* Set operation type for trigger */ hg_core_handle->op_type = HG_CORE_FORWARD; + /* Set header */ + hg_core_handle->in_header.msg.request.id = + hg_core_handle->core_handle.info.id; + hg_core_handle->in_header.msg.request.flags = + (hg_uint8_t) (hg_atomic_get32(&hg_core_handle->flags) & 0xff); + /* Set the cookie as origin context ID, so that when the cookie is + * unpacked by the target and assigned to HG info context_id, the NA + * layer knows which context ID it needs to send the response to. */ + hg_core_handle->in_header.msg.request.cookie = + hg_core_handle->core_handle.info.context->id; + + /* Encode request header */ + ret = hg_core_proc_header_request( + &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_ENCODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); + /* Generate tag */ hg_core_handle->tag = hg_core_gen_request_tag(HG_CORE_HANDLE_CLASS(hg_core_handle)); /* Pre-post recv (output) if response is expected */ - if (!hg_core_handle->no_response) { + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE)) { + int32_t HG_DEBUG_LOG_USED expected_count; + na_ret = NA_Msg_recv_expected(hg_core_handle->na_class, hg_core_handle->na_context, hg_core_recv_output_cb, hg_core_handle, hg_core_handle->core_handle.out_buf, @@ -3934,12 +4003,15 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) hg_core_handle->out_buf_plugin_data, hg_core_handle->na_addr, hg_core_handle->core_handle.info.context_id, hg_core_handle->tag, hg_core_handle->na_recv_op_id); - HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error_recv, ret, + HG_CHECK_SUBSYS_ERROR(rpc, na_ret != NA_SUCCESS, error, ret, (hg_return_t) na_ret, "Could not post recv for output buffer (%s)", NA_Error_to_string(na_ret)); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); } /* Mark handle as posted */ @@ -3958,18 +4030,22 @@ hg_core_forward_na(struct hg_core_private_handle *hg_core_handle) return HG_SUCCESS; -error_recv: +error: return ret; error_send: hg_atomic_and32(&hg_core_handle->status, ~HG_CORE_OP_POSTED); hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_ERRORED); - if (hg_core_handle->no_response) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE) { /* No recv was posted */ return ret; } else { - hg_atomic_decr32(&hg_core_handle->op_expected_count); + int32_t HG_DEBUG_LOG_USED expected_count = + hg_atomic_decr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count decr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Keep error for return status */ hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); @@ -3994,12 +4070,14 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_core_cb_t callback, void *arg, hg_uint8_t flags, hg_size_t payload_size, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED ref_count; hg_size_t header_size; hg_return_t ret = HG_SUCCESS; /* Cannot respond if no_response flag set */ - HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->no_response, done, ret, - HG_OPNOTSUPPORTED, "Sending response was disabled on that RPC"); + HG_CHECK_SUBSYS_ERROR(rpc, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE, done, + ret, HG_OPNOTSUPPORTED, "Sending response was disabled on that RPC"); /* Reset handle ret */ hg_core_handle->ret = HG_SUCCESS; @@ -4018,32 +4096,28 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_core_handle->out_buf_used > hg_core_handle->core_handle.out_buf_size, error, ret, HG_MSGSIZE, "Exceeding output buffer size"); + /* Parse flags */ + if (flags & HG_CORE_MORE_DATA) + hg_atomic_or32(&hg_core_handle->flags, HG_CORE_MORE_DATA); + else + hg_atomic_and32(&hg_core_handle->flags, ~HG_CORE_MORE_DATA); + /* Set callback, keep request and response callbacks separate so that * they do not get overwritten when forwarding to ourself */ hg_core_handle->response_callback = callback; hg_core_handle->response_arg = arg; - /* Set header */ - hg_core_handle->out_header.msg.response.ret_code = (hg_int8_t) ret_code; - hg_core_handle->out_header.msg.response.flags = flags; - hg_core_handle->out_header.msg.response.cookie = hg_core_handle->cookie; - - /* Encode response header */ - ret = hg_core_proc_header_response( - &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_ENCODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); - - /* If addr is self, forward locally, otherwise send the encoded buffer - * through NA and pre-post response */ - ret = hg_core_handle->ops.respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not respond"); - #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ hg_atomic_incr64( HG_CORE_HANDLE_CLASS(hg_core_handle)->counters.rpc_resp_sent_count); #endif + /* If addr is self, forward locally, otherwise send the encoded buffer + * through NA and pre-post response */ + ret = hg_core_handle->ops.respond(hg_core_handle, ret_code); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not respond"); + done: return ret; @@ -4052,20 +4126,30 @@ hg_core_respond(struct hg_core_private_handle *hg_core_handle, hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_COMPLETED); /* Decrement refcount on handle */ - hg_atomic_decr32(&hg_core_handle->ref_count); + ref_count = hg_atomic_decr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count decr to %" PRId32, + (void *) hg_core_handle, ref_count); return ret; } /*---------------------------------------------------------------------------*/ static HG_INLINE hg_return_t -hg_core_respond_self(struct hg_core_private_handle *hg_core_handle) +hg_core_respond_self( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED expected_count; + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_RESPOND_SELF; + hg_core_handle->op_type = HG_CORE_RESPOND; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Pass return code */ + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret_code); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Complete and add to completion queue */ hg_core_complete_op(hg_core_handle); @@ -4074,37 +4158,57 @@ hg_core_respond_self(struct hg_core_private_handle *hg_core_handle) } /*---------------------------------------------------------------------------*/ -static HG_INLINE hg_return_t +static HG_INLINE void hg_core_no_respond_self(struct hg_core_private_handle *hg_core_handle) { + int32_t HG_DEBUG_LOG_USED expected_count; + /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; + hg_core_handle->op_type = HG_CORE_FORWARD; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Reset saved refcount */ + hg_atomic_set32(&hg_core_handle->no_response_done, 0); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Complete and add to completion queue */ hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ static hg_return_t -hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) +hg_core_respond_na( + struct hg_core_private_handle *hg_core_handle, hg_return_t ret_code) { + int32_t HG_DEBUG_LOG_USED expected_count; hg_return_t ret; na_return_t na_ret; hg_bool_t ack_recv_posted = HG_FALSE; - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Set header */ + hg_core_handle->out_header.msg.response.ret_code = (hg_int8_t) ret_code; + hg_core_handle->out_header.msg.response.flags = + (hg_uint8_t) (hg_atomic_get32(&hg_core_handle->flags) & 0xff); + hg_core_handle->out_header.msg.response.cookie = hg_core_handle->cookie; + + /* Encode response header */ + ret = hg_core_proc_header_response( + &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_ENCODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not encode header"); + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Set operation type for trigger */ hg_core_handle->op_type = HG_CORE_RESPOND; /* More data on output requires an ack once it is processed */ - if (hg_core_handle->out_header.msg.response.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { size_t buf_size = hg_core_handle->core_handle.na_out_header_offset + sizeof(hg_uint8_t); @@ -4126,8 +4230,11 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) NA_Error_to_string(na_ret)); } - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Pre-post recv (ack) if more data is expected */ na_ret = NA_Msg_recv_expected(hg_core_handle->na_class, @@ -4164,7 +4271,10 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_ERRORED); if (ack_recv_posted) { - hg_atomic_decr32(&hg_core_handle->op_expected_count); + expected_count = hg_atomic_decr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count decr to %" PRId32, + (void *) hg_core_handle, expected_count); /* Keep error for return status */ hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); @@ -4190,21 +4300,6 @@ hg_core_respond_na(struct hg_core_private_handle *hg_core_handle) return ret; } -/*---------------------------------------------------------------------------*/ -static HG_INLINE hg_return_t -hg_core_no_respond_na(struct hg_core_private_handle *hg_core_handle) -{ - /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_NO_RESPOND; - - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); - - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - /*---------------------------------------------------------------------------*/ static HG_INLINE void hg_core_send_input_cb(const struct na_cb_info *callback_info) @@ -4235,7 +4330,8 @@ hg_core_send_input_cb(const struct na_cb_info *callback_info) HG_LOG_SUBSYS_ERROR(rpc, "NA callback returned error (%s)", NA_Error_to_string(callback_info->ret)); - if (!(status & HG_CORE_OP_CANCELED) && !hg_core_handle->no_response) { + if (!(status & HG_CORE_OP_CANCELED) && + !(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE)) { na_return_t na_ret; hg_atomic_or32(&hg_core_handle->status, HG_CORE_OP_CANCELED); @@ -4459,31 +4555,37 @@ hg_core_process_input(struct hg_core_private_handle *hg_core_handle) hg_atomic_incr64(hg_core_class->counters.rpc_req_recv_count); #endif - /* Get and verify input header */ - ret = hg_core_proc_header_request( - &hg_core_handle->core_handle, &hg_core_handle->in_header, HG_DECODE); - HG_CHECK_SUBSYS_HG_ERROR( - rpc, error, ret, "Could not decode request header"); - - /* Get operation ID from header */ - hg_core_handle->core_handle.info.id = - hg_core_handle->in_header.msg.request.id; - hg_core_handle->cookie = hg_core_handle->in_header.msg.request.cookie; - /* TODO assign target ID from cookie directly for now */ - hg_core_handle->core_handle.info.context_id = hg_core_handle->cookie; - - /* Parse flags */ - hg_core_handle->no_response = - hg_core_handle->in_header.msg.request.flags & HG_CORE_NO_RESPONSE; + /* We can skip RPC headers etc if we are sending to ourselves */ + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD)) { + /* Get and verify input header */ + ret = hg_core_proc_header_request(&hg_core_handle->core_handle, + &hg_core_handle->in_header, HG_DECODE); + HG_CHECK_SUBSYS_HG_ERROR( + rpc, error, ret, "Could not decode request header"); + + /* Get operation ID from header */ + hg_core_handle->core_handle.info.id = + hg_core_handle->in_header.msg.request.id; + hg_core_handle->cookie = hg_core_handle->in_header.msg.request.cookie; + /* TODO assign target ID from cookie directly for now */ + hg_core_handle->core_handle.info.context_id = hg_core_handle->cookie; + + /* Parse flags */ + hg_atomic_set32(&hg_core_handle->flags, + hg_core_handle->in_header.msg.request.flags); + } HG_LOG_SUBSYS_DEBUG(rpc, "Processed input for handle %p, ID=%" PRIu64 ", cookie=%" PRIu8 ", no_response=%d", (void *) hg_core_handle, hg_core_handle->core_handle.info.id, - hg_core_handle->cookie, hg_core_handle->no_response); + hg_core_handle->cookie, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_NO_RESPONSE); /* Must let upper layer get extra payload if HG_CORE_MORE_DATA is set */ - if (hg_core_handle->in_header.msg.request.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { + int32_t HG_DEBUG_LOG_USED expected_count; + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_class->more_data_cb.acquire == NULL, error, ret, HG_OPNOTSUPPORTED, "No callback defined for acquiring more data"); @@ -4492,8 +4594,11 @@ hg_core_process_input(struct hg_core_private_handle *hg_core_handle) "Must recv extra input data payload for handle %p", (void *) hg_core_handle); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ @@ -4606,16 +4711,20 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, hg_atomic_incr64(hg_core_class->counters.rpc_resp_recv_count); #endif - /* Get and verify output header */ - ret = hg_core_proc_header_response( - &hg_core_handle->core_handle, &hg_core_handle->out_header, HG_DECODE); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not decode header"); + if (!(hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD)) { + /* Get and verify output header */ + ret = hg_core_proc_header_response(&hg_core_handle->core_handle, + &hg_core_handle->out_header, HG_DECODE); + HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not decode header"); - /* Get return code from header */ - hg_atomic_set32(&hg_core_handle->ret_status, - (int32_t) hg_core_handle->out_header.msg.response.ret_code); + /* Get return code from header */ + hg_atomic_set32(&hg_core_handle->ret_status, + (int32_t) hg_core_handle->out_header.msg.response.ret_code); - /* Parse flags */ + /* Parse flags */ + hg_atomic_set32(&hg_core_handle->flags, + hg_core_handle->out_header.msg.response.flags); + } HG_LOG_SUBSYS_DEBUG(rpc, "Processed output for handle %p, ID=%" PRIu64 ", ret=%" PRId32, @@ -4623,7 +4732,9 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, hg_atomic_get32(&hg_core_handle->ret_status)); /* Must let upper layer get extra payload if HG_CORE_MORE_DATA is set */ - if (hg_core_handle->out_header.msg.response.flags & HG_CORE_MORE_DATA) { + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_MORE_DATA) { + int32_t HG_DEBUG_LOG_USED expected_count; + HG_CHECK_SUBSYS_ERROR(rpc, hg_core_class->more_data_cb.acquire == NULL, error, ret, HG_OPNOTSUPPORTED, "No callback defined for acquiring more data"); @@ -4632,8 +4743,11 @@ hg_core_process_output(struct hg_core_private_handle *hg_core_handle, "Must recv extra output data payload for handle %p", (void *) hg_core_handle); - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, + "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); #if defined(HG_HAS_DEBUG) && !defined(_WIN32) /* Increment counter */ @@ -4762,85 +4876,6 @@ hg_core_ack_cb(const struct na_cb_info *callback_info) hg_core_complete_op(hg_core_handle); } -/*---------------------------------------------------------------------------*/ -static hg_return_t -hg_core_self_cb(const struct hg_core_cb_info *callback_info) -{ - struct hg_core_private_handle *hg_core_handle = - (struct hg_core_private_handle *) callback_info->info.respond.handle; - int32_t HG_DEBUG_LOG_USED ref_count; - hg_return_t ret; - - /* Increment number of expected operations */ - hg_atomic_incr32(&hg_core_handle->op_expected_count); - - /* First execute response callback */ - if (hg_core_handle->response_callback) { - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.ret = HG_SUCCESS; /* TODO report failure */ - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = (hg_core_handle_t) hg_core_handle; - - hg_core_handle->response_callback(&hg_core_cb_info); - } - - /* Assign forward callback back to handle */ - hg_core_handle->op_type = HG_CORE_FORWARD_SELF; - - /* Increment refcount and push handle back to completion queue */ - ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); - HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, - (void *) hg_core_handle, ref_count); - - /* Process output */ - ret = hg_core_process_output(hg_core_handle, hg_core_more_data_complete); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not process output"); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; - -error: - hg_atomic_cas32( - &hg_core_handle->ret_status, (int32_t) HG_SUCCESS, (int32_t) ret); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - -/*---------------------------------------------------------------------------*/ -static hg_return_t -hg_core_process_self(struct hg_core_private_handle *hg_core_handle) -{ - hg_return_t ret; - - /* Set operation type for trigger */ - hg_core_handle->op_type = HG_CORE_PROCESS; - - /* Process input */ - ret = hg_core_process_input(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, error, ret, "Could not process input"); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; - -error: - hg_atomic_cas32( - &hg_core_handle->ret_status, (int32_t) HG_SUCCESS, (int32_t) ret); - - /* Mark as completed */ - hg_core_complete_op(hg_core_handle); - - return HG_SUCCESS; -} - /*---------------------------------------------------------------------------*/ static hg_return_t hg_core_process(struct hg_core_private_handle *hg_core_handle) @@ -4849,26 +4884,31 @@ hg_core_process(struct hg_core_private_handle *hg_core_handle) int32_t HG_DEBUG_LOG_USED ref_count; hg_return_t ret; - /* Retrieve exe function from function map */ - hg_core_rpc_info = - hg_core_map_lookup(&HG_CORE_HANDLE_CLASS(hg_core_handle)->rpc_map, - &hg_core_handle->core_handle.info.id); - if (hg_core_rpc_info == NULL) { - HG_LOG_SUBSYS_WARNING(rpc, - "Could not find RPC ID (%" PRIu64 ") in RPC map", - hg_core_handle->core_handle.info.id); - HG_GOTO_DONE(error, ret, HG_NOENTRY); + /* Already cached for self RPCs */ + if (hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD) + hg_core_rpc_info = hg_core_handle->core_handle.rpc_info; + else { + /* Retrieve exe function from function map */ + hg_core_rpc_info = + hg_core_map_lookup(&HG_CORE_HANDLE_CLASS(hg_core_handle)->rpc_map, + &hg_core_handle->core_handle.info.id); + if (hg_core_rpc_info == NULL) { + HG_LOG_SUBSYS_WARNING(rpc, + "Could not find RPC ID (%" PRIu64 ") in RPC map", + hg_core_handle->core_handle.info.id); + HG_GOTO_DONE(error, ret, HG_NOENTRY); + } + // HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info == NULL, error, ret, + // HG_NOENTRY, + // "Could not find RPC ID (%" PRIu64 ") in RPC map", + // hg_core_handle->core_handle.info.id); + + /* Cache RPC info */ + hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; } - // HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info == NULL, error, ret, - // HG_NOENTRY, - // "Could not find RPC ID (%" PRIu64 ") in RPC map", - // hg_core_handle->core_handle.info.id); HG_CHECK_SUBSYS_ERROR(rpc, hg_core_rpc_info->rpc_cb == NULL, error, ret, HG_INVALID_ARG, "No RPC callback registered"); - /* Cache RPC info */ - hg_core_handle->core_handle.rpc_info = hg_core_rpc_info; - /* Increment ref count here so that a call to HG_Destroy in user's RPC * callback does not free the handle but only schedules its completion */ @@ -4896,7 +4936,7 @@ hg_core_complete_op(struct hg_core_private_handle *hg_core_handle) op_expected_count = hg_atomic_get32(&hg_core_handle->op_expected_count); - HG_LOG_SUBSYS_DEBUG(rpc, + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Completed %" PRId32 "/%" PRId32 " NA operations for handle (%p)", op_completed_count, op_expected_count, (void *) hg_core_handle); @@ -4924,7 +4964,8 @@ hg_core_complete(struct hg_core_private_handle *hg_core_handle, hg_return_t ret) (hg_core_handle_t) hg_core_handle; hg_core_completion_add(hg_core_handle->core_handle.info.context, - &hg_core_handle->hg_completion_entry, hg_core_handle->is_self); + &hg_core_handle->hg_completion_entry, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD); } /*---------------------------------------------------------------------------*/ @@ -5336,17 +5377,13 @@ hg_core_trigger(struct hg_core_private_context *context, /* Trigger entry */ switch (hg_completion_entry->op_type) { case HG_ADDR: - ret = hg_core_trigger_lookup_entry( + hg_core_trigger_lookup_entry( hg_completion_entry->op_id.hg_core_op_id); - HG_CHECK_SUBSYS_HG_ERROR( - poll, done, ret, "Could not trigger addr completion entry"); break; case HG_RPC: - ret = hg_core_trigger_entry( + hg_core_trigger_entry( (struct hg_core_private_handle *) hg_completion_entry->op_id.hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR( - poll, done, ret, "Could not trigger RPC completion entry"); break; case HG_BULK: ret = hg_bulk_trigger_entry( @@ -5371,125 +5408,183 @@ hg_core_trigger(struct hg_core_private_context *context, } /*---------------------------------------------------------------------------*/ -static hg_return_t +static HG_INLINE void hg_core_trigger_lookup_entry(struct hg_core_op_id *hg_core_op_id) { /* Execute callback */ if (hg_core_op_id->callback) { - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.arg = hg_core_op_id->arg; - hg_core_cb_info.ret = HG_SUCCESS; - hg_core_cb_info.type = HG_CB_LOOKUP; - hg_core_cb_info.info.lookup.addr = - (hg_core_addr_t) hg_core_op_id->info.lookup.hg_core_addr; + struct hg_core_cb_info hg_core_cb_info = {.arg = hg_core_op_id->arg, + .ret = HG_SUCCESS, + .type = HG_CB_LOOKUP, + .info.lookup.addr = + (hg_core_addr_t) hg_core_op_id->info.lookup.hg_core_addr}; hg_core_op_id->callback(&hg_core_cb_info); } /* NB. OK to free after callback execution, op ID is not re-used */ free(hg_core_op_id); - - return HG_SUCCESS; } /*---------------------------------------------------------------------------*/ -static hg_return_t +static HG_INLINE void hg_core_trigger_entry(struct hg_core_private_handle *hg_core_handle) { - hg_return_t ret; - hg_atomic_and32(&hg_core_handle->status, ~HG_CORE_OP_QUEUED); - if (hg_core_handle->op_type == HG_CORE_PROCESS) { - int32_t HG_DEBUG_LOG_USED ref_count; + HG_LOG_SUBSYS_DEBUG(rpc, "Triggering callback type %s", + hg_core_op_type_to_string(hg_core_handle->op_type)); - /* Simply exit if error occurred */ - if (hg_core_handle->ret != HG_SUCCESS) - HG_GOTO_DONE(done, ret, HG_SUCCESS); + hg_core_handle->ops.trigger(hg_core_handle); - /* Take another reference to make sure the handle only gets freed - * after the response is sent */ - ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); + /* Reuse handle if we were listening, otherwise destroy it */ + (void) hg_core_destroy(hg_core_handle); +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_self(struct hg_core_private_handle *hg_core_handle) +{ + switch (hg_core_handle->op_type) { + case HG_CORE_PROCESS: + hg_core_trigger_process(hg_core_handle); + break; + case HG_CORE_FORWARD: + hg_core_trigger_forward_cb(hg_core_handle); + break; + case HG_CORE_RESPOND: + hg_core_trigger_self_respond_cb(hg_core_handle); + break; + default: + HG_LOG_SUBSYS_ERROR(rpc, "Invalid core operation type"); + break; + } +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_na(struct hg_core_private_handle *hg_core_handle) +{ + switch (hg_core_handle->op_type) { + case HG_CORE_PROCESS: + hg_core_trigger_process(hg_core_handle); + break; + case HG_CORE_FORWARD: + hg_core_trigger_forward_cb(hg_core_handle); + break; + case HG_CORE_RESPOND: + hg_core_trigger_respond_cb(hg_core_handle); + break; + default: + HG_LOG_SUBSYS_ERROR(rpc, "Invalid core operation type"); + break; + } +} + +/*---------------------------------------------------------------------------*/ +static void +hg_core_trigger_process(struct hg_core_private_handle *hg_core_handle) +{ + hg_return_t ret; + int32_t flags; + + /* Silently exit if error occurred */ + if (hg_core_handle->ret != HG_SUCCESS) + return; + + flags = hg_atomic_get32(&hg_core_handle->flags); + /* Take another reference to make sure the handle only gets freed + * after the response is sent (when forwarding to self, always take a + * refcount to make sure the handle does not get re-used too early) */ + if (!(flags & HG_CORE_NO_RESPONSE) || (flags & HG_CORE_SELF_FORWARD)) { + int32_t HG_DEBUG_LOG_USED ref_count = + hg_atomic_incr32(&hg_core_handle->ref_count); HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, (void *) hg_core_handle, ref_count); + } - /* Save ref_count, when sending to self and no response is being sent, - * we can only use that refcount to determine that the RPC callback has - * been fully executed. */ - if (hg_core_handle->is_self && hg_core_handle->no_response) - hg_atomic_set32(&hg_core_handle->no_response_done, ref_count); - - /* Run RPC callback */ - ret = hg_core_process(hg_core_handle); - if (ret != HG_SUCCESS && !hg_core_handle->no_response) { - hg_size_t header_size = - hg_core_header_response_get_size() + - hg_core_handle->core_handle.na_out_header_offset; - - /* Respond in case of error */ - ret = hg_core_respond( - hg_core_handle, NULL, NULL, 0, header_size, ret); - HG_CHECK_SUBSYS_HG_ERROR(rpc, done, ret, "Could not respond"); - } + /* Run RPC callback */ + ret = hg_core_process(hg_core_handle); + if (ret != HG_SUCCESS && !(flags & HG_CORE_NO_RESPONSE)) { + hg_size_t header_size = + hg_core_header_response_get_size() + + hg_core_handle->core_handle.na_out_header_offset; - /* No response callback */ - if (hg_core_handle->no_response && !hg_core_handle->is_self) { - ret = hg_core_handle->ops.no_respond(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR( - rpc, done, ret, "Could not complete handle"); - } - } else { - hg_core_cb_t hg_cb = NULL; - struct hg_core_cb_info hg_core_cb_info; - - hg_core_cb_info.ret = hg_core_handle->ret; - - switch (hg_core_handle->op_type) { - case HG_CORE_FORWARD_SELF: - case HG_CORE_FORWARD: - hg_cb = hg_core_handle->request_callback; - hg_core_cb_info.arg = hg_core_handle->request_arg; - hg_core_cb_info.type = HG_CB_FORWARD; - hg_core_cb_info.info.forward.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_RESPOND: - hg_cb = hg_core_handle->response_callback; - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_RESPOND_SELF: - hg_cb = hg_core_self_cb; - hg_core_cb_info.arg = hg_core_handle->response_arg; - hg_core_cb_info.type = HG_CB_RESPOND; - hg_core_cb_info.info.respond.handle = - (hg_core_handle_t) hg_core_handle; - break; - case HG_CORE_NO_RESPOND: - /* Nothing */ - break; - case HG_CORE_PROCESS: - default: - HG_GOTO_SUBSYS_ERROR(rpc, done, ret, HG_OPNOTSUPPORTED, - "Invalid core operation type"); - } + /* Respond in case of error */ + (void) hg_core_respond(hg_core_handle, NULL, NULL, 0, header_size, ret); + } +} + +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_forward_cb(struct hg_core_private_handle *hg_core_handle) +{ + if (hg_core_handle->request_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->request_arg, + .ret = hg_core_handle->ret, + .type = HG_CB_FORWARD, + .info.forward.handle = (hg_core_handle_t) hg_core_handle}; - /* Execute user callback. - * NB. The handle cannot be destroyed before the callback execution - * as the user may carry the handle in the callback. */ - if (hg_cb) - hg_cb(&hg_core_cb_info); + (void) hg_core_handle->request_callback(&hg_core_cb_info); } +} -done: - /* Reuse handle if we were listening, otherwise destroy it */ - ret = hg_core_destroy(hg_core_handle); - HG_CHECK_SUBSYS_HG_ERROR(rpc, done, ret, "Could not destroy handle"); +/*---------------------------------------------------------------------------*/ +static HG_INLINE void +hg_core_trigger_respond_cb(struct hg_core_private_handle *hg_core_handle) +{ + if (hg_core_handle->response_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->response_arg, + .ret = hg_core_handle->ret, + .type = HG_CB_RESPOND, + .info.respond.handle = (hg_core_handle_t) hg_core_handle}; - return ret; + (void) hg_core_handle->response_callback(&hg_core_cb_info); + } +} + +/*---------------------------------------------------------------------------*/ +static void +hg_core_trigger_self_respond_cb(struct hg_core_private_handle *hg_core_handle) +{ + int32_t HG_DEBUG_LOG_USED ref_count, HG_DEBUG_LOG_USED expected_count; + hg_return_t ret; + + /* Increment number of expected completions */ + expected_count = hg_atomic_incr32(&hg_core_handle->op_expected_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) expected_count incr to %" PRId32, + (void *) hg_core_handle, expected_count); + + /* Increment refcount and push handle back to completion queue */ + ref_count = hg_atomic_incr32(&hg_core_handle->ref_count); + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Handle (%p) ref_count incr to %" PRId32, + (void *) hg_core_handle, ref_count); + + /* First execute response callback */ + if (hg_core_handle->response_callback) { + struct hg_core_cb_info hg_core_cb_info = { + .arg = hg_core_handle->response_arg, + .ret = HG_SUCCESS, /* Keep return as SUCCESS */ + .type = HG_CB_RESPOND, + .info.respond.handle = (hg_core_handle_t) hg_core_handle}; + + (void) hg_core_handle->response_callback(&hg_core_cb_info); + } + + /* Assign forward callback back to handle */ + hg_core_handle->op_type = HG_CORE_FORWARD; + + /* Process output */ + ret = hg_core_process_output(hg_core_handle, hg_core_more_data_complete); + if (ret != HG_SUCCESS) { + HG_LOG_SUBSYS_ERROR(rpc, "Could not process output"); + hg_atomic_set32(&hg_core_handle->ret_status, (int32_t) ret); + } + + /* Mark as completed */ + hg_core_complete_op(hg_core_handle); } /*---------------------------------------------------------------------------*/ @@ -5499,8 +5594,9 @@ hg_core_cancel(struct hg_core_private_handle *hg_core_handle) hg_return_t ret; int32_t status; - HG_CHECK_SUBSYS_ERROR(rpc, hg_core_handle->is_self, error, ret, - HG_OPNOTSUPPORTED, "Local cancellation is not supported"); + HG_CHECK_SUBSYS_ERROR(rpc, + hg_atomic_get32(&hg_core_handle->flags) & HG_CORE_SELF_FORWARD, error, + ret, HG_OPNOTSUPPORTED, "Local cancellation is not supported"); /* Exit if op has already completed */ status = hg_atomic_get32(&hg_core_handle->status); @@ -5940,6 +6036,58 @@ HG_Core_registered_data(hg_core_class_t *hg_core_class, hg_id_t id) return NULL; } +/*---------------------------------------------------------------------------*/ +hg_return_t +HG_Core_registered_disable_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t disable) +{ + struct hg_core_private_class *private_class = + (struct hg_core_private_class *) hg_core_class; + struct hg_core_rpc_info *hg_core_rpc_info = NULL; + hg_return_t ret; + + HG_CHECK_SUBSYS_ERROR(cls, hg_core_class == NULL, error, ret, + HG_INVALID_ARG, "NULL HG core class"); + + hg_core_rpc_info = hg_core_map_lookup(&private_class->rpc_map, &id); + HG_CHECK_SUBSYS_ERROR(cls, hg_core_rpc_info == NULL, error, ret, HG_NOENTRY, + "Could not find RPC ID (%" PRIu64 ") in RPC map", id); + + hg_core_rpc_info->no_response = disable; + + return HG_SUCCESS; + +error: + return ret; +} + +/*---------------------------------------------------------------------------*/ +hg_return_t +HG_Core_registered_disabled_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t *disabled_p) +{ + struct hg_core_private_class *private_class = + (struct hg_core_private_class *) hg_core_class; + struct hg_core_rpc_info *hg_core_rpc_info = NULL; + hg_return_t ret; + + HG_CHECK_SUBSYS_ERROR(cls, hg_core_class == NULL, error, ret, + HG_INVALID_ARG, "NULL HG core class"); + HG_CHECK_SUBSYS_ERROR(cls, disabled_p == NULL, error, ret, HG_INVALID_ARG, + "NULL pointer to disabled flag"); + + hg_core_rpc_info = hg_core_map_lookup(&private_class->rpc_map, &id); + HG_CHECK_SUBSYS_ERROR(cls, hg_core_rpc_info == NULL, error, ret, HG_NOENTRY, + "Could not find RPC ID (%" PRIu64 ") in RPC map", id); + + *disabled_p = hg_core_rpc_info->no_response; + + return HG_SUCCESS; + +error: + return ret; +} + /*---------------------------------------------------------------------------*/ hg_return_t HG_Core_addr_lookup1(hg_core_context_t *context, hg_core_cb_t callback, @@ -6240,7 +6388,7 @@ HG_Core_create(hg_core_context_t *context, hg_core_addr_t addr, hg_id_t id, HG_CHECK_SUBSYS_ERROR(rpc, handle_p == NULL, error, ret, HG_INVALID_ARG, "NULL pointer to HG core handle"); - HG_LOG_SUBSYS_DEBUG(rpc, + HG_LOG_SUBSYS_DEBUG(rpc_ref, "Creating new handle with ID=%" PRIu64 ", address=%p", id, (void *) addr); @@ -6263,14 +6411,14 @@ HG_Core_create(hg_core_context_t *context, hg_core_addr_t addr, hg_id_t id, "Could not set new RPC info to handle %p", (void *) hg_core_handle); HG_LOG_SUBSYS_DEBUG( - rpc, "Created new handle (%p)", (void *) hg_core_handle); + rpc_ref, "Created new handle (%p)", (void *) hg_core_handle); *handle_p = (hg_core_handle_t) hg_core_handle; return HG_SUCCESS; error: - hg_core_destroy(hg_core_handle); + (void) hg_core_destroy(hg_core_handle); return ret; } diff --git a/src/mercury_core.h b/src/mercury_core.h index 01c39a40..a1c14801 100644 --- a/src/mercury_core.h +++ b/src/mercury_core.h @@ -72,8 +72,7 @@ typedef hg_return_t (*hg_core_cb_t)( #define HG_CORE_OP_ID_IGNORE ((hg_core_op_id_t *) 1) /* Flags */ -#define HG_CORE_MORE_DATA (1 << 0) /* More data required */ -#define HG_CORE_NO_RESPONSE (1 << 1) /* No response required */ +#define HG_CORE_MORE_DATA (1 << 0) /* More data required */ /*********************/ /* Public Prototypes */ @@ -505,6 +504,39 @@ HG_Core_register_data(hg_core_class_t *hg_core_class, hg_id_t id, void *data, HG_PUBLIC void * HG_Core_registered_data(hg_core_class_t *hg_core_class, hg_id_t id); +/** + * Disable response for a given RPC ID. This allows an origin process to send an + * RPC to a target without waiting for a response. The RPC completes locally and + * the callback on the origin is therefore pushed to the completion queue once + * the RPC send is completed. By default, all RPCs expect a response to + * be sent back. + * + * \param hg_core_class [IN] pointer to HG core class + * \param id [IN] registered function ID + * \param disable [IN] boolean (HG_TRUE to disable + * HG_FALSE to re-enable) + * + * \return HG_SUCCESS or corresponding HG error code + */ +HG_PUBLIC hg_return_t +HG_Core_registered_disable_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t disable); + +/** + * Check if response is disabled for a given RPC ID + * (i.e., HG_Registered_disable_response() has been called for this RPC ID). + * + * \param hg_core_class [IN] pointer to HG core class + * \param id [IN] registered function ID + * \param disabled_p [OUT] boolean (HG_TRUE if disabled + * HG_FALSE if enabled) + * + * \return HG_SUCCESS or corresponding HG error code + */ +HG_PUBLIC hg_return_t +HG_Core_registered_disabled_response( + hg_core_class_t *hg_core_class, hg_id_t id, hg_bool_t *disabled_p); + /** * Lookup an addr from a peer address/name. Addresses need to be * freed by calling HG_Core_addr_free(). After completion, user callback is @@ -967,6 +999,7 @@ struct hg_core_rpc_info { void *data; /* User data */ void (*free_callback)(void *); /* User data free callback */ hg_id_t id; /* RPC ID */ + hg_bool_t no_response; /* RPC response not expected */ }; /* HG core handle */