Skip to content

Commit

Permalink
Implement grouped product scan (#15254)
Browse files Browse the repository at this point in the history
Although cumulative products are implemented for whole-frame scans, they were not for grouped aggregations.
Plumb through the necessary machinery to enable this. Only enabled for floating and integral types: the units make no sense for durations.

As for the whole-frame product aggregation, it is very easy to overflow the output type. For floating types this will result in `+/- inf` as the result. For signed integral types, behaviour is undefined on overflow.

- Closes #15253

Authors:
  - Lawrence Mitchell (https://github.com/wence-)
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Bradley Dice (https://github.com/bdice)

URL: #15254
  • Loading branch information
wence- authored Mar 11, 2024
1 parent 8ed3e20 commit 241825a
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 4 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ add_library(
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_product_scan.cu
src/groupby/sort/group_rank_scan.cu
src/groupby/sort/group_replace_nulls.cu
src/groupby/sort/group_sum_scan.cu
Expand Down
1 change: 1 addition & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class sum_aggregation final : public rolling_aggregation,
* @brief Derived class for specifying a product aggregation
*/
class product_aggregation final : public groupby_aggregation,
public groupby_scan_aggregation,
public reduce_aggregation,
public scan_aggregation,
public segmented_reduce_aggregation {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -429,6 +429,8 @@ std::unique_ptr<Base> make_product_aggregation()
}
template std::unique_ptr<aggregation> make_product_aggregation<aggregation>();
template std::unique_ptr<groupby_aggregation> make_product_aggregation<groupby_aggregation>();
template std::unique_ptr<groupby_scan_aggregation>
make_product_aggregation<groupby_scan_aggregation>();
template std::unique_ptr<reduce_aggregation> make_product_aggregation<reduce_aggregation>();
template std::unique_ptr<scan_aggregation> make_product_aggregation<scan_aggregation>();
template std::unique_ptr<segmented_reduce_aggregation>
Expand Down
41 changes: 41 additions & 0 deletions cpp/src/groupby/sort/group_product_scan.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "groupby/sort/group_scan_util.cuh"

#include <rmm/cuda_stream_view.hpp>

namespace cudf {
namespace groupby {
namespace detail {
std::unique_ptr<column> product_scan(column_view const& values,
size_type num_groups,
cudf::device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
return type_dispatcher(values.type(),
group_scan_dispatcher<aggregation::PRODUCT>{},
values,
num_groups,
group_labels,
stream,
mr);
}

} // namespace detail
} // namespace groupby
} // namespace cudf
19 changes: 18 additions & 1 deletion cpp/src/groupby/sort/group_scan.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,23 @@ std::unique_ptr<column> sum_scan(column_view const& values,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Internal API to calculate groupwise cumulative product
*
* Behaviour is undefined for signed integral types if any groupwise product overflows the type.
*
* @param values Grouped values to get product of
* @param num_groups Number of groups
* @param group_labels ID of group that the corresponding value belongs to
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
*/
std::unique_ptr<column> product_scan(column_view const& values,
size_type num_groups,
device_span<size_type const> group_labels,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Internal API to calculate groupwise cumulative minimum value
*
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/groupby/sort/group_scan_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ static constexpr bool is_group_scan_supported()
{
if (K == aggregation::SUM)
return cudf::is_numeric<T>() || cudf::is_duration<T>() || cudf::is_fixed_point<T>();
else if (K == aggregation::PRODUCT)
return cudf::is_numeric<T>();
else if (K == aggregation::MIN or K == aggregation::MAX)
return not cudf::is_dictionary<T>() and
(is_relationally_comparable<T, T>() or std::is_same_v<T, cudf::struct_view>);
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ void scan_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
void scan_result_functor::operator()<aggregation::PRODUCT>(aggregation const& agg)
{
if (cache.has_result(values, agg)) return;

cache.add_result(
values,
agg,
detail::product_scan(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
void scan_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
{
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ ConfigureTest(
groupby/min_scan_tests.cpp
groupby/nth_element_tests.cpp
groupby/nunique_tests.cpp
groupby/product_scan_tests.cpp
groupby/product_tests.cpp
groupby/quantile_tests.cpp
groupby/rank_scan_tests.cpp
Expand Down
142 changes: 142 additions & 0 deletions cpp/tests/groupby/product_scan_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <tests/groupby/groupby_test_util.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/iterator_utilities.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/detail/aggregation/aggregation.hpp>

using key_wrapper = cudf::test::fixed_width_column_wrapper<int32_t>;

template <typename T>
struct groupby_product_scan_test : public cudf::test::BaseFixture {
using V = T;
using R = cudf::detail::target_type_t<V, cudf::aggregation::PRODUCT>;
using value_wrapper = cudf::test::fixed_width_column_wrapper<V, int32_t>;
using result_wrapper = cudf::test::fixed_width_column_wrapper<R, int32_t>;
};

using supported_types =
cudf::test::Concat<cudf::test::Types<int8_t, int16_t, int32_t, int64_t, float, double>>;

TYPED_TEST_SUITE(groupby_product_scan_test, supported_types);

TYPED_TEST(groupby_product_scan_test, basic)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys {1, 2, 3, 1, 2, 2, 1, 3, 3, 2};
value_wrapper vals{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

key_wrapper expect_keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
// {0, 3, 6, 1, 4, 5, 9, 2, 7, 8}
result_wrapper expect_vals{0, 0, 0, 1, 4, 20, 180, 2, 14, 112};
// clang-format on
auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, pre_sorted)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
value_wrapper vals{0, 3, 6, 1, 4, 5, 9, 2, 7, 8};

key_wrapper expect_keys {1, 1, 1, 2, 2, 2, 2, 3, 3, 3};
result_wrapper expect_vals{0, 0, 0, 1, 4, 20, 180, 2, 14, 112};
// clang-format on

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys,
vals,
expect_keys,
expect_vals,
std::move(agg),
cudf::null_policy::EXCLUDE,
cudf::sorted::YES);
}

TYPED_TEST(groupby_product_scan_test, empty_cols)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

key_wrapper keys{};
value_wrapper vals{};

key_wrapper expect_keys{};
result_wrapper expect_vals{};

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, zero_valid_keys)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

key_wrapper keys({1, 2, 3}, cudf::test::iterators::all_nulls());
value_wrapper vals{3, 4, 5};
key_wrapper expect_keys{};
result_wrapper expect_vals{};

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, zero_valid_values)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

key_wrapper keys{1, 1, 1};
value_wrapper vals({3, 4, 5}, cudf::test::iterators::all_nulls());
key_wrapper expect_keys{1, 1, 1};
result_wrapper expect_vals({3, 4, 5}, cudf::test::iterators::all_nulls());

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_product_scan_test, null_keys_and_values)
{
using value_wrapper = typename TestFixture::value_wrapper;
using result_wrapper = typename TestFixture::result_wrapper;

// clang-format off
key_wrapper keys( {1, 2, 3, 1, 2, 2, 1, 3, 3, 2, 4}, {1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1});
value_wrapper vals({0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 4}, {0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0});

// { 1, 1, 1, 2, 2, 2, 2, 3, *, 3, 4};
key_wrapper expect_keys( { 1, 1, 1, 2, 2, 2, 2, 3, 3, 4}, cudf::test::iterators::no_nulls());
// { -, 3, 6, 1, 4, -, 9, 2, _, 8, -}
result_wrapper expect_vals({-1, 3, 18, 1, 4, -1, 36, 2, 16, -1},
{ 0, 1, 1, 1, 1, 0, 1, 1, 1, 0});
// clang-format on

auto agg = cudf::make_product_aggregation<cudf::groupby_scan_aggregation>();
test_single_scan(keys, vals, expect_keys, expect_vals, std::move(agg));
}
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class Aggregation:
cumsum = sum
cummin = min
cummax = max
cumprod = product

@classmethod
def rank(cls, method, ascending, na_option, pct):
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ cdef class GroupBy:
return columns_from_pylibcudf_table(replaced)


_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "rank"}
_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "cumprod", "rank"}


def _is_all_scan_aggregate(all_aggs):
Expand Down
4 changes: 3 additions & 1 deletion python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2319,7 +2319,9 @@ def test_groupby_unique(by, data, dtype):


@pytest.mark.parametrize("nelem", [2, 3, 100, 1000])
@pytest.mark.parametrize("func", ["cummin", "cummax", "cumcount", "cumsum"])
@pytest.mark.parametrize(
"func", ["cummin", "cummax", "cumcount", "cumsum", "cumprod"]
)
def test_groupby_2keys_scan(nelem, func):
pdf = make_frame(pd.DataFrame, nelem=nelem)
expect_df = pdf.groupby(["x", "y"], sort=True).agg(func)
Expand Down

0 comments on commit 241825a

Please sign in to comment.