Skip to content

Commit

Permalink
format and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
lroberts36 committed Oct 22, 2024
1 parent 7f5b944 commit b0dd208
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/bvals/comms/bnd_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct BndId {
KOKKOS_FORCEINLINE_FUNCTION
int &start_idx() { return data[9]; }

buf_pool_t<Real>::weak_t buf; // comm buffer from pool
buf_pool_t<Real>::weak_t buf; // comm buffer from pool
BufArray1D<Real> combined_buf; // Combined buffer

KOKKOS_DEFAULTED_FUNCTION
Expand Down
16 changes: 8 additions & 8 deletions src/bvals/comms/boundary_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
Kokkos::fence();
#endif

// Send the combined buffers
// Send the combined buffers
pmesh->pcombined_buffers->PackAndSend(md->partition, bound_type);

for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) {
Expand Down Expand Up @@ -211,18 +211,18 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey,
false);

// Receive any messages that are around
pmesh->pcombined_buffers->TryReceiveAny(pmesh, bound_type);

bool all_received = true;
int nreceived = 0;
std::for_each(
std::begin(cache.buf_vec), std::end(cache.buf_vec),
[&all_received, &nreceived](auto pbuf) {
bool received = pbuf->TryReceiveLocal();
nreceived += received;
all_received = received && all_received; });
std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[&all_received, &nreceived](auto pbuf) {
bool received = pbuf->TryReceiveLocal();
nreceived += received;
all_received = received && all_received;
});
int ibound = 0;
if (Globals::sparse_config.enabled && all_received) {
ForEachBoundary<bound_type>(
Expand Down
71 changes: 32 additions & 39 deletions src/bvals/comms/combined_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// license in this material to reproduce, prepare derivative works, distribute copies to
// the public, perform publicly and display publicly, and to permit others to do so.
//========================================================================================
#include <cstdio>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -65,12 +66,10 @@ bool CombinedBuffersRank::TryReceiveBufInfo(Mesh *pmesh) {
// Unpack into per combined buffer information
int idx{nglobal};

printf("Expecting to receive %i partitions on rank %i from rank %i\n", npartitions, Globals::my_rank, other_rank);
for (int p = 0; p < npartitions; ++p) {
const int partition = mess_buf[idx++];
const int nbuf = mess_buf[idx++];
const int total_size = mess_buf[idx++];
printf("Building receive buffer for partition %i on rank %i\n", partition, Globals::my_rank);
combined_buffers[partition] =
CommBuffer<buf_t>(partition, other_rank, Globals::my_rank, comm_);
combined_buffers[partition].ConstructBuffer("combined recv buffer", total_size);
Expand Down Expand Up @@ -129,7 +128,7 @@ void CombinedBuffersRank::ResolveSendBuffersAndSendInfo(Mesh *pmesh) {
buf_struct.Serialize(&(mess_buf[idx]));
PARTHENON_REQUIRE(pmesh->boundary_comm_map.count(GetChannelKey(buf_struct)),
"Buffer doesn't exist.");

bufs.push_back(&(pmesh->boundary_comm_map[GetChannelKey(buf_struct)]));
idx += BndId::NDAT;
}
Expand All @@ -144,26 +143,25 @@ void CombinedBuffersRank::ResolveSendBuffersAndSendInfo(Mesh *pmesh) {
combined_buffers[partition].ConstructBuffer("combined send buffer", size);
}

// Point the BndId objects to the combined buffers
// Point the BndId objects to the combined buffers
for (auto &[partition, buf_struct_vec] : combined_info) {
for (auto &buf_struct : buf_struct_vec) {
buf_struct.combined_buf = combined_buffers[partition].buffer();
}
}

buffers_built = true;
}

void CombinedBuffersRank::RepointBuffers(Mesh *pmesh, int partition) {
printf("Repointing buffers on partition %i on rank %i to rank %i\n", partition, Globals::my_rank, other_rank);
if (combined_info.count(partition) == 0) return;
// Pull out the buffers and point them to the buf_struct
// Pull out the buffers and point them to the buf_struct
auto &buf_struct_vec = combined_info[partition];
for (auto &buf_struct : buf_struct_vec) {
buf_struct.buf = pmesh->boundary_comm_map[GetChannelKey(buf_struct)];
}

// Get the BndId objects on device
// Get the BndId objects on device
combined_info_device[partition] = ParArray1D<BndId>("bnd_id", buf_struct_vec.size());
auto ci_host = Kokkos::create_mirror_view(combined_info_device[partition]);
for (int i = 0; i < ci_host.size(); ++i)
Expand All @@ -172,28 +170,25 @@ void CombinedBuffersRank::RepointBuffers(Mesh *pmesh, int partition) {
}

void CombinedBuffersRank::PackAndSend(int partition) {
PARTHENON_REQUIRE(buffers_built, "Trying to send combined buffers before they have been built");
PARTHENON_REQUIRE(buffers_built,
"Trying to send combined buffers before they have been built");
if (combined_info_device.count(partition) == 0) return; // There is nothing to send here
auto &comb_info = combined_info_device[partition];
Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(), Kokkos::AUTO),
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(),
Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
const int buf_size = comb_info[b].size();
Real *com_buf = &(comb_info[b].combined_buf(comb_info[b].start_idx()));
Real *buf = &(comb_info[b].buf(0));
printf("buf_size = %i (%i) combined_buf_size = %i start = %i\n", buf_size, comb_info[b].buf.size(), comb_info[b].combined_buf.size(), comb_info[b].start_idx());
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) {
com_buf[idx] = buf[idx];
});
Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) { com_buf[idx] = buf[idx]; });
});
#ifdef MPI_PARALLEL
Kokkos::fence();
#endif
printf("Sending combined buffer %i from rank %i with size %i (%i)\n", partition, Globals::my_rank, combined_buffers[partition].buffer().size(), current_size[partition]);
combined_buffers[partition].Send();
// Information in these send buffers is no longer required
for (auto &buf : buffers[partition])
Expand All @@ -204,7 +199,6 @@ bool CombinedBuffersRank::AllReceived() {
bool all_received{true};
for (auto &[partition, buf] : combined_buffers) {
bool received = buf.GetState() == BufferState::received;
if (!received) printf("partition %i not received on rank %i\n", partition, Globals::my_rank);
all_received = all_received && received;
}
return all_received;
Expand All @@ -217,8 +211,10 @@ void CombinedBuffersRank::StaleAllReceives() {
}

bool CombinedBuffersRank::TryReceiveAndUnpack(Mesh *pmesh, int partition) {
PARTHENON_REQUIRE(buffers_built, "Trying to recv combined buffers before they have been built");
PARTHENON_REQUIRE(combined_buffers.count(partition) > 0, "Trying to receive on a non-existent combined receive buffer.");
PARTHENON_REQUIRE(buffers_built,
"Trying to recv combined buffers before they have been built");
PARTHENON_REQUIRE(combined_buffers.count(partition) > 0,
"Trying to receive on a non-existent combined receive buffer.");
auto received = combined_buffers[partition].TryReceive();
if (!received) return false;

Expand All @@ -230,24 +226,21 @@ bool CombinedBuffersRank::TryReceiveAndUnpack(Mesh *pmesh, int partition) {
buf->Allocate();
}
}
if (!all_allocated) {
printf("Repoint receive\n");
if (!all_allocated) {
RepointBuffers(pmesh, partition);
}
auto &comb_info = combined_info_device[partition];
Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(), Kokkos::AUTO),
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(),
Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
const int buf_size = comb_info[b].size();
Real *com_buf = &(comb_info[b].combined_buf(comb_info[b].start_idx()));
Real *buf = &(comb_info[b].buf(0));
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) {
buf[idx] = com_buf[idx];
});
Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) { buf[idx] = com_buf[idx]; });
});
combined_buffers[partition].Stale();
for (auto &buf : buffers[partition])
Expand All @@ -257,27 +250,27 @@ bool CombinedBuffersRank::TryReceiveAndUnpack(Mesh *pmesh, int partition) {

void CombinedBuffersRank::CompareReceivedBuffers(int partition) {
if (Globals::my_rank != 0) return; // don't crush us with output
PARTHENON_REQUIRE(buffers_built, "Trying to recv combined buffers before they have been built")
PARTHENON_REQUIRE(buffers_built,
"Trying to recv combined buffers before they have been built")
if (combined_info_device.count(partition) == 0) return;
printf("Comparing buffers received from partition %i on rank %i to rank %i\n", partition, other_rank, Globals::my_rank);
auto &comb_info = combined_info_device[partition];
Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(), Kokkos::AUTO),
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), combined_info[partition].size(),
Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
const int buf_size = comb_info[b].size();
Real *com_buf = &(comb_info[b].combined_buf(comb_info[b].start_idx()));
Real *buf = &(comb_info[b].buf(0));
printf("Buffer [%i] start = %i size = %i\n", b, comb_info[b].start_idx(), buf_size);
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) {
if (buf[idx] != com_buf[idx])
printf(" [%i] %e %e\n", idx, buf[idx], com_buf[idx]);
});
printf("Buffer [%i] start = %i size = %i\n", b, comb_info[b].start_idx(),
buf_size);
Kokkos::parallel_for(Kokkos::TeamThreadRange<>(team_member, buf_size),
[&](const int idx) {
if (buf[idx] != com_buf[idx])
printf(" [%i] %e %e\n", idx, buf[idx], com_buf[idx]);
});
});
}


} // namespace parthenon
36 changes: 16 additions & 20 deletions src/bvals/comms/combined_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct CombinedBuffersRank {
// partition id of the sender will be the mpi tag we use
bool buffers_built{false};
std::map<int, coalesced_message_structure_t> combined_info;
std::map<int, std::vector<CommBuffer<buf_pool_t<Real>::owner_t>*>> buffers;
std::map<int, std::vector<CommBuffer<buf_pool_t<Real>::owner_t> *>> buffers;
std::map<int, ParArray1D<BndId>> combined_info_device;
std::map<int, CommBuffer<buf_t>> combined_buffers;
std::map<int, int> current_size;
Expand Down Expand Up @@ -77,9 +77,9 @@ struct CombinedBuffersRank {
bool TryReceiveAndUnpack(Mesh *pmesh, int partition);

void RepointBuffers(Mesh *pmesh, int partition);

bool AllReceived();

void StaleAllReceives();

void CompareReceivedBuffers(int partition);
Expand Down Expand Up @@ -135,25 +135,23 @@ struct CombinedBuffers {
receive_iters < max_it,
"Too many iterations waiting to receive boundary communication buffers.");
}
void PackAndSend(int partition, BoundaryType b_type) {

void PackAndSend(int partition, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_send_buffers.count({rank, b_type})) {
printf("Sending from partition %i on rank %i\n", partition, Globals::my_rank);
combined_send_buffers[{rank, b_type}].PackAndSend(partition);
}
}
}

