Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add stream synchronize to non-stream operations #7023

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions maint/local_python/binding_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,16 @@ def dump_function_normal(func):
# ----
def dump_body_of_routine():
do_threadcomm = False
if RE.search(r'streamsync', func['extra'], re.IGNORECASE):
if '_has_comm' in func:
G.out.append("MPIR_Stream *stream = MPIR_stream_comm_get_local_stream(%s_ptr);" % func['_has_comm'])
elif '_has_win' in func:
G.out.append("MPIR_Stream *stream = MPIR_stream_comm_get_local_stream(%s_ptr->comm_ptr);" % func['_has_win'])
else:
raise Exception("streamsync not supported in %s" % func['name'])
dump_if_open("stream && stream->type == MPIR_STREAM_GPU")
G.out.append("MPL_gpu_stream_synchronize(stream->u.gpu_stream);")
dump_if_close()
if RE.search(r'threadcomm', func['extra'], re.IGNORECASE):
do_threadcomm = True
G.out.append("#ifdef ENABLE_THREADCOMM")
Expand Down Expand Up @@ -1506,6 +1516,7 @@ def dump_body_of_routine():

if do_threadcomm:
dump_if_close()

# ----
G.out.append("/* ... body of routine ... */")

Expand Down
56 changes: 38 additions & 18 deletions src/binding/c/coll_api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

