diff --git a/src/bvals/comms/boundary_communication.cpp b/src/bvals/comms/boundary_communication.cpp index 1afa32dd68c5..96fb8b9916b5 100644 --- a/src/bvals/comms/boundary_communication.cpp +++ b/src/bvals/comms/boundary_communication.cpp @@ -64,7 +64,8 @@ TaskStatus SendBoundBufs(std::shared_ptr> &md) { return TaskStatus::complete; } - bool can_write_combined = pmesh->pcombined_buffers->IsAvailableForWrite(md->partition, bound_type); + bool can_write_combined = + pmesh->pcombined_buffers->IsAvailableForWrite(md->partition, bound_type); if (other_communication_unfinished || !can_write_combined) { return TaskStatus::incomplete; } diff --git a/src/bvals/comms/combined_buffers.cpp b/src/bvals/comms/combined_buffers.cpp index b0e6338ec75d..d3ff16e37c23 100644 --- a/src/bvals/comms/combined_buffers.cpp +++ b/src/bvals/comms/combined_buffers.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -278,17 +279,18 @@ void CombinedBuffersRank::CompareReceivedBuffers(int partition) { }); } -void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb, const NeighborBlock &nb, - const std::shared_ptr> &var, BoundaryType b_type) { +void CombinedBuffers::AddSendBuffer(int partition, MeshBlock *pmb, + const NeighborBlock &nb, + const std::shared_ptr> &var, + BoundaryType b_type) { if (combined_send_buffers.count({nb.rank, b_type}) == 0) - combined_send_buffers[{nb.rank, b_type}] = - CombinedBuffersRank(nb.rank, b_type, true); - combined_send_buffers[{nb.rank, b_type}].AddSendBuffer(partition, pmb, nb, var, - b_type); + combined_send_buffers[{nb.rank, b_type}] = CombinedBuffersRank(nb.rank, b_type, true); + combined_send_buffers[{nb.rank, b_type}].AddSendBuffer(partition, pmb, nb, var, b_type); } void CombinedBuffers::AddRecvBuffer(MeshBlock *pmb, const NeighborBlock &nb, - const std::shared_ptr>, BoundaryType b_type) { + const std::shared_ptr>, + BoundaryType b_type) { // We don't actually know enough here to register this particular buffer, but we do // know that it's existence implies that we need to receive a message from the // neighbor block rank eventually telling us the details @@ -322,7 +324,8 @@ bool CombinedBuffers::IsAvailableForWrite(int partition, BoundaryType b_type) { bool available{true}; for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) { - available = available && combined_send_buffers[{rank, b_type}].IsAvailableForWrite(partition); + available = available && + combined_send_buffers[{rank, b_type}].IsAvailableForWrite(partition); } } return available; @@ -336,14 +339,16 @@ void CombinedBuffers::PackAndSend(int partition, BoundaryType b_type) { } } -void CombinedBuffers::RepointSendBuffers(Mesh *pmesh, int partition, BoundaryType b_type) { +void CombinedBuffers::RepointSendBuffers(Mesh *pmesh, int partition, + BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_send_buffers.count({rank, b_type})) combined_send_buffers[{rank, b_type}].RepointBuffers(pmesh, partition); } } -void CombinedBuffers::RepointRecvBuffers(Mesh *pmesh, int partition, BoundaryType b_type) { +void CombinedBuffers::RepointRecvBuffers(Mesh *pmesh, int partition, + BoundaryType b_type) { for (int rank = 0; rank < Globals::nranks; ++rank) { if (combined_recv_buffers.count({rank, b_type})) combined_recv_buffers[{rank, b_type}].RepointBuffers(pmesh, partition); @@ -360,22 +365,23 @@ void CombinedBuffers::TryReceiveAny(Mesh *pmesh, BoundaryType b_type) { if (flag) { const int rank = status.MPI_SOURCE; const int partition = status.MPI_TAG - 913; - bool finished = combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); + bool finished = + combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); if (!finished) processing_messages.insert({rank, partition}); } } while (flag); // Process in flight messages - std::set> finished_messages; - for (auto &[rank, partition] : processing_messages) { - bool finished = combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); + std::set> finished_messages; + for (auto &[rank, partition] : processing_messages) { + bool finished = + combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition); if (finished) finished_messages.insert({rank, partition}); } for (auto &m : finished_messages) processing_messages.erase(m); - #endif } diff --git a/src/bvals/comms/combined_buffers.hpp b/src/bvals/comms/combined_buffers.hpp index 35a66a7a37c0..6ceac43d753d 100644 --- a/src/bvals/comms/combined_buffers.hpp +++ b/src/bvals/comms/combined_buffers.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -91,7 +92,7 @@ struct CombinedBuffers { // Combined buffers for each rank std::map, CombinedBuffersRank> combined_send_buffers; std::map, CombinedBuffersRank> combined_recv_buffers; - + std::set> processing_messages; void clear() { diff --git a/src/utils/communication_buffer.hpp b/src/utils/communication_buffer.hpp index 49688cfbb00b..a3f4d9a97dde 100644 --- a/src/utils/communication_buffer.hpp +++ b/src/utils/communication_buffer.hpp @@ -465,8 +465,8 @@ template void CommBuffer::Stale() { // PARTHENON_REQUIRE(*comm_type_ != BuffCommType::sender, "Should never get here."); - //if (!(*state_ == BufferState::received || *state_ == BufferState::received_null)) - // PARTHENON_DEBUG_WARN("Staling buffer not in the received state."); + // if (!(*state_ == BufferState::received || *state_ == BufferState::received_null)) + // PARTHENON_DEBUG_WARN("Staling buffer not in the received state."); #ifdef MPI_PARALLEL if (MPI_REQUEST_NULL != *my_request_) PARTHENON_WARN("Staling buffer with pending request.");