diff --git a/CMakeLists.txt b/CMakeLists.txt index d88a3d27..24be243c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -251,6 +251,38 @@ install( install(FILES cppuddle-config.cmake DESTINATION ${CMAKE_INSTALL_PREFIX}/lib/cmake/CPPuddle/) install(EXPORT CPPuddle NAMESPACE CPPuddle:: DESTINATION ${CMAKE_INSTALL_PREFIX}/lib/cmake/CPPuddle/) +#Boost::boost Boost::program_options HPX::hpx Kokkos::kokkos HPXKokkos::hpx_kokkos buffer_manager stream_manager +if (CPPUDDLE_WITH_TESTS) + if (CPPUDDLE_WITH_CUDA) + add_hpx_executable( + recycling-with-hpx-cuda + DEPENDENCIES + Boost::boost Boost::program_options HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/recycling-with-hpx-cuda.cu + ) + + if (CPPUDDLE_WITH_KOKKOS) + add_hpx_executable( + recycling-with-hpx-kokkos + DEPENDENCIES + Boost::boost Boost::program_options Kokkos::kokkos HPXKokkos::hpx_kokkos HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/recycling-with-hpx-kokkos.cpp + ) + add_hpx_executable( + kernel-aggregation-with-hpx-kokkos + DEPENDENCIES + Boost::boost Boost::program_options Kokkos::kokkos HPXKokkos::hpx_kokkos HPX::hpx buffer_manager stream_manager + COMPONENT_DEPENDENCIES iostreams + SOURCES + examples/kernel-aggregation-with-hpx-kokkos.cpp + ) + endif() + endif() +endif() #------------------------------------------------------------------------------------------------------------ # Define cmake targets for all tests/example executables diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp new file mode 100644 index 00000000..a0695b7b --- /dev/null +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -0,0 +1,551 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Improve type accessiblity (user should not worry about the +// activated Kokkos backend like belew to pick the correct view types +// TODO(daissgr) Add unified CPPuddle finalize that also cleans up all executor +// pool (and avoids having to use the cleanup methds of the individual pools + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include + +/** \file Work aggregation example using Kokkos with HPX and CPPuddle. Like the + * other examples, we are still using a mere vector-add kernel for simplicity, + * allowing us to focus on how the aggregation actually works. Notably, it works + * similarly for much more complicated application (see Octo-Tiger for an + * example of this) + * + * The example has three parts: First the GPU part (using the kernel + * aggregation feature with both host and device code), then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ + +//================================================================================================= +// PART I: The Kokkos kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= + +// Define types: A lot of this can be done automatically, however, here we want to show the manual +// approach (as using different types/ifdefs can allow us to specialize kernels for specific hardware +// if required. +// +using float_t = float; +// TODO(daissgr) No reason not to have this in a cppuddle header defining a generic type +// +// Use correct device exeuction space and memory spaces depending on the activated device +// execution space +#ifdef KOKKOS_ENABLE_CUDA +#include +// Pick executor type +using device_executor_t = hpx::kokkos::cuda_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_host; +#elif KOKKOS_ENABLE_HIP +#include +// Pick executor type +using device_executor_t = hpx::kokkos::hip_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_host; +#elif KOKKOS_ENABLE_SYCL +#include +// Pick executor type +using device_executor_t = hpx::kokkos::sycl_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_host; +#else +#error "Example assumes both a host and a device Kokkos execution space are available" +#endif +// Plug together the defined Kokkos views with the recycling CPPuddle allocators +// This yields a new type that can be used just like a normal Kokkos View but gets its memory from +// CPPuddle. +using recycling_device_view_t = + cppuddle::memory_recycling::recycling_view; +using recycling_host_view_t = + cppuddle::memory_recycling::recycling_view; +using aggregated_device_view_t = + cppuddle::memory_recycling::aggregated_recycling_view< + kokkos_device_view_t, + cppuddle::kernel_aggregation::allocator_slice< + float_t, device_allocator_t, device_executor_t>, + float_t>; +using aggregated_host_view_t = + cppuddle::memory_recycling::aggregated_recycling_view< + kokkos_host_view_t, + cppuddle::kernel_aggregation::allocator_slice, + float_t>; + +// Run host kernels on HPX execution space: +using host_executor_t = hpx::kokkos::hpx_executor; +// Serial executor can actually work well, too when interleaving multiple Kokkos kernels to +// achieve multicore usage. However, be aware that this only works for Kokkos kernels that are not +// using team policies / scratch memory (those use a global barrier across all Serial execution +// spaces): +// using host_executor_t = hpx::kokkos::serial_executor; + +// The actual compute kernel: Simply defines a exeuction policy with the given executor and runs the +// kernel with a Kokkos::parallel_for +template +void kernel_add(executor_t &executor, const size_t entries_per_task, + const size_t number_slices, const size_t max_kernels_fused, + const view_t &input_a, const view_t &input_b, + view_t &output_c) { + // Define exeuction policy + auto execution_policy = Kokkos::Experimental::require( + Kokkos::RangePolicy( + executor.instance(), 0, entries_per_task * number_slices), + Kokkos::Experimental::WorkItemProperty::HintLightWeight); + + // Run Kernel with execution policy (and give it some name ideally) + // NOTE: Since this kernel may be launched in a way where we combine multiple + // kernel launches into one kernel, it contains another index: the slice ID + // (telling us which kernel a workitem belongs to in case multiple kernels + // were fused. For this simpple kernel there is no difference compute-wise for + // each workitem, however this is not always the case (for instance when + // fusing together multiple reduce kernels), hence we show how to simply + // obtain the slice ID if required here: + Kokkos::parallel_for( + "sample vector add kernel", execution_policy, + KOKKOS_LAMBDA(size_t index) { + // Get slice id (kernel ID in case multiple kernels were fused) + const size_t slice_id = index / entries_per_task; + const size_t entry_index = index % entries_per_task; + // Obtain correct subviews for current team member (slice) + auto [a_slice, b_slice, c_slice] = + cppuddle::kernel_aggregation::map_views_to_slice(slice_id, max_kernels_fused, input_a, + input_b, output_c); + + // Run the actual compute kernel on the mapped subviews + c_slice[entry_index] = a_slice[entry_index] + b_slice[entry_index]; + }); +} + +/** Method that shows how the CPPuddle kernel aggregation feature may be used + * to + * define aggregation regions where teams of tasks can form on-the-fly to + * facilitate a single larger GPU kernel launch than they would individually. */ +void launch_gpu_kernel_task( + const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + const size_t max_kernels_fused, + std::atomic &number_aggregated_kernel_launches, + std::atomic &number_kernel_launches) { + // 3. Check GPU utilization - Method will return true if there is an + // executor in the pool that does currently not exceed its queue limit + // (tracked by RAII, no CUDA/HIP/SYCL API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + // Defines an aggregation region + // ----------------------------- + // The lambda within will be executed a team of threads (between 1 and + // max_kernels_fused). The "threads" correspond to the HPX tasks that + // originally hit this aggregation region. How many of the tasks are fused + // together to this team depends on the utilization of the underlying + // ressource (GPU stream in this case). If it is not busy, a task will + // immediately get to work on lauching its own kernel (teamsize 1). If the + // ressource is busy however, the current task will be combined with other + // tasks arriving and launch as an aggregated/fused task once either + // max_kernels_fused tasks have arrived or the ressource becomes available. + // + // Note that the worker threads are never blocked during this. If a task is + // waiting to be combined with other tasks it will simply be suspended by HPX + static const char aggregation_region_name[] = "vector_add_aggregation"; + auto kernel_done_future = cppuddle::kernel_aggregation::aggregation_region< + aggregation_region_name, device_executor_t, + void>(max_kernels_fused, [&](auto slice_id, auto number_slices, + auto &aggregation_executor) { + // Within the aggregation region we have multiple extra parameters + // available: + // --> slice_id: The team ID of the current thread + // --> number_slices: Thn number of participating threads (1 <= number_slices <= max_kernels_fused) + // --> aggregation executor: This special-purpose executor + // enables the communication between the team members. It does this three + // ways: 1) function calls wrapped by it are only called once (by the last + // team member encountering the call, the previous members visiting just + // signals their readiness), 2) by providing an allocator that creates data + // structures that are shared between the threads (they are only created + // once by the first thread visiting the allocation, subsequent team members + // use the same allocation) and 3) by providing some primitives that allow + // us to conditionally execute some commands only by the final team members + // (just as if we would have wrapped them). + + // Demonstrate how to execute something once per team (by the last team thread visiting)... + if (aggregation_executor.sync_aggregation_slices()) { + // Only executed once per team + number_aggregated_kernel_launches++; + } + // ..and how to execute something for each team members + number_kernel_launches++; + + + // 1. Create recycled Kokkos host views + // 1a) obtain aggregated host allocator from aggregation executor + auto alloc_host = aggregation_executor. template make_allocator(); + // 1b) create aggregated views (shared by all team members, hence larger than entries_per_task) + aggregated_host_view_t host_a(alloc_host, entries_per_task * max_kernels_fused); + aggregated_host_view_t host_b(alloc_host, entries_per_task * max_kernels_fused); + aggregated_host_view_t host_c(alloc_host, entries_per_task * max_kernels_fused); + // 1c) use aggregation_executor with to obtain subviews that just map to the current team member + auto [host_a_slice, host_b_slice, host_c_slice] = + cppuddle::kernel_aggregation::map_views_to_slice( + aggregation_executor, host_a, host_b, host_c); + + // 2. Use per-teammember subview for host-side preprocessing (usually: + // communication, here fill dummy input) + for (size_t i = 0; i < entries_per_task; i++) { + host_a_slice[i] = 1.0; + host_b_slice[i] = 2.0; + } + + // 3 Create subviews + // 3a) obtain aggregated device allocator from aggregation executor + auto alloc_device = aggregation_executor. template make_allocator(); + // 3b) create aggregated views (shared by all team members, hence larger than entries_per_task) + aggregated_device_view_t device_a(alloc_device, + entries_per_task * max_kernels_fused); + aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused); + aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused); + + // 4. Launch data transfers and kernel. Only executed once per team (by + // using the aggregated views and the aggregated_deep_copy wrapper that is + // only executed by the last thread. Alternatively one could have used the + // sync_aggregation_slices for an if branch again. + cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, + device_a, host_a); + cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, + device_b, host_b); + + // 5. Launch the kernel. We could have wrapepd this with the aggregation + // executor but this time we use the sync_aggregation if branch again to + // only launch it once + if (aggregation_executor + .sync_aggregation_slices()) { // Only launch one per team + kernel_add(aggregation_executor.get_underlying_executor(), + entries_per_task, number_slices, max_kernels_fused, device_a, device_b, device_c); + } + + // 6. Future of the last data copy will be ready once the data transfers for the entire team will be done + auto transfer_fut = + cppuddle::kernel_aggregation::aggregrated_deep_copy_async< + device_executor_t>(aggregation_executor, host_c, device_c); + transfer_fut.get(); + + // 7. Now each team member has gotten its results. Hence we can proceed + // with + // some example post processing (usually: communication, here checking + // correctness)) + for (size_t i = 0; i < entries_per_task; i++) { + if (host_c_slice[i] != 1.0 + 2.0) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + break; + } + } + }); + kernel_done_future.get(); +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + const size_t max_kernels_fused, + std::atomic &number_aggregated_kernel_launches, + std::atomic &number_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + max_kernels_fused, &number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + max_kernels_fused, number_aggregated_kernel_launches, + number_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, max_kernels_fused, + &number_aggregated_kernel_launches, + &number_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, max_kernels_fused, + number_aggregated_kernel_launches, + number_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_kernel_launches + << " were scheduled (before kernel fusion)" << std::endl; + hpx::cout << " => " << number_aggregated_kernel_launches + << " fused kernels were launched" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_aggregated_kernel_launches, + &number_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_kernel_launches + << " were scheduled (before kernel fusion)" << std::endl; + hpx::cout << " => " << number_aggregated_kernel_launches + << " fused kernels were launched" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its HPX-Kokkos futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the HPX-Kokkos futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_gpu_executors, + const size_t number_cpu_executors, + const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + // Init polling + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + // Init device executors + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_gpu_executors, hpx::kokkos::execution_space_mode::independent); + /* // Init host executors (fixed to 256) */ + cppuddle::executor_recycling::executor_pool::init_all_executor_pools< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + number_cpu_executors, hpx::kokkos::execution_space_mode::independent); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_gpu_executors, + size_t &max_queue_length, size_t &max_kernels_fused) { + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions) + ->default_value(20), + "Sets the number of repetitions")( + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) + ->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length) + ->default_value(5), + "Maximum numbers of kernels queued per GPU executor")( + "max_kernels_fused", + boost::program_options::value(&max_kernels_fused) + ->default_value(4), + "The maximum amount of kernels being fused together (keep below 128 ideally)"); + + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); + + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + + std::cout << "CPPuddle Aggregation Sample (Vector-Add / Kokkos edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --max_kernels_fused = " << max_kernels_fused << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return false; + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; + } + return true; +} + +int hpx_main(int argc, char *argv[]) { + // Init Kokkos + Kokkos::initialize(); + // Init/Finalize Kokkos alternative using RAII: + /* hpx::kokkos::ScopeGuard g(argc, argv); */ + // Launch counters + std::atomic number_aggregated_kernel_launches = 0; + std::atomic number_kernel_launches = 0; + + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_gpu_executors = 1; + size_t number_cpu_executors = 128; + size_t max_kernels_fused = 4; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_gpu_executors, max_queue_length, max_kernels_fused)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } + + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_gpu_executors, number_cpu_executors, + gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; + + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, max_kernels_fused, + number_aggregated_kernel_launches, number_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; + + // Finalize HPX (CPPuddle finalizes automatically) + hpx::cout << "Finalizing..." << std::endl; + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); + + // Cleanup (executor_pool cleanup required to deallocate all Kokkos execution + // spaces before Kokkos finalize is called) + cppuddle::executor_recycling::executor_pool::cleanup< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::executor_recycling::executor_pool::cleanup< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::memory_recycling::finalize(); + Kokkos::finalize(); // only required if hpx-kokkos Scope Guard is not used + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} diff --git a/examples/recycling-with-hpx-cuda.cu b/examples/recycling-with-hpx-cuda.cu new file mode 100644 index 00000000..81670172 --- /dev/null +++ b/examples/recycling-with-hpx-cuda.cu @@ -0,0 +1,385 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Simplify specifying an executor pool (at least when using the +// default round_robin_pool_impl). The current way seems awfully verbose + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + + +/** \file This example shows how to use HPX + CPPuddle with GPU-accelerated + * applications. Particulary we focus on how to use a) recycled pinned host + * memory, b) recycled device memory, c) the executor pool, d) the HPX-CUDA + * futures and the basic CPU/GPU load balancing based on executor usage in an + * HPX application. To demonstrate these features we just use the simplest of + * kernels: a vector addition that is repeated over a multitude of tasks (with + * varying, artifical dependencies inbetween). So while the compute kernel is + * basic, we still get to see how the CPPuddle/HPX features may be used with + * it. + * + * The example has three parts: First the GPU part, then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ + +//================================================================================================= +// PART I: The (CUDA) GPU kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= + +// Compile-time options: float type... +using float_t = float; +// ... and we will use the HPX CUDA executor inside the executor pool later on +using device_executor_t = hpx::cuda::experimental::cuda_executor; + +/** Just some example CUDA kernel. For simplicity it just adds two vectors. */ +__global__ void kernel_add(const float_t *input_a, const float_t *input_b, float_t *output_c) { + const int index = blockIdx.x * blockDim.x + threadIdx.x; + output_c[index] = input_a[index] + input_b[index]; +} + +/** Method that demonstrates how one might launch a CUDA kernel with HPX and + * CPPuddle recycled memory/executor! By using CPPuddle allocators to avoid + * allocating GPU memory and HPX futures to track the status of the + * kernel/memory transfers, this method is expected to be non-blocking both on + * the launching CPU thread and on the GPU (non malloc barriers). Hence, this + * launch method is suitable to quickly launch a multitude of GPU kernels if + * required. + * + * This method uses the following features: + * - Recycled pinned host memory. + * - Recycled device memory. + * - Draws GPU executor from the CPPuddle executor pool. + * - CPU-GPU load balancing based on the number of GPU executors and their queue length. + * - Asynchronous data-transfers and lauching of the kernel. + * - HPX futures to suspend the HPX task until kernel and data-transfers are done. + * - Includes (sample) pre- and post-processing. */ +void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // 1. Create required per task host-side buffers using CPPuddle recycled + // pinned memory + std::vector> + host_a(entries_per_task); + std::vector> + host_b(entries_per_task); + std::vector> + host_c(entries_per_task); + + // 2. Host-side preprocessing (usually: communication, here fill dummy input) + std::fill(host_a.begin(), host_a.end(), 1.0); + std::fill(host_b.begin(), host_b.end(), 2.0); + + // 3. Check GPU utilization - Method will return true if there is an executor + // in the pool that does currently not exceed its queue limit (tracked by + // RAII, no CUDA API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + // 4. Run Kernel on either CPU or GPU + if (!device_executor_available) { + number_cpu_kernel_launches++; + // 4a. Launch CPU Fallback Version + for (size_t entry_id = 0; entry_id < entries_per_task; entry_id++) { + host_c[entry_id] = host_a[entry_id] + host_b[entry_id]; + } + } else { + number_gpu_kernel_launches++; + // 4b. Create per_task device-side buffers (using recylced device memory) + // and draw GPU executor from CPPuddle executor pool + cppuddle::executor_recycling::executor_interface< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + cppuddle::memory_recycling::cuda_device_buffer device_a( + entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer device_b( + entries_per_task); + cppuddle::memory_recycling::cuda_device_buffer device_c( + entries_per_task); + + // 4c. Launch data transfers and kernel + hpx::apply(static_cast(executor), cudaMemcpyAsync, + device_a.device_side_buffer, host_a.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + hpx::apply(static_cast(executor), cudaMemcpyAsync, + device_b.device_side_buffer, host_b.data(), + entries_per_task * sizeof(float_t), cudaMemcpyHostToDevice); + void *args[] = {&device_a.device_side_buffer, &device_b.device_side_buffer, + &device_c.device_side_buffer}; + hpx::apply(static_cast(executor), + cudaLaunchKernel, kernel_add, + entries_per_task / 128, 128, args, 0); + auto fut = + hpx::async(static_cast(executor), cudaMemcpyAsync, + host_c.data(), device_c.device_side_buffer, + entries_per_task * sizeof(float_t), cudaMemcpyDeviceToHost); + fut.get(); // Allow worker thread to jump away until the kernel and + // data-transfers are done + } + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + if (!std::all_of(host_c.begin(), host_c.end(), + [](float_t i) { return i == 1.0 + 2.0; })) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + } +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + &number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its CUDA futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the CUDA futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_gpu_executors, const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_gpu_executors, gpu_id, true); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_gpu_executors, + size_t &max_queue_length) { + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions) + ->default_value(20), + "Sets the number of repetitions")( + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) + ->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length) + ->default_value(5), + "Maximum numbers of kernels queued per GPU executor"); + + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); + + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + + std::cout << "CPPuddle Recycling Sample (Vector-Add / CUDA edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return false; + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; + } + return true; +} + +int hpx_main(int argc, char *argv[]) { + // Launch counters + std::atomic number_cpu_kernel_launches = 0; + std::atomic number_gpu_kernel_launches = 0; + + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_gpu_executors = 1; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_gpu_executors, max_queue_length)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } + + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_gpu_executors, gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; + + + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; + + // Finalize HPX (CPPuddle finalizes automatically) + hpx::cout << "Finalizing..." << std::endl; + // Deallocates all CPPuddle everything and prevent further usage. Technically + // not required as long as static variables with CPPuddle-managed memory are + // not used, however, it does not hurt either. + cppuddle::memory_recycling::finalize(); + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} diff --git a/examples/recycling-with-hpx-kokkos.cpp b/examples/recycling-with-hpx-kokkos.cpp new file mode 100644 index 00000000..1a3db086 --- /dev/null +++ b/examples/recycling-with-hpx-kokkos.cpp @@ -0,0 +1,476 @@ +// Copyright (c) 2024 Gregor Daiß +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// Developer TODOs regarding CPPuddle usability: +// TODO(daissgr) Improve type accessiblity (user should not worry about the +// activated Kokkos backend like belew to pick the correct view types +// TODO(daissgr) Add unified CPPuddle finalize that also cleans up all executor +// pool (and avoids having to use the cleanup methds of the individual pools + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include + +#include +#include +#include + + +/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated + * applications. The example is extremly similary to its CUDA counterpart, however, uses + * Kokkos for implementation to showcase the required boilerplate and offered features. + * Particulary we focus on how to use a) recycled pinned host + * memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos + * futures and the basic CPU/GPU load balancing based on executor usage in an + * HPX application. To demonstrate these features we just use the simplest of + * kernels: a vector add, that is repeated over a multitude of tasks (with + * varying, artifical dependencies inbetween). So while the compute kernel is + * basic, we still get to see how the CPPuddle/HPX features may be used. + * + * The example has three parts: First the GPU part, then the HPX task graph + * management and lastly the remaining initialization/boilerplate code + */ + +//================================================================================================= +// PART I: The Kokkos kernel and how to launch it with CPPuddle + HPX whilst avoid +// any CPU/GPU barriers +//================================================================================================= + +// Define types: A lot of this can be done automatically, however, here we want to show the manual +// approach (as using different types/ifdefs can allow us to specialize kernels for specific hardware +// if required. +// +using float_t = float; +// Use correct device exeuction space and memory spaces depending on the activated device +// execution space +#ifdef KOKKOS_ENABLE_CUDA +#include +// Pick executor type +using device_executor_t = hpx::kokkos::cuda_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_cuda_host; +#elif KOKKOS_ENABLE_HIP +#include +// Pick executor type +using device_executor_t = hpx::kokkos::hip_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_hip_host; +#elif KOKKOS_ENABLE_SYCL +#include +// Pick executor type +using device_executor_t = hpx::kokkos::sycl_executor; +// Define Kokkos View types to be used! Must be using MemoryUnmanaged to allow memory recycling +using kokkos_device_view_t = Kokkos::View; +using kokkos_host_view_t = Kokkos::View; +// Define CPPuddle recycling allocators to be used with the views +using device_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_device; +using host_allocator_t = cppuddle::memory_recycling::recycle_allocator_sycl_host; +#else +#error "Example assumes both a host and a device Kokkos execution space are available" +#endif +// Plug together the defined Kokkos views with the recycling CPPuddle allocators +// This yields a new type that can be used just like a normal Kokkos View but gets its memory from +// CPPuddle. +using recycling_device_view_t = + cppuddle::memory_recycling::recycling_view; +using recycling_host_view_t = + cppuddle::memory_recycling::recycling_view; + +// Run host kernels on HPX execution space: +using host_executor_t = hpx::kokkos::hpx_executor; +// Serial executor can actually work well, too when interleaving multiple Kokkos kernels to +// achieve multicore usage. However, be aware that this only works for Kokkos kernels that are not +// using team policies / scratch memory (those use a global barrier across all Serial execution +// spaces): +// using host_executor_t = hpx::kokkos::serial_executor; + +// The actual compute kernel: Simply defines a exeuction policy with the given executor and runs the +// kernel with a Kokkos::parallel_for +template +void kernel_add(executor_t &executor, const size_t entries_per_task, + const view_t &input_a, const view_t &input_b, view_t &output_c) +{ + // Define exeuction policy + auto execution_policy = Kokkos::Experimental::require( + Kokkos::RangePolicy( + executor.instance(), 0, entries_per_task), + Kokkos::Experimental::WorkItemProperty::HintLightWeight); + + // Run Kernel with execution policy (and give it some name ideally) + Kokkos::parallel_for( + "sample vector add kernel", execution_policy, + KOKKOS_LAMBDA(size_t index) { + output_c[index] = input_a[index] + input_b[index]; + }); +} + +/** Method that demonstrates how one might launch a Kokkos kernel with HPX and + * CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid + * allocating GPU memory and HPX futures to track the status of the + * kernel/memory transfers, this method is expected to be non-blocking both on + * the launching CPU thread and on the GPU (non malloc barriers). Hence, this + * launch method is suitable to quickly launch a multitude of GPU kernels if + * required. + * + * This method uses the following features: + * - Recycled pinned host memory. + * - Recycled device memory. + * - Draws GPU executor from the CPPuddle executor pool. + * - CPU-GPU load balancing based on the number of GPU executors and their queue length. + * - Asynchronous data-transfers and lauching of the kernel. + * - HPX futures to suspend the HPX task until kernel and data-transfers are done. + * - Includes (sample) pre- and post-processing. */ +void launch_gpu_kernel_task(const size_t task_id, const size_t entries_per_task, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // 1. Create recycled Kokkos host views + recycling_host_view_t host_a(entries_per_task); + recycling_host_view_t host_b(entries_per_task); + recycling_host_view_t host_c(entries_per_task); + + // 2. Host-side preprocessing (usually: communication, here fill dummy input) + for (size_t i = 0; i < entries_per_task; i++) { + host_a[i] = 1.0; + host_b[i] = 2.0; + } + + // 3. Check GPU utilization - Method will return true if there is an executor + // in the pool that does currently not exceed its queue limit (tracked by + // RAII, no CUDA/HIP/SYCL API calls involved) + bool device_executor_available = + cppuddle::executor_recycling::executor_pool::interface_available< + device_executor_t, cppuddle::executor_recycling:: + round_robin_pool_impl>( + max_queue_length, gpu_id); + + // 4. Run Kernel on either CPU or GPU + if (!device_executor_available) { + // 4a. Launch CPU Fallback Version + number_cpu_kernel_launches++; + // Draw host executor + cppuddle::executor_recycling::executor_interface< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + + // Launch + kernel_add(static_cast(executor), entries_per_task, host_a, host_b, host_c); + + // Sync kernel + static_cast(executor).instance().fence(); + + } else { + number_gpu_kernel_launches++; + // 4b. Create per_task device-side views (using recylced device memory) + // and draw GPU executor from CPPuddle executor pool + // Draw host device + cppuddle::executor_recycling::executor_interface< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl> + executor(gpu_id); // Wrapper that draws executor from the pool + + recycling_device_view_t device_a(entries_per_task); + recycling_device_view_t device_b(entries_per_task); + recycling_device_view_t device_c(entries_per_task); + + // 4c. Launch data transfers and kernel + Kokkos::deep_copy(executor.interface.instance(), device_a, host_a); + Kokkos::deep_copy(executor.interface.instance(), device_b, host_b); + + kernel_add(static_cast(executor), entries_per_task, + device_a, device_b, device_c); + + auto transfer_fut = hpx::kokkos::deep_copy_async( + executor.interface.instance(), host_c, device_c); + transfer_fut.get(); + } + + // 5. Host-side postprocessing (usually: communication, here: check + // correctness) + for (size_t i = 0; i < entries_per_task; i++) { + if (host_c[i] != 1.0 + 2.0) { + std::cerr << "Task " << task_id << " contained wrong results!!" + << std::endl; + break; + } + } +} + +//================================================================================================= +// PART II: How to build the dependency graph with HPX and the GPU launches +//================================================================================================= + +/** This methods demonstrates how one might build the HPX task graph + * asynchronously, using the launch_gpu_kernel_task method to launch the GPU + * kernels inside the tasks. To illustrate how one can chain together tasks, we + * support two modes for building the task tree: One keeps the dependencies + * between the repetitions (keeping them in order) and one does not and allows + * to interleave repetitions. */ +hpx::future +build_task_graph(const size_t number_repetitions, const size_t number_tasks, + const size_t entries_per_task, const bool in_order_repetitions, + const size_t max_queue_length, const size_t gpu_id, + std::atomic &number_cpu_kernel_launches, + std::atomic &number_gpu_kernel_launches) { + // Launch tasks + hpx::shared_future previous_iteration_fut = hpx::make_ready_future(); + std::vector> repetition_futs(number_repetitions); + for (size_t repetition = 0; repetition < number_repetitions; repetition++) { + std::vector> futs(number_tasks); + for (size_t task_id = 0; task_id < number_tasks; task_id++) { + // Schedule task either in order (one repetition after another) or out of order + if (in_order_repetitions) { + futs[task_id] = previous_iteration_fut.then( + [task_id, entries_per_task, max_queue_length, gpu_id, + &number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + launch_gpu_kernel_task( + task_id, entries_per_task, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + }); + } else { + futs[task_id] = hpx::async([task_id, entries_per_task, max_queue_length, + gpu_id, &number_cpu_kernel_launches, + &number_gpu_kernel_launches]() { + launch_gpu_kernel_task(task_id, entries_per_task, max_queue_length, + gpu_id, number_cpu_kernel_launches, + number_gpu_kernel_launches); + }); + } + } + // Schedule output task to run once each repetition is done + auto repetition_finished = hpx::when_all(futs); + if (in_order_repetitions) { + previous_iteration_fut = + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + }); + } else { + repetition_futs.emplace_back( + repetition_finished.then([repetition](auto &&fut) { + hpx::cout << "Repetition " << repetition << " done!" << std::endl; + })); + } + } + // Schedule final output task to run once all other tasks are done and return future + if (in_order_repetitions) { + return previous_iteration_fut + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [in-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } else { + return hpx::when_all(repetition_futs) + .then([&number_cpu_kernel_launches, + &number_gpu_kernel_launches](auto &&fut) { + hpx::cout << "All tasks are done! [out-of-order repetitions version]" << std::endl; + hpx::cout << " => " << number_gpu_kernel_launches + << " kernels were run on the GPU" << std::endl; + hpx::cout << " => " << number_cpu_kernel_launches + << " kernels were using the CPU fallback" << std::endl + << std::endl; + }); + } +} + +//================================================================================================= +// PART III: Initialization / Boilerplate and Main +//================================================================================================= + +/** HPX uses either callbacks or event polling to implement its HPX-Kokkos futures. + * Polling usually has the superior performance, however, it requires that the + * polling is initialized at startup (or at least before the HPX-Kokkos futures are + * used). The CPPuddle executor pool also needs initialzing as we need to set it + * to a specified number of executors (which CPPuddle cannot know without the + * number_gpu_executors parameter). We will use the round_robin_pool_impl for + * simplicity. A priority_pool_impl is also available. + */ +void init_executor_pool_and_polling(const size_t number_gpu_executors, + const size_t number_cpu_executors, + const size_t gpu_id) { + assert(gpu_id == 0); // MultiGPU not used in this example + // Init polling + hpx::cuda::experimental::detail::register_polling(hpx::resource::get_thread_pool(0)); + // Init device executors + cppuddle::executor_recycling::executor_pool::init_executor_pool< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + gpu_id, number_gpu_executors, hpx::kokkos::execution_space_mode::independent); + /* // Init host executors (fixed to 256) */ + cppuddle::executor_recycling::executor_pool::init_all_executor_pools< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>( + number_cpu_executors, hpx::kokkos::execution_space_mode::independent); +} + +/// Processes the CLI options via boost program_options to configure the example +bool process_cli_options(int argc, char *argv[], size_t &entries_per_task, + size_t &number_tasks, bool &in_order_repetitions, + size_t &number_repetitions, size_t &number_gpu_executors, + size_t &max_queue_length) { + try { + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help", "Help screen")( + "elements_per_task", + boost::program_options::value(&entries_per_task) + ->default_value(1024), + "Number of elements added per task (corresponds to the number of CUDA " + "workitems used per kernel)")( + "tasks_per_repetition", + boost::program_options::value(&number_tasks) + ->default_value(100), + "Number of tasks per repetition")( + "in_order_repetitions", + boost::program_options::value(&in_order_repetitions) + ->default_value(false), + "Execute repetitions in-order")( + "number_repetitions", + boost::program_options::value(&number_repetitions) + ->default_value(20), + "Sets the number of repetitions")( + "number_gpu_executors", + boost::program_options::value(&number_gpu_executors) + ->default_value(32), + "Number of GPU executors in the pool")( + "max_queue_length_per_executor", + boost::program_options::value(&max_queue_length) + ->default_value(5), + "Maximum numbers of kernels queued per GPU executor"); + + boost::program_options::variables_map vm; + boost::program_options::parsed_options options = + parse_command_line(argc, argv, desc); + boost::program_options::store(options, vm); + boost::program_options::notify(vm); + + if (entries_per_task % 128 != 0) { + std::cerr << "ERROR: --entries_per_task needs to be divisble by 128." << std::endl; + return false; + } + + std::cout << "CPPuddle Recycling Sample (Vector-Add / Kokkos edition)" << std::endl; + std::cout << "=====================================================" << std::endl; + if (vm.count("help") == 0u) { + hpx::cout << "Running with parameters:" << std::endl + << " --elements_per_task = " << entries_per_task << std::endl + << " --tasks_per_repetition = " << number_tasks << std::endl + << " --number_repetitions = " << number_repetitions << std::endl + << " --in_order_repetitions = " << in_order_repetitions << std::endl + << " --number_gpu_executors = " << number_gpu_executors << std::endl + << " --max_queue_length_per_executor = " << max_queue_length << std::endl + << " --hpx:threads = " << hpx::get_os_thread_count() + << std::endl << std::endl; + } else { + std::cout << desc << std::endl; + return false; + } + } catch (const boost::program_options::error &ex) { + std::cerr << "CLI argument problem found: " << ex.what() << '\n'; + return false; + } + return true; +} + +int hpx_main(int argc, char *argv[]) { + // Init Kokkos + Kokkos::initialize(); + // Init/Finalize Kokkos alternative using RAII: + /* hpx::kokkos::ScopeGuard g(argc, argv); */ + // Launch counters + std::atomic number_cpu_kernel_launches = 0; + std::atomic number_gpu_kernel_launches = 0; + + // Runtime options + size_t entries_per_task = 1024; + size_t number_tasks = 100; + size_t number_repetitions = 20; + bool in_order_repetitions = false; + size_t max_queue_length = 5; + size_t number_gpu_executors = 1; + size_t number_cpu_executors = 128; + size_t gpu_id = 0; + if(!process_cli_options(argc, argv, entries_per_task, number_tasks, + in_order_repetitions, number_repetitions, + number_gpu_executors, max_queue_length)) { + return hpx::finalize(); // problem with CLI parameters detected -> exiting.. + } + + // Init HPX CUDA polling + executor pool + hpx::cout << "Start initializing CUDA polling and executor pool..." << std::endl; + init_executor_pool_and_polling(number_gpu_executors, number_cpu_executors, + gpu_id); + hpx::cout << "Init done!" << std::endl << std::endl; + + // Build task graph / Launch all tasks + auto start = std::chrono::high_resolution_clock::now(); + hpx::cout << "Start launching tasks..." << std::endl; + auto all_tasks_done_fut = + build_task_graph(number_repetitions, number_tasks, entries_per_task, + in_order_repetitions, max_queue_length, gpu_id, + number_cpu_kernel_launches, number_gpu_kernel_launches); + hpx::cout << "All tasks launched asynchronously!" << std::endl; + // Only continue once all tasks are done! + all_tasks_done_fut.get(); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + long long microseconds = + std::chrono::duration_cast(elapsed).count(); + hpx::cout << "Launching and running all tasks took " << microseconds + << " microseconds!" << std::endl + << std::endl; + + // Finalize HPX (CPPuddle finalizes automatically) + hpx::cout << "Finalizing..." << std::endl; + hpx::cuda::experimental::detail::unregister_polling( + hpx::resource::get_thread_pool(0)); + + // Cleanup (executor_pool cleanup required to deallocate all Kokkos execution + // spaces before Kokkos finalize is called) + cppuddle::executor_recycling::executor_pool::cleanup< + device_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::executor_recycling::executor_pool::cleanup< + host_executor_t, + cppuddle::executor_recycling::round_robin_pool_impl>(); + cppuddle::memory_recycling::finalize(); + Kokkos::finalize(); // only required if hpx-kokkos Scope Guard is not used + return hpx::finalize(); +} + +int main(int argc, char *argv[]) { + hpx::init_params p; + p.cfg = {"hpx.commandline.allow_unknown=1"}; + return hpx::init(argc, argv, p); +} diff --git a/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp b/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp index 97626ebb..2696e705 100644 --- a/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp +++ b/include/cppuddle/memory_recycling/util/recycling_kokkos_view.hpp @@ -35,6 +35,8 @@ struct view_deleter { /// Kokkos View that automatically uses a recycling allocator using /// alloc_type as an underlying allocator. Must be passed an existing allocator object /// (which should be an allocator_slice from the kernel aggregation functionality) +/** Requires the underlying Kokkos View (kokkos_type) to be a View using the + * MemoryUnmanaged trait! */ template class aggregated_recycling_view : public kokkos_type { private: @@ -95,6 +97,8 @@ class aggregated_recycling_view : public kokkos_type { /// Kokkos View that automatically uses a recycling allocator using /// alloc_type as an underlying allocator +/** Requires the underlying Kokkos View (kokkos_type) to be a View using the + * MemoryUnmanaged trait! */ template class recycling_view : public kokkos_type { private: