Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test and fix lifetimes of transform_mpi values #1321

Merged
merged 14 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,8 @@ namespace pika::mpi::experimental {
friend constexpr PIKA_FORCEINLINE auto
tag_fallback_invoke(dispatch_mpi_t, Sender&& sender, F&& f)
{
auto snd1 = detail::dispatch_mpi_sender<Sender, F>{
return detail::dispatch_mpi_sender<Sender, F>{
PIKA_FORWARD(Sender, sender), PIKA_FORWARD(F, f)};
return pika::execution::experimental::make_unique_any_sender(std::move(snd1));
}

template <typename F>
Expand Down
17 changes: 12 additions & 5 deletions libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution/algorithms/just.hpp>
#include <pika/execution/algorithms/let_value.hpp>
#include <pika/execution/algorithms/unpack.hpp>
#include <pika/execution_base/any_sender.hpp>
#include <pika/execution_base/receiver.hpp>
#include <pika/execution_base/sender.hpp>
Expand Down Expand Up @@ -52,6 +53,7 @@ namespace pika::mpi::experimental {
using pika::execution::experimental::just;
using pika::execution::experimental::let_value;
using pika::execution::experimental::unique_any_sender;
using pika::execution::experimental::unpack;

// get mpi completion mode settings
auto mode = get_completion_mode();
Expand All @@ -78,14 +80,19 @@ namespace pika::mpi::experimental {

if (requests_inline)
{
return dispatch_mpi_sender<Sender, F>{PIKA_MOVE(sender), PIKA_FORWARD(F, f)} |
let_value(completion_snd);
return std::forward<Sender>(sender) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
}
else
{
auto snd0 = PIKA_FORWARD(Sender, sender) | continues_on(mpi_pool_scheduler(p));
return dispatch_mpi_sender<decltype(snd0), F>{PIKA_MOVE(snd0), PIKA_FORWARD(F, f)} |
let_value(completion_snd);
return std::forward<Sender>(sender) | continues_on(mpi_pool_scheduler(p)) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
}
}

Expand Down
30 changes: 27 additions & 3 deletions libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,38 @@ int pika_main()
PIKA_TEST_EQ(data, 42);
}

// Values passed to transform_mpi should be kept alive by transform_mpi itself
{
int count = 1 << 20;
auto s = ex::just(std::vector<int>{count, 0}, datatype, 0, comm) |
ex::drop_operation_state() |
mpi::transform_mpi([](auto& data, MPI_Datatype datatype, int i, MPI_Comm comm,
MPI_Request* request) {
MPI_Ibcast(data.data(), data.size(), datatype, i, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
}

{
auto s = ex::just(custom_type_non_default_constructible_non_copyable{42}, datatype,
0, comm) |
ex::drop_operation_state() |
mpi::transform_mpi([](auto& data, MPI_Datatype datatype, int i, MPI_Comm comm,
MPI_Request* request) {
MPI_Ibcast(&data.x, 1, datatype, i, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
}

// transform_mpi should be able to handle reference types (by copying
// them to the operation state)
{
int data = 0, count = 1;
if (rank == 0) { data = 42; }
auto s = mpi::transform_mpi(
const_reference_sender<int>{count}, [&](int& count, MPI_Request* request) {
MPI_Ibcast(&data, count, datatype, 0, comm, request);
auto s = mpi::transform_mpi(const_reference_sender<int>{count},
[&](int& count_transform_mpi, MPI_Request* request) {
PIKA_TEST(&count_transform_mpi != &count);
MPI_Ibcast(&data, count_transform_mpi, datatype, 0, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
PIKA_TEST_EQ(data, 42);
Expand Down
10 changes: 8 additions & 2 deletions libs/pika/config/include/pika/config/compiler_specific.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,20 @@
#endif

// clang-format on
# if !defined(__has_feature)
# define PIKA_HAS_FEATURE(x) 0
# else
# define PIKA_HAS_FEATURE(x) __has_feature(x)
# endif

# if defined(PIKA_HAVE_SANITIZERS)
# if defined(__SANITIZE_ADDRESS__) || (defined(__has_feature) && __has_feature(address_sanitizer))
# if defined(__SANITIZE_ADDRESS__) || PIKA_HAS_FEATURE(address_sanitizer)
# define PIKA_HAVE_ADDRESS_SANITIZER
# if defined(PIKA_GCC_VERSION) || defined(PIKA_CLANG_VERSION)
# define PIKA_NO_SANITIZE_ADDRESS __attribute__((no_sanitize("address")))
# endif
# endif
# if defined(__SANITIZE_THREAD__) || (defined(__has_feature) && __has_feature(thread_sanitizer))
# if defined(__SANITIZE_THREAD__) || PIKA_HAS_FEATURE(thread_sanitizer)
# define PIKA_HAVE_THREAD_SANITIZER
# if defined(PIKA_GCC_VERSION) || defined(PIKA_CLANG_VERSION)
# define PIKA_NO_SANITIZE_THREAD __attribute__((no_sanitize("thread")))
Expand Down
Loading