Skip to content

Commit

Permalink
Use cuco::static_set in the hash-based groupby (#14813)
Browse files Browse the repository at this point in the history
Depends on #14849

Contributes to #12261

This PR migrates hash groupby to use the new `cuco::static_set` data structure. It doesn't change any existing libcudf behavior but uncovers the fact that the cudf python `value_counts` doesn't guarantee output orders thus the PR becomes a breaking change.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #14813
  • Loading branch information
PointKernel authored Feb 29, 2024
1 parent a9e41e7 commit 200fc0b
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 129 deletions.
7 changes: 6 additions & 1 deletion cpp/benchmarks/groupby/group_max.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>

#include <cudf/groupby.hpp>

Expand Down Expand Up @@ -50,9 +51,13 @@ void bench_groupby_max(nvbench::state& state, nvbench::type_list<Type>)
requests[0].values = vals->view();
requests[0].aggregations.push_back(cudf::make_max_aggregation<cudf::groupby_aggregation>());

auto const mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto const result = gb_obj.aggregate(requests); });

state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}

NVBENCH_BENCH_TYPES(bench_groupby_max,
Expand Down
9 changes: 7 additions & 2 deletions cpp/benchmarks/groupby/group_struct_keys.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>

#include <cudf_test/column_wrapper.hpp>

Expand Down Expand Up @@ -80,11 +81,15 @@ void bench_groupby_struct_keys(nvbench::state& state)
requests[0].aggregations.push_back(cudf::make_min_aggregation<cudf::groupby_aggregation>());

// Set up nvbench default stream
auto stream = cudf::get_default_stream();
auto const mem_stats_logger = cudf::memory_stats_logger();
auto stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));

state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto const result = gb_obj.aggregate(requests); });

state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}

NVBENCH_BENCH(bench_groupby_struct_keys)
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/cudf/detail/cuco_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

#pragma once

#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>

