Skip to content

Commit

Permalink
some more stuff that doesn't work
Browse files Browse the repository at this point in the history
  • Loading branch information
lroberts36 committed Nov 13, 2024
1 parent 8670107 commit f2d8c0c
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 195 deletions.
57 changes: 23 additions & 34 deletions src/bvals/comms/boundary_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,17 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
if (bound_type == BoundaryType::any || bound_type == BoundaryType::nonlocal)
Kokkos::fence();
#endif
if (pmesh->do_combined_comms) {
for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) {
auto &buf = *cache.buf_vec[ibuf];
if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled)
buf.SendLocal();
else
buf.SendNullLocal();
}

// Send the combined buffers
pmesh->pcombined_buffers->PackAndSend(md.get(), bound_type);
} else {
for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) {
auto &buf = *cache.buf_vec[ibuf];
if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled)
buf.Send();
else
buf.SendNull();
}
const bool comb_comm = pmesh->do_combined_comms;
for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) {
auto &buf = *cache.buf_vec[ibuf];
if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled)
buf.Send(comb_comm);
else
buf.SendNull(comb_comm);
}
if (pmesh->do_combined_comms)
pmesh->pcombined_buffers->PackAndSend(md.get(), bound_type);

return TaskStatus::complete;
}

Expand Down Expand Up @@ -228,22 +219,16 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
bool all_received = true;
if (pmesh->do_combined_comms) {
// Receive any messages that are around
pmesh->pcombined_buffers->TryReceiveAny(md.get(), bound_type);

int nreceived{0};
std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[&all_received, &nreceived](auto pbuf) {
all_received = pbuf->TryReceiveLocal() && all_received;
nreceived += pbuf->TryReceiveLocal();
});
} else {
int nreceived{0};
std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[&all_received, &nreceived](auto pbuf) {
all_received = pbuf->TryReceive() && all_received;
nreceived += pbuf->TryReceive();
});
bool all_combined_received =
pmesh->pcombined_buffers->TryReceiveAny(md.get(), bound_type);
all_received = all_received && all_combined_received;
}
const bool comb_comm = pmesh->do_combined_comms;
std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[&all_received, comb_comm](auto pbuf) {
all_received = pbuf->TryReceive(comb_comm) && all_received;
});

int ibound = 0;
if (Globals::sparse_config.enabled && all_received) {
ForEachBoundary<bound_type>(
Expand Down Expand Up @@ -286,6 +271,10 @@ TaskStatus SetBounds(std::shared_ptr<MeshData<Real>> &md) {
Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);

// if (pmesh->do_combined_comms) {
// pmesh->pcombined_buffers->Compare(md.get(), bound_type);
// }

auto [rebuild, nbound] = CheckReceiveBufferCacheForRebuild<bound_type, false>(md);

if (rebuild) {
Expand Down
159 changes: 82 additions & 77 deletions src/bvals/comms/combined_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ void CombinedBuffersRankPartition::PackAndSend(const std::set<Uid_t> &vars) {
int idx{0};
for (auto uid : var_set) {
for (auto &[bnd_id, pvbbuf] : combined_info_buf.at(uid)) {
stat[idx] = (pvbbuf->GetState() == BufferState::sending);
const auto state = pvbbuf->GetState();
PARTHENON_REQUIRE(state == BufferState::sending ||
state == BufferState::sending_null,
"Bad state.");
int send_type = (state == BufferState::sending);
stat[idx] = send_type;
++idx;
}
}
Expand All @@ -142,8 +147,11 @@ void CombinedBuffersRankPartition::PackAndSend(const std::set<Uid_t> &vars) {
}

//----------------------------------------------------------------------------------------
bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message,
const std::set<Uid_t> &vars) {
bool CombinedBuffersRankPartition::TryReceiveAndUnpack(const std::set<Uid_t> &vars) {
if ((sparse_status_buffer.GetState() == BufferState::received) &&
(combined_comm_buffer.GetState() == BufferState::received))
return true;

const auto &var_set = vars.size() == 0 ? all_vars : vars;
// Make sure the var-boundary buffers are available to write to
int nbuf{0};
Expand All @@ -158,7 +166,7 @@ bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message,
sparse_status_buffer.ConstructBuffer(nbuf);
}
auto received_sparse = sparse_status_buffer.TryReceive();
auto received = combined_comm_buffer.TryReceive(message);
auto received = combined_comm_buffer.TryReceive();
if (!received || !received_sparse) return false;

// Allocate and free buffers as required
Expand Down Expand Up @@ -197,6 +205,49 @@ bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message,
return true;
}

//----------------------------------------------------------------------------------------
void CombinedBuffersRankPartition::Compare(const std::set<Uid_t> &vars) {
PARTHENON_REQUIRE(combined_comm_buffer.GetState() == BufferState::received,
"Combined buffer not in correct state");
PARTHENON_REQUIRE(sparse_status_buffer.GetState() == BufferState::received,
"Combined buffer not in correct state");
const auto &var_set = vars.size() == 0 ? all_vars : vars;
// Allocate and free buffers as required
int idx{0};
auto &stat = sparse_status_buffer.buffer();
for (auto uid : var_set) {
for (auto &[bnd_id, pvbbuf] : combined_info_buf.at(uid)) {
if (stat[idx] == 1) {
PARTHENON_REQUIRE(pvbbuf->GetState() == BufferState::received,
"State doesn't agree.");
} else {
PARTHENON_REQUIRE(pvbbuf->GetState() == BufferState::received_null,
"State doesn't agree.");
}
idx++;
}
}

auto &bids = GetBndIdsOnDevice(vars);
Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), bids.size(), Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
if (bids[b].buf_allocated) {
const int buf_size = bids[b].size();
Real *com_buf = &(bids[b].combined_buf(bids[b].start_idx()));
Real *buf = &(bids[b].buf(0));
Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) {
PARTHENON_REQUIRE(buf[idx] == com_buf[idx], "Bad value");
});
}
});
combined_comm_buffer.Stale();
sparse_status_buffer.Stale();
}

