Skip to content

Commit

Permalink
Add explicit instantiations for compute_aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
PointKernel committed Sep 30, 2024
1 parent bb7187d commit 5f05ca7
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 301 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ add_library(
src/filling/sequence.cu
src/groupby/groupby.cu
src/groupby/hash/compute_aggregations.cu
src/groupby/hash/compute_aggregations_null.cu
src/groupby/hash/compute_groupby.cu
src/groupby/hash/compute_single_pass_aggs.cu
src/groupby/hash/compute_single_pass_aggs_null.cu
Expand Down
302 changes: 1 addition & 301 deletions cpp/src/groupby/hash/compute_aggregations.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,292 +14,10 @@
* limitations under the License.
*/

#include "compute_aggregations.cuh"
#include "compute_aggregations.hpp"
#include "create_sparse_results_table.hpp"
#include "global_memory_aggregator.cuh"
#include "helpers.cuh"
#include "shared_memory_aggregator.cuh"
#include "single_pass_functors.cuh"

#include <cudf/aggregation.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda.hpp>
#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <cooperative_groups.h>

#include <cstddef>

namespace cudf::groupby::detail::hash {
namespace {
__device__ void calculate_columns_to_aggregate(int& col_start,
int& col_end,
cudf::mutable_table_device_view output_values,
int num_input_cols,
std::byte** s_aggregates_pointer,
bool** s_aggregates_valid_pointer,
std::byte* shared_set_aggregates,
cudf::size_type cardinality,
int total_agg_size)
{
if (threadIdx.x == 0) {
col_start = col_end;
int bytes_allocated = 0;
int valid_col_size = round_to_multiple_of_8(sizeof(bool) * cardinality);
while ((bytes_allocated < total_agg_size) && (col_end < num_input_cols)) {
int next_col_size =
round_to_multiple_of_8(sizeof(output_values.column(col_end).type()) * cardinality);
int next_col_total_size = valid_col_size + next_col_size;
if (bytes_allocated + next_col_total_size > total_agg_size) { break; }
s_aggregates_pointer[col_end] = shared_set_aggregates + bytes_allocated;
s_aggregates_valid_pointer[col_end] =
reinterpret_cast<bool*>(shared_set_aggregates + bytes_allocated + next_col_size);
bytes_allocated += next_col_total_size;
col_end++;
}
}
}

__device__ void initialize_shared_memory_aggregates(int col_start,
int col_end,
cudf::mutable_table_device_view output_values,
std::byte** s_aggregates_pointer,
bool** s_aggregates_valid_pointer,
cudf::size_type cardinality,
cudf::aggregation::Kind const* d_agg_kinds)
{
for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
for (auto idx = threadIdx.x; idx < cardinality; idx += blockDim.x) {
cudf::detail::dispatch_type_and_aggregation(output_values.column(col_idx).type(),
d_agg_kinds[col_idx],
initialize_shmem{},
s_aggregates_pointer[col_idx],
idx,
s_aggregates_valid_pointer[col_idx]);
}
}
}

__device__ void compute_pre_aggregrates(int col_start,
int col_end,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::table_device_view input_values,
cudf::size_type num_input_rows,
cudf::size_type* local_mapping_index,
std::byte** s_aggregates_pointer,
bool** s_aggregates_valid_pointer,
cudf::aggregation::Kind const* d_agg_kinds)
{
// TODO grid_1d utility
for (auto cur_idx = blockDim.x * blockIdx.x + threadIdx.x; cur_idx < num_input_rows;
cur_idx += blockDim.x * gridDim.x) {
if (not skip_rows_with_nulls or cudf::bit_is_set(row_bitmask, cur_idx)) {
auto map_idx = local_mapping_index[cur_idx];

for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
auto input_col = input_values.column(col_idx);

cudf::detail::dispatch_type_and_aggregation(input_col.type(),
d_agg_kinds[col_idx],
shmem_element_aggregator{},
s_aggregates_pointer[col_idx],
map_idx,
s_aggregates_valid_pointer[col_idx],
input_col,
cur_idx);
}
}
}
}

__device__ void compute_final_aggregates(int col_start,
int col_end,
cudf::table_device_view input_values,
cudf::mutable_table_device_view output_values,
cudf::size_type cardinality,
cudf::size_type* global_mapping_index,
std::byte** s_aggregates_pointer,
bool** s_aggregates_valid_pointer,
cudf::aggregation::Kind const* d_agg_kinds)
{
for (auto cur_idx = threadIdx.x; cur_idx < cardinality; cur_idx += blockDim.x) {
auto out_idx = global_mapping_index[blockIdx.x * GROUPBY_SHM_MAX_ELEMENTS + cur_idx];
for (auto col_idx = col_start; col_idx < col_end; col_idx++) {
auto output_col = output_values.column(col_idx);

cudf::detail::dispatch_type_and_aggregation(input_values.column(col_idx).type(),
d_agg_kinds[col_idx],
gmem_element_aggregator{},
output_col,
out_idx,
input_values.column(col_idx),
s_aggregates_pointer[col_idx],
cur_idx,
s_aggregates_valid_pointer[col_idx]);
}
}
}

/* Takes the local_mapping_index and global_mapping_index to compute
* pre (shared) and final (global) aggregates*/
CUDF_KERNEL void compute_d_agg_kinds_kernel(cudf::size_type num_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::mutable_table_device_view output_values,
cudf::aggregation::Kind const* d_agg_kinds,
int total_agg_size,
int pointer_size)
{
auto const block = cooperative_groups::this_thread_block();
auto const cardinality = block_cardinality[block.group_index().x];
if (cardinality >= GROUPBY_CARDINALITY_THRESHOLD) { return; }

auto const num_cols = output_values.num_columns();

__shared__ int col_start;
__shared__ int col_end;
extern __shared__ std::byte shared_set_aggregates[];
std::byte** s_aggregates_pointer =
reinterpret_cast<std::byte**>(shared_set_aggregates + total_agg_size);
bool** s_aggregates_valid_pointer =
reinterpret_cast<bool**>(shared_set_aggregates + total_agg_size + pointer_size);

if (block.thread_rank() == 0) {
col_start = 0;
col_end = 0;
}
block.sync();

while (col_end < num_cols) {
calculate_columns_to_aggregate(col_start,
col_end,
output_values,
num_cols,
s_aggregates_pointer,
s_aggregates_valid_pointer,
shared_set_aggregates,
cardinality,
total_agg_size);
block.sync();
initialize_shared_memory_aggregates(col_start,
col_end,
output_values,
s_aggregates_pointer,
s_aggregates_valid_pointer,
cardinality,
d_agg_kinds);
block.sync();
compute_pre_aggregrates(col_start,
col_end,
row_bitmask,
skip_rows_with_nulls,
input_values,
num_rows,
local_mapping_index,
s_aggregates_pointer,
s_aggregates_valid_pointer,
d_agg_kinds);
block.sync();
compute_final_aggregates(col_start,
col_end,
input_values,
output_values,
cardinality,
global_mapping_index,
s_aggregates_pointer,
s_aggregates_valid_pointer,
d_agg_kinds);
block.sync();
}
}

constexpr size_t get_previous_multiple_of_8(size_t number) { return number / 8 * 8; }

template <typename Kernel>
constexpr std::pair<cudaError_t, size_t> compute_shared_memory_size(Kernel kernel,
int grid_size) noexcept
{
auto const active_blocks_per_sm =
cudf::util::div_rounding_up_safe(grid_size, cudf::detail::num_multiprocessors());

size_t dynamic_shmem_size = 0;

auto const status = cudaOccupancyAvailableDynamicSMemPerBlock(
&dynamic_shmem_size, kernel, active_blocks_per_sm, GROUPBY_BLOCK_SIZE);
if (status != cudaSuccess) { cudaGetLastError(); }
return {status, get_previous_multiple_of_8(0.5 * dynamic_shmem_size)};
}

} // namespace