void RepointSendBuffers(Mesh *pmesh, int partition, BoundaryType b_type) {
printf("Repointing send buffers\n");
void RepointSendBuffers(Mesh *pmesh, int partition, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_send_buffers.count({rank, b_type}))
combined_send_buffers[{rank, b_type}].RepointBuffers(pmesh, partition);
}
}
void RepointRecvBuffers(Mesh *pmesh, int partition, BoundaryType b_type) {

void RepointRecvBuffers(Mesh *pmesh, int partition, BoundaryType b_type) {
for (int rank = 0; rank < Globals::nranks; ++rank) {
if (combined_recv_buffers.count({rank, b_type}))
combined_recv_buffers[{rank, b_type}].RepointBuffers(pmesh, partition);
Expand All @@ -170,39 +168,37 @@ struct CombinedBuffers {
if (flag) {
const int rank = status.MPI_SOURCE;
const int partition = status.MPI_TAG;
printf("Trying to receive combined from rank %i partition %i on rank %i\n", rank, partition, Globals::my_rank);
combined_recv_buffers[{rank, b_type}].TryReceiveAndUnpack(pmesh, partition);
}
} while(flag);
} while (flag);
#endif
}

bool AllReceived(BoundaryType b_type) {
bool all_received{true};
for (auto &[tag, bufs] : combined_recv_buffers) {
if (std::get<1>(tag) == b_type) {
if (std::get<1>(tag) == b_type) {
all_received = all_received && bufs.AllReceived();
}
}
}
return all_received;
}

void StaleAllReceives(BoundaryType b_type) {
for (auto &[tag, bufs] : combined_recv_buffers) {
if (std::get<1>(tag) == b_type) {
if (std::get<1>(tag) == b_type) {
bufs.StaleAllReceives();
}
}
}
}

void CompareReceivedBuffers(BoundaryType b_type) {
void CompareReceivedBuffers(BoundaryType b_type) {
for (auto &[tag, bufs] : combined_recv_buffers) {
if (std::get<1>(tag) == b_type) {
if (std::get<1>(tag) == b_type) {
bufs.CompareReceivedBuffers(0);
}
}
}
}

};

} // namespace parthenon
Expand Down
13 changes: 7 additions & 6 deletions src/utils/communication_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ class CommBuffer {
bool TryReceive() noexcept;
bool TryReceiveLocal() noexcept;
void SetReceived() noexcept {
PARTHENON_REQUIRE(*comm_type_ == BuffCommType::receiver ||
*comm_type_ == BuffCommType::sparse_receiver,
"This doesn't make sense for a non-receiver.");
*state_ = BufferState::received;
PARTHENON_REQUIRE(*comm_type_ == BuffCommType::receiver ||
*comm_type_ == BuffCommType::sparse_receiver,
"This doesn't make sense for a non-receiver.");
*state_ = BufferState::received;
}
bool IsSafeToDelete() {
if (*comm_type_ == BuffCommType::sparse_receiver ||
Expand Down Expand Up @@ -182,7 +182,8 @@ CommBuffer<T>::CommBuffer(const CommBuffer<U> &in)
: buf_(in.buf_), state_(in.state_), comm_type_(in.comm_type_),
started_irecv_(in.started_irecv_), nrecv_tries_(in.nrecv_tries_),
my_request_(in.my_request_), tag_(in.tag_), send_rank_(in.send_rank_),
recv_rank_(in.recv_rank_), comm_(in.comm_), active_(in.active_), get_resource_(in.get_resource_) {
recv_rank_(in.recv_rank_), comm_(in.comm_), active_(in.active_),
get_resource_(in.get_resource_) {
my_rank = Globals::my_rank;
}

Expand Down Expand Up @@ -462,7 +463,7 @@ bool CommBuffer<T>::TryReceive() noexcept {

template <class T>
void CommBuffer<T>::Stale() {
//PARTHENON_REQUIRE(*comm_type_ != BuffCommType::sender, "Should never get here.");
// PARTHENON_REQUIRE(*comm_type_ != BuffCommType::sender, "Should never get here.");

if (!(*state_ == BufferState::received || *state_ == BufferState::received_null))
PARTHENON_DEBUG_WARN("Staling buffer not in the received state.");
Expand Down

0 comments on commit b0dd208

Please sign in to comment.