//----------------------------------------------------------------------------------------
void CombinedBuffersRankPartition::AddVarBoundary(BndId &bnd_id) {
auto key = GetChannelKey(bnd_id);
Expand Down Expand Up @@ -346,13 +397,12 @@ bool CombinedBuffersRank::IsAvailableForWrite(MeshData<Real> *pmd) {
}

//----------------------------------------------------------------------------------------
bool CombinedBuffersRank::TryReceiveAndUnpack(MeshData<Real> *pmd, int partition,
mpi_message_t *message) {
bool CombinedBuffersRank::TryReceiveAndUnpack(MeshData<Real> *pmd, int partition) {
PARTHENON_REQUIRE(buffers_built,
"Trying to recv combined buffers before they have been built");
PARTHENON_REQUIRE(combined_bufs.count(partition) > 0,
"Trying to receive on a non-existent combined receive buffer.");
return combined_bufs.at(partition).TryReceiveAndUnpack(message, pmd->GetUids());
return combined_bufs.at(partition).TryReceiveAndUnpack(pmd->GetUids());
}

//----------------------------------------------------------------------------------------
Expand All @@ -370,6 +420,7 @@ void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb,
combined_send_buffers.at({nb.rank, b_type}).AddSendBuffer(partition, pmb, nb, var);
}

//----------------------------------------------------------------------------------------
void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb,
const std::shared_ptr<Variable<Real>>,
BoundaryType b_type) {
Expand All @@ -383,11 +434,13 @@ void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb,
comms_[GetAssociatedSender(b_type)], pmesh)));
}

//----------------------------------------------------------------------------------------
void CombinedBuffers::ResolveAndSendSendBuffers() {
for (auto &[id, buf] : combined_send_buffers)
buf.ResolveSendBuffersAndSendInfo();
}

//----------------------------------------------------------------------------------------
void CombinedBuffers::ReceiveBufferInfo() {
constexpr std::int64_t max_it = 1e10;
std::vector<bool> received(combined_recv_buffers.size(), false);
Expand All @@ -404,6 +457,7 @@ void CombinedBuffers::ReceiveBufferInfo() {
"Too many iterations waiting to receive boundary communication buffers.");
}

//----------------------------------------------------------------------------------------
bool CombinedBuffers::IsAvailableForWrite(MeshData<Real> *pmd, BoundaryType b_type) {
bool available{true};
for (int rank = 0; rank < Globals::nranks; ++rank) {
Expand All @@ -415,6 +469,7 @@ bool CombinedBuffers::IsAvailableForWrite(MeshData<Real> *pmd, BoundaryType b_ty
return available;
}

//----------------------------------------------------------------------------------------
void CombinedBuffers::PackAndSend(MeshData<Real> *pmd, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_send_buffers.count({rank, b_type})) {
Expand All @@ -423,82 +478,32 @@ void CombinedBuffers::PackAndSend(MeshData<Real> *pmd, BoundaryType b_type) {
}
}

void CombinedBuffers::TryReceiveAny(MeshData<Real> *pmd, BoundaryType b_type) {
#ifdef MPI_PARALLEL
// This was an attempt at another method for receiving, it seemed to work
// but was subject to the same problems as the Iprobe based code
if (pmesh->receive_type == "old") {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_recv_buffers.count({rank, b_type})) {
auto &comb_bufs = combined_recv_buffers.at({rank, b_type});
for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) {
comb_buf.TryReceiveAndUnpack(nullptr, pmd->GetUids());
}
}
}
} else if (pmesh->receive_type == "iprobe") {
MPI_Status status;
int flag;
do {
mpi_message_t message;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comms_[GetAssociatedSender(b_type)], &flag,
&status);
if (flag) {
const int rank = status.MPI_SOURCE;
const int partition = status.MPI_TAG;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmd, partition, nullptr);
if (!finished)
processing_messages.insert(
std::make_pair(std::pair<int, int>{rank, partition}, message));
//----------------------------------------------------------------------------------------
void CombinedBuffers::Compare(MeshData<Real> *pmd, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_recv_buffers.count({rank, b_type})) {
auto &comb_bufs = combined_recv_buffers.at({rank, b_type});
for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) {
comb_buf.Compare(pmd->GetUids());
}
} while (flag);

