Skip to content

Commit

Permalink
coll: check coll_group in MPIR_Sched_next_tag
Browse files Browse the repository at this point in the history
All subgroup collectives should use the same tag within the parent
collectives. This is because all processes in the communicator has to
agree on the tag to use, but group collectives may not involve all
processes. It is okay to use the same tag as long as the group
collectives are always issued in order. This is the case since all group
collectives are spawned under a parent collective, which has to obey the
non-overlapping rule.
  • Loading branch information
hzhou committed Aug 24, 2024
1 parent 477ac52 commit 794bc65
Show file tree
Hide file tree
Showing 47 changed files with 67 additions and 53 deletions.
4 changes: 4 additions & 0 deletions maint/gen_coll.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def dump_allcomm_sched_auto(name):
dump_split(0, "int MPIR_%s_allcomm_sched_auto(%s)" % (Name, func_params))
dump_open('{')
G.out.append("int mpi_errno = MPI_SUCCESS;")
if re.match(r'Ineighbor_', Name):
G.out.append("int coll_group = MPIR_SUBGROUP_NONE;")
G.out.append("")

# -- Csel_search
Expand Down Expand Up @@ -367,6 +369,8 @@ def dump_cases(commkind):
dump_split(0, "int MPIR_%s_sched_impl(%s)" % (Name, func_params))
dump_open('{')
G.out.append("int mpi_errno = MPI_SUCCESS;")
if re.match(r'Ineighbor_', Name):
G.out.append("int coll_group = MPIR_SUBGROUP_NONE;")
G.out.append("")

dump_open("if (comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM) {")
Expand Down
2 changes: 1 addition & 1 deletion src/include/mpir_nbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
/* Open question: should tag allocation be rolled into Sched_start? Keeping it
* separate potentially allows more parallelism in the future, but it also
* pushes more work onto the clients of this interface. */
int MPIR_Sched_next_tag(MPIR_Comm * comm_ptr, int *tag);
int MPIR_Sched_next_tag(MPIR_Comm * comm_ptr, int coll_group, int *tag);
void MPIR_Sched_set_tag(MPIR_Sched_t s, int tag);