MPI_Allgather:
.desc: Gathers data from all tasks and distribute the combined data to all tasks
.extra: threadcomm
.extra: threadcomm, streamsync
/*
Notes:
The MPI standard (1.0 and 1.1) says that
Expand Down Expand Up @@ -35,7 +35,7 @@ MPI_Allgather_init:

MPI_Allgatherv:
.desc: Gathers data from all tasks and deliver the combined data to all tasks
.extra: threadcomm
.extra: threadcomm, streamsync
/*
Notes:
The MPI standard (1.0 and 1.1) says that
Expand Down Expand Up @@ -75,7 +75,7 @@ MPI_Allgatherv_init:
MPI_Allreduce:
.desc: Combines values from all processes and distributes the result back to all processes
.docnotes: collops
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (MPIR_is_self_comm(comm_ptr)) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -90,7 +90,7 @@ MPI_Allreduce_init:

MPI_Alltoall:
.desc: Sends data from all to all processes
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && recvcount == 0) {
goto fn_exit;
Expand All @@ -102,21 +102,21 @@ MPI_Alltoall_init:

MPI_Alltoallv:
.desc: Sends data from all to all processes; each process may send a different amount of data and provide displacements for the input and output data.
.extra: threadcomm
.extra: threadcomm, streamsync

MPI_Alltoallv_init:
.desc: Create a persistent request for alltoallv.

MPI_Alltoallw:
.desc: Generalized all-to-all communication allowing different datatypes, counts, and displacements for each partner
.extra: threadcomm
.extra: threadcomm, streamsync

MPI_Alltoallw_init:
.desc: Create a persistent request for alltoallw.

MPI_Barrier:
.desc: Blocks until all processes in the communicator have reached this routine.
.extra: threadcomm
.extra: threadcomm, streamsync
/*
Notes:
Blocks the caller until all processes in the communicator have called it;
Expand All @@ -134,7 +134,7 @@ MPI_Barrier_init:

MPI_Bcast:
.desc: Broadcasts a message from the process with rank "root" to all other processes of the communicator
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (count == 0 || MPIR_is_self_comm(comm_ptr)) {
goto fn_exit;
Expand Down Expand Up @@ -165,7 +165,7 @@ MPI_Exscan_init:

MPI_Gather:
.desc: Gathers together values from a group of processes
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
if ((MPIR_Comm_rank(comm_ptr) == root && recvcount == 0) || (MPIR_Comm_rank(comm_ptr) != root && sendcount == 0)) {
Expand All @@ -179,13 +179,14 @@ MPI_Gather_init:

MPI_Gatherv:
.desc: Gathers into specified locations from all processes in a group
.extra: threadcomm
.extra: threadcomm, streamsync

MPI_Gatherv_init:
.desc: Create a persistent request for gatherv.

MPI_Iallgather:
.desc: Gathers data from all tasks and distribute the combined data to all tasks in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
if ((sendcount == 0 && sendbuf != MPI_IN_PLACE) || recvcount == 0) {
Expand All @@ -198,6 +199,7 @@ MPI_Iallgather:

MPI_Iallgatherv:
.desc: Gathers data from all tasks and deliver the combined data to all tasks in a nonblocking way
.extra: streamsync
{ -- early_return --
if (MPIR_is_self_comm(comm_ptr)) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -214,6 +216,7 @@ MPI_Iallgatherv:

MPI_Iallreduce:
.desc: Combines values from all processes and distributes the result back to all processes in a nonblocking way
.extra: streamsync
{ -- early_return --
if (MPIR_is_self_comm(comm_ptr)) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -227,6 +230,7 @@ MPI_Iallreduce:

MPI_Ialltoall:
.desc: Sends data from all to all processes in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && recvcount == 0) {
MPIR_Request *request_ptr = MPIR_Request_create_complete(MPIR_REQUEST_KIND__COLL);
Expand All @@ -237,12 +241,15 @@ MPI_Ialltoall:

MPI_Ialltoallv:
.desc: Sends data from all to all processes in a nonblocking way; each process may send a different amount of data and provide displacements for the input and output data.
.extra: streamsync

MPI_Ialltoallw:
.desc: Nonblocking generalized all-to-all communication allowing different datatypes, counts, and displacements for each partner
.extra: streamsync

MPI_Ibarrier:
.desc: Notifies the process that it has reached the barrier and returns immediately
.extra: streamsync
/*
Notes:
MPI_Ibarrier is a nonblocking version of MPI_barrier. By calling MPI_Ibarrier,
Expand All @@ -264,6 +271,7 @@ MPI_Ibarrier:

MPI_Ibcast:
.desc: Broadcasts a message from the process with rank "root" to all other processes of the communicator in a nonblocking way
.extra: streamsync
{ -- early_return --
if (count == 0 || MPIR_is_self_comm(comm_ptr)) {
MPIR_Request *request_ptr = MPIR_Request_create_complete(MPIR_REQUEST_KIND__COLL);
Expand All @@ -275,7 +283,7 @@ MPI_Ibcast:
MPI_Iexscan:
.desc: Computes the exclusive scan (partial reductions) of data on a collection of processes in a nonblocking way
.docnotes: collops
.extra: errtest_comm_intra
.extra: errtest_comm_intra, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && count == 0) {
MPIR_Request *request_ptr = MPIR_Request_create_complete(MPIR_REQUEST_KIND__COLL);
Expand All @@ -286,6 +294,7 @@ MPI_Iexscan:

MPI_Igather:
.desc: Gathers together values from a group of processes in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
if ((MPIR_Comm_rank(comm_ptr) == root && recvcount == 0) || (MPIR_Comm_rank(comm_ptr) != root && sendcount == 0)) {
Expand All @@ -298,24 +307,31 @@ MPI_Igather:

MPI_Igatherv:
.desc: Gathers into specified locations from all processes in a group in a nonblocking way
.extra: streamsync

MPI_Ineighbor_allgather:
.desc: Nonblocking version of MPI_Neighbor_allgather.
.extra: streamsync

MPI_Ineighbor_allgatherv:
.desc: Nonblocking version of MPI_Neighbor_allgatherv.
.extra: streamsync

MPI_Ineighbor_alltoall:
.desc: Nonblocking version of MPI_Neighbor_alltoall.
.extra: streamsync

MPI_Ineighbor_alltoallv:
.desc: Nonblocking version of MPI_Neighbor_alltoallv.
.extra: streamsync

MPI_Ineighbor_alltoallw:
.desc: Nonblocking version of MPI_Neighbor_alltoallw.
.extra: streamsync

MPI_Ireduce:
.desc: Reduces values on all processes to a single value in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && (count == 0 || MPIR_is_self_comm(comm_ptr))) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -329,6 +345,7 @@ MPI_Ireduce:

MPI_Ireduce_scatter:
.desc: Combines values and scatters the results in a nonblocking way
.extra: streamsync
{ -- early_return --
if (MPIR_is_self_comm(comm_ptr)) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -342,6 +359,7 @@ MPI_Ireduce_scatter:

MPI_Ireduce_scatter_block:
.desc: Combines values and scatters the results in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && (recvcount == 0 || MPIR_is_self_comm(comm_ptr))) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -356,7 +374,7 @@ MPI_Ireduce_scatter_block:
MPI_Iscan:
.desc: Computes the scan (partial reductions) of data on a collection of processes in a nonblocking way
.docnotes: collops
.extra: errtest_comm_intra
.extra: errtest_comm_intra, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && count == 0) {
MPIR_Request *request_ptr = MPIR_Request_create_complete(MPIR_REQUEST_KIND__COLL);
Expand All @@ -367,6 +385,7 @@ MPI_Iscan:

MPI_Iscatter:
.desc: Sends data from one process to all other processes in a communicator in a nonblocking way
.extra: streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
if ((MPIR_Comm_rank(comm_ptr) == root && sendcount == 0) || (MPIR_Comm_rank(comm_ptr) != root && recvcount == 0)) {
Expand All @@ -379,6 +398,7 @@ MPI_Iscatter:

MPI_Iscatterv:
.desc: Scatters a buffer in parts to all processes in a communicator in a nonblocking way
.extra: streamsync

MPI_Neighbor_allgather:
.desc: Gathers data from all neighboring processes and distribute the combined data to all neighboring processes
Expand All @@ -398,7 +418,7 @@ MPI_Neighbor_allgatherv_init:
.desc: Create a persistent request for neighbor_allgatherv.

MPI_Neighbor_alltoall:
.desc: Sends and Receivs data from all neighboring processes
.desc: Sends and Receives data from all neighboring processes
/*
Notes:
In this function, each process i receives data items from each process j if an edge (j,i) exists in the topology graph or Cartesian topology. Similarly, each process i sends data items to all processes j where an edge (i,j) exists. This call is more general than MPI_NEIGHBOR_ALLGATHER in that different data items can be sent to each neighbor. The k-th block in send buffer is sent to the k-th neighboring process and the l-th block in the receive buffer is received from the l-th neighbor.
Expand All @@ -422,7 +442,7 @@ MPI_Neighbor_alltoallw_init:
MPI_Reduce:
.desc: Reduces values on all processes to a single value
.docnotes: collops
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (count == 0 && comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
goto fn_exit;
Expand Down Expand Up @@ -456,7 +476,7 @@ MPI_Reduce_local:
MPI_Reduce_scatter:
.desc: Combines values and scatters the results
.docnotes: collops
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (MPIR_is_self_comm(comm_ptr)) {
if (sendbuf != MPI_IN_PLACE) {
Expand All @@ -472,7 +492,7 @@ MPI_Reduce_scatter_init:
MPI_Reduce_scatter_block:
.desc: Combines values and scatters the results
.docnotes: collops
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM && (recvcount == 0 || MPIR_is_self_comm(comm_ptr))) {
if (sendbuf != MPI_IN_PLACE) {
Expand Down Expand Up @@ -500,7 +520,7 @@ MPI_Scan_init:

MPI_Scatter:
.desc: Sends data from one process to all other processes in a communicator
.extra: threadcomm
.extra: threadcomm, streamsync
{ -- early_return --
if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {
if ((MPIR_Comm_rank(comm_ptr) == root && sendcount == 0) || (MPIR_Comm_rank(comm_ptr) != root && recvcount == 0)) {
Expand All @@ -514,7 +534,7 @@ MPI_Scatter_init:

MPI_Scatterv:
.desc: Scatters a buffer in parts to all processes in a communicator
.extra: threadcomm
.extra: threadcomm, streamsync

MPI_Scatterv_init:
.desc: Create a persistent request for scatterv.
Loading