diff --git a/src/mpid/ch4/netmod/ofi/ofi_comm.c b/src/mpid/ch4/netmod/ofi/ofi_comm.c index 57b9cb131de..8936941498a 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_comm.c +++ b/src/mpid/ch4/netmod/ofi/ofi_comm.c @@ -145,6 +145,9 @@ int MPIDI_OFI_mpi_comm_commit_pre_hook(MPIR_Comm * comm) MPIDI_OFI_COMM(comm).enable_hashing = 0; MPIDI_OFI_COMM(comm).pref_nic = NULL; + /* Initialize tag for gpu_pipeline chunks; incremented by sender. */ + MPIDI_OFI_COMM(comm).pipeline_tag = 0; + if (comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] == -1) { comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] = MPIR_CVAR_CH4_OFI_ENABLE_MULTI_NIC_STRIPING; diff --git a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c index b2c11b00c8f..b442890775b 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c +++ b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c @@ -14,6 +14,11 @@ struct chunk_req { void *buf; }; +struct pipeline_header { + int n_chunks; + int pipeline_tag; +}; + static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq, const void *buf, MPI_Aint chunk_sz); static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks); @@ -39,12 +44,11 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPI_Aint count, MPI_Datatype datatype, MPL_pointer_attr_t attr, MPI_Aint data_sz, uint64_t cq_data, fi_addr_t remote_addr, - int vci_local, int ctx_idx, uint64_t match_bits) + int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag) { int mpi_errno = MPI_SUCCESS; uint32_t n_chunks = 0; - uint64_t is_packed = 0; /* always 0 ? */ MPI_Aint chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ; if (data_sz <= chunk_sz) { /* data fits in a single chunk */ @@ -56,8 +60,6 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, n_chunks++; } } - MPIDI_OFI_idata_set_gpuchunk_bits(&cq_data, n_chunks); - MPIDI_OFI_idata_set_gpu_packed_bit(&cq_data, is_packed); MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data; @@ -65,9 +67,15 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx) = ctx_idx; MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) = match_bits; + MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) = pipeline_tag; + + struct pipeline_header hdr; + hdr.n_chunks = n_chunks; + hdr.pipeline_tag = pipeline_tag; /* Send the initial empty packet for matching */ - MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, NULL, 0, cq_data, + MPIDI_OFI_CALL_RETRY(fi_tinjectdata(MPIDI_OFI_global.ctx[ctx_idx].tx, + &hdr, sizeof(hdr), cq_data | MPIDI_OFI_IDATA_PIPELINE, remote_addr, match_bits), vci_local, tinjectdata); struct send_alloc *p; @@ -197,7 +205,7 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch int ctx_idx = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx); fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr); uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data); - uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) | + uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) | MPIDI_OFI_GPU_PIPELINE_SEND; MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock); MPIDI_OFI_CALL_RETRY(fi_tsenddata(MPIDI_OFI_global.ctx[ctx_idx].tx, @@ -318,7 +326,6 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr); int ctx_idx = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx); int vci = MPIDI_Request_get_vci(rreq); - uint64_t match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits); uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits); struct chunk_req *chunk_req; @@ -327,10 +334,14 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) chunk_req->parent = rreq; chunk_req->buf = host_buf; + + uint64_t match_bits; if (p->n_chunks == -1) { + match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits); chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT; } else { - match_bits |= MPIDI_OFI_GPU_PIPELINE_SEND; + match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) | + MPIDI_OFI_GPU_PIPELINE_SEND; chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE; } MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci).lock); @@ -380,24 +391,22 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = true; } - uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data); - uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data); - /* ? - Not sure why sender cannot send packed data */ - MPIR_Assert(packed == 0); - if (wc->len > 0) { + bool is_pipeline = (wc->data & MPIDI_OFI_IDATA_PIPELINE); + if (!is_pipeline) { /* message from a normal send */ - MPIR_Assert(n_chunks == 0); MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1; mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); MPIR_ERR_CHECK(mpi_errno); } else { - MPIR_Assert(n_chunks > 0); - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = n_chunks; + struct pipeline_header *p_hdr = host_buf; + MPIR_Assert(p_hdr->n_chunks > 0); + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = p_hdr->n_chunks; + MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) = p_hdr->pipeline_tag; /* There is no data in the init chunk, free the buffer */ MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); /* Post recv for the remaining chunks. */ - for (int i = 0; i < n_chunks; i++) { - mpi_errno = start_recv_chunk(rreq, i, n_chunks); + for (int i = 0; i < p_hdr->n_chunks; i++) { + mpi_errno = start_recv_chunk(rreq, i, p_hdr->n_chunks); MPIR_ERR_CHECK(mpi_errno); } } diff --git a/src/mpid/ch4/netmod/ofi/ofi_impl.h b/src/mpid/ch4/netmod/ofi/ofi_impl.h index b4c959b8f35..698d68fac8e 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_impl.h @@ -831,7 +831,7 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, MPI_Aint count, MPI_Datatype datatype, MPL_pointer_attr_t attr, MPI_Aint data_sz, uint64_t cq_data, fi_addr_t remote_addr, - int vci_local, int ctx_idx, uint64_t match_bits); + int vci_local, int ctx_idx, uint64_t match_bits, int pipeline_tag); int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, void *recv_buf, MPI_Aint count, MPI_Datatype datatype, fi_addr_t remote_addr, int vci_local, diff --git a/src/mpid/ch4/netmod/ofi/ofi_pre.h b/src/mpid/ch4/netmod/ofi/ofi_pre.h index 07b999ca808..48beae17fe2 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_pre.h +++ b/src/mpid/ch4/netmod/ofi/ofi_pre.h @@ -48,6 +48,7 @@ typedef struct { int enable_striping; /* Flag to enable striping per communicator. */ int enable_hashing; /* Flag to enable hashing per communicator. */ int *pref_nic; /* Array to specify the preferred NIC for each rank (if needed) */ + int pipeline_tag; /* match_bits for gpu_pipeline chunks */ } MPIDI_OFI_comm_t; enum { MPIDI_AMTYPE_NONE = 0, @@ -223,6 +224,7 @@ typedef struct { fi_addr_t remote_addr; uint64_t cq_data; uint64_t match_bits; + int pipeline_tag; int num_remain; } send; struct { @@ -232,6 +234,7 @@ typedef struct { uint64_t match_bits; uint64_t mask_bits; MPI_Aint offset; + int pipeline_tag; int num_inrecv; int num_remain; bool is_sync; diff --git a/src/mpid/ch4/netmod/ofi/ofi_send.h b/src/mpid/ch4/netmod/ofi/ofi_send.h index 855970e8927..dc30330a84c 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_send.h +++ b/src/mpid/ch4/netmod/ofi/ofi_send.h @@ -275,9 +275,10 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_send_normal(const void *buf, MPI_Aint cou data_sz >= MPIR_CVAR_CH4_OFI_GPU_PIPELINE_THRESHOLD) { /* Pipeline path */ fi_addr_t remote_addr = MPIDI_OFI_av_to_phys(addr, receiver_nic, vci_remote); + MPIDI_OFI_COMM(comm).pipeline_tag += 1; mpi_errno = MPIDI_OFI_gpu_pipeline_send(sreq, buf, count, datatype, attr, data_sz, cq_data, remote_addr, vci_local, ctx_idx, - match_bits); + match_bits, MPIDI_OFI_COMM(comm).pipeline_tag); MPIR_ERR_CHECK(mpi_errno); MPIR_T_PVAR_COUNTER_INC(MULTINIC, nic_sent_bytes_count[sender_nic], data_sz); diff --git a/src/mpid/ch4/netmod/ofi/ofi_types.h b/src/mpid/ch4/netmod/ofi/ofi_types.h index 46f49ea4478..657fe08261a 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_types.h +++ b/src/mpid/ch4/netmod/ofi/ofi_types.h @@ -37,14 +37,12 @@ #define MPIDI_OFI_IDATA_ERROR_BITS (2) /* The number of bits in the immediate data field allocated to the source rank and error propagation. */ #define MPIDI_OFI_IDATA_SRC_ERROR_BITS (MPIDI_OFI_IDATA_SRC_BITS + MPIDI_OFI_IDATA_ERROR_BITS) -/* The number of bits in the immediate data field allocated to MPI_Packed datatype for GPU. */ -#define MPIDI_OFI_IDATA_GPU_PACKED_BITS (1) -/* The offset of bits in the immediate data field allocated to number of message chunks. */ -#define MPIDI_OFI_IDATA_GPUCHUNK_OFFSET (MPIDI_OFI_IDATA_SRC_ERROR_BITS + MPIDI_OFI_IDATA_GPU_PACKED_BITS) /* Bit mask for MPIR_ERR_OTHER */ #define MPIDI_OFI_ERR_OTHER (0x1ULL) /* Bit mask for MPIR_PROC_FAILED */ #define MPIDI_OFI_ERR_PROC_FAILED (0x2ULL) +/* Bit mask for gpu pipeline */ +#define MPIDI_OFI_IDATA_PIPELINE (1ULL << 32) /* Set the error bits */ MPL_STATIC_INLINE_PREFIX void MPIDI_OFI_idata_set_error_bits(uint64_t * data_field, @@ -75,30 +73,6 @@ MPL_STATIC_INLINE_PREFIX int MPIDI_OFI_idata_get_error_bits(uint64_t idata) } } -/* Set the gpu packed bit */ -static inline void MPIDI_OFI_idata_set_gpu_packed_bit(uint64_t * data_field, uint64_t is_packed) -{ - *data_field = (*data_field) | (is_packed << MPIDI_OFI_IDATA_SRC_ERROR_BITS); -} - -/* Get the gpu packed bit from the OFI data field. */ -static inline uint32_t MPIDI_OFI_idata_get_gpu_packed_bit(uint64_t idata) -{ - return (idata >> MPIDI_OFI_IDATA_SRC_ERROR_BITS) & 0x1ULL; -} - -/* Set gpu chunk bits */ -static inline void MPIDI_OFI_idata_set_gpuchunk_bits(uint64_t * data_field, uint64_t n_chunks) -{ - *data_field = (*data_field) | (n_chunks << MPIDI_OFI_IDATA_GPUCHUNK_OFFSET); -} - -/* Get gpu chunks from the OFI data field. */ -static inline uint32_t MPIDI_OFI_idata_get_gpuchunk_bits(uint64_t idata) -{ - return (idata >> MPIDI_OFI_IDATA_GPUCHUNK_OFFSET); -} - /* There are 4 protocol bits: * - MPIDI_DYNPROC_SEND * - MPIDI_OFI_HUGE_SEND