template <typename SetType>
std::pair<cudaError_t, cudf::table> compute_aggregations(
int grid_size,
cudf::size_type num_input_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::table_view const& flattened_values,
cudf::aggregation::Kind const* d_agg_kinds,
std::vector<cudf::aggregation::Kind> const& agg_kinds,
bool direct_aggregations,
SetType& global_set,
rmm::device_uvector<cudf::size_type>& populated_keys,
rmm::cuda_stream_view stream)
{
auto const [status, shmem_size] =
compute_shared_memory_size(compute_d_agg_kinds_kernel, grid_size);

if (status != cudaSuccess) { direct_aggregations = true; }

// make table that will hold sparse results
cudf::table sparse_table = create_sparse_results_table(flattened_values,
d_agg_kinds,
agg_kinds,
direct_aggregations,
global_set,
populated_keys,
stream);

if (status != cudaSuccess) { return {status, sparse_table}; }

auto d_sparse_table = mutable_table_device_view::create(sparse_table, stream);
auto output_values = *d_sparse_table;

// For each aggregation, need two pointers to arrays in shmem
// One where the aggregation is performed, one indicating the validity of the aggregation
auto const shmem_agg_pointer_size =
round_to_multiple_of_8(sizeof(std::byte*) * output_values.num_columns());
// The rest of shmem is utilized for the actual arrays in shmem
auto const shmem_agg_size = shmem_size - shmem_agg_pointer_size * 2;
compute_d_agg_kinds_kernel<<<grid_size, GROUPBY_BLOCK_SIZE, shmem_size, stream>>>(
num_input_rows,
row_bitmask,
skip_rows_with_nulls,
local_mapping_index,
global_mapping_index,
block_cardinality,
input_values,
output_values,
d_agg_kinds,
shmem_agg_size,
shmem_agg_pointer_size);

return {status, sparse_table};
}

template std::pair<cudaError_t, cudf::table> compute_aggregations<global_set_t>(
int grid_size,
cudf::size_type num_input_rows,
Expand All @@ -316,22 +34,4 @@ template std::pair<cudaError_t, cudf::table> compute_aggregations<global_set_t>(
global_set_t& global_set,
rmm::device_uvector<cudf::size_type>& populated_keys,
rmm::cuda_stream_view stream);

template std::pair<cudaError_t, cudf::table> compute_aggregations<nullable_global_set_t>(
int grid_size,
cudf::size_type num_input_rows,
bitmask_type const* row_bitmask,
bool skip_rows_with_nulls,
cudf::size_type* local_mapping_index,
cudf::size_type* global_mapping_index,
cudf::size_type* block_cardinality,
cudf::table_device_view input_values,
cudf::table_view const& flattened_values,
cudf::aggregation::Kind const* d_agg_kinds,
std::vector<cudf::aggregation::Kind> const& agg_kinds,
bool direct_aggregations,
nullable_global_set_t& global_set,
rmm::device_uvector<cudf::size_type>& populated_keys,
rmm::cuda_stream_view stream);

} // namespace cudf::groupby::detail::hash
Loading

0 comments on commit 5f05ca7

Please sign in to comment.