namespace cudf::detail {

/// Sentinel value for `cudf::size_type`
static cudf::size_type constexpr CUDF_SIZE_TYPE_SENTINEL = -1;

/// Default load factor for cuco data structures
static double constexpr CUCO_DESIRED_LOAD_FACTOR = 0.5;

Expand Down
123 changes: 54 additions & 69 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/binaryop.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/replace.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/detail/utilities/algorithm.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/groupby.hpp>
#include <cudf/hashing/detail/default_hash.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_device_view.cuh>
Expand All @@ -49,12 +45,9 @@

#include <rmm/cuda_stream_view.hpp>

#include <cuda/functional>
#include <cuda/std/atomic>
#include <thrust/copy.h>
#include <cuco/static_set.cuh>
#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>

#include <memory>
#include <unordered_set>
Expand All @@ -66,15 +59,12 @@ namespace detail {
namespace hash {
namespace {

// TODO: replace it with `cuco::static_map`
// https://github.com/rapidsai/cudf/issues/10401
template <typename ComparatorType>
using map_type = concurrent_unordered_map<
cudf::size_type,
cudf::size_type,
// TODO: similar to `contains_table`, using larger CG size like 2 or 4 for nested
// types and `cg_size = 1`for flat data to improve performance
using probing_scheme_type = cuco::linear_probing<
1, ///< Number of threads used to handle each input key
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>,
ComparatorType>;
cudf::nullate::DYNAMIC>>;

/**
* @brief List of aggregation operations that can be computed with a hash-based
Expand Down Expand Up @@ -190,14 +180,14 @@ class groupby_simple_aggregations_collector final
}
};

template <typename ComparatorType>
template <typename SetType>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
cudf::detail::result_cache* dense_results;
device_span<size_type const> gather_map;
map_type<ComparatorType> const& map;
SetType set;
bitmask_type const* __restrict__ row_bitmask;
rmm::cuda_stream_view stream;
rmm::mr::device_memory_resource* mr;
Expand All @@ -209,15 +199,15 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type<ComparatorType> const& map,
SetType set,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col(col),
sparse_results(sparse_results),
dense_results(dense_results),
gather_map(gather_map),
map(map),
set(set),
row_bitmask(row_bitmask),
stream(stream),
mr(mr)
Expand Down Expand Up @@ -340,8 +330,8 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
col.size(),
::cudf::detail::var_hash_functor<map_type<ComparatorType>>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
::cudf::detail::var_hash_functor{
set, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
Expand Down Expand Up @@ -398,13 +388,13 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
*
* @see groupby_null_templated()
*/
template <typename ComparatorType>
template <typename SetType>
void sparse_to_dense_results(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type<ComparatorType> const& map,
SetType set,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream,
Expand All @@ -423,7 +413,7 @@ void sparse_to_dense_results(table_view const& keys,
// Given an aggregation, this will get the result from sparse_results and
// convert and return dense, compacted result
auto finalizer = hash_compound_agg_finalizer(
col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
col, sparse_results, dense_results, gather_map, set, row_bitmask_ptr, stream, mr);
for (auto&& agg : agg_v) {
agg->finalize(finalizer);
}
Expand Down Expand Up @@ -467,11 +457,11 @@ auto create_sparse_results_table(table_view const& flattened_values,
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename ComparatorType>
template <typename SetType>
void compute_single_pass_aggs(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
map_type<ComparatorType>& map,
SetType set,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream)
Expand All @@ -494,16 +484,16 @@ void compute_single_pass_aggs(table_view const& keys,
? cudf::detail::bitmask_and(keys, stream, rmm::mr::get_current_device_resource()).first
: rmm::device_buffer{};

thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn<map_type<ComparatorType>>{
map,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn{set,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
Expand All @@ -517,23 +507,15 @@ void compute_single_pass_aggs(table_view const& keys,
* @brief Computes and returns a device vector containing all populated keys in
* `map`.
*/
template <typename ComparatorType>
rmm::device_uvector<size_type> extract_populated_keys(map_type<ComparatorType> const& map,
template <typename SetType>
rmm::device_uvector<size_type> extract_populated_keys(SetType const& key_set,
size_type num_keys,
rmm::cuda_stream_view stream)
{
rmm::device_uvector<size_type> populated_keys(num_keys, stream);
auto const keys_end = key_set.retrieve_all(populated_keys.begin(), stream.value());

auto const get_key = cuda::proclaim_return_type<typename map_type<ComparatorType>::key_type>(
[] __device__(auto const& element) { return element.first; }); // first = key
auto const key_used = [unused = map.get_unused_key()] __device__(auto key) {
return key != unused;
};
auto const key_itr = thrust::make_transform_iterator(map.data(), get_key);
auto const end_it = cudf::detail::copy_if_safe(
key_itr, key_itr + map.capacity(), populated_keys.begin(), key_used, stream);

populated_keys.resize(std::distance(populated_keys.begin(), end_it), stream);
populated_keys.resize(std::distance(populated_keys.begin(), keys_end), stream);
return populated_keys;
}

Expand Down Expand Up @@ -580,38 +562,41 @@ std::unique_ptr<table> groupby(table_view const& keys,
auto const row_hash = cudf::experimental::row::hash::row_hasher{std::move(preprocessed_keys)};
auto const d_row_hash = row_hash.device_hasher(has_null);

size_type constexpr unused_key{std::numeric_limits<size_type>::max()};
size_type constexpr unused_value{std::numeric_limits<size_type>::max()};

// Cache of sparse results where the location of aggregate value in each
// column is indexed by the hash map
// column is indexed by the hash set
cudf::detail::result_cache sparse_results(requests.size());

auto const comparator_helper = [&](auto const d_key_equal) {
using allocator_type = typename map_type<decltype(d_key_equal)>::allocator_type;

auto const map = map_type<decltype(d_key_equal)>::create(compute_hash_table_size(num_keys),
stream,
unused_key,
unused_value,
d_row_hash,
d_key_equal,
allocator_type());
// Compute all single pass aggs first
compute_single_pass_aggs(
keys, requests, &sparse_results, *map, keys_have_nulls, include_null_keys, stream);
auto const set = cuco::static_set{num_keys,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_key_equal,
probing_scheme_type{d_row_hash},
cuco::thread_scope_device,
cuco::storage<1>{},
cudf::detail::cuco_allocator{stream},
stream.value()};

// Extract the populated indices from the hash map and create a gather map.
// Compute all single pass aggs first
compute_single_pass_aggs(keys,
requests,
&sparse_results,
set.ref(cuco::insert_and_find),
keys_have_nulls,
include_null_keys,
stream);

// Extract the populated indices from the hash set and create a gather map.
// Gathering using this map from sparse results will give dense results.
auto gather_map = extract_populated_keys(*map, keys.num_rows(), stream);
auto gather_map = extract_populated_keys(set, keys.num_rows(), stream);

// Compact all results from sparse_results and insert into cache
sparse_to_dense_results(keys,
requests,
&sparse_results,
cache,
gather_map,
*map,
set.ref(cuco::find),
keys_have_nulls,
include_null_keys,
stream,
Expand Down
Loading

0 comments on commit 200fc0b

Please sign in to comment.