Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HG: add HG_BULK_NO_CONNECT op flag #771

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 36 additions & 69 deletions src/mercury_bulk.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,26 @@

/* Check permission flags */
#define HG_BULK_CHECK_FLAGS(op, origin_flags, local_flags, label, ret) \
switch (op) { \
case HG_BULK_PUSH: \
HG_CHECK_SUBSYS_ERROR(bulk, \
!(origin_flags & HG_BULK_WRITE_ONLY) || \
!(local_flags & HG_BULK_READ_ONLY), \
label, ret, HG_PERMISSION, \
"Invalid permission flags for PUSH operation " \
"(origin=0x%x, local=0x%x)", \
origin_flags, local_flags); \
break; \
case HG_BULK_PULL: \
do { \
HG_CHECK_SUBSYS_ERROR(bulk, op > (HG_BULK_NO_CONNECT | HG_BULK_PULL), \
label, ret, HG_INVALID_ARG, "Unknown bulk operation"); \
if (op & HG_BULK_PULL) \
HG_CHECK_SUBSYS_ERROR(bulk, \
!(origin_flags & HG_BULK_READ_ONLY) || \
!(local_flags & HG_BULK_WRITE_ONLY), \
label, ret, HG_PERMISSION, \
"Invalid permission flags for PULL operation " \
"(origin=%d, local=%d)", \
origin_flags, local_flags); \
break; \
default: \
HG_GOTO_SUBSYS_ERROR( \
bulk, label, ret, HG_INVALID_ARG, "Unknown bulk operation"); \
}
else \
HG_CHECK_SUBSYS_ERROR(bulk, \
!(origin_flags & HG_BULK_WRITE_ONLY) || \
!(local_flags & HG_BULK_READ_ONLY), \
label, ret, HG_PERMISSION, \
"Invalid permission flags for PUSH operation " \
"(origin=0x%x, local=0x%x)", \
origin_flags, local_flags); \
} while (0)

/************************************/
/* Local Type and Struct Definition */
Expand Down Expand Up @@ -231,7 +228,7 @@ typedef na_return_t (*na_bulk_op_t)(na_class_t *na_class, na_context_t *context,
na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle,
na_offset_t local_offset, na_mem_handle_t *remote_mem_handle,
na_offset_t remote_offset, size_t data_size, na_addr_t *remote_addr,
uint8_t remote_id, na_op_id_t *op_id);
uint8_t remote_id, uint8_t flags, na_op_id_t *op_id);

/********************/
/* Local Prototypes */
Expand Down Expand Up @@ -382,7 +379,7 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg,
/**
* Bulk transfer to self.
*/
static hg_return_t
static void
hg_bulk_transfer_self(hg_bulk_op_t op,
const struct hg_bulk_segment *origin_segments, uint32_t origin_count,
hg_size_t origin_offset, const struct hg_bulk_segment *local_segments,
Expand Down Expand Up @@ -458,7 +455,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context,
hg_size_t origin_segment_start_offset,
const struct hg_bulk_segment *local_segments, uint32_t local_count,
na_mem_handle_t **local_mem_handles, hg_size_t local_segment_start_index,
hg_size_t local_segment_start_offset, hg_size_t size,
hg_size_t local_segment_start_offset, hg_size_t size, uint8_t flags,
na_op_id_t *na_op_ids[], uint32_t na_op_count);