// Process in-flight messages
std::vector<std::pair<int, int>> finished_messages;
for (auto &[p, message] : processing_messages) {
int rank = p.first;
int partition = p.second;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmd, partition, nullptr);
if (finished) finished_messages.push_back({rank, partition});
}
}
}

for (auto &m : finished_messages)
processing_messages.erase(m);
} else if (pmesh->receive_type == "improbe") {
MPI_Status status;
int flag;
do {
mpi_message_t message;
MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, comms_[GetAssociatedSender(b_type)], &flag,
&message, &status);
if (flag) {
const int rank = status.MPI_SOURCE;
const int partition = status.MPI_TAG;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmd, partition, &message);
if (!finished)
processing_messages.insert(
std::make_pair(std::pair<int, int>{rank, partition}, message));
//----------------------------------------------------------------------------------------
bool CombinedBuffers::TryReceiveAny(MeshData<Real> *pmd, BoundaryType b_type) {
#ifdef MPI_PARALLEL
bool all_received = true;
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_recv_buffers.count({rank, b_type})) {
auto &comb_bufs = combined_recv_buffers.at({rank, b_type});
for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) {
bool received = comb_buf.TryReceiveAndUnpack(pmd->GetUids());
all_received = all_received && received;
}
} while (flag);

// Process in-flight messages
std::vector<std::pair<int, int>> finished_messages;
for (auto &[p, message] : processing_messages) {
int rank = p.first;
int partition = p.second;
bool finished = combined_recv_buffers.at({rank, b_type})
.TryReceiveAndUnpack(pmd, partition, &message);
if (finished) finished_messages.push_back({rank, partition});
}

for (auto &m : finished_messages)
processing_messages.erase(m);
} else {
PARTHENON_FAIL("Unknown receiving strategy.");
}
return all_received;
#endif
}
} // namespace parthenon
15 changes: 8 additions & 7 deletions src/bvals/comms/combined_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ struct CombinedBuffersRankPartition {

void PackAndSend(const std::set<Uid_t> &vars);

bool TryReceiveAndUnpack(mpi_message_t *message, const std::set<Uid_t> &vars);
bool TryReceiveAndUnpack(const std::set<Uid_t> &vars);

void Compare(const std::set<Uid_t> &vars);
};

struct CombinedBuffersRank {
Expand Down Expand Up @@ -121,7 +123,7 @@ struct CombinedBuffersRank {

void PackAndSend(MeshData<Real> *pmd);

bool TryReceiveAndUnpack(MeshData<Real> *pmd, int partition, mpi_message_t *message);
bool TryReceiveAndUnpack(MeshData<Real> *pmd, int partition);

bool IsAvailableForWrite(MeshData<Real> *pmd);
};
Expand All @@ -131,8 +133,6 @@ struct CombinedBuffers {
std::map<std::pair<int, BoundaryType>, CombinedBuffersRank> combined_send_buffers;
std::map<std::pair<int, BoundaryType>, CombinedBuffersRank> combined_recv_buffers;

std::map<std::pair<int, int>, mpi_message_t> processing_messages;

std::map<BoundaryType, mpi_comm_t> comms_;

Mesh *pmesh;
Expand Down Expand Up @@ -167,10 +167,9 @@ struct CombinedBuffers {
can_delete = cbrp.IsAvailableForWrite() && can_delete;
}
}
} while (!can_delete);
} while (!can_delete);
combined_send_buffers.clear();
combined_recv_buffers.clear();
processing_messages.clear();
}

void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb,
Expand All @@ -185,7 +184,9 @@ struct CombinedBuffers {

void PackAndSend(MeshData<Real> *pmd, BoundaryType b_type);

void TryReceiveAny(MeshData<Real> *pmd, BoundaryType b_type);
void Compare(MeshData<Real> *pmd, BoundaryType b_type);

bool TryReceiveAny(MeshData<Real> *pmd, BoundaryType b_type);

bool IsAvailableForWrite(MeshData<Real> *pmd, BoundaryType b_type);
};
Expand Down
1 change: 0 additions & 1 deletion src/mesh/mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages,
ddisp(Globals::nranks), bnref(Globals::nranks), bnderef(Globals::nranks),
brdisp(Globals::nranks), bddisp(Globals::nranks),
pcombined_buffers(std::make_shared<CombinedBuffers>(this)),
receive_type{pin->GetOrAddString("parthenon/mesh", "receive_type", "iprobe")},
do_combined_comms{
pin->GetOrAddBoolean("parthenon/mesh", "do_combined_comms", false)} {
// Allow for user overrides to default Parthenon functions
Expand Down
2 changes: 0 additions & 2 deletions src/mesh/mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ class Mesh {
comm_buf_map_t boundary_comm_map;
TagMap tag_map;

std::string
receive_type; // Defines how to structure the MPI receives for combined buffers
std::shared_ptr<CombinedBuffers> pcombined_buffers;

#ifdef MPI_PARALLEL
Expand Down
Loading

0 comments on commit f2d8c0c

Please sign in to comment.