Skip to content

Commit

Permalink
Add more descriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
G-071 committed May 31, 2024
1 parent 751adac commit 703dc23
Showing 1 changed file with 83 additions and 44 deletions.
127 changes: 83 additions & 44 deletions examples/kernel-aggregation-with-hpx-kokkos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,14 @@
#include <stdexcept>
#include <vector>


/** \file This example shows how to use HPX + Kokkos + CPPuddle with GPU-accelerated
* applications. The example is extremly similary to its CUDA counterpart, however, uses
* Kokkos for implementation to showcase the required boilerplate and offered features.
* Particulary we focus on how to use a) recycled pinned host
* memory, b) recycled device memory, c) the executor pool, d) the HPX-Kokkos
* futures and the basic CPU/GPU load balancing based on executor usage in an
* HPX application. To demonstrate these features we just use the simplest of
* kernels: a vector add, that is repeated over a multitude of tasks (with
* varying, artifical dependencies inbetween). So while the compute kernel is
* basic, we still get to see how the CPPuddle/HPX features may be used.
/** \file Work aggregation example using Kokkos with HPX and CPPuddle. Like the
* other examples, we are still using a mere vector-add kernel for simplicity,
* allowing us to focus on how the aggregation actually works. Notably, it works
* similarly for much more complicated application (see Octo-Tiger for an
* example of this)
*
* The example has three parts: First the GPU part, then the HPX task graph
* The example has three parts: First the GPU part (using the kernel
* aggregation feature with both host and device code), then the HPX task graph
* management and lastly the remaining initialization/boilerplate code
*/

Expand All @@ -61,6 +56,8 @@
// if required.
//
using float_t = float;
// TODO(daissgr) No reason not to have this in a cppuddle header defining a generic type
//
// Use correct device exeuction space and memory spaces depending on the activated device
// execution space
#ifdef KOKKOS_ENABLE_CUDA
Expand Down Expand Up @@ -140,35 +137,33 @@ void kernel_add(executor_t &executor, const size_t entries_per_task,
Kokkos::Experimental::WorkItemProperty::HintLightWeight);

// Run Kernel with execution policy (and give it some name ideally)
// NOTE: Since this kernel may be launched in a way where we combine multiple
// kernel launches into one kernel, it contains another index: the slice ID
// (telling us which kernel a workitem belongs to in case multiple kernels
// were fused. For this simpple kernel there is no difference compute-wise for
// each workitem, however this is not always the case (for instance when
// fusing together multiple reduce kernels), hence we show how to simply
// obtain the slice ID if required here:
Kokkos::parallel_for(
"sample vector add kernel", execution_policy,
KOKKOS_LAMBDA(size_t index) {
// Get slice id (kernel ID in case multiple kernels were fused)
const size_t slice_id = index / entries_per_task;
const size_t entry_index = index % entries_per_task;
// Obtain correct subviews for current team member (slice)
auto [a_slice, b_slice, c_slice] =
cppuddle::kernel_aggregation::map_views_to_slice(slice_id, max_kernels_fused, input_a,
input_b, output_c);

// Run the actual compute kernel on the mapped subviews
c_slice[entry_index] = a_slice[entry_index] + b_slice[entry_index];
});
}

/** Method that demonstrates how one might launch a Kokkos kernel with HPX and
* CPPuddle recycled memory/executors! By using CPPuddle allocators to avoid
* allocating GPU memory and HPX futures to track the status of the
* kernel/memory transfers, this method is expected to be non-blocking both on
* the launching CPU thread and on the GPU (non malloc barriers). Hence, this
* launch method is suitable to quickly launch a multitude of GPU kernels if
* required.
*
* This method uses the following features:
* - Recycled pinned host memory.
* - Recycled device memory.
* - Draws GPU executor from the CPPuddle executor pool.
* - CPU-GPU load balancing based on the number of GPU executors and their queue length.
* - Asynchronous data-transfers and lauching of the kernel.
* - HPX futures to suspend the HPX task until kernel and data-transfers are done.
* - Includes (sample) pre- and post-processing. */
/** Method that shows how the CPPuddle kernel aggregation feature may be used
* to
* define aggregation regions where teams of tasks can form on-the-fly to
* facilitate a single larger GPU kernel launch than they would individually. */
void launch_gpu_kernel_task(
const size_t task_id, const size_t entries_per_task,
const size_t max_queue_length, const size_t gpu_id,
Expand All @@ -184,61 +179,105 @@ void launch_gpu_kernel_task(
round_robin_pool_impl<device_executor_t>>(
max_queue_length, gpu_id);

// Defines an aggregation region
// -----------------------------
// The lambda within will be executed a team of threads (between 1 and
// max_kernels_fused). The "threads" correspond to the HPX tasks that
// originally hit this aggregation region. How many of the tasks are fused
// together to this team depends on the utilization of the underlying
// ressource (GPU stream in this case). If it is not busy, a task will
// immediately get to work on lauching its own kernel (teamsize 1). If the
// ressource is busy however, the current task will be combined with other
// tasks arriving and launch as an aggregated/fused task once either
// max_kernels_fused tasks have arrived or the ressource becomes available.
//
// Note that the worker threads are never blocked during this. If a task is
// waiting to be combined with other tasks it will simply be suspended by HPX
static const char aggregation_region_name[] = "vector_add_aggregation";
auto kernel_done_future = cppuddle::kernel_aggregation::aggregation_region<
aggregation_region_name, device_executor_t,
void>(max_kernels_fused, [&](auto slice_id, auto number_slices,
auto &aggregation_executor) {
auto alloc_host = aggregation_executor. template make_allocator<float_t, host_allocator_t>();
assert(number_slices >= 1);
assert(number_slices < max_kernels_fused);

auto &aggregation_executor) {
// Within the aggregation region we have multiple extra parameters
// available:
// --> slice_id: The team ID of the current thread
// --> number_slices: Thn number of participating threads (1 <= number_slices <= max_kernels_fused)
// --> aggregation executor: This special-purpose executor
// enables the communication between the team members. It does this three
// ways: 1) function calls wrapped by it are only called once (by the last
// team member encountering the call, the previous members visiting just
// signals their readiness), 2) by providing an allocator that creates data
// structures that are shared between the threads (they are only created
// once by the first thread visiting the allocation, subsequent team members
// use the same allocation) and 3) by providing some primitives that allow
// us to conditionally execute some commands only by the final team members
// (just as if we would have wrapped them).

// Demonstrate how to execute something once per team (by the last team thread visiting)...
if (aggregation_executor.sync_aggregation_slices()) {
// Only executed once per team
number_aggregated_kernel_launches++;
// Only executed once per team
number_aggregated_kernel_launches++;
}
// Executed by each team member
// ..and how to execute something for each team members
number_kernel_launches++;
// 1. Create recycled Kokkos host views


// 1. Create recycled Kokkos host views
// 1a) obtain aggregated host allocator from aggregation executor
auto alloc_host = aggregation_executor. template make_allocator<float_t, host_allocator_t>();
// 1b) create aggregated views (shared by all team members, hence larger than entries_per_task)
aggregated_host_view_t host_a(alloc_host, entries_per_task * max_kernels_fused);
aggregated_host_view_t host_b(alloc_host, entries_per_task * max_kernels_fused);
aggregated_host_view_t host_c(alloc_host, entries_per_task * max_kernels_fused);

// 1c) use aggregation_executor with to obtain subviews that just map to the current team member
auto [host_a_slice, host_b_slice, host_c_slice] =
cppuddle::kernel_aggregation::map_views_to_slice(
aggregation_executor, host_a, host_b, host_c);

// 2. Host-side preprocessing (usually: communication, here fill dummy
// input)
// 2. Use per-teammember subview for host-side preprocessing (usually:
// communication, here fill dummy input)
for (size_t i = 0; i < entries_per_task; i++) {
host_a_slice[i] = 1.0;
host_b_slice[i] = 2.0;
}

// 3 Create subviews
// 3a) obtain aggregated device allocator from aggregation executor
auto alloc_device = aggregation_executor. template make_allocator<float_t, device_allocator_t>();
// 3b) create aggregated views (shared by all team members, hence larger than entries_per_task)
aggregated_device_view_t device_a(alloc_device,
entries_per_task * max_kernels_fused);
aggregated_device_view_t device_b(alloc_device, entries_per_task * max_kernels_fused);
aggregated_device_view_t device_c(alloc_device, entries_per_task * max_kernels_fused);

// 4c. Launch data transfers and kernel
// 4. Launch data transfers and kernel. Only executed once per team (by
// using the aggregated views and the aggregated_deep_copy wrapper that is
// only executed by the last thread. Alternatively one could have used the
// sync_aggregation_slices for an if branch again.
cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor,
device_a, host_a);
cppuddle::kernel_aggregation::aggregated_deep_copy(aggregation_executor,
device_b, host_b);

if (aggregation_executor.sync_aggregation_slices()) { // Only launch one per team
// 5. Launch the kernel. We could have wrapepd this with the aggregation
// executor but this time we use the sync_aggregation if branch again to
// only launch it once
if (aggregation_executor
.sync_aggregation_slices()) { // Only launch one per team
kernel_add(aggregation_executor.get_underlying_executor(),
entries_per_task, number_slices, max_kernels_fused, device_a, device_b, device_c);
}

// 6. Future of the last data copy will be ready once the data transfers for the entire team will be done
auto transfer_fut =
cppuddle::kernel_aggregation::aggregrated_deep_copy_async<
device_executor_t>(aggregation_executor, host_c, device_c);
transfer_fut.get();

// 5. Host-side postprocessing (usually: communication, here: check
// correctness)
// 7. Now each team member has gotten its results. Hence we can proceed
// with
// some example post processing (usually: communication, here checking
// correctness))
for (size_t i = 0; i < entries_per_task; i++) {
if (host_c_slice[i] != 1.0 + 2.0) {
std::cerr << "Task " << task_id << " contained wrong results!!"
Expand Down

0 comments on commit 703dc23

Please sign in to comment.