/**
Expand All @@ -468,12 +465,12 @@ static HG_INLINE na_return_t
hg_bulk_na_put(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id)
{
return NA_Put(na_class, context, callback, arg, local_mem_handle,
return NA_Put2(na_class, context, callback, arg, local_mem_handle,
local_offset, remote_mem_handle, remote_offset, data_size, remote_addr,
remote_id, op_id);
remote_id, flags, op_id);
}

/**
Expand All @@ -483,12 +480,12 @@ static HG_INLINE na_return_t
hg_bulk_na_get(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id)
{
return NA_Get(na_class, context, callback, arg, local_mem_handle,
return NA_Get2(na_class, context, callback, arg, local_mem_handle,
local_offset, remote_mem_handle, remote_offset, data_size, remote_addr,
remote_id, op_id);
remote_id, flags, op_id);
}

/**
Expand Down Expand Up @@ -1971,15 +1968,13 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg,
/* Complete immediately */
hg_bulk_complete(hg_bulk_op_id, HG_SUCCESS, true);
} else if (HG_Core_addr_is_self(origin_addr) ||
((origin_flags & HG_BULK_EAGER) && (op != HG_BULK_PUSH))) {
((origin_flags & HG_BULK_EAGER) && (op & HG_BULK_PULL))) {
hg_bulk_op_id->na_class = NULL;
hg_bulk_op_id->na_context = NULL;

/* When doing eager transfers, use self code path to copy data locally
*/
ret = hg_bulk_transfer_self(op, origin_segments, origin_count,
origin_offset, local_segments, local_count, local_offset, size,
hg_bulk_op_id);
/* For eager transfers, use self code path to copy data locally */
hg_bulk_transfer_self(op, origin_segments, origin_count, origin_offset,
local_segments, local_count, local_offset, size, hg_bulk_op_id);
} else {
struct hg_bulk_na_mem_desc *origin_mem_descs, *local_mem_descs;
na_mem_handle_t **origin_mem_handles, **local_mem_handles;
Expand Down Expand Up @@ -2034,7 +2029,7 @@ hg_bulk_transfer(hg_core_context_t *core_context, hg_cb_t callback, void *arg,
}

/*---------------------------------------------------------------------------*/
static hg_return_t
static void
hg_bulk_transfer_self(hg_bulk_op_t op,
const struct hg_bulk_segment *origin_segments, uint32_t origin_count,
hg_size_t origin_offset, const struct hg_bulk_segment *local_segments,
Expand All @@ -2043,20 +2038,8 @@ hg_bulk_transfer_self(hg_bulk_op_t op,
{
uint32_t origin_segment_start_index = 0, local_segment_start_index = 0;
hg_size_t origin_segment_start_offset = 0, local_segment_start_offset = 0;
hg_bulk_copy_op_t copy_op;
hg_return_t ret;

switch (op) {
case HG_BULK_PUSH:
copy_op = hg_bulk_memcpy_put;
break;
case HG_BULK_PULL:
copy_op = hg_bulk_memcpy_get;
break;
default:
HG_GOTO_SUBSYS_ERROR(
bulk, error, ret, HG_INVALID_ARG, "Unknown bulk operation");
}
hg_bulk_copy_op_t copy_op =
(op & HG_BULK_PULL) ? hg_bulk_memcpy_get : hg_bulk_memcpy_put;

HG_LOG_SUBSYS_DEBUG(bulk, "Transferring data through self");

Expand All @@ -2078,11 +2061,6 @@ hg_bulk_transfer_self(hg_bulk_op_t op,

/* Complete immediately */
hg_bulk_complete(hg_bulk_op_id, HG_SUCCESS, true);

return HG_SUCCESS;

error:
return ret;
}

/*---------------------------------------------------------------------------*/
Expand Down Expand Up @@ -2149,22 +2127,11 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr,
hg_size_t local_offset, hg_size_t size, struct hg_bulk_op_id *hg_bulk_op_id)
{
hg_bulk_na_op_id_t *hg_bulk_na_op_ids;
na_bulk_op_t na_bulk_op;
na_bulk_op_t na_bulk_op =
(op & HG_BULK_PULL) ? hg_bulk_na_get : hg_bulk_na_put;
uint8_t opt_flags = (op & HG_BULK_NO_CONNECT) ? NA_RMA_NO_CONNECT : 0;
hg_return_t ret;

/* Map op to NA op */
switch (op) {
case HG_BULK_PUSH:
na_bulk_op = hg_bulk_na_put;
break;
case HG_BULK_PULL:
na_bulk_op = hg_bulk_na_get;
break;
default:
HG_GOTO_SUBSYS_ERROR(
bulk, error, ret, HG_INVALID_ARG, "Unknown bulk operation");
}

#ifdef NA_HAS_SM
/* Use NA SM op IDs if needed */
if (origin_flags & HG_BULK_SM)
Expand All @@ -2183,7 +2150,7 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr,
na_ret = na_bulk_op(hg_bulk_op_id->na_class, hg_bulk_op_id->na_context,
hg_bulk_transfer_cb, hg_bulk_op_id, local_mem_handles[0],
local_offset, origin_mem_handles[0], origin_offset, size,
na_origin_addr, origin_id, hg_bulk_na_op_ids->s[0]);
na_origin_addr, origin_id, opt_flags, hg_bulk_na_op_ids->s[0]);
HG_CHECK_SUBSYS_ERROR(bulk, na_ret != NA_SUCCESS, error, ret,
(hg_return_t) na_ret, "Could not transfer data (%s)",
NA_Error_to_string(na_ret));
Expand Down Expand Up @@ -2245,7 +2212,7 @@ hg_bulk_transfer_na(hg_bulk_op_t op, na_addr_t *na_origin_addr,
origin_count, origin_mem_handles, origin_segment_start_index,
origin_segment_start_offset, local_segments, local_count,
local_mem_handles, local_segment_start_index,
local_segment_start_offset, size, na_op_ids,
local_segment_start_offset, size, opt_flags, na_op_ids,
hg_bulk_op_id->op_count);
HG_CHECK_SUBSYS_HG_ERROR(
bulk, error, ret, "Could not transfer data segments");
Expand Down Expand Up @@ -2321,7 +2288,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context,
hg_size_t origin_segment_start_offset,
const struct hg_bulk_segment *local_segments, uint32_t local_count,
na_mem_handle_t **local_mem_handles, hg_size_t local_segment_start_index,
hg_size_t local_segment_start_offset, hg_size_t size,
hg_size_t local_segment_start_offset, hg_size_t size, uint8_t flags,
na_op_id_t *na_op_ids[], uint32_t na_op_count)
{
hg_size_t origin_segment_index = origin_segment_start_index;
Expand All @@ -2346,7 +2313,7 @@ hg_bulk_transfer_segments_na(na_class_t *na_class, na_context_t *na_context,
na_ret = na_bulk_op(na_class, na_context, callback, arg,
local_mem_handles[local_segment_index], local_segment_offset,
origin_mem_handles[origin_segment_index], origin_segment_offset,
transfer_size, origin_addr, origin_id, na_op_ids[count]);
transfer_size, origin_addr, origin_id, flags, na_op_ids[count]);
HG_CHECK_SUBSYS_ERROR(bulk, na_ret != NA_SUCCESS, error, ret,
(hg_return_t) na_ret, "Could not transfer data (%s)",
NA_Error_to_string(na_ret));
Expand Down
6 changes: 4 additions & 2 deletions src/mercury_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ struct hg_bulk_attr {
* Bulk transfer operators.
*/
typedef enum hg_bulk_op {
HG_BULK_PUSH, /*!< push data to origin */
HG_BULK_PULL /*!< pull data from origin */
HG_BULK_PUSH, /*!< push data to origin */
HG_BULK_PULL, /*!< pull data from origin */
HG_BULK_NO_CONNECT /*!< for transports emulating RMA, do not attempt to
connect */
} hg_bulk_op_t;

/* Callback info structs */
Expand Down
99 changes: 95 additions & 4 deletions src/na/na.h
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,39 @@ NA_Put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id,
na_op_id_t *op_id);

/**
* Put data to remote address.
* Initiate a put to the registered memory regions with the given offset/size.
* After completion, the user callback is placed into a completion queue and
* can be triggered using NA_Trigger().
* \remark Memory must be registered and handles exchanged between peers.
*
* Users must manually create an operation ID through NA_Op_create() and pass
* it through op_id for future use and prevent multiple ID creation.
*
* \param na_class [IN/OUT] pointer to NA class
* \param context [IN/OUT] pointer to context of execution
* \param callback [IN] pointer to function callback
* \param arg [IN] pointer to data passed to callback
* \param local_mem_handle [IN] NA local memory handle
* \param local_offset [IN] local offset
* \param remote_mem_handle [IN] NA remote memory handle
* \param remote_offset [IN] remote offset
* \param data_size [IN] size of data that needs to be transferred
* \param remote_addr [IN] NA address of remote destination
* \param remote_id [IN] target ID of remote destination
* \param flags [IN] optional flags
* \param op_id [IN/OUT] pointer to operation ID
*
* \return NA_SUCCESS or corresponding NA error code
*/
static NA_INLINE na_return_t
NA_Put2(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id);

/**
* Get data from remote address.
* Initiate a get to the registered memory regions with the given offset/size.
Expand Down Expand Up @@ -865,6 +898,38 @@ NA_Get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id,
na_op_id_t *op_id);

/**
* Get data from remote address.
* Initiate a get to the registered memory regions with the given offset/size.
* After completion, the user callback is placed into a completion queue and
* can be triggered using NA_Trigger().
*
* Users must manually create an operation ID through NA_Op_create() and pass
* it through op_id for future use and prevent multiple ID creation.
*
* \param na_class [IN/OUT] pointer to NA class
* \param context [IN/OUT] pointer to context of execution
* \param callback [IN] pointer to function callback
* \param arg [IN] pointer to data passed to callback
* \param local_mem_handle [IN] NA local memory handle
* \param local_offset [IN] local offset
* \param remote_mem_handle [IN] NA remote memory handle
* \param remote_offset [IN] remote offset
* \param data_size [IN] size of data that needs to be transferred
* \param remote_addr [IN] NA address of remote source
* \param remote_id [IN] target ID of remote source
* \param flags [IN] optional flags
* \param op_id [IN/OUT] pointer to operation ID
*
* \return NA_SUCCESS or corresponding NA error code
*/
static NA_INLINE na_return_t
NA_Get2(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id);

/**
* Retrieve file descriptor from NA plugin when supported. The descriptor
* can be used by upper layers for manual polling through the usual
Expand Down Expand Up @@ -1090,12 +1155,12 @@ struct na_class_ops {
na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle,
na_offset_t local_offset, na_mem_handle_t *remote_mem_handle,
na_offset_t remote_offset, size_t length, na_addr_t *remote_addr,
uint8_t remote_id, na_op_id_t *op_id);
uint8_t remote_id, uint8_t flags, na_op_id_t *op_id);
na_return_t (*get)(na_class_t *na_class, na_context_t *context,
na_cb_t callback, void *arg, na_mem_handle_t *local_mem_handle,
na_offset_t local_offset, na_mem_handle_t *remote_mem_handle,
na_offset_t remote_offset, size_t length, na_addr_t *remote_addr,
uint8_t remote_id, na_op_id_t *op_id);
uint8_t remote_id, uint8_t flags, na_op_id_t *op_id);
int (*poll_get_fd)(na_class_t *na_class, na_context_t *context);
bool (*poll_try_wait)(na_class_t *na_class, na_context_t *context);
na_return_t (*poll)(
Expand Down Expand Up @@ -1280,7 +1345,20 @@ NA_Put(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg,
{
return na_class->ops->put(na_class, context, callback, arg,
local_mem_handle, local_offset, remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, op_id);
data_size, remote_addr, remote_id, 0, op_id);
}

/*---------------------------------------------------------------------------*/
static NA_INLINE na_return_t
NA_Put2(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id)
{
return na_class->ops->put(na_class, context, callback, arg,
local_mem_handle, local_offset, remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, flags, op_id);
}

/*---------------------------------------------------------------------------*/
Expand All @@ -1293,7 +1371,20 @@ NA_Get(na_class_t *na_class, na_context_t *context, na_cb_t callback, void *arg,
{
return na_class->ops->get(na_class, context, callback, arg,
local_mem_handle, local_offset, remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, op_id);
data_size, remote_addr, remote_id, 0, op_id);
}

/*---------------------------------------------------------------------------*/
static NA_INLINE na_return_t
NA_Get2(na_class_t *na_class, na_context_t *context, na_cb_t callback,
void *arg, na_mem_handle_t *local_mem_handle, na_offset_t local_offset,
na_mem_handle_t *remote_mem_handle, na_offset_t remote_offset,
size_t data_size, na_addr_t *remote_addr, uint8_t remote_id, uint8_t flags,
na_op_id_t *op_id)
{
return na_class->ops->get(na_class, context, callback, arg,
local_mem_handle, local_offset, remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, flags, op_id);
}

/*---------------------------------------------------------------------------*/
Expand Down
Loading
Loading