Skip to content

Commit

Permalink
Merge pull request #7138 from hzhou/2409_am_data
Browse files Browse the repository at this point in the history
ch4/ofi: use cq_data to carry message size

Approved-by: Ken Raffenetti
  • Loading branch information
hzhou authored Nov 6, 2024
2 parents 634b17d + 59c67ed commit e38d557
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 33 deletions.
23 changes: 0 additions & 23 deletions src/mpid/ch4/netmod/ofi/ofi_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request
static int ssend_ack_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq);
static int chunk_done_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * req);
static int inject_emu_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * req);
static int accept_probe_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq);
static int dynproc_done_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq);
static int am_isend_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq);
static int am_isend_rdma_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq);
Expand Down Expand Up @@ -58,7 +57,6 @@ static int peek_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rre
static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq)
{
MPIR_FUNC_ENTER;
MPIDI_OFI_dynamic_process_request_t *ctrl;

switch (MPIDI_OFI_REQUEST(rreq, event_id)) {
case MPIDI_OFI_EVENT_PEEK:
Expand All @@ -70,11 +68,6 @@ static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request
MPIDI_OFI_PEEK_NOT_FOUND);
break;

case MPIDI_OFI_EVENT_ACCEPT_PROBE:
ctrl = (MPIDI_OFI_dynamic_process_request_t *) rreq;
ctrl->done = MPIDI_OFI_PEEK_NOT_FOUND;
break;

default:
MPIR_Assert(0);
break;
Expand Down Expand Up @@ -333,18 +326,6 @@ static int inject_emu_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request
return MPI_SUCCESS;
}

static int accept_probe_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq)
{
MPIR_FUNC_ENTER;
MPIDI_OFI_dynamic_process_request_t *ctrl = (MPIDI_OFI_dynamic_process_request_t *) rreq;
ctrl->source = MPIDI_OFI_cqe_get_source(wc, false);
ctrl->tag = MPIDI_OFI_init_get_tag(wc->tag);
ctrl->msglen = wc->len;
ctrl->done = MPIDI_OFI_PEEK_FOUND;
MPIR_FUNC_EXIT;
return MPI_SUCCESS;
}

static int dynproc_done_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * rreq)
{
MPIR_FUNC_ENTER;
Expand Down Expand Up @@ -703,10 +684,6 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req
mpi_errno = dynproc_done_event(vci, wc, req);
break;

case MPIDI_OFI_EVENT_ACCEPT_PROBE:
mpi_errno = accept_probe_event(vci, wc, req);
break;

case MPIDI_OFI_EVENT_ABORT:
default:
mpi_errno = MPI_SUCCESS;
Expand Down
1 change: 1 addition & 0 deletions src/mpid/ch4/netmod/ofi/ofi_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ static int update_global_limits(struct fi_info *prov)
} else {
MPIDI_OFI_global.max_msg_size = MPL_MIN(prov->ep_attr->max_msg_size, MPIR_AINT_MAX);
}
MPIDI_OFI_global.cq_data_size = prov->domain_attr->cq_data_size;
MPIDI_OFI_global.stripe_threshold = MPIR_CVAR_CH4_OFI_MULTI_NIC_STRIPING_THRESHOLD;
if (prov->ep_attr->max_order_raw_size > MPIR_AINT_MAX) {
MPIDI_OFI_global.max_order_raw = -1;
Expand Down
24 changes: 16 additions & 8 deletions src/mpid/ch4/netmod/ofi/ofi_rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,23 @@ int MPIDI_OFI_peek_rndv_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Reque
mpi_errno = rndv_event_common(vci, rreq, &vci_src, &vci_dst);
MPIR_ERR_CHECK(mpi_errno);

/* prepare rndv_cts */
struct rndv_cts hdr;
hdr.rreq = rreq;
hdr.am_tag = -1; /* don't issue am_tag_recv yet */
hdr.flag = MPIDI_OFI_CTS_FLAG__PROBE;
MPI_Aint data_sz;
data_sz = MPIDI_OFI_idata_get_size(wc->data);

/* send cts */
mpi_errno = MPIDI_OFI_send_ack(rreq, context_id, &hdr, sizeof(hdr));
MPIR_ERR_CHECK(mpi_errno);
if (data_sz > 0) {
/* complete probe */
MPIR_STATUS_SET_COUNT(rreq->status, data_sz);
MPL_atomic_release_store_int(&(MPIDI_OFI_REQUEST(rreq, peek_status)), MPIDI_OFI_PEEK_FOUND);
} else {
/* ask sender for data_sz */
struct rndv_cts hdr;
hdr.rreq = rreq;
hdr.am_tag = -1; /* don't issue am_tag_recv yet */
hdr.flag = MPIDI_OFI_CTS_FLAG__PROBE;

mpi_errno = MPIDI_OFI_send_ack(rreq, context_id, &hdr, sizeof(hdr));
MPIR_ERR_CHECK(mpi_errno);
}

fn_exit:
MPIR_FUNC_EXIT;
Expand Down
3 changes: 2 additions & 1 deletion src/mpid/ch4/netmod/ofi/ofi_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name : MPIR_CVAR_CH4_OFI_EAGER_THRESHOLD
category : CH4_OFI
type : int
default : -1
default : 16384
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_LOCAL
Expand Down Expand Up @@ -641,6 +641,7 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send(const void *buf, MPI_Aint count, MPI
MPIR_ERR_CHECK(mpi_errno);
/* inject a zero-size message with MPIDI_OFI_RNDV_SEND in match_bits */
match_bits |= MPIDI_OFI_RNDV_SEND;
MPIDI_OFI_idata_set_size(&cq_data, data_sz); /* optionally use cq_data to carry data_sz */
mpi_errno = MPIDI_OFI_send_lightweight(NULL, 0, cq_data, dst_rank, tag, comm,
match_bits, addr,
vci_src, vci_dst, sender_nic, receiver_nic);
Expand Down
19 changes: 18 additions & 1 deletion src/mpid/ch4/netmod/ofi/ofi_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ enum {
MPIDI_OFI_EVENT_HUGE_CHUNK_DONE,
MPIDI_OFI_EVENT_INJECT_EMU,
MPIDI_OFI_EVENT_DYNPROC_DONE,
MPIDI_OFI_EVENT_ACCEPT_PROBE
};

enum {
Expand Down Expand Up @@ -474,6 +473,7 @@ typedef struct {
uint64_t max_mr_key_size;
uint64_t max_rma_key_bits;
uint64_t max_huge_rmas;
int cq_data_size;
int rma_key_type_bits;
int context_shift;
MPI_Aint tx_iov_limit;
Expand Down Expand Up @@ -675,4 +675,21 @@ extern MPIDI_OFI_global_t MPIDI_OFI_global;

extern MPIDI_OFI_capabilities_t MPIDI_OFI_caps_list[MPIDI_OFI_NUM_SETS];

static inline void MPIDI_OFI_idata_set_size(uint64_t * data_field, MPI_Aint data_sz)
{
*data_field &= 0xffffffff;
if (MPIDI_OFI_global.cq_data_size == 8 && data_sz <= INT32_MAX) {
*data_field |= (data_sz << 32);
}
}

static inline uint32_t MPIDI_OFI_idata_get_size(uint64_t idata)
{
if (MPIDI_OFI_global.cq_data_size == 8) {
return idata >> 32;
} else {
return 0;
}
}

#endif /* OFI_TYPES_H_INCLUDED */

0 comments on commit e38d557

Please sign in to comment.