Skip to content

Commit

Permalink
change_buckets_for_multigpu
Browse files Browse the repository at this point in the history
  • Loading branch information
G-071 committed Aug 15, 2023
1 parent 073f4da commit ffdbb7b
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 52 deletions.
193 changes: 146 additions & 47 deletions include/buffer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,41 +74,48 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL

template <typename T, typename Host_Allocator>
static T *get(size_t number_elements, bool manage_content_lifetime = false,
std::optional<size_t> location_hint = std::nullopt) {
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> device_id = std::nullopt) {

return Host_Allocator{}.allocate(number_elements);
}
/// Marks an buffer as unused and fit for reusage
template <typename T, typename Host_Allocator>
static void mark_unused(T *p, size_t number_elements,
std::optional<size_t> location_hint = std::nullopt) {
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> device_id = std::nullopt) {
return Host_Allocator{}.deallocate(p, number_elements);
}
#else
/// Returns and allocated buffer of the requested size - this may be a reused
/// buffer
template <typename T, typename Host_Allocator>
static T *get(size_t number_elements, bool manage_content_lifetime = false,
std::optional<size_t> location_hint = std::nullopt) {
return buffer_manager<T, Host_Allocator>::get(number_elements,
manage_content_lifetime, location_hint);
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> device_id = std::nullopt) {
return buffer_manager<T, Host_Allocator>::get(
number_elements, manage_content_lifetime, location_hint, device_id);
}
/// Marks an buffer as unused and fit for reusage
template <typename T, typename Host_Allocator>
static void mark_unused(T *p, size_t number_elements,
std::optional<size_t> location_hint = std::nullopt) {
return buffer_manager<T, Host_Allocator>::mark_unused(p, number_elements);
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> device_id = std::nullopt) {
return buffer_manager<T, Host_Allocator>::mark_unused(p, number_elements,
location_hint, device_id);
}
#endif
template <typename T, typename Host_Allocator>
static void register_allocator_counters_with_hpx(void) {
#ifdef CPPUDDLE_HAVE_COUNTERS
buffer_manager<T, Host_Allocator>::register_counters_with_hpx();
#else
std::cerr << "Warning: Trying to register allocator performance counters with HPX but CPPuddle was built "
std::cerr << "Warning: Trying to register allocator performance counters "
"with HPX but CPPuddle was built "
"without CPPUDDLE_WITH_COUNTERS -- operation will be ignored!"
<< std::endl;
#endif
}
}

