Skip to content

Commit

Permalink
Backport multigpu interface changes [for 0.2.1]
Browse files Browse the repository at this point in the history
  • Loading branch information
G-071 committed Aug 23, 2023
1 parent c922bcc commit fa15cbe
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 38 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ if (CPPUDDLE_WITH_TESTS)
find_program(VALGRIND_COMMAND valgrind)
if (VALGRIND_COMMAND)
add_test(allocator_memcheck.valgrind
${VALGRIND_COMMAND} --trace-children=yes --leak-check=full ./allocator_test --arraysize 5000000 --passes 200)
${VALGRIND_COMMAND} --trace-children=yes --leak-check=full --undef-value-errors=no --show-error-list=yes ./allocator_test --arraysize 5000000 --passes 200)
set_tests_properties(allocator_memcheck.valgrind PROPERTIES
PASS_REGULAR_EXPRESSION "ERROR SUMMARY: 0 errors from 0 contexts"
)
Expand Down Expand Up @@ -327,7 +327,7 @@ if (CPPUDDLE_WITH_TESTS)
find_program(VALGRIND_COMMAND valgrind)
if (VALGRIND_COMMAND)
add_test(allocator_memcheck.valgrind
${VALGRIND_COMMAND} --trace-children=yes --leak-check=full ./allocator_aligned_test --arraysize 5000000 --passes 200)
${VALGRIND_COMMAND} --trace-children=yes --leak-check=full --undef-value-errors=no --show-error-list=yes ./allocator_aligned_test --arraysize 5000000 --passes 200)
set_tests_properties(allocator_memcheck.valgrind PROPERTIES
PASS_REGULAR_EXPRESSION "ERROR SUMMARY: 0 errors from 0 contexts"
)
Expand Down
7 changes: 5 additions & 2 deletions include/aggregation_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ template <typename Executor> class Aggregated_Executor {
Executor &executor;

public:
const size_t gpu_id{0};
// Subclasses

/// Slice class - meant as a scope interface to the aggregated executor
Expand Down Expand Up @@ -895,7 +896,7 @@ template <typename Executor> class Aggregated_Executor {

Aggregated_Executor(const size_t number_slices,
Aggregated_Executor_Modes mode)
: max_slices(number_slices), current_slices(0), slices_exhausted(false),dealloc_counter(0),
: gpu_id(0), max_slices(number_slices), current_slices(0), slices_exhausted(false), dealloc_counter(0),
mode(mode), executor_slices_alive(false), buffers_in_use(false),
executor_tuple(
stream_pool::get_interface<Executor, round_robin_pool<Executor>>()),
Expand Down Expand Up @@ -988,7 +989,9 @@ class aggregation_pool {
/// interface
template <typename... Ts>
static void init(size_t number_of_executors, size_t slices_per_executor,
Aggregated_Executor_Modes mode) {
Aggregated_Executor_Modes mode, size_t num_devices = 1) {
if (num_devices > 1)
throw std::runtime_error("Got num_devices > 1. MultiGPU not yet supported in v0.2.1");
std::lock_guard<aggregation_mutex_t> guard(instance.pool_mutex);
assert(instance.aggregation_executor_pool.empty());
for (int i = 0; i < number_of_executors; i++) {
Expand Down
30 changes: 28 additions & 2 deletions include/buffer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
#include <mutex>
#include <type_traits>
#include <unordered_map>
#include <optional>

#ifdef CPPUDDLE_HAVE_COUNTERS
#include <boost/core/demangle.hpp>
#endif


namespace recycler {
constexpr size_t number_instances = 1;
constexpr size_t max_number_gpus = 1;
namespace detail {

namespace util {
Expand Down Expand Up @@ -51,7 +55,12 @@ class buffer_recycler {
/// Returns and allocated buffer of the requested size - this may be a reused
/// buffer
template <typename T, typename Host_Allocator>
static T *get(size_t number_elements, bool manage_content_lifetime = false) {
static T *get(size_t number_elements, bool manage_content_lifetime = false,
std::optional<size_t> localtion_id = std::nullopt, std::optional<size_t> device_id = std::nullopt) {
if (device_id) {
if (*device_id > 0)
throw std::runtime_error("Got device_id > 1. MultiGPU not yet supported in v0.2.1");
}
std::lock_guard<std::mutex> guard(mut);
if (!recycler_instance) {
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
Expand All @@ -62,13 +71,21 @@ class buffer_recycler {
}
/// Marks an buffer as unused and fit for reusage
template <typename T, typename Host_Allocator>
static void mark_unused(T *p, size_t number_elements) {
static void mark_unused(T *p, size_t number_elements,
std::optional<size_t> localtion_id = std::nullopt, std::optional<size_t> device_id = std::nullopt) {
std::lock_guard<std::mutex> guard(mut);
if (recycler_instance) { // if the instance was already destroyed all
// buffers are destroyed anyway
return buffer_manager<T, Host_Allocator>::mark_unused(p, number_elements);
}
}

template <typename T, typename Host_Allocator>
static void register_allocator_counters_with_hpx(void) {
std::cerr << "Warning: CPPuddle v0.2.1 does not yet support HPX counters "
"-- this operation will be ignored!"
<< std::endl;
}
/// Increase the reference coutner of a buffer
template <typename T, typename Host_Allocator>
static void increase_usage_counter(T *p, size_t number_elements) noexcept {
Expand Down Expand Up @@ -590,6 +607,8 @@ std::unique_ptr<buffer_recycler::mutexless_buffer_manager<T, Host_Allocator>>

template <typename T, typename Host_Allocator> struct recycle_allocator {
using value_type = T;
using underlying_allocator_type = Host_Allocator;
static_assert(std::is_same_v<value_type, typename underlying_allocator_type::value_type>);
recycle_allocator() noexcept = default;
template <typename U>
explicit recycle_allocator(
Expand Down Expand Up @@ -627,6 +646,9 @@ operator!=(recycle_allocator<T, Host_Allocator> const &,
template <typename T, typename Host_Allocator>
struct aggressive_recycle_allocator {
using value_type = T;
using underlying_allocator_type = Host_Allocator;
static_assert(std::is_same_v<value_type, typename underlying_allocator_type::value_type>);

aggressive_recycle_allocator() noexcept = default;
template <typename U>
explicit aggressive_recycle_allocator(
Expand Down Expand Up @@ -675,6 +697,10 @@ using aggressive_recycle_std =
/// Deletes all buffers (even ones still marked as used), delete the buffer
/// managers and the recycler itself
inline void force_cleanup() { detail::buffer_recycler::clean_all(); }
/// Dummy method that maps to clean_all for now - ensures interface
/// compatabilty with 0.3.0 where finalize is a smarter cleanup that ensures no
/// further buffers can be added and static buffers are properly cleaned
inline void finalize() { detail::buffer_recycler::clean_all(); }
/// Deletes all buffers currently marked as unused
inline void cleanup() { detail::buffer_recycler::clean_unused_buffers(); }

Expand Down
20 changes: 6 additions & 14 deletions include/cuda_buffer_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,21 @@ struct cuda_aggregated_device_buffer {
device_side_buffer =
recycle_allocator_cuda_device<T>{}.allocate(number_of_elements);
}
explicit cuda_aggregated_device_buffer(size_t number_of_elements, Host_Allocator &alloc)
: number_of_elements(number_of_elements), alloc(alloc) {
device_side_buffer =
alloc.allocate(number_of_elements);
}
explicit cuda_aggregated_device_buffer(size_t number_of_elements, size_t gpu_id, Host_Allocator &alloc)
: gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true), alloc(alloc) {
#if defined(CPPUDDLE_HAVE_MULTIGPU)
cudaSetDevice(gpu_id);
#else
// TODO It would be better to have separate method for this but it would change the interface
// This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice
// This will have to do for some testing. If it's worth it, add separate method without hipSetDevice
// Allows for testing without any changes to other projects
assert(gpu_id == 0);
#endif
device_side_buffer =
alloc.allocate(number_of_elements);
}
~cuda_aggregated_device_buffer() {
#if defined(CPPUDDLE_HAVE_MULTIGPU)
if (set_id)
cudaSetDevice(gpu_id);
#else
// TODO It would be better to have separate method for this but it would change the interface
// This will have to do for some testing. If it's worth it, add separate method without cudaSetDevice
// Allows for testing without any changes to other projects
assert(gpu_id == 0);
#endif
alloc.deallocate(device_side_buffer,
number_of_elements);
}
Expand Down
14 changes: 5 additions & 9 deletions include/hip_buffer_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,25 @@ struct hip_aggregated_device_buffer {
device_side_buffer =
recycle_allocator_hip_device<T>{}.allocate(number_of_elements);
}
explicit hip_aggregated_device_buffer(size_t number_of_elements, Host_Allocator &alloc)
: number_of_elements(number_of_elements), alloc(alloc) {
device_side_buffer =
alloc.allocate(number_of_elements);
}
explicit hip_aggregated_device_buffer(size_t number_of_elements, size_t gpu_id, Host_Allocator &alloc)
: gpu_id(gpu_id), number_of_elements(number_of_elements), set_id(true), alloc(alloc) {
#if defined(CPPUDDLE_HAVE_MULTIGPU)
hipSetDevice(gpu_id);
#else
// TODO It would be better to have separate method for this but it would change the interface
// This will have to do for some testing. If it's worth it, add separate method without hipSetDevice
// Allows for testing without any changes to other projects
assert(gpu_id == 0);
#endif
device_side_buffer =
alloc.allocate(number_of_elements);
}
~hip_aggregated_device_buffer() {
#if defined(CPPUDDLE_HAVE_MULTIGPU)
if (set_id)
hipSetDevice(gpu_id);
#else
// TODO It would be better to have separate method for this but it would change the interface
// This will have to do for some testing. If it's worth it, add separate method without hipSetDevice
// Allows for testing without any changes to other projects
assert(gpu_id == 0);
#endif
alloc.deallocate(device_side_buffer,
number_of_elements);
}
Expand Down
24 changes: 23 additions & 1 deletion include/kokkos_buffer_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class recycled_view : public kokkos_type {

public:
using view_type = kokkos_type;
template <class... Args>
template <typename... Args,
std::enable_if_t<sizeof...(Args) == kokkos_type::rank, bool> = true>
explicit recycled_view(Args... args)
: kokkos_type(
allocator.allocate(kokkos_type::required_allocation_size(args...) /
Expand All @@ -93,6 +94,26 @@ class recycled_view : public kokkos_type {
total_elements(kokkos_type::required_allocation_size(args...) /
sizeof(element_type)) {}

template <typename... Args,
std::enable_if_t<sizeof...(Args) == kokkos_type::rank, bool> = true>
recycled_view(const size_t device_id, Args... args)
: kokkos_type(
allocator.allocate(kokkos_type::required_allocation_size(args...) /
sizeof(element_type)),
args...),
total_elements(kokkos_type::required_allocation_size(args...) /
sizeof(element_type)) {}

template <typename layout_t,
std::enable_if_t<Kokkos::is_array_layout<layout_t>::value, bool> = true>
recycled_view(std::size_t device_id, layout_t layout)
: kokkos_type(
allocator.allocate(kokkos_type::required_allocation_size(layout) /
sizeof(element_type)),
layout),
total_elements(kokkos_type::required_allocation_size(layout) /
sizeof(element_type)) {}

recycled_view(
const recycled_view<kokkos_type, alloc_type, element_type> &other)
: kokkos_type(other) {
Expand All @@ -101,6 +122,7 @@ class recycled_view : public kokkos_type {
allocator.increase_usage_counter(this->data(), this->total_elements);
}


recycled_view<kokkos_type, alloc_type, element_type> &
operator=(const recycled_view<kokkos_type, alloc_type, element_type> &other) {
allocator.deallocate(this->data(), total_elements);
Expand Down
71 changes: 63 additions & 8 deletions include/stream_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <type_traits>

//#include <cuda_runtime.h>
Expand Down Expand Up @@ -221,35 +222,86 @@ class stream_pool {
stream_pool_implementation<Interface, Pool>::init(
number_of_streams, std::forward<Ts>(executor_args)...);
}
// Dummy for interface compatbility with future 0.3.0 release
// Works the same as the init method here
template <class Interface, class Pool, typename... Ts>
static void init_all_executor_pools(size_t number_of_streams, Ts &&... executor_args) {
std::lock_guard<std::mutex> guard(mut);
if (!access_instance) {
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
access_instance.reset(new stream_pool());
}
assert(access_instance);
stream_pool_implementation<Interface, Pool>::init(
number_of_streams, std::forward<Ts>(executor_args)...);
}
// Dummy for interface compatbility with future 0.3.0 release
// Works the same as the init method here
template <class Interface, class Pool, typename... Ts>
static void init_executor_pool(size_t device_id, size_t number_of_streams, Ts &&... executor_args) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
std::lock_guard<std::mutex> guard(mut);
if (!access_instance) {
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
access_instance.reset(new stream_pool());
}
assert(access_instance);
stream_pool_implementation<Interface, Pool>::init(
number_of_streams, std::forward<Ts>(executor_args)...);
}
template <class Interface, class Pool> static void cleanup() {
assert(access_instance); // should already be initialized
stream_pool_implementation<Interface, Pool>::cleanup();
}
template <class Interface, class Pool>
static std::tuple<Interface &, size_t> get_interface() {
static std::tuple<Interface &, size_t> get_interface(size_t device_id = 0) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
assert(access_instance); // should already be initialized
return stream_pool_implementation<Interface, Pool>::get_interface();
}
template <class Interface, class Pool>
static void release_interface(size_t index) noexcept {
static void release_interface(size_t index, size_t device_id = 0) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
assert(access_instance); // should already be initialized
stream_pool_implementation<Interface, Pool>::release_interface(index);
}
template <class Interface, class Pool>
static bool interface_available(size_t load_limit) noexcept {
static bool interface_available(size_t load_limit, size_t device_id = 0) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
assert(access_instance); // should already be initialized
return stream_pool_implementation<Interface, Pool>::interface_available(
load_limit);
}
template <class Interface, class Pool>
static size_t get_current_load() noexcept {
static size_t get_current_load(size_t device_id = 0) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
assert(access_instance); // should already be initialized
return stream_pool_implementation<Interface, Pool>::get_current_load();
}
template <class Interface, class Pool>
static size_t get_next_device_id() noexcept {
static size_t get_next_device_id(size_t num_devices = 1) {
if (num_devices > 1)
throw std::runtime_error("Got num_devices > 1. MultiGPU not yet supported in cppuddle v0.2.1");
assert(access_instance); // should already be initialized
return stream_pool_implementation<Interface, Pool>::get_next_device_id();
return 0;
}

template <class Interface, class Pool>
static size_t select_device(size_t device_id = 0) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
return 0;
}

// Dummy for interface compatbility with future 0.3.0 release
template <class Interface, class Pool>
static void set_device_selector(std::function<void(size_t)> select_gpu_function) {

}

private:
Expand Down Expand Up @@ -347,9 +399,12 @@ std::unique_ptr<stream_pool::stream_pool_implementation<Interface, Pool>>

template <class Interface, class Pool> class stream_interface {
public:
explicit stream_interface()
explicit stream_interface(size_t device_id = 0)
: t(stream_pool::get_interface<Interface, Pool>()),
interface(std::get<0>(t)), interface_index(std::get<1>(t)) {}
interface(std::get<0>(t)), interface_index(std::get<1>(t)) {
if (device_id > 0)
throw std::runtime_error("Got device_id > 0. MultiGPU not yet supported in cppuddle v0.2.1");
}

stream_interface(const stream_interface &other) = delete;
stream_interface &operator=(const stream_interface &other) = delete;
Expand Down

0 comments on commit fa15cbe

Please sign in to comment.