/* the device must provide a typedef for MPIR_Sched_t in mpidpre.h */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/allreduce/allreduce_intra_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int MPIR_Allreduce_intra_ring(const void *sendbuf, void *recvbuf, MPI_Aint count
send_rank = (nranks + rank - 1 - i) % nranks;

/* get a new tag to prevent out of order messages */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

mpi_errno =
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/allreduce/allreduce_intra_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ int MPIR_Allreduce_intra_tree(const void *sendbuf,
void *reduce_address = (char *) reduce_buffer + offset * extent;
MPIR_ERR_CHKANDJUMP(!reduce_address, mpi_errno, MPI_ERR_OTHER, "**nomem");

mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (i = 0; i < num_children; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgather/iallgather_tsp_brucks.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ MPIR_TSP_Iallgather_sched_intra_brucks(const void *sendbuf, MPI_Aint sendcount,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_FUNC_ENTER;
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgather/iallgather_tsp_recexch.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ int MPIR_TSP_Iallgather_sched_intra_recexch(const void *sendbuf, MPI_Aint sendco

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

is_inplace = (sendbuf == MPI_IN_PLACE);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgather/iallgather_tsp_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int MPIR_TSP_Iallgather_sched_intra_ring(const void *sendbuf, MPI_Aint sendcount
int recv_id[3] = { 0 }; /* warning fix: icc: maybe used before set */
for (i = 0; i < size - 1; i++) {
/* Get new tag for each cycle so that the send-recv pairs are matched correctly */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

int vtcs[3], nvtcs;
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgatherv/iallgatherv_tsp_brucks.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MPIR_TSP_Iallgatherv_sched_intra_brucks(const void *sendbuf, MPI_Aint sendcount,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

is_inplace = (sendbuf == MPI_IN_PLACE);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgatherv/iallgatherv_tsp_recexch.c
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ int MPIR_TSP_Iallgatherv_sched_intra_recexch(const void *sendbuf, MPI_Aint sendc

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* get the neighbors, the function allocates the required memory */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallgatherv/iallgatherv_tsp_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ int MPIR_TSP_Iallgatherv_sched_intra_ring(const void *sendbuf, MPI_Aint sendcoun
send_rank = (rank - i + nranks) % nranks; /* Rank whose data you're sending */

/* New tag for each send-recv pair. */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

int nvtcs, vtcs[3];
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallreduce/iallreduce_tsp_recexch.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int MPIR_TSP_Iallreduce_sched_intra_recexch(const void *sendbuf, void *recvbuf,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);

/* get the neighbors, the function allocates the required memory */
MPII_Recexchalgo_get_neighbors(rank, nranks, &k, &step1_sendto,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int MPIR_TSP_Iallreduce_sched_intra_recexch_reduce_scatter_recexch_allgatherv(co

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* get the neighbors, the function allocates the required memory */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallreduce/iallreduce_tsp_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ int MPIR_TSP_Iallreduce_sched_intra_ring(const void *sendbuf, void *recvbuf, MPI
send_rank = (nranks + rank - 1 - i) % nranks;

/* get a new tag to prevent out of order messages */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

nvtcs = (i == 0) ? 0 : 1;
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iallreduce/iallreduce_tsp_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ int MPIR_TSP_Iallreduce_sched_intra_tree(const void *sendbuf, void *recvbuf, MPI

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (i = 0; i < num_children; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoall/ialltoall_tsp_brucks.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ MPIR_TSP_Ialltoall_sched_intra_brucks(const void *sendbuf, MPI_Aint sendcount,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_CHKLMEM_MALLOC(pack_invtcs, int *, sizeof(int) * k, mpi_errno, "pack_invtcs",
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoall/ialltoall_tsp_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ int MPIR_TSP_Ialltoall_sched_intra_ring(const void *sendbuf, MPI_Aint sendcount,
for (i = 0; i < size - 1; i++) {
/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

int vtcs[3], nvtcs;
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoall/ialltoall_tsp_scattered.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ int MPIR_TSP_Ialltoall_sched_intra_scattered(const void *sendbuf, MPI_Aint sendc

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_COLL_RANK_SIZE(comm, coll_group, rank, size);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoallv/ialltoallv_tsp_blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ int MPIR_TSP_Ialltoallv_sched_intra_blocked(const void *sendbuf, const MPI_Aint

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoallv/ialltoallv_tsp_inplace.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ int MPIR_TSP_Ialltoallv_sched_intra_inplace(const void *sendbuf, const MPI_Aint

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoallv/ialltoallv_tsp_scattered.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int MPIR_TSP_Ialltoallv_sched_intra_scattered(const void *sendbuf, const MPI_Ain
MPIR_Type_get_true_extent_impl(sendtype, &sendtype_lb, &sendtype_true_extent);
sendtype_extent = MPL_MAX(sendtype_extent, sendtype_true_extent);

mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* First, post bblock number of sends/recvs */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoallw/ialltoallw_tsp_blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int MPIR_TSP_Ialltoallw_sched_intra_blocked(const void *sendbuf, const MPI_Aint

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* post only bblock isends/irecvs at a time as suggested by Tony Ladd */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ialltoallw/ialltoallw_tsp_inplace.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int MPIR_TSP_Ialltoallw_sched_intra_inplace(const void *sendbuf, const MPI_Aint

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* FIXME: Here we allocate tmp_buf using extent and send/recv with datatype directly,
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ibarrier/ibarrier_intra_tsp_dissem.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int MPIR_TSP_Ibarrier_sched_intra_k_dissemination(MPIR_Comm * comm, int coll_gro

MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);

mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
if (mpi_errno)
MPIR_ERR_POP(mpi_errno);

Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ibcast/ibcast_tsp_scatterv_allgatherv.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int MPIR_TSP_Ibcast_sched_intra_scatterv_allgatherv(void *buffer, MPI_Aint count

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_FUNC_ENTER;
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ibcast/ibcast_tsp_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int MPIR_TSP_Ibcast_sched_intra_tree(void *buffer, MPI_Aint count, MPI_Datatype

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* Receive message from parent */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/igather/igather_tsp_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ int MPIR_TSP_Igather_sched_intra_tree(const void *sendbuf, MPI_Aint sendcount,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

if (rank == root && is_inplace) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/igatherv/igatherv_tsp_linear.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int MPIR_TSP_Igatherv_sched_allcomm_linear(const void *sendbuf, MPI_Aint sendcou
comm_size = comm_ptr->remote_size;
}

mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* If rank == root, then I recv lots, otherwise I send */
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/include/coll_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int MPII_Coll_finalize(void);
mpi_errno = MPIR_Sched_create(&s, sched_kind); \
MPIR_ERR_CHECK(mpi_errno); \
int tag = -1; \
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag); \
mpi_errno = MPIR_Sched_next_tag(comm_ptr, coll_group, &tag); \
MPIR_ERR_CHECK(mpi_errno); \
MPIR_Sched_set_tag(s, tag); \
*sched_type_p = MPIR_SCHED_NORMAL; \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int MPIR_TSP_Ineighbor_allgather_sched_allcomm_linear(const void *sendbuf, MPI_A

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (k = 0; k < outdegree; ++k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int MPIR_TSP_Ineighbor_allgatherv_sched_allcomm_linear(const void *sendbuf, MPI_

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (k = 0; k < outdegree; ++k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int MPIR_TSP_Ineighbor_alltoall_sched_allcomm_linear(const void *sendbuf, MPI_Ai

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int MPIR_TSP_Ineighbor_alltoallv_sched_allcomm_linear(const void *sendbuf,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (k = 0; k < outdegree; ++k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int MPIR_TSP_Ineighbor_alltoallw_sched_allcomm_linear(const void *sendbuf,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (k = 0; k < outdegree; ++k) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ireduce/ireduce_tsp_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ int MPIR_TSP_Ireduce_sched_intra_tree(const void *sendbuf, void *recvbuf, MPI_Ai

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

for (i = 0; i < num_children; i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/ireduce_scatter/ireduce_scatter_tsp_recexch.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ int MPIR_TSP_Ireduce_scatter_sched_intra_recexch(const void *sendbuf, void *recv

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);

is_inplace = (sendbuf == MPI_IN_PLACE);
MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int MPIR_TSP_Ireduce_scatter_block_sched_intra_recexch(const void *sendbuf, void

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);

is_inplace = (sendbuf == MPI_IN_PLACE);
MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iscan/iscan_tsp_recursive_doubling.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ int MPIR_TSP_Iscan_sched_intra_recursive_doubling(const void *sendbuf, void *rec

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

MPIR_COLL_RANK_SIZE(comm, coll_group, rank, nranks);
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iscatter/iscatter_tsp_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int MPIR_TSP_Iscatter_sched_intra_tree(const void *sendbuf, MPI_Aint sendcount,

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm, &tag);
mpi_errno = MPIR_Sched_next_tag(comm, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

if (rank == root && is_inplace) {
Expand Down
2 changes: 1 addition & 1 deletion src/mpi/coll/iscatterv/iscatterv_tsp_linear.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int MPIR_TSP_Iscatterv_sched_allcomm_linear(const void *sendbuf, const MPI_Aint

/* For correctness, transport based collectives need to get the
* tag from the same pool as schedule based collectives */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, coll_group, &tag);
MPIR_ERR_CHECK(mpi_errno);

/* If I'm the root, then scatter */
Expand Down
6 changes: 3 additions & 3 deletions src/mpi/comm/contextid.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ static int sched_cb_gcn_allocate_cid(MPIR_Comm * comm, int tag, void *state)
* are not necessarily completed in the same order as they are issued, also on the
* same communicator. To avoid deadlocks, we cannot add the elements to the
* list bevfore the first iallreduce is completed. The "tag" is created for the
* scheduling - by calling MPIR_Sched_next_tag(comm_ptr, &tag) - and the same
* scheduling - by calling MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag) - and the same
* for a idup operation on all processes. So we use it here. */
/* FIXME I'm not sure if there can be an overflows for this tag */
st->tag = (uint64_t) tag + MPIR_Process.attrs.tag_ub;
Expand Down Expand Up @@ -945,7 +945,7 @@ int MPIR_Get_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newcommp, MPIR
MPIR_FUNC_ENTER;

/* now create a schedule */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);
mpi_errno = MPIR_Sched_create(&s, MPIR_SCHED_KIND_GENERALIZED);
MPIR_ERR_CHECK(mpi_errno);
Expand Down Expand Up @@ -986,7 +986,7 @@ int MPIR_Get_intercomm_contextid_nonblock(MPIR_Comm * comm_ptr, MPIR_Comm * newc
}

/* now create a schedule */
mpi_errno = MPIR_Sched_next_tag(comm_ptr, &tag);
mpi_errno = MPIR_Sched_next_tag(comm_ptr, MPIR_SUBGROUP_NONE, &tag);
MPIR_ERR_CHECK(mpi_errno);
mpi_errno = MPIR_Sched_create(&s, MPIR_SCHED_KIND_GENERALIZED);
MPIR_ERR_CHECK(mpi_errno);
Expand Down
Loading

0 comments on commit 794bc65

Please sign in to comment.