/// Deallocate all buffers, no matter whether they are marked as used or not
static void clean_all() {
Expand Down Expand Up @@ -215,22 +222,22 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
/// Cleanup and delete this singleton
static void clean() {
assert(instance() && !is_finalized);
for (auto i = 0; i < number_instances; i++) {
for (auto i = 0; i < number_instances * max_number_gpus; i++) {
std::lock_guard<mutex_t> guard(instance()[i].mut);
instance()[i].clean_all_buffers();
}
}
static void print_performance_counters() {
assert(instance() && !is_finalized);
for (auto i = 0; i < number_instances; i++) {
for (auto i = 0; i < number_instances * max_number_gpus; i++) {
std::lock_guard<mutex_t> guard(instance()[i].mut);
instance()[i].print_counters();
}
}
static void finalize() {
assert(instance() && !is_finalized);
is_finalized = true;
for (auto i = 0; i < number_instances; i++) {
for (auto i = 0; i < number_instances * max_number_gpus; i++) {
std::lock_guard<mutex_t> guard(instance()[i].mut);
instance()[i].clean_all_buffers();
}
Expand All @@ -239,7 +246,7 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
/// Cleanup all buffers not currently in use
static void clean_unused_buffers_only() {
assert(instance() && !is_finalized);
for (auto i = 0; i < number_instances; i++) {
for (auto i = 0; i < number_instances * max_number_gpus; i++) {
std::lock_guard<mutex_t> guard(instance()[i].mut);
for (auto &buffer_tuple : instance()[i].unused_buffer_list) {
Host_Allocator alloc;
Expand Down Expand Up @@ -319,7 +326,8 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL

/// Tries to recycle or create a buffer of type T and size number_elements.
static T *get(size_t number_of_elements, bool manage_content_lifetime,
std::optional<size_t> location_hint = std::nullopt) {
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> gpu_device_id = std::nullopt) {
init_callbacks_once();
if (is_finalized) {
throw std::runtime_error("Tried allocation after finalization");
Expand All @@ -328,11 +336,22 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL

size_t location_id = 0;
if (location_hint) {
location_id = location_hint.value();
location_id = *location_hint;
}
if (location_id >= number_instances) {
throw std::runtime_error("Tried to create buffer with invalid location_id [get]");
}
size_t device_id = 0;
if (gpu_device_id) {
device_id = *gpu_device_id;
}
if (device_id >= max_number_gpus) {
throw std::runtime_error("Tried to create buffer with invalid device id [get]! "
"Is multigpu support enabled with the correct number "
"of GPUs?");
}

location_id = location_id + device_id * number_instances;
std::lock_guard<mutex_t> guard(instance()[location_id].mut);


Expand Down Expand Up @@ -369,7 +388,7 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
// No unused buffer found -> Create new one and return it
try {
recycler::device_selection::select_device_functor<T, Host_Allocator>{}(
location_id / instances_per_gpu);
device_id);
Host_Allocator alloc;
T *buffer = alloc.allocate(number_of_elements);
instance()[location_id].buffer_map.insert(
Expand All @@ -395,7 +414,7 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
// We've done all we can in here
Host_Allocator alloc;
recycler::device_selection::select_device_functor<T, Host_Allocator>{}(
location_id / instances_per_gpu);
device_id);
T *buffer = alloc.allocate(number_of_elements);
instance()[location_id].buffer_map.insert(
{buffer, std::make_tuple(buffer, number_of_elements, 1,
Expand All @@ -415,17 +434,32 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
}

static void mark_unused(T *memory_location, size_t number_of_elements,
std::optional<size_t> location_hint = std::nullopt) {
std::optional<size_t> location_hint = std::nullopt,
std::optional<size_t> device_hint = std::nullopt) {
if (is_finalized)
return;
assert(instance() && !is_finalized);

size_t location_id = 0;
if (location_hint) {
size_t location_id = location_hint.value();
location_id = *location_hint;
if (location_id >= number_instances) {
throw std::runtime_error(
"Buffer recylcer received invalid location hint [mark_unused]");
}
}
size_t device_id = 0;
if (device_hint) {
device_id = *device_hint;
if (device_id >= max_number_gpus) {
throw std::runtime_error(
"Buffer recylcer received invalid devce hint [mark_unused]");
}
}

// Attempt 1 to find the correct bucket/location: Look at provided hint:
if (location_hint) {
size_t location_id = location_hint.value() + device_id * number_instances;
std::lock_guard<mutex_t> guard(instance()[location_id].mut);
if (instance()[location_id].buffer_map.find(memory_location) !=
instance()[location_id].buffer_map.end()) {
Expand All @@ -443,19 +477,20 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
instance()[location_id].buffer_map.erase(memory_location);
return; // Success
}
// hint was wrong - note that, and continue on with all other buffer
// managers
// hint was wrong
#ifdef CPPUDDLE_HAVE_COUNTERS
instance()[location_id].number_wrong_hints++;
sum_number_wrong_hints++;
#endif
}

for(size_t location_id = 0; location_id < number_instances; location_id++) {
// Failed to find buffer in the specified localtion/device!
// Attempt 2 - Look for buffer other locations on the same device...
for (size_t location_id = device_id * number_instances;
location_id < (device_id + 1) * number_instances; location_id++) {
if (location_hint) {
if (location_hint.value() == location_id) {
continue; // already tried this -> skip
}
if (*location_hint + device_id * max_number_gpus == location_id) {
continue; // already tried this -> skip
}
}
std::lock_guard<mutex_t> guard(instance()[location_id].mut);
if (instance()[location_id].buffer_map.find(memory_location) !=
Expand All @@ -475,6 +510,64 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
return; // Success
}
}
// Failed to find buffer on the specified device!
// Attempt 3 - Look for buffer on other devices...
for (size_t local_device_id = 0; local_device_id < max_number_gpus;
local_device_id++) {
if (local_device_id == device_id)
continue; // aldready tried this device

// Try hint localtion first yet again (though on different device)
if (location_hint) {
size_t location_id = location_hint.value() + local_device_id * number_instances;
std::lock_guard<mutex_t> guard(instance()[location_id].mut);
if (instance()[location_id].buffer_map.find(memory_location) !=
instance()[location_id].buffer_map.end()) {
#ifdef CPPUDDLE_HAVE_COUNTERS
instance()[location_id].number_deallocation++;
sum_number_deallocation++;
#endif
auto it = instance()[location_id].buffer_map.find(memory_location);
assert(it != instance()[location_id].buffer_map.end());
auto &tuple = it->second;
// sanity checks:
assert(std::get<1>(tuple) == number_of_elements);
// move to the unused_buffer list
instance()[location_id].unused_buffer_list.push_front(tuple);
instance()[location_id].buffer_map.erase(memory_location);
return; // Success
}
}
// Failed - check all other localtions on device
for (size_t location_id = local_device_id * number_instances;
location_id < (local_device_id + 1) * number_instances; location_id++) {
if (location_hint) {
if (*location_hint + local_device_id * max_number_gpus == location_id) {
continue; // already tried this -> skip
}
}
std::lock_guard<mutex_t> guard(instance()[location_id].mut);
if (instance()[location_id].buffer_map.find(memory_location) !=
instance()[location_id].buffer_map.end()) {
#ifdef CPPUDDLE_HAVE_COUNTERS
instance()[location_id].number_deallocation++;
sum_number_deallocation++;
#endif
auto it = instance()[location_id].buffer_map.find(memory_location);
assert(it != instance()[location_id].buffer_map.end());
auto &tuple = it->second;
// sanity checks:
assert(std::get<1>(tuple) == number_of_elements);
// move to the unused_buffer list
instance()[location_id].unused_buffer_list.push_front(tuple);
instance()[location_id].buffer_map.erase(memory_location);
return; // Success
}
}
}
// Buffer that is to be deleted is nowhere to be found - we looked everywhere!
// =>
// Failure! Handle here...

// TODO Throw exception instead in the futures, as soon as the recycler finalize is
// in all user codes
Expand All @@ -488,7 +581,7 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
<< "Warning! Tried to delete non-existing buffer within CPPuddle!"
<< std::endl;
std::cerr << "Did you forget to call recycler::finalize?" << std::endl;
}
}

private:
/// List with all buffers still in usage
Expand Down Expand Up @@ -516,7 +609,7 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL
operator=(buffer_manager<T, Host_Allocator> &&other) = delete;
static std::unique_ptr<buffer_manager[]>& instance(void) {
static std::unique_ptr<buffer_manager[]> instances{
new buffer_manager[number_instances]};
new buffer_manager[number_instances * max_number_gpus]};
return instances;
}
static void init_callbacks_once(void) {
Expand Down Expand Up @@ -544,6 +637,8 @@ For better performance configure CPPuddle with CPPUDDLE_DEACTIVATE_BUFFER_RECYCL

#ifdef CPPUDDLE_HAVE_COUNTERS
void print_counters(void) {
if (number_allocation == 0)
return;
// Print performance counters
size_t number_cleaned = unused_buffer_list.size() + buffer_map.size();
std::cout << "\nBuffer manager destructor for (Alloc: "
Expand Down Expand Up @@ -642,15 +737,16 @@ template <typename T, typename Host_Allocator> struct recycle_allocator {
using underlying_allocator_type = Host_Allocator;
static_assert(std::is_same_v<value_type, typename underlying_allocator_type::value_type>);
const std::optional<size_t> dealloc_hint;
const std::optional<size_t> device_id;

#ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
recycle_allocator() noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
explicit recycle_allocator(size_t hint) noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
explicit recycle_allocator(
recycle_allocator<T, Host_Allocator> const &other) noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
T *allocate(std::size_t n) {
T *data = buffer_recycler::get<T, Host_Allocator>(n);
return data;
Expand All @@ -660,19 +756,20 @@ template <typename T, typename Host_Allocator> struct recycle_allocator {
}
#else
recycle_allocator() noexcept
: dealloc_hint(hpx::get_worker_thread_num()) {}
explicit recycle_allocator(size_t hint) noexcept
: dealloc_hint(hint) {}
: dealloc_hint(hpx::get_worker_thread_num()), device_id(0) {}
explicit recycle_allocator(const size_t device_id) noexcept
: dealloc_hint(hint), device_id(device_id) {}
explicit recycle_allocator(
recycle_allocator<T, Host_Allocator> const &other) noexcept
: dealloc_hint(other.dealloc_hint) {}
: dealloc_hint(other.dealloc_hint), device_id(other.device_id) {}
T *allocate(std::size_t n) {
T *data = buffer_recycler::get<T, Host_Allocator>(
n, false, hpx::get_worker_thread_num());
n, false, hpx::get_worker_thread_num(), device_id);
return data;
}
void deallocate(T *p, std::size_t n) {
buffer_recycler::mark_unused<T, Host_Allocator>(p, n, dealloc_hint);
buffer_recycler::mark_unused<T, Host_Allocator>(p, n, dealloc_hint,
device_id);
}
#endif

Expand Down Expand Up @@ -707,16 +804,17 @@ struct aggressive_recycle_allocator {
using value_type = T;
using underlying_allocator_type = Host_Allocator;
static_assert(std::is_same_v<value_type, typename underlying_allocator_type::value_type>);
std::optional<size_t> dealloc_hint;
const std::optional<size_t> dealloc_hint;
const std::optional<size_t> device_id;

#ifndef CPPUDDLE_HAVE_HPX_AWARE_ALLOCATORS
aggressive_recycle_allocator() noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
explicit aggressive_recycle_allocator(size_t hint) noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
explicit aggressive_recycle_allocator(
aggressive_recycle_allocator<T, Host_Allocator> const &) noexcept
: dealloc_hint(std::nullopt) {}
: dealloc_hint(std::nullopt), device_id(std::nullopt) {}
T *allocate(std::size_t n) {
T *data = buffer_recycler::get<T, Host_Allocator>(
n, true); // also initializes the buffer if it isn't reused
Expand All @@ -727,20 +825,21 @@ struct aggressive_recycle_allocator {
}
#else
aggressive_recycle_allocator() noexcept
: dealloc_hint(hpx::get_worker_thread_num()) {}
explicit aggressive_recycle_allocator(size_t hint) noexcept
: dealloc_hint(hint) {}
: dealloc_hint(hpx::get_worker_thread_num()), device_id(0) {}
explicit aggressive_recycle_allocator(const size_t device_id) noexcept
: device_id(device_id) {}
explicit aggressive_recycle_allocator(
recycle_allocator<T, Host_Allocator> const &other) noexcept
: dealloc_hint(other.dealloc_hint) {}
: dealloc_hint(other.dealloc_hint), device_id(other.device_id) {}
T *allocate(std::size_t n) {
T *data = buffer_recycler::get<T, Host_Allocator>(
n, true, hpx::get_worker_thread_num()); // also initializes the buffer
n, true, dealloc_hint, device_id); // also initializes the buffer
// if it isn't reused
return data;
}
void deallocate(T *p, std::size_t n) {
buffer_recycler::mark_unused<T, Host_Allocator>(p, n, dealloc_hint);
buffer_recycler::mark_unused<T, Host_Allocator>(p, n, dealloc_hint,
device_id);
}
#endif

Expand Down
Loading

0 comments on commit ffdbb7b

Please sign in to comment.