Skip to content

Commit

Permalink
ch4/ofi: merge pipeline_info in MPIDI_OFI_request_t
Browse files Browse the repository at this point in the history
The paths of pipeline code is not intercepting with other paths, thus
pipeline_info can be part of the same union.
  • Loading branch information
hzhou committed Feb 6, 2024
1 parent b52d242 commit ed2c544
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 56 deletions.
94 changes: 47 additions & 47 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf,
}
}

MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) = match_bits;
MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) = pipeline_tag;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) = n_chunks;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data) = cq_data;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local) = vci_local;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.match_bits) = match_bits;
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) = pipeline_tag;

struct pipeline_header hdr;
hdr.n_chunks = n_chunks;
Expand Down Expand Up @@ -193,7 +193,7 @@ static int send_copy_poll(MPIR_Async_thing * thing)
static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint chunk_sz)
{
int mpi_errno = MPI_SUCCESS;
int vci_local = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local);
int vci_local = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local);

struct chunk_req *chunk_req = MPL_malloc(sizeof(struct chunk_req), MPL_MEM_BUFFER);
MPIR_Assertp(chunk_req);
Expand All @@ -202,10 +202,10 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch
chunk_req->event_id = MPIDI_OFI_EVENT_SEND_GPU_PIPELINE;
chunk_req->buf = (void *) buf;

int ctx_idx = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx);
fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr);
uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data);
uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) |
int ctx_idx = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx);
fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr);
uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data);
uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) |
MPIDI_OFI_GPU_PIPELINE_SEND;
MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock);
MPIDI_OFI_CALL_RETRY(fi_tsenddata(MPIDI_OFI_global.ctx[ctx_idx].tx,
Expand Down Expand Up @@ -234,8 +234,8 @@ int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Reques

MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf);

MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) == 0) {
MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) == 0) {
MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype));
MPIDI_Request_complete_fast(sreq);
}
Expand Down Expand Up @@ -263,20 +263,20 @@ int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
int mpi_errno = MPI_SUCCESS;

/* The 1st recv is an empty chunk for matching. We need initialize rreq. */
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 0;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = false;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.vci_local) = vci_local;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits) = match_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits) = mask_bits;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx) = ctx_idx;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) = 0;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) = 0;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 0;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = false;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr) = remote_addr;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.vci_local) = vci_local;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits) = match_bits;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits) = mask_bits;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx) = ctx_idx;

/* Save original buf, datatype and count */
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.buf) = recv_buf;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.count) = count;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.datatype) = datatype;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf) = recv_buf;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count) = count;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype) = datatype;

struct recv_alloc *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
Expand Down Expand Up @@ -313,7 +313,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
MPIR_Request *rreq = p->rreq;

/* arbitrary threshold */
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) {
if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) > 1) {
return MPIR_ASYNC_THING_NOPROGRESS;
}

Expand All @@ -323,10 +323,10 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
return MPIR_ASYNC_THING_NOPROGRESS;
}

fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr);
int ctx_idx = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx);
fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr);
int ctx_idx = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx);
int vci = MPIDI_Request_get_vci(rreq);
uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits);
uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits);

struct chunk_req *chunk_req;
chunk_req = MPL_malloc(sizeof(*chunk_req), MPL_MEM_BUFFER);
Expand All @@ -337,10 +337,10 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)

uint64_t match_bits;
if (p->n_chunks == -1) {
match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits);
match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits);
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT;
} else {
match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) |
match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) |
MPIDI_OFI_GPU_PIPELINE_SEND;
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE;
}
Expand All @@ -350,7 +350,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
match_bits, mask_bits, (void *) &chunk_req->context);
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock);
if (ret == 0) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) += 1;
free(p);
/* chunk_req and host_buf will be freed in recv_events */
return MPIR_ASYNC_THING_DONE;
Expand Down Expand Up @@ -378,30 +378,30 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques

MPL_free(chunk_req);

void *recv_buf = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.buf);
size_t recv_count = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.count);
MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.datatype);
void *recv_buf = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf);
size_t recv_count = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count);
MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype);

if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true);
rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data);
rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag);

if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = true;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = true;
}

bool is_pipeline = (wc->data & MPIDI_OFI_IDATA_PIPELINE);
if (!is_pipeline) {
/* message from a normal send */
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 1;
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
} else {
struct pipeline_header *p_hdr = host_buf;
MPIR_Assert(p_hdr->n_chunks > 0);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = p_hdr->n_chunks;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) = p_hdr->pipeline_tag;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = p_hdr->n_chunks;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) = p_hdr->pipeline_tag;
/* There is no data in the init chunk, free the buffer */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
/* Post recv for the remaining chunks. */
Expand All @@ -412,7 +412,7 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
}
} else {
MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) -= 1;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) -= 1;
mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype);
MPIR_ERR_CHECK(mpi_errno);
}
Expand Down Expand Up @@ -443,7 +443,7 @@ static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
{
int mpi_errno = MPI_SUCCESS;

MPI_Aint offset = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset);
MPI_Aint offset = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset);
int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_H2D_ENGINE_TYPE;

/* FIXME: current design unpacks all bytes from host buffer, overflow check is missing. */
Expand All @@ -453,7 +453,7 @@ static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
MPL_GPU_COPY_H2D, engine_type, 1, &async_req);
MPIR_ERR_CHECK(mpi_errno);

MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) += chunk_sz;
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) += chunk_sz;

struct recv_copy *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
Expand Down Expand Up @@ -490,10 +490,10 @@ static int recv_copy_poll(MPIR_Async_thing * thing)
static void recv_copy_complete(MPIR_Request * rreq, void *buf)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) == 0) {
MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) -= 1;
if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) == 0) {
/* all chunks arrived and copied */
if (unlikely(MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync))) {
if (unlikely(MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync))) {
MPIR_Comm *comm = rreq->comm;
uint64_t ss_bits =
MPIDI_OFI_init_sendtag(MPL_atomic_relaxed_load_int
Expand All @@ -520,7 +520,7 @@ static void recv_copy_complete(MPIR_Request * rreq, void *buf)

MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(rreq, datatype));
/* Set number of bytes in status. */
MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset));
MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset));

MPIDI_Request_complete_fast(rreq);
}
Expand Down
16 changes: 7 additions & 9 deletions src/mpid/ch4/netmod/ofi/ofi_pre.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,6 @@ typedef struct {
struct {
struct iovec *iovs;
} nopack_recv;
} u;
union {
struct iovec iov;
void *inject_buf; /* Internal buffer for inject emulation */
} util;
union {
struct {
int vci_local;
int ctx_idx;
Expand All @@ -234,7 +228,7 @@ typedef struct {
uint64_t match_bits;
int pipeline_tag;
int num_remain;
} send;
} pipeline_send;
struct {
int vci_local;
int ctx_idx;
Expand All @@ -249,8 +243,12 @@ typedef struct {
void *buf;
MPI_Aint count;
MPI_Datatype datatype;
} recv;
} pipeline_info; /* GPU pipeline */
} pipeline_recv;
} u;
union {
struct iovec iov;
void *inject_buf; /* Internal buffer for inject emulation */
} util;
} MPIDI_OFI_request_t;

typedef struct {
Expand Down

0 comments on commit ed2c544

Please sign in to comment.