Skip to content

Commit

Permalink
Use stream interface instead of executor tuples
Browse files Browse the repository at this point in the history
  • Loading branch information
G-071 committed Oct 1, 2023
1 parent f94dbf2 commit 2e13ab3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
62 changes: 36 additions & 26 deletions include/aggregation_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,10 @@ template <typename Executor> class Aggregated_Executor {
const Aggregated_Executor_Modes mode;
const size_t max_slices;
std::atomic<size_t> current_slices;
/// Executor reference and its ID in the exextutor pool
std::tuple<Executor &, size_t> executor_tuple;
/// Reference to the executor (presumably residing in the executor pool)
Executor &executor;
/// Wrapper to the executor interface from the stream pool
/// Automatically hooks into the stream_pools reference counting
/// for cpu/gpu load balancing
std::unique_ptr<stream_interface<Executor, round_robin_pool<Executor>>> executor_wrapper;

public:
size_t gpu_id;
Expand Down Expand Up @@ -516,7 +516,8 @@ template <typename Executor> class Aggregated_Executor {
}

Executor& get_underlying_executor(void) {
return parent.executor;
assert(parent.executor_wrapper);
return *(parent.executor_wrapper);
}
};

Expand Down Expand Up @@ -548,6 +549,7 @@ template <typename Executor> class Aggregated_Executor {
template <typename T, typename Host_Allocator>
T *get(const size_t size, const size_t slice_alloc_counter) {
assert(slices_exhausted == true);
assert(executor_wrapper);
assert(executor_slices_alive == true);
// Add aggreated buffer entry in case it hasn't happened yet for this call
// First: Check if it already has happened
Expand Down Expand Up @@ -624,6 +626,7 @@ template <typename Executor> class Aggregated_Executor {
template <typename T, typename Host_Allocator>
void mark_unused(T *p, const size_t size) {
assert(slices_exhausted == true);
assert(executor_wrapper);

void *ptr_key = static_cast<void*>(p);
size_t slice_alloc_counter = buffer_allocations_map[p];
Expand Down Expand Up @@ -663,8 +666,11 @@ template <typename Executor> class Aggregated_Executor {
if (current_deallocs == buffer_counter) {
std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
buffers_in_use = false;
if (!executor_slices_alive && !buffers_in_use)
if (!executor_slices_alive && !buffers_in_use) {
slices_exhausted = false;
// Release executor
executor_wrapper.reset(nullptr);
}
}
}
}
Expand All @@ -681,11 +687,12 @@ template <typename Executor> class Aggregated_Executor {
bool sync_aggregation_slices(const size_t slice_launch_counter) {
std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
assert(slices_exhausted == true);
assert(executor_wrapper);
// Add function call object in case it hasn't happened for this launch yet
if (overall_launch_counter <= slice_launch_counter) {
/* std::lock_guard<recycler::aggregation_mutex_t> guard(mut); */
if (overall_launch_counter <= slice_launch_counter) {
function_calls.emplace_back(current_slices, false, executor);
function_calls.emplace_back(current_slices, false, *executor_wrapper);
overall_launch_counter = function_calls.size();
return function_calls[slice_launch_counter].sync_aggregation_slices(
last_stream_launch_done);
Expand All @@ -701,11 +708,12 @@ template <typename Executor> class Aggregated_Executor {
void post(const size_t slice_launch_counter, F &&f, Ts &&...ts) {
std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
assert(slices_exhausted == true);
assert(executor_wrapper);
// Add function call object in case it hasn't happened for this launch yet
if (overall_launch_counter <= slice_launch_counter) {
/* std::lock_guard<recycler::aggregation_mutex_t> guard(mut); */
if (overall_launch_counter <= slice_launch_counter) {
function_calls.emplace_back(current_slices, false, executor);
function_calls.emplace_back(current_slices, false, *executor_wrapper);
overall_launch_counter = function_calls.size();
function_calls[slice_launch_counter].post_when(
last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
Expand All @@ -724,11 +732,12 @@ template <typename Executor> class Aggregated_Executor {
Ts &&...ts) {
std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
assert(slices_exhausted == true);
assert(executor_wrapper);
// Add function call object in case it hasn't happened for this launch yet
if (overall_launch_counter <= slice_launch_counter) {
/* std::lock_guard<recycler::aggregation_mutex_t> guard(mut); */
if (overall_launch_counter <= slice_launch_counter) {
function_calls.emplace_back(current_slices, true, executor);
function_calls.emplace_back(current_slices, true, *executor_wrapper);
overall_launch_counter = function_calls.size();
return function_calls[slice_launch_counter].async_when(
last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
Expand All @@ -744,11 +753,12 @@ template <typename Executor> class Aggregated_Executor {
Ts &&...ts) {
std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
assert(slices_exhausted == true);
assert(executor_wrapper);
// Add function call object in case it hasn't happened for this launch yet
if (overall_launch_counter <= slice_launch_counter) {
/* std::lock_guard<recycler::aggregation_mutex_t> guard(mut); */
if (overall_launch_counter <= slice_launch_counter) {
function_calls.emplace_back(current_slices, true, executor);
function_calls.emplace_back(current_slices, true, *executor_wrapper);
overall_launch_counter = function_calls.size();
return function_calls[slice_launch_counter].wrap_async(
last_stream_launch_done, std::forward<F>(f), std::forward<Ts>(ts)...);
Expand Down Expand Up @@ -813,16 +823,22 @@ template <typename Executor> class Aggregated_Executor {
// Executor_Slice
// futures to ready if the launch conditions are met
if (local_slice_id == 1) {
// Renew promise that all slices will be ready as the primary launch criteria...
// Redraw executor
assert(!executor_wrapper);
stream_pool::select_device<Executor, round_robin_pool<Executor>>(gpu_id);
executor_wrapper.reset(
new stream_interface<Executor, round_robin_pool<Executor>>(gpu_id));
// Renew promise that all slices will be ready as the primary launch
// criteria...
hpx::lcos::shared_future<void> fut;
if (mode == Aggregated_Executor_Modes::EAGER ||
mode == Aggregated_Executor_Modes::ENDLESS) {
// Fallback launch condidtion: Launch as soon as the underlying stream
// is ready
/* auto slices_full_fut = slices_full_promise.get_future(); */
stream_pool::select_device<Executor, round_robin_pool<Executor>>(gpu_id);
auto exec_fut = executor.get_future();
/* fut = hpx::when_any(exec_fut, slices_full_fut); */
auto exec_fut = (*executor_wrapper).get_future();
/* auto fut = hpx::when_any(exec_fut, slices_full_fut); */
fut = std::move(exec_fut);
} else {
auto slices_full_fut = slices_full_promise.get_shared_future();
Expand Down Expand Up @@ -870,25 +886,20 @@ template <typename Executor> class Aggregated_Executor {
void reduce_usage_counter(void) {
/* std::lock_guard<recycler::aggregation_mutex_t> guard(mut); */
assert(slices_exhausted == true);
assert(executor_wrapper);
assert(executor_slices_alive == true);
assert(launched_slices >= 1);
assert(current_slices >= 0 && current_slices <= launched_slices);
const size_t local_slice_id = --current_slices;
// Last slice goes out scope?
if (local_slice_id == 0) {

// Draw new underlying executor TODO Test if it's better to redraw at
// the first slice request stream_pool::release_interface<Executor,
// round_robin_pool<Executor>>( std::get<1>(executor_tuple));
// executor_tuple = stream_pool::get_interface<Executor,
// round_robin_pool<Executor>>(); executor =
// std::get<0>(executor_tuple);
// Mark executor fit for reusage

std::lock_guard<recycler::aggregation_mutex_t> guard(mut);
executor_slices_alive = false;
if (!executor_slices_alive && !buffers_in_use) {
// Release executor
slices_exhausted = false;
executor_wrapper.reset(nullptr);
}
}
}
Expand Down Expand Up @@ -922,11 +933,10 @@ template <typename Executor> class Aggregated_Executor {

Aggregated_Executor(const size_t number_slices,
Aggregated_Executor_Modes mode, const size_t gpu_id = 0)
: max_slices(number_slices), current_slices(0), slices_exhausted(false),dealloc_counter(0),
mode(mode), executor_slices_alive(false), buffers_in_use(false), gpu_id(gpu_id),
executor_tuple(
stream_pool::get_interface<Executor, round_robin_pool<Executor>>(gpu_id)),
executor(std::get<0>(executor_tuple)),
: max_slices(number_slices), current_slices(0), slices_exhausted(false),
dealloc_counter(0), mode(mode), executor_slices_alive(false),
buffers_in_use(false), gpu_id(gpu_id),
executor_wrapper(nullptr),
current_continuation(hpx::make_ready_future()),
last_stream_launch_done(hpx::make_ready_future()) {}
// Not meant to be copied or moved
Expand Down
4 changes: 4 additions & 0 deletions include/stream_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ template <class Interface, class Pool> class stream_interface {
return interface.async_execute(std::forward<F>(f), std::forward<Ts>(ts)...);
}

inline decltype(auto) get_future() {
return interface.get_future();
}

// allow implict conversion
operator Interface &() { // NOLINT
return interface;
Expand Down

0 comments on commit 2e13ab3

Please sign in to comment.