From 25693ac76604b6e4d2c09d6c0f08a2d4beccb92b Mon Sep 17 00:00:00 2001 From: Jerome Soumagne Date: Mon, 18 Nov 2024 18:19:25 -0600 Subject: [PATCH] HG: add HG_BULK_NO_CONNECT op flag NA: update NA_Put/NA_Get() to pass optional flags NA OFI: update to use FI_TCP_NO_CONNECT flag to prevent connection on RMA. This is to prevent potential connection timeouts when RMA target operates behind a firewall and connection cannot be established. --- src/mercury_bulk.c | 105 +++++++++++++++----------------------------- src/mercury_types.h | 6 ++- src/na/na.h | 99 +++++++++++++++++++++++++++++++++++++++-- src/na/na_bmi.c | 8 ++-- src/na/na_mpi.c | 8 ++-- src/na/na_ofi.c | 25 ++++++++--- src/na/na_psm.c | 6 +-- src/na/na_sm.c | 8 ++-- src/na/na_types.h | 3 ++ src/na/na_ucx.c | 8 ++-- 10 files changed, 175 insertions(+), 101 deletions(-) diff --git a/src/mercury_bulk.c b/src/mercury_bulk.c index 57ce9333..9e60dd40 100644 --- a/src/mercury_bulk.c +++ b/src/mercury_bulk.c @@ -95,17 +95,10 @@ /* Check permission flags */ #define HG_BULK_CHECK_FLAGS(op, origin_flags, local_flags, label, ret) \ - switch (op) { \ - case HG_BULK_PUSH: \ - HG_CHECK_SUBSYS_ERROR(bulk, \ - !(origin_flags & HG_BULK_WRITE_ONLY) || \ - !(local_flags & HG_BULK_READ_ONLY), \ - label, ret, HG_PERMISSION, \ - "Invalid permission flags for PUSH operation " \ - "(origin=0x%x, local=0x%x)", \ - origin_flags, local_flags); \ - break; \ - case HG_BULK_PULL: \ + do { \ + HG_CHECK_SUBSYS_ERROR(bulk, op > (HG_BULK_NO_CONNECT | HG_BULK_PULL), \ + label, ret, HG_INVALID_ARG, "Unknown bulk operation"); \ + if (op & HG_BULK_PULL) \ HG_CHECK_SUBSYS_ERROR(bulk, \ !(origin_flags & HG_BULK_READ_ONLY) || \ !(local_flags & HG_BULK_WRITE_ONLY), \ @@ -113,11 +106,15 @@ "Invalid permission flags for PULL operation " \ "(origin=%d, local=%d)", \ origin_flags, local_flags); \ - break; \ - default: \ - HG_GOTO_SUBSYS_ERROR( \ - bulk, label, ret, HG_INVALID_ARG, "Unknown bulk operation"); \ - } + else \ + HG_CHECK_SUBSYS_ERROR(bulk, \ + !(origin_flags & HG_BULK_WRITE_ONLY) || \ + !(local_flags & HG_BULK_READ_ONLY), \ + label, ret, HG_PERMISSION, \ + "Invalid permission flags for PUSH operation " \ + "(origin=0x%x, local=0x%x)", \ + origin_flags, local_flags); \ + } while (0) /************************************/ /* Local Type and Struct Definition */ @@ -231,7 +228,7 @@ typedef na_return_t (*na_bulk_op_t)(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t data_size, na_addr_t *remote_addr, - uint8_t remote_id, na_op_id_t *op_id); + uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /********************/ /* Local Prototypes */ @@ -382,7 +379,7 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg, /** * Bulk transfer to self. */ -static hg_return_t +static void hg_bulk_transfer_self(hg_bulk_op_t op, const struct hg_bulk_segment *origin_segments, uint32_t origin_count, hg_size_t origin_offset, const struct hg_bulk_segment *local_segments, @@ -458,7 +455,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context, hg_size_t origin_segment_start_offset, const struct hg_bulk_segment *local_segments, uint32_t local_count, na_mem_handle_t **local_mem_handles, hg_size_t local_segment_start_index, - hg_size_t local_segment_start_offset, hg_size_t size, + hg_size_t local_segment_start_offset, hg_size_t size, uint8_t flags, na_op_id_t *na_op_ids[], uint32_t na_op_count); /** @@ -468,12 +465,12 @@ static HG_INLINE na_return_t hg_bulk_na_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id) { - return NA_Put(na_class, context, callback, arg, local_mem_handle, + return NA_Put2(na_class, context, callback, arg, local_mem_handle, local_offset, remote_mem_handle, remote_offset, data_size, remote_addr, - remote_id, op_id); + remote_id, flags, op_id); } /** @@ -483,12 +480,12 @@ static HG_INLINE na_return_t hg_bulk_na_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id) { - return NA_Get(na_class, context, callback, arg, local_mem_handle, + return NA_Get2(na_class, context, callback, arg, local_mem_handle, local_offset, remote_mem_handle, remote_offset, data_size, remote_addr, - remote_id, op_id); + remote_id, flags, op_id); } /** @@ -1971,15 +1968,13 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg, /* Complete immediately */ hg_bulk_complete(hg_bulk_op_id, HG_SUCCESS, true); } else if (HG_Core_addr_is_self(origin_addr) || - ((origin_flags & HG_BULK_EAGER) && (op != HG_BULK_PUSH))) { + ((origin_flags & HG_BULK_EAGER) && (op & HG_BULK_PULL))) { hg_bulk_op_id->na_class = NULL; hg_bulk_op_id->na_context = NULL; - /* When doing eager transfers, use self code path to copy data locally - */ - ret = hg_bulk_transfer_self(op, origin_segments, origin_count, - origin_offset, local_segments, local_count, local_offset, size, - hg_bulk_op_id); + /* For eager transfers, use self code path to copy data locally */ + hg_bulk_transfer_self(op, origin_segments, origin_count, origin_offset, + local_segments, local_count, local_offset, size, hg_bulk_op_id); } else { struct hg_bulk_na_mem_desc *origin_mem_descs, *local_mem_descs; na_mem_handle_t **origin_mem_handles, **local_mem_handles; @@ -2034,7 +2029,7 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg, } /*---------------------------------------------------------------------------*/ -static hg_return_t +static void hg_bulk_transfer_self(hg_bulk_op_t op, const struct hg_bulk_segment *origin_segments, uint32_t origin_count, hg_size_t origin_offset, const struct hg_bulk_segment *local_segments, @@ -2043,20 +2038,8 @@ hg_bulk_transfer_self(hg_bulk_op_t op, { uint32_t origin_segment_start_index = 0, local_segment_start_index = 0; hg_size_t origin_segment_start_offset = 0, local_segment_start_offset = 0; - hg_bulk_copy_op_t copy_op; - hg_return_t ret; - - switch (op) { - case HG_BULK_PUSH: - copy_op = hg_bulk_memcpy_put; - break; - case HG_BULK_PULL: - copy_op = hg_bulk_memcpy_get; - break; - default: - HG_GOTO_SUBSYS_ERROR( - bulk, error, ret, HG_INVALID_ARG, "Unknown bulk operation"); - } + hg_bulk_copy_op_t copy_op = + (op & HG_BULK_PULL) ? hg_bulk_memcpy_get : hg_bulk_memcpy_put; HG_LOG_SUBSYS_DEBUG(bulk, "Transferring data through self"); @@ -2078,11 +2061,6 @@ hg_bulk_transfer_self(hg_bulk_op_t op, /* Complete immediately */ hg_bulk_complete(hg_bulk_op_id, HG_SUCCESS, true); - - return HG_SUCCESS; - -error: - return ret; } /*---------------------------------------------------------------------------*/ @@ -2149,22 +2127,11 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr, hg_size_t local_offset, hg_size_t size, struct hg_bulk_op_id *hg_bulk_op_id) { hg_bulk_na_op_id_t *hg_bulk_na_op_ids; - na_bulk_op_t na_bulk_op; + na_bulk_op_t na_bulk_op = + (op & HG_BULK_PULL) ? hg_bulk_na_get : hg_bulk_na_put; + uint8_t opt_flags = (op & HG_BULK_NO_CONNECT) ? NA_RMA_NO_CONNECT : 0; hg_return_t ret; - /* Map op to NA op */ - switch (op) { - case HG_BULK_PUSH: - na_bulk_op = hg_bulk_na_put; - break; - case HG_BULK_PULL: - na_bulk_op = hg_bulk_na_get; - break; - default: - HG_GOTO_SUBSYS_ERROR( - bulk, error, ret, HG_INVALID_ARG, "Unknown bulk operation"); - } - #ifdef NA_HAS_SM /* Use NA SM op IDs if needed */ if (origin_flags & HG_BULK_SM) @@ -2183,7 +2150,7 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr, na_ret = na_bulk_op(hg_bulk_op_id->na_class, hg_bulk_op_id->na_context, hg_bulk_transfer_cb, hg_bulk_op_id, local_mem_handles[0], local_offset, origin_mem_handles[0], origin_offset, size, - na_origin_addr, origin_id, hg_bulk_na_op_ids->s[0]); + na_origin_addr, origin_id, opt_flags, hg_bulk_na_op_ids->s[0]); HG_CHECK_SUBSYS_ERROR(bulk, na_ret != NA_SUCCESS, error, ret, (hg_return_t) na_ret, "Could not transfer data (%s)", NA_Error_to_string(na_ret)); @@ -2245,7 +2212,7 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr, origin_count, origin_mem_handles, origin_segment_start_index, origin_segment_start_offset, local_segments, local_count, local_mem_handles, local_segment_start_index, - local_segment_start_offset, size, na_op_ids, + local_segment_start_offset, size, opt_flags, na_op_ids, hg_bulk_op_id->op_count); HG_CHECK_SUBSYS_HG_ERROR( bulk, error, ret, "Could not transfer data segments"); @@ -2321,7 +2288,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context, hg_size_t origin_segment_start_offset, const struct hg_bulk_segment *local_segments, uint32_t local_count, na_mem_handle_t **local_mem_handles, hg_size_t local_segment_start_index, - hg_size_t local_segment_start_offset, hg_size_t size, + hg_size_t local_segment_start_offset, hg_size_t size, uint8_t flags, na_op_id_t *na_op_ids[], uint32_t na_op_count) { hg_size_t origin_segment_index = origin_segment_start_index; @@ -2346,7 +2313,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context, na_ret = na_bulk_op(na_class, na_context, callback, arg, local_mem_handles[local_segment_index], local_segment_offset, origin_mem_handles[origin_segment_index], origin_segment_offset, - transfer_size, origin_addr, origin_id, na_op_ids[count]); + transfer_size, origin_addr, origin_id, flags, na_op_ids[count]); HG_CHECK_SUBSYS_ERROR(bulk, na_ret != NA_SUCCESS, error, ret, (hg_return_t) na_ret, "Could not transfer data (%s)", NA_Error_to_string(na_ret)); diff --git a/src/mercury_types.h b/src/mercury_types.h index 88e85e68..8a786d35 100644 --- a/src/mercury_types.h +++ b/src/mercury_types.h @@ -50,8 +50,10 @@ struct hg_bulk_attr { * Bulk transfer operators. */ typedef enum hg_bulk_op { - HG_BULK_PUSH, /*!< push data to origin */ - HG_BULK_PULL /*!< pull data from origin */ + HG_BULK_PUSH, /*!< push data to origin */ + HG_BULK_PULL, /*!< pull data from origin */ + HG_BULK_NO_CONNECT /*!< for transports emulating RMA, do not attempt to + connect */ } hg_bulk_op_t; /* Callback info structs */ diff --git a/src/na/na.h b/src/na/na.h index 779f3ef3..780dec02 100644 --- a/src/na/na.h +++ b/src/na/na.h @@ -834,6 +834,39 @@ NA_Put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, na_op_id_t *op_id); +/** + * Put data to remote address. + * Initiate a put to the registered memory regions with the given offset/size. + * After completion, the user callback is placed into a completion queue and + * can be triggered using NA_Trigger(). + * \remark Memory must be registered and handles exchanged between peers. + * + * Users must manually create an operation ID through NA_Op_create() and pass + * it through op_id for future use and prevent multiple ID creation. + * + * \param na_class [IN/OUT] pointer to NA class + * \param context [IN/OUT] pointer to context of execution + * \param callback [IN] pointer to function callback + * \param arg [IN] pointer to data passed to callback + * \param local_mem_handle [IN] NA local memory handle + * \param local_offset [IN] local offset + * \param remote_mem_handle [IN] NA remote memory handle + * \param remote_offset [IN] remote offset + * \param data_size [IN] size of data that needs to be transferred + * \param remote_addr [IN] NA address of remote destination + * \param remote_id [IN] target ID of remote destination + * \param flags [IN] optional flags + * \param op_id [IN/OUT] pointer to operation ID + * + * \return NA_SUCCESS or corresponding NA error code + */ +static NA_INLINE na_return_t +NA_Put2(na_class_t *na_class, na_context_t *context, na_cb_t callback, + void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, + na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id); + /** * Get data from remote address. * Initiate a get to the registered memory regions with the given offset/size. @@ -865,6 +898,38 @@ NA_Get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, na_op_id_t *op_id); +/** + * Get data from remote address. + * Initiate a get to the registered memory regions with the given offset/size. + * After completion, the user callback is placed into a completion queue and + * can be triggered using NA_Trigger(). + * + * Users must manually create an operation ID through NA_Op_create() and pass + * it through op_id for future use and prevent multiple ID creation. + * + * \param na_class [IN/OUT] pointer to NA class + * \param context [IN/OUT] pointer to context of execution + * \param callback [IN] pointer to function callback + * \param arg [IN] pointer to data passed to callback + * \param local_mem_handle [IN] NA local memory handle + * \param local_offset [IN] local offset + * \param remote_mem_handle [IN] NA remote memory handle + * \param remote_offset [IN] remote offset + * \param data_size [IN] size of data that needs to be transferred + * \param remote_addr [IN] NA address of remote source + * \param remote_id [IN] target ID of remote source + * \param flags [IN] optional flags + * \param op_id [IN/OUT] pointer to operation ID + * + * \return NA_SUCCESS or corresponding NA error code + */ +static NA_INLINE na_return_t +NA_Get2(na_class_t *na_class, na_context_t *context, na_cb_t callback, + void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, + na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id); + /** * Retrieve file descriptor from NA plugin when supported. The descriptor * can be used by upper layers for manual polling through the usual @@ -1090,12 +1155,12 @@ struct na_class_ops { na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, - uint8_t remote_id, na_op_id_t *op_id); + uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); na_return_t (*get)(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, - uint8_t remote_id, na_op_id_t *op_id); + uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); int (*poll_get_fd)(na_class_t *na_class, na_context_t *context); bool (*poll_try_wait)(na_class_t *na_class, na_context_t *context); na_return_t (*poll)( @@ -1280,7 +1345,20 @@ NA_Put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, { return na_class->ops->put(na_class, context, callback, arg, local_mem_handle, local_offset, remote_mem_handle, remote_offset, - data_size, remote_addr, remote_id, op_id); + data_size, remote_addr, remote_id, 0, op_id); +} + +/*---------------------------------------------------------------------------*/ +static NA_INLINE na_return_t +NA_Put2(na_class_t *na_class, na_context_t *context, na_cb_t callback, + void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, + na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id) +{ + return na_class->ops->put(na_class, context, callback, arg, + local_mem_handle, local_offset, remote_mem_handle, remote_offset, + data_size, remote_addr, remote_id, flags, op_id); } /*---------------------------------------------------------------------------*/ @@ -1293,7 +1371,20 @@ NA_Get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, { return na_class->ops->get(na_class, context, callback, arg, local_mem_handle, local_offset, remote_mem_handle, remote_offset, - data_size, remote_addr, remote_id, op_id); + data_size, remote_addr, remote_id, 0, op_id); +} + +/*---------------------------------------------------------------------------*/ +static NA_INLINE na_return_t +NA_Get2(na_class_t *na_class, na_context_t *context, na_cb_t callback, + void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, + na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, + size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id) +{ + return na_class->ops->get(na_class, context, callback, arg, + local_mem_handle, local_offset, remote_mem_handle, remote_offset, + data_size, remote_addr, remote_id, flags, op_id); } /*---------------------------------------------------------------------------*/ diff --git a/src/na/na_bmi.c b/src/na/na_bmi.c index 5d6a8ecf..529f4f0c 100644 --- a/src/na/na_bmi.c +++ b/src/na/na_bmi.c @@ -422,7 +422,7 @@ static na_return_t na_bmi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* get */ @@ -430,7 +430,7 @@ static na_return_t na_bmi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* poll */ @@ -2061,7 +2061,7 @@ na_bmi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_bmi_op_id *na_bmi_op_id = (struct na_bmi_op_id *) op_id; struct na_bmi_mem_handle *na_bmi_mem_handle_local = @@ -2193,7 +2193,7 @@ na_bmi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_bmi_op_id *na_bmi_op_id = (struct na_bmi_op_id *) op_id; struct na_bmi_mem_handle *na_bmi_mem_handle_local = diff --git a/src/na/na_mpi.c b/src/na/na_mpi.c index 60903a17..53a8ec2e 100644 --- a/src/na/na_mpi.c +++ b/src/na/na_mpi.c @@ -332,7 +332,7 @@ static na_return_t na_mpi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* get */ @@ -340,7 +340,7 @@ static na_return_t na_mpi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* poll */ @@ -1613,7 +1613,7 @@ na_mpi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_mpi_mem_handle *mpi_local_mem_handle = (struct na_mpi_mem_handle *) local_mem_handle; @@ -1709,7 +1709,7 @@ na_mpi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_mpi_mem_handle *mpi_local_mem_handle = (struct na_mpi_mem_handle *) local_mem_handle; diff --git a/src/na/na_ofi.c b/src/na/na_ofi.c index 7542a657..78aa587c 100644 --- a/src/na/na_ofi.c +++ b/src/na/na_ofi.c @@ -1921,7 +1921,7 @@ static na_return_t na_ofi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* get */ @@ -1929,7 +1929,7 @@ static na_return_t na_ofi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* poll_get_fd */ @@ -9009,11 +9009,16 @@ static na_return_t na_ofi_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, na_op_id_t *op_id) + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id) { + uint64_t fi_rma_flags = FI_COMPLETION | FI_DELIVERY_COMPLETE; + if ((flags & NA_RMA_NO_CONNECT) && + (NA_OFI_CLASS(na_class)->fabric->prov_type == NA_OFI_PROV_TCP)) + fi_rma_flags |= FI_TCP_NO_CONNECT; + return na_ofi_rma_common(NA_OFI_CLASS(na_class), context, NA_CB_PUT, - callback, arg, fi_writemsg, "fi_writemsg", - FI_COMPLETION | FI_DELIVERY_COMPLETE, + callback, arg, fi_writemsg, "fi_writemsg", fi_rma_flags, (struct na_ofi_mem_handle *) local_mem_handle, local_offset, (struct na_ofi_mem_handle *) remote_mem_handle, remote_offset, length, (struct na_ofi_addr *) remote_addr, remote_id, @@ -9025,10 +9030,16 @@ static na_return_t na_ofi_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, na_op_id_t *op_id) + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, + na_op_id_t *op_id) { + uint64_t fi_rma_flags = FI_COMPLETION; + if ((flags & NA_RMA_NO_CONNECT) && + (NA_OFI_CLASS(na_class)->fabric->prov_type == NA_OFI_PROV_TCP)) + fi_rma_flags |= FI_TCP_NO_CONNECT; + return na_ofi_rma_common(NA_OFI_CLASS(na_class), context, NA_CB_GET, - callback, arg, fi_readmsg, "fi_readmsg", FI_COMPLETION, + callback, arg, fi_readmsg, "fi_readmsg", fi_rma_flags, (struct na_ofi_mem_handle *) local_mem_handle, local_offset, (struct na_ofi_mem_handle *) remote_mem_handle, remote_offset, length, (struct na_ofi_addr *) remote_addr, remote_id, diff --git a/src/na/na_psm.c b/src/na/na_psm.c index 5d73e914..6e14f9b6 100644 --- a/src/na/na_psm.c +++ b/src/na/na_psm.c @@ -369,7 +369,7 @@ na_psm_op_destroy(na_class_t *na_class, na_op_id_t *op_id); * * note: the psm library timeouts do not sleep, they spin in a poll loop */ -#define SEC_TO_NSEC(X) ((X) *1000000000LL) +#define SEC_TO_NSEC(X) ((X) * 1000000000LL) /* * psm_enc64: break a uint64 up into two uint32s and encode each in network @@ -2160,7 +2160,7 @@ na_psm_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_psm_class *pc; struct na_psm_addr *naddr; @@ -2339,7 +2339,7 @@ na_psm_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { struct na_psm_class *pc; struct na_psm_addr *naddr; diff --git a/src/na/na_sm.c b/src/na/na_sm.c index daa3ea14..3e4c3453 100644 --- a/src/na/na_sm.c +++ b/src/na/na_sm.c @@ -1048,7 +1048,7 @@ static NA_INLINE na_return_t na_sm_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* get */ @@ -1056,7 +1056,7 @@ static NA_INLINE na_return_t na_sm_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* poll_get_fd */ @@ -5012,7 +5012,7 @@ na_sm_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { return na_sm_rma(NA_SM_CLASS(na_class), context, NA_CB_PUT, callback, arg, na_sm_process_vm_writev, (struct na_sm_mem_handle *) local_mem_handle, @@ -5027,7 +5027,7 @@ na_sm_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { return na_sm_rma(NA_SM_CLASS(na_class), context, NA_CB_GET, callback, arg, na_sm_process_vm_readv, (struct na_sm_mem_handle *) local_mem_handle, diff --git a/src/na/na_types.h b/src/na/na_types.h index 351087cb..1142b4c9 100644 --- a/src/na/na_types.h +++ b/src/na/na_types.h @@ -249,6 +249,9 @@ typedef void (*na_cb_t)(const struct na_cb_info *callback_info); #define NA_MEM_WRITE_ONLY 0x02 #define NA_MEM_READWRITE 0x03 +/* RMA optional flags */ +#define NA_RMA_NO_CONNECT (1 << 0) + /* Progress modes */ #define NA_NO_BLOCK 0x01 /*!< no blocking progress */ #define NA_NO_RETRY 0x02 /*!< no retry of operations in progress */ diff --git a/src/na/na_ucx.c b/src/na/na_ucx.c index f8fbb236..89808e8b 100644 --- a/src/na/na_ucx.c +++ b/src/na/na_ucx.c @@ -884,7 +884,7 @@ static na_return_t na_ucx_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* get */ @@ -892,7 +892,7 @@ static na_return_t na_ucx_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, - size_t length, na_addr_t *remote_addr, uint8_t remote_id, + size_t length, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags, na_op_id_t *op_id); /* poll_get_fd */ @@ -4219,7 +4219,7 @@ na_ucx_put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { return na_ucx_rma(NA_UCX_CLASS(na_class), context, NA_CB_PUT, callback, arg, (struct na_ucx_mem_handle *) local_mem_handle, local_offset, @@ -4233,7 +4233,7 @@ na_ucx_get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset, na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset, size_t length, na_addr_t *remote_addr, uint8_t NA_UNUSED remote_id, - na_op_id_t *op_id) + uint8_t NA_UNUSED flags, na_op_id_t *op_id) { return na_ucx_rma(NA_UCX_CLASS(na_class), context, NA_CB_GET, callback, arg, (struct na_ucx_mem_handle *) local_mem_handle, local_offset,