Skip to content

Commit

Permalink
fix buffer bugs?
Browse files Browse the repository at this point in the history
  • Loading branch information
lroberts36 committed Nov 28, 2024
1 parent 6aa7dcd commit 0f3795d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 46 deletions.
51 changes: 24 additions & 27 deletions src/bvals/comms/coalesced_buffers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,23 @@
namespace parthenon {

//----------------------------------------------------------------------------------------
void CoalescedBuffer::AllocateCoalescedBuffer() {
int send_rank = sender ? Globals::my_rank : other_rank;
int recv_rank = sender ? other_rank : Globals::my_rank;
coalesced_comm_buffer = CommBuffer<buf_t>(
2 * partition, send_rank, recv_rank, comm,
[](int size) { return buf_t("Combined Buffer", 2 * size); }, true);

sparse_status_buffer =
CommBuffer<std::vector<int>>(2 * partition + 1, send_rank, recv_rank, comm);
sparse_status_buffer.ConstructBuffer(current_size + 1);
}
CoalescedBuffer::CoalescedBuffer(bool sender, int partition, int other_rank,
BoundaryType b_type, mpi_comm_t comm, Mesh *pmesh)
: sender(sender), partition(partition), other_rank(other_rank), b_type(b_type),
comm(comm), pmesh(pmesh), current_size(0),
coalesced_comm_buffer(
2 * partition, sender ? Globals::my_rank : other_rank,
sender ? other_rank : Globals::my_rank, comm,
[](int size) { return buf_t("Combined Buffer", 2 * size); }, true),
sparse_status_buffer(
2 * partition + 1, sender ? Globals::my_rank : other_rank,
sender ? other_rank : Globals::my_rank, comm,
[](int size) { return std::vector<int>(size); }, true) {}

//----------------------------------------------------------------------------------------
ParArray1DRaw<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &vars,
int *pcomb_size) {
const auto &var_set = vars.size() == 0 ? all_vars : vars;
auto &bnd_ids_device = bnd_ids_device_map[var_set];
auto &bnd_ids_host = bnd_ids_host_map[var_set];

int nbnd_id{0};
int comb_size{0};
for (auto uid : var_set) {
Expand All @@ -70,10 +68,19 @@ ParArray1DRaw<BndId> &CoalescedBuffer::GetBndIdsOnDevice(const std::set<Uid_t> &
if (comb_size > coalesced_comm_buffer.buffer().size()) {
PARTHENON_REQUIRE(
sender, "Something bad is going on if we are doing this on a receiving buffer.");
coalesced_comm_buffer.ConstructBuffer("combined send buffer", 2 * comb_size);
coalesced_comm_buffer.Allocate(comb_size);
updated = true;
}

if (bnd_ids_device_map.count(var_set) == 0)
bnd_ids_device_map.emplace(std::make_pair(
var_set, ParArray1DRaw<BndId>(parthenon::ViewOfViewAlloc("bnd_id"), nbnd_id)));
auto &bnd_ids_device = bnd_ids_device_map.at(var_set);
if (bnd_ids_host_map.count(var_set) == 0)
bnd_ids_host_map.emplace(
std::make_pair(var_set, create_view_of_view_mirror(bnd_ids_device)));
auto &bnd_ids_host = bnd_ids_host_map.at(var_set);

if (nbnd_id != bnd_ids_device.size()) {
bnd_ids_device = ParArray1DRaw<BndId>(parthenon::ViewOfViewAlloc("bnd_id"), nbnd_id);
bnd_ids_host = create_view_of_view_mirror(bnd_ids_device);
Expand Down Expand Up @@ -140,7 +147,7 @@ void CoalescedBuffer::PackAndSend(const std::set<Uid_t> &vars) {

// Send the sparse null info as well
if (bids.size() != sparse_status_buffer.buffer().size()) {
sparse_status_buffer.ConstructBuffer(bids.size());
sparse_status_buffer.Allocate(bids.size());
}

const auto &var_set = vars.size() == 0 ? all_vars : vars;
Expand Down Expand Up @@ -189,9 +196,6 @@ bool CoalescedBuffer::TryReceiveAndUnpack(const std::set<Uid_t> &vars) {
}
}

if (nbuf != sparse_status_buffer.buffer().size()) {
sparse_status_buffer.ConstructBuffer(nbuf);
}
auto received_sparse = sparse_status_buffer.TryReceive();
auto received = coalesced_comm_buffer.TryReceive();
if (!received || !received_sparse) return false;
Expand Down Expand Up @@ -316,10 +320,6 @@ bool CoalescedBuffersRank::TryReceiveBufInfo() {
}
message.Stale();

for (auto &[partition, com_buf] : coalesced_bufs) {
com_buf.AllocateCoalescedBuffer();
}

buffers_built = true;
return true;
}
Expand Down Expand Up @@ -357,9 +357,6 @@ void CoalescedBuffersRank::ResolveAndSendBufInfo() {

message.Send();

for (auto &[partition, com_buf] : coalesced_bufs)
com_buf.AllocateCoalescedBuffer();

buffers_built = true;
}

Expand Down Expand Up @@ -430,7 +427,7 @@ void CoalescedComms::clear() {
}
iter++;
} while (!can_delete && iter < max_iters);
if (iter == max_iters) PARTHENON_FAIL("Waited too long to clear CoalescedComms.");
if (iter >= max_iters) PARTHENON_FAIL("Waited too long to clear CoalescedComms.");

coalesced_send_buffers.clear();
coalesced_recv_buffers.clear();
Expand Down
6 changes: 1 addition & 5 deletions src/bvals/comms/coalesced_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ struct CoalescedBuffer {
int current_size;

CoalescedBuffer(bool sender, int partition, int other_rank, BoundaryType b_type,
mpi_comm_t comm, Mesh *pmesh)
: sender(sender), partition(partition), other_rank(other_rank), b_type(b_type),
comm(comm), pmesh(pmesh), current_size(0) {}
mpi_comm_t comm, Mesh *pmesh);

int TotalBuffers() const {
int total_buffers{0};
Expand All @@ -88,8 +86,6 @@ struct CoalescedBuffer {
void AddVarBoundary(MeshBlock *pmb, const NeighborBlock &nb,
const std::shared_ptr<Variable<Real>> &var);

void AllocateCoalescedBuffer();

bool IsAvailableForWrite() {
return sparse_status_buffer.IsAvailableForWrite() &&
coalesced_comm_buffer.IsAvailableForWrite();
Expand Down
16 changes: 2 additions & 14 deletions src/utils/communication_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,8 @@ class CommBuffer {
{
}

CommBuffer(
int tag, int send_rank, int recv_rank, mpi_comm_t comm_,
get_resource_func_t get_resource =
[](int) {
PARTHENON_FAIL("Trying to use an uninitialized get_resource function.");
return T();
},
bool do_sparse_allocation = false);
CommBuffer(int tag, int send_rank, int recv_rank, mpi_comm_t comm_,
get_resource_func_t get_resource, bool do_sparse_allocation = false);

~CommBuffer();

Expand All @@ -108,12 +102,6 @@ class CommBuffer {
}
}

template <class... Args>
void ConstructBuffer(Args &&...args) {
buf_ = T(std::forward<Args>(args)...);
active_ = true;
}

void Free() {
buf_ = T();
active_ = false;
Expand Down

0 comments on commit 0f3795d

Please sign in to comment.