diff --git a/examples/kernel-aggregation-with-hpx-kokkos.cpp b/examples/kernel-aggregation-with-hpx-kokkos.cpp index 51ffef9..d734f8f 100644 --- a/examples/kernel-aggregation-with-hpx-kokkos.cpp +++ b/examples/kernel-aggregation-with-hpx-kokkos.cpp @@ -35,19 +35,14 @@ #include #include - -/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated - * applications. The example is extremly similary to its CUDA counterpart, however, uses - * Kokkos for implementation to showcase the required boilerplate and offered features. - * Particulary we focus on how to use a) recycled pinned host - * memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos - * futures and the basic CPU/GPU load balancing based on executor usage in an - * HPX application. To demonstrate these features we just use the simplest of - * kernels: a vector add, that is repeated over a multitude of tasks (with - * varying, artifical dependencies inbetween). So while the compute kernel is - * basic, we still get to see how the CPPuddle/HPX features may be used. +/** \file Work aggregation example using Kokkos with HPX and CPPuddle. Like the + * other examples, we are still using a mere vector-add kernel for simplicity, + * allowing us to focus on how the aggregation actually works. Notably, it works + * similarly for much more complicated application (see Octo-Tiger for an + * example of this) * - * The example has three parts: First the GPU part, then the HPX task graph + * The example has three parts: First the GPU part (using the kernel + * aggregation feature with both host and device code), then the HPX task graph * management and lastly the remaining initialization/boilerplate code */ @@ -61,6 +56,8 @@ // if required. // using float_t = float; +// TODO(daissgr) No reason not to have this in a cppuddle header defining a generic type +// // Use correct device exeuction space and memory spaces depending on the activated device // execution space #ifdef KOKKOS_ENABLE_CUDA @@ -140,35 +137,33 @@ void kernel_add(executor_t &executor, const size_t entries_per_task, Kokkos::Experimental::WorkItemProperty::HintLightWeight); // Run Kernel with execution policy (and give it some name ideally) + // NOTE: Since this kernel may be launched in a way where we combine multiple + // kernel launches into one kernel, it contains another index: the slice ID + // (telling us which kernel a workitem belongs to in case multiple kernels + // were fused. For this simpple kernel there is no difference compute-wise for + // each workitem, however this is not always the case (for instance when + // fusing together multiple reduce kernels), hence we show how to simply + // obtain the slice ID if required here: Kokkos::parallel_for( "sample vector add kernel", execution_policy, KOKKOS_LAMBDA(size_t index) { + // Get slice id (kernel ID in case multiple kernels were fused) const size_t slice_id = index / entries_per_task; const size_t entry_index = index % entries_per_task; + // Obtain correct subviews for current team member (slice) auto [a_slice, b_slice, c_slice] = cppuddle::kernel_aggregation::map_views_to_slice(slice_id, max_kernels_fused, input_a, input_b, output_c); + // Run the actual compute kernel on the mapped subviews c_slice[entry_index] = a_slice[entry_index] + b_slice[entry_index]; }); } -/** Method that demonstrates how one might launch a Kokkos kernel with HPX and - * CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid - * allocating GPU memory and HPX futures to track the status of the - * kernel/memory transfers, this method is expected to be non-blocking both on - * the launching CPU thread and on the GPU (non malloc barriers). Hence, this - * launch method is suitable to quickly launch a multitude of GPU kernels if - * required. - * - * This method uses the following features: - * - Recycled pinned host memory. - * - Recycled device memory. - * - Draws GPU executor from the CPPuddle executor pool. - * - CPU-GPU load balancing based on the number of GPU executors and their queue length. - * - Asynchronous data-transfers and lauching of the kernel. - * - HPX futures to suspend the HPX task until kernel and data-transfers are done. - * - Includes (sample) pre- and post-processing. */ +/** Method that shows how the CPPuddle kernel aggregation feature may be used + * to + * define aggregation regions where teams of tasks can form on-the-fly to + * facilitate a single larger GPU kernel launch than they would individually. */ void launch_gpu_kernel_task( const size_t task_id, const size_t entries_per_task, const size_t max_queue_length, const size_t gpu_id, @@ -184,61 +179,105 @@ void launch_gpu_kernel_task( round_robin_pool_impl>( max_queue_length, gpu_id); + // Defines an aggregation region + // ----------------------------- + // The lambda within will be executed a team of threads (between 1 and + // max_kernels_fused). The "threads" correspond to the HPX tasks that + // originally hit this aggregation region. How many of the tasks are fused + // together to this team depends on the utilization of the underlying + // ressource (GPU stream in this case). If it is not busy, a task will + // immediately get to work on lauching its own kernel (teamsize 1). If the + // ressource is busy however, the current task will be combined with other + // tasks arriving and launch as an aggregated/fused task once either + // max_kernels_fused tasks have arrived or the ressource becomes available. + // + // Note that the worker threads are never blocked during this. If a task is + // waiting to be combined with other tasks it will simply be suspended by HPX static const char aggregation_region_name[] = "vector_add_aggregation"; auto kernel_done_future = cppuddle::kernel_aggregation::aggregation_region< aggregation_region_name, device_executor_t, void>(max_kernels_fused, [&](auto slice_id, auto number_slices, - auto &aggregation_executor) { - auto alloc_host = aggregation_executor. template make_allocator(); - assert(number_slices >= 1); - assert(number_slices < max_kernels_fused); - + auto &aggregation_executor) { + // Within the aggregation region we have multiple extra parameters + // available: + // --> slice_id: The team ID of the current thread + // --> number_slices: Thn number of participating threads (1 <= number_slices <= max_kernels_fused) + // --> aggregation executor: This special-purpose executor + // enables the communication between the team members. It does this three + // ways: 1) function calls wrapped by it are only called once (by the last + // team member encountering the call, the previous members visiting just + // signals their readiness), 2) by providing an allocator that creates data + // structures that are shared between the threads (they are only created + // once by the first thread visiting the allocation, subsequent team members + // use the same allocation) and 3) by providing some primitives that allow + // us to conditionally execute some commands only by the final team members + // (just as if we would have wrapped them). + + // Demonstrate how to execute something once per team (by the last team thread visiting)... if (aggregation_executor.sync_aggregation_slices()) { - // Only executed once per team - number_aggregated_kernel_launches++; + // Only executed once per team + number_aggregated_kernel_launches++; } - // Executed by each team member + // ..and how to execute something for each team members number_kernel_launches++; - // 1. Create recycled Kokkos host views + + + // 1. Create recycled Kokkos host views + // 1a) obtain aggregated host allocator from aggregation executor + auto alloc_host = aggregation_executor. template make_allocator(); + // 1b) create aggregated views (shared by all team members, hence larger than entries_per_task) aggregated_host_view_t host_a(alloc_host, entries_per_task * max_kernels_fused); aggregated_host_view_t host_b(alloc_host, entries_per_task * max_kernels_fused); aggregated_host_view_t host_c(alloc_host, entries_per_task * max_kernels_fused); - + // 1c) use aggregation_executor with to obtain subviews that just map to the current team member auto [host_a_slice, host_b_slice, host_c_slice] = cppuddle::kernel_aggregation::map_views_to_slice( aggregation_executor, host_a, host_b, host_c); - // 2. Host-side preprocessing (usually: communication, here fill dummy - // input) + // 2. Use per-teammember subview for host-side preprocessing (usually: + // communication, here fill dummy input) for (size_t i = 0; i < entries_per_task; i++) { host_a_slice[i] = 1.0; host_b_slice[i] = 2.0; } + // 3 Create subviews + // 3a) obtain aggregated device allocator from aggregation executor auto alloc_device = aggregation_executor. template make_allocator(); + // 3b) create aggregated views (shared by all team members, hence larger than entries_per_task) aggregated_device_view_t device_a(alloc_device, entries_per_task * max_kernels_fused); aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused); aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused); - // 4c. Launch data transfers and kernel + // 4. Launch data transfers and kernel. Only executed once per team (by + // using the aggregated views and the aggregated_deep_copy wrapper that is + // only executed by the last thread. Alternatively one could have used the + // sync_aggregation_slices for an if branch again. cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, device_a, host_a); cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor, device_b, host_b); - if (aggregation_executor.sync_aggregation_slices()) { // Only launch one per team + // 5. Launch the kernel. We could have wrapepd this with the aggregation + // executor but this time we use the sync_aggregation if branch again to + // only launch it once + if (aggregation_executor + .sync_aggregation_slices()) { // Only launch one per team kernel_add(aggregation_executor.get_underlying_executor(), entries_per_task, number_slices, max_kernels_fused, device_a, device_b, device_c); } + // 6. Future of the last data copy will be ready once the data transfers for the entire team will be done auto transfer_fut = cppuddle::kernel_aggregation::aggregrated_deep_copy_async< device_executor_t>(aggregation_executor, host_c, device_c); transfer_fut.get(); - // 5. Host-side postprocessing (usually: communication, here: check - // correctness) + // 7. Now each team member has gotten its results. Hence we can proceed + // with + // some example post processing (usually: communication, here checking + // correctness)) for (size_t i = 0; i < entries_per_task; i++) { if (host_c_slice[i] != 1.0 + 2.0) { std::cerr << "Task " << task_id << " contained wrong results!!"