diff --git a/src/bvals/comms/boundary_communication.cpp b/src/bvals/comms/boundary_communication.cpp index bbc4643b5fdf..6eee76c48eae 100644 --- a/src/bvals/comms/boundary_communication.cpp +++ b/src/bvals/comms/boundary_communication.cpp @@ -151,26 +151,17 @@ TaskStatus SendBoundBufs(std::shared_ptr> &md) { if (bound_type == BoundaryType::any || bound_type == BoundaryType::nonlocal) Kokkos::fence(); #endif - if (pmesh->do_combined_comms) { - for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) { - auto &buf = *cache.buf_vec[ibuf]; - if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled) - buf.SendLocal(); - else - buf.SendNullLocal(); - } - - // Send the combined buffers - pmesh->pcombined_buffers->PackAndSend(md.get(), bound_type); - } else { - for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) { - auto &buf = *cache.buf_vec[ibuf]; - if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled) - buf.Send(); - else - buf.SendNull(); - } + const bool comb_comm = pmesh->do_combined_comms; + for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) { + auto &buf = *cache.buf_vec[ibuf]; + if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled) + buf.Send(comb_comm); + else + buf.SendNull(comb_comm); } + if (pmesh->do_combined_comms) + pmesh->pcombined_buffers->PackAndSend(md.get(), bound_type); + return TaskStatus::complete; } @@ -228,22 +219,16 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr> &md) { bool all_received = true; if (pmesh->do_combined_comms) { // Receive any messages that are around - pmesh->pcombined_buffers->TryReceiveAny(md.get(), bound_type); - - int nreceived{0}; - std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), - [&all_received, &nreceived](auto pbuf) { - all_received = pbuf->TryReceiveLocal() && all_received; - nreceived += pbuf->TryReceiveLocal(); - }); - } else { - int nreceived{0}; - std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), - [&all_received, &nreceived](auto pbuf) { - all_received = pbuf->TryReceive() && all_received; - nreceived += pbuf->TryReceive(); - }); + bool all_combined_received = + pmesh->pcombined_buffers->TryReceiveAny(md.get(), bound_type); + all_received = all_received && all_combined_received; } + const bool comb_comm = pmesh->do_combined_comms; + std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec), + [&all_received, comb_comm](auto pbuf) { + all_received = pbuf->TryReceive(comb_comm) && all_received; + }); + int ibound = 0; if (Globals::sparse_config.enabled && all_received) { ForEachBoundary( @@ -286,6 +271,10 @@ TaskStatus SetBounds(std::shared_ptr> &md) { Mesh *pmesh = md->GetMeshPointer(); auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false); + // if (pmesh->do_combined_comms) { + // pmesh->pcombined_buffers->Compare(md.get(), bound_type); + // } + auto [rebuild, nbound] = CheckReceiveBufferCacheForRebuild(md); if (rebuild) { diff --git a/src/bvals/comms/combined_buffers.cpp b/src/bvals/comms/combined_buffers.cpp index f5b4e911a0ca..75253339f1bc 100644 --- a/src/bvals/comms/combined_buffers.cpp +++ b/src/bvals/comms/combined_buffers.cpp @@ -127,7 +127,12 @@ void CombinedBuffersRankPartition::PackAndSend(const std::set &vars) { int idx{0}; for (auto uid : var_set) { for (auto &[bnd_id, pvbbuf] : combined_info_buf.at(uid)) { - stat[idx] = (pvbbuf->GetState() == BufferState::sending); + const auto state = pvbbuf->GetState(); + PARTHENON_REQUIRE(state == BufferState::sending || + state == BufferState::sending_null, + "Bad state."); + int send_type = (state == BufferState::sending); + stat[idx] = send_type; ++idx; } } @@ -142,8 +147,11 @@ void CombinedBuffersRankPartition::PackAndSend(const std::set &vars) { } //---------------------------------------------------------------------------------------- -bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message, - const std::set &vars) { +bool CombinedBuffersRankPartition::TryReceiveAndUnpack(const std::set &vars) { + if ((sparse_status_buffer.GetState() == BufferState::received) && + (combined_comm_buffer.GetState() == BufferState::received)) + return true; + const auto &var_set = vars.size() == 0 ? all_vars : vars; // Make sure the var-boundary buffers are available to write to int nbuf{0}; @@ -158,7 +166,7 @@ bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message, sparse_status_buffer.ConstructBuffer(nbuf); } auto received_sparse = sparse_status_buffer.TryReceive(); - auto received = combined_comm_buffer.TryReceive(message); + auto received = combined_comm_buffer.TryReceive(); if (!received || !received_sparse) return false; // Allocate and free buffers as required @@ -197,6 +205,49 @@ bool CombinedBuffersRankPartition::TryReceiveAndUnpack(mpi_message_t *message, return true; } +//---------------------------------------------------------------------------------------- +void CombinedBuffersRankPartition::Compare(const std::set &vars) { + PARTHENON_REQUIRE(combined_comm_buffer.GetState() == BufferState::received, + "Combined buffer not in correct state"); + PARTHENON_REQUIRE(sparse_status_buffer.GetState() == BufferState::received, + "Combined buffer not in correct state"); + const auto &var_set = vars.size() == 0 ? all_vars : vars; + // Allocate and free buffers as required + int idx{0}; + auto &stat = sparse_status_buffer.buffer(); + for (auto uid : var_set) { + for (auto &[bnd_id, pvbbuf] : combined_info_buf.at(uid)) { + if (stat[idx] == 1) { + PARTHENON_REQUIRE(pvbbuf->GetState() == BufferState::received, + "State doesn't agree."); + } else { + PARTHENON_REQUIRE(pvbbuf->GetState() == BufferState::received_null, + "State doesn't agree."); + } + idx++; + } + } + + auto &bids = GetBndIdsOnDevice(vars); + Kokkos::parallel_for( + PARTHENON_AUTO_LABEL, + Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), bids.size(), Kokkos::AUTO), + KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) { + const int b = team_member.league_rank(); + if (bids[b].buf_allocated) { + const int buf_size = bids[b].size(); + Real *com_buf = &(bids[b].combined_buf(bids[b].start_idx())); + Real *buf = &(bids[b].buf(0)); + Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size), + [&](const int idx) { + PARTHENON_REQUIRE(buf[idx] == com_buf[idx], "Bad value"); + }); + } + }); + combined_comm_buffer.Stale(); + sparse_status_buffer.Stale(); +} + //---------------------------------------------------------------------------------------- void CombinedBuffersRankPartition::AddVarBoundary(BndId &bnd_id) { auto key = GetChannelKey(bnd_id); @@ -346,13 +397,12 @@ bool CombinedBuffersRank::IsAvailableForWrite(MeshData *pmd) { } //---------------------------------------------------------------------------------------- -bool CombinedBuffersRank::TryReceiveAndUnpack(MeshData *pmd, int partition, - mpi_message_t *message) { +bool CombinedBuffersRank::TryReceiveAndUnpack(MeshData *pmd, int partition) { PARTHENON_REQUIRE(buffers_built, "Trying to recv combined buffers before they have been built"); PARTHENON_REQUIRE(combined_bufs.count(partition) > 0, "Trying to receive on a non-existent combined receive buffer."); - return combined_bufs.at(partition).TryReceiveAndUnpack(message, pmd->GetUids()); + return combined_bufs.at(partition).TryReceiveAndUnpack(pmd->GetUids()); } //---------------------------------------------------------------------------------------- @@ -370,6 +420,7 @@ void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb, combined_send_buffers.at({nb.rank, b_type}).AddSendBuffer(partition, pmb, nb, var); } +//---------------------------------------------------------------------------------------- void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, const std::shared_ptr>, BoundaryType b_type) { @@ -383,11 +434,13 @@ void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, comms_[GetAssociatedSender(b_type)], pmesh))); } +//---------------------------------------------------------------------------------------- void CombinedBuffers::ResolveAndSendSendBuffers() { for (auto &[id, buf] : combined_send_buffers) buf.ResolveSendBuffersAndSendInfo(); } +//---------------------------------------------------------------------------------------- void CombinedBuffers::ReceiveBufferInfo() { constexpr std::int64_t max_it = 1e10; std::vector received(combined_recv_buffers.size(), false); @@ -404,6 +457,7 @@ void CombinedBuffers::ReceiveBufferInfo() { "Too many iterations waiting to receive boundary communication buffers."); } +//---------------------------------------------------------------------------------------- bool CombinedBuffers::IsAvailableForWrite(MeshData *pmd, BoundaryType b_type) { bool available{true}; for (int rank = 0; rank < Globals::nranks; ++rank) { @@ -415,6 +469,7 @@ bool CombinedBuffers::IsAvailableForWrite(MeshData *pmd, BoundaryType b_ty return available; } +//---------------------------------------------------------------------------------------- void CombinedBuffers::PackAndSend(MeshData *pmd, BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) { @@ -423,82 +478,32 @@ void CombinedBuffers::PackAndSend(MeshData *pmd, BoundaryType b_type) { } } -void CombinedBuffers::TryReceiveAny(MeshData *pmd, BoundaryType b_type) { -#ifdef MPI_PARALLEL - // This was an attempt at another method for receiving, it seemed to work - // but was subject to the same problems as the Iprobe based code - if (pmesh->receive_type == "old") { - for (int rank = 0; rank < Globals::nranks; ++rank) { - if (combined_recv_buffers.count({rank, b_type})) { - auto &comb_bufs = combined_recv_buffers.at({rank, b_type}); - for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) { - comb_buf.TryReceiveAndUnpack(nullptr, pmd->GetUids()); - } - } - } - } else if (pmesh->receive_type == "iprobe") { - MPI_Status status; - int flag; - do { - mpi_message_t message; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comms_[GetAssociatedSender(b_type)], &flag, - &status); - if (flag) { - const int rank = status.MPI_SOURCE; - const int partition = status.MPI_TAG; - bool finished = combined_recv_buffers.at({rank, b_type}) - .TryReceiveAndUnpack(pmd, partition, nullptr); - if (!finished) - processing_messages.insert( - std::make_pair(std::pair{rank, partition}, message)); +//---------------------------------------------------------------------------------------- +void CombinedBuffers::Compare(MeshData *pmd, BoundaryType b_type) { + for (int rank = 0; rank < Globals::nranks; ++rank) { + if (combined_recv_buffers.count({rank, b_type})) { + auto &comb_bufs = combined_recv_buffers.at({rank, b_type}); + for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) { + comb_buf.Compare(pmd->GetUids()); } - } while (flag); - - // Process in-flight messages - std::vector> finished_messages; - for (auto &[p, message] : processing_messages) { - int rank = p.first; - int partition = p.second; - bool finished = combined_recv_buffers.at({rank, b_type}) - .TryReceiveAndUnpack(pmd, partition, nullptr); - if (finished) finished_messages.push_back({rank, partition}); } + } +} - for (auto &m : finished_messages) - processing_messages.erase(m); - } else if (pmesh->receive_type == "improbe") { - MPI_Status status; - int flag; - do { - mpi_message_t message; - MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, comms_[GetAssociatedSender(b_type)], &flag, - &message, &status); - if (flag) { - const int rank = status.MPI_SOURCE; - const int partition = status.MPI_TAG; - bool finished = combined_recv_buffers.at({rank, b_type}) - .TryReceiveAndUnpack(pmd, partition, &message); - if (!finished) - processing_messages.insert( - std::make_pair(std::pair{rank, partition}, message)); +//---------------------------------------------------------------------------------------- +bool CombinedBuffers::TryReceiveAny(MeshData *pmd, BoundaryType b_type) { +#ifdef MPI_PARALLEL + bool all_received = true; + for (int rank = 0; rank < Globals::nranks; ++rank) { + if (combined_recv_buffers.count({rank, b_type})) { + auto &comb_bufs = combined_recv_buffers.at({rank, b_type}); + for (auto &[partition, comb_buf] : comb_bufs.combined_bufs) { + bool received = comb_buf.TryReceiveAndUnpack(pmd->GetUids()); + all_received = all_received && received; } - } while (flag); - - // Process in-flight messages - std::vector> finished_messages; - for (auto &[p, message] : processing_messages) { - int rank = p.first; - int partition = p.second; - bool finished = combined_recv_buffers.at({rank, b_type}) - .TryReceiveAndUnpack(pmd, partition, &message); - if (finished) finished_messages.push_back({rank, partition}); } - - for (auto &m : finished_messages) - processing_messages.erase(m); - } else { - PARTHENON_FAIL("Unknown receiving strategy."); } + return all_received; #endif } } // namespace parthenon diff --git a/src/bvals/comms/combined_buffers.hpp b/src/bvals/comms/combined_buffers.hpp index 79ecd8ecbb65..1235783168bf 100644 --- a/src/bvals/comms/combined_buffers.hpp +++ b/src/bvals/comms/combined_buffers.hpp @@ -82,7 +82,9 @@ struct CombinedBuffersRankPartition { void PackAndSend(const std::set &vars); - bool TryReceiveAndUnpack(mpi_message_t *message, const std::set &vars); + bool TryReceiveAndUnpack(const std::set &vars); + + void Compare(const std::set &vars); }; struct CombinedBuffersRank { @@ -121,7 +123,7 @@ struct CombinedBuffersRank { void PackAndSend(MeshData *pmd); - bool TryReceiveAndUnpack(MeshData *pmd, int partition, mpi_message_t *message); + bool TryReceiveAndUnpack(MeshData *pmd, int partition); bool IsAvailableForWrite(MeshData *pmd); }; @@ -131,8 +133,6 @@ struct CombinedBuffers { std::map, CombinedBuffersRank> combined_send_buffers; std::map, CombinedBuffersRank> combined_recv_buffers; - std::map, mpi_message_t> processing_messages; - std::map comms_; Mesh *pmesh; @@ -167,10 +167,9 @@ struct CombinedBuffers { can_delete = cbrp.IsAvailableForWrite() && can_delete; } } - } while (!can_delete); + } while (!can_delete); combined_send_buffers.clear(); combined_recv_buffers.clear(); - processing_messages.clear(); } void AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, @@ -185,7 +184,9 @@ struct CombinedBuffers { void PackAndSend(MeshData *pmd, BoundaryType b_type); - void TryReceiveAny(MeshData *pmd, BoundaryType b_type); + void Compare(MeshData *pmd, BoundaryType b_type); + + bool TryReceiveAny(MeshData *pmd, BoundaryType b_type); bool IsAvailableForWrite(MeshData *pmd, BoundaryType b_type); }; diff --git a/src/mesh/mesh.cpp b/src/mesh/mesh.cpp index b597cc2ac23b..2fdc17a54240 100644 --- a/src/mesh/mesh.cpp +++ b/src/mesh/mesh.cpp @@ -88,7 +88,6 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages, ddisp(Globals::nranks), bnref(Globals::nranks), bnderef(Globals::nranks), brdisp(Globals::nranks), bddisp(Globals::nranks), pcombined_buffers(std::make_shared(this)), - receive_type{pin->GetOrAddString("parthenon/mesh", "receive_type", "iprobe")}, do_combined_comms{ pin->GetOrAddBoolean("parthenon/mesh", "do_combined_comms", false)} { // Allow for user overrides to default Parthenon functions diff --git a/src/mesh/mesh.hpp b/src/mesh/mesh.hpp index 485ed28d9af1..38379bd851c3 100644 --- a/src/mesh/mesh.hpp +++ b/src/mesh/mesh.hpp @@ -237,8 +237,6 @@ class Mesh { comm_buf_map_t boundary_comm_map; TagMap tag_map; - std::string - receive_type; // Defines how to structure the MPI receives for combined buffers std::shared_ptr pcombined_buffers; #ifdef MPI_PARALLEL diff --git a/src/utils/communication_buffer.hpp b/src/utils/communication_buffer.hpp index e7d7b72539e8..47ee05bbe477 100644 --- a/src/utils/communication_buffer.hpp +++ b/src/utils/communication_buffer.hpp @@ -123,28 +123,26 @@ class CommBuffer { BufferState GetState() { return *state_; } - void Send() noexcept; - void SendLocal() noexcept; - void SendNull() noexcept; - void SendNullLocal() noexcept; + void Send(bool local = false) noexcept; + void SendNull(bool local = false) noexcept; bool IsAvailableForWrite(); - void TryStartReceive(mpi_message_t *message_id = nullptr) noexcept; - bool TryReceive(mpi_message_t *message_id = nullptr) noexcept; - bool TryReceiveLocal() noexcept; + void TryStartReceive() noexcept; + bool TryReceive(bool local = false) noexcept; void SetReceived() noexcept { PARTHENON_REQUIRE(*comm_type_ == BuffCommType::receiver || *comm_type_ == BuffCommType::sparse_receiver, "This doesn't make sense for a non-receiver."); *state_ = BufferState::received; } + void SetReceivedNull() noexcept { - PARTHENON_REQUIRE(*comm_type_ == BuffCommType::receiver || - *comm_type_ == BuffCommType::sparse_receiver, + PARTHENON_REQUIRE(*comm_type_ == BuffCommType::sparse_receiver, "This doesn't make sense for a non-receiver."); *state_ = BufferState::received_null; } + bool IsSafeToDelete() { if (*comm_type_ == BuffCommType::sparse_receiver || *comm_type_ == BuffCommType::receiver) { @@ -237,16 +235,16 @@ CommBuffer &CommBuffer::operator=(const CommBuffer &in) { } template -void CommBuffer::Send() noexcept { +void CommBuffer::Send(bool local) noexcept { if (!active_) { - SendNull(); + SendNull(local); return; } PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, "Trying to send from buffer that hasn't been staled."); *state_ = BufferState::sending; - if (*comm_type_ == BuffCommType::sender) { + if (*comm_type_ == BuffCommType::sender && !local) { // Make sure that this request isn't still out, // this could be blocking #ifdef MPI_PARALLEL @@ -266,33 +264,11 @@ void CommBuffer::Send() noexcept { } template -void CommBuffer::SendLocal() noexcept { - PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, - "Trying to send from buffer that hasn't been staled."); - *state_ = BufferState::sending; - if (*comm_type_ == BuffCommType::receiver) { - // This is an error - PARTHENON_FAIL("Trying to send from a receiver"); - } -} - -template -void CommBuffer::SendNullLocal() noexcept { - PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, - "Trying to send from buffer that hasn't been staled."); - *state_ = BufferState::sending_null; - if (*comm_type_ == BuffCommType::receiver) { - // This is an error - PARTHENON_FAIL("Trying to send from a receiver"); - } -} - -template -void CommBuffer::SendNull() noexcept { +void CommBuffer::SendNull(bool local) noexcept { PARTHENON_DEBUG_REQUIRE(*state_ == BufferState::stale, "Trying to send_null from buffer that hasn't been staled."); *state_ = BufferState::sending_null; - if (*comm_type_ == BuffCommType::sender) { + if (*comm_type_ == BuffCommType::sender && !local) { // Make sure that this request isn't still out, // this could be blocking #ifdef MPI_PARALLEL @@ -333,28 +309,19 @@ bool CommBuffer::IsAvailableForWrite() { } template -void CommBuffer::TryStartReceive(mpi_message_t *message_id) noexcept { +void CommBuffer::TryStartReceive() noexcept { #ifdef MPI_PARALLEL if (*comm_type_ == BuffCommType::receiver && !*started_irecv_) { PARTHENON_REQUIRE( *my_request_ == MPI_REQUEST_NULL, "Cannot have another pending request in a buffer that is starting to receive."); - if (!IsActive()) - Allocate( - -1); // For early start of Irecv, always need storage space even if not used - if (message_id != nullptr) { - PARTHENON_MPI_CHECK(MPI_Imrecv(buf_.data(), buf_.size(), - MPITypeMap::type(), message_id, - my_request_.get())); - } else { - PARTHENON_MPI_CHECK(MPI_Irecv(buf_.data(), buf_.size(), - MPITypeMap::type(), send_rank_, tag_, - comm_, my_request_.get())); - } + // For early start of Irecv, always need storage space even if not used + if (!IsActive()) Allocate(-1); + PARTHENON_MPI_CHECK(MPI_Irecv(buf_.data(), buf_.size(), + MPITypeMap::type(), send_rank_, tag_, comm_, + my_request_.get())); *started_irecv_ = true; } else if (*comm_type_ == BuffCommType::sparse_receiver && !*started_irecv_) { - PARTHENON_REQUIRE(message_id == nullptr, - "Imrecv not yet implemented for sparse buffers."); int test; MPI_Status status; // Check if our message is available so that we can use the correct buffer size @@ -381,36 +348,19 @@ void CommBuffer::TryStartReceive(mpi_message_t *message_id) noexcept { } template -bool CommBuffer::TryReceiveLocal() noexcept { - if (*state_ == BufferState::received || *state_ == BufferState::received_null) - return true; - if (*comm_type_ == BuffCommType::both) { - if (*state_ == BufferState::sending) { - *state_ = BufferState::received; - // Memory should already be available, since both - // send and receive rank point at the same memory - return true; - } else if (*state_ == BufferState::sending_null) { - *state_ = BufferState::received_null; - return true; - } - } - return false; -} - -template -bool CommBuffer::TryReceive(mpi_message_t *message_id) noexcept { +bool CommBuffer::TryReceive(bool local) noexcept { if (*state_ == BufferState::received || *state_ == BufferState::received_null) return true; - if (*comm_type_ == BuffCommType::receiver || - *comm_type_ == BuffCommType::sparse_receiver) { + if ((*comm_type_ == BuffCommType::receiver || + *comm_type_ == BuffCommType::sparse_receiver) && + !local) { #ifdef MPI_PARALLEL (*nrecv_tries_)++; PARTHENON_REQUIRE(*nrecv_tries_ < 1e8, "MPI probably hanging after 1e8 receive tries."); - TryStartReceive(message_id); + TryStartReceive(); if (*started_irecv_) { MPI_Status status; @@ -478,7 +428,7 @@ bool CommBuffer::TryReceive(mpi_message_t *message_id) noexcept { return true; } return false; - } else { + } else if (*comm_type_ == BuffCommType::sender) { // This is an error since this is a purely send buffer PARTHENON_FAIL("Trying to receive on a sender"); }