From 8c839bcc39e9f5a2ae7b5c018d32940739cc74a8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 10 Sep 2024 14:19:27 -0700 Subject: [PATCH] Fix empty cluster handling in tdigest merge (#16675) This PR fixes an edge case bug in the tdigest merge. When there are multiple distinct keys but all values are empty clusters, the value column is currently merged into a single empty cluster after merge, which leads to an error while creating a result table because of the mismatching number of rows in the key and value columns. This bug can be reproduced only when all values are empty clusters. If some values are empty but some are not, the current implementation returns a valid result. This bug was originally reported in https://github.com/NVIDIA/spark-rapids/issues/11367. The bug exists in `merge_tdigests()` as it assumes that there is no empty cluster in the merge stage even when there are (`has_nulls` are fixed to `false`). It is rather safe to assume that always there could be empty clusters. This PR fixes the flag by fixing it to true. Also, `has_nulls` has been renamed to a more descriptive name, `may_have_empty_clusters`. The tdigest reduce does not have the same issue as it does not call `merge_tdigests()`. Authors: - Jihoon Son (https://github.com/jihoonson) - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/16675 --- cpp/include/cudf/detail/tdigest/tdigest.hpp | 17 ++-- cpp/include/cudf_test/tdigest_utilities.cuh | 20 ++--- cpp/src/quantiles/tdigest/tdigest.cu | 23 ++--- .../quantiles/tdigest/tdigest_aggregation.cu | 70 +++++++++------ cpp/tests/groupby/tdigest_tests.cu | 90 +++++++++++++++++-- .../quantiles/percentile_approx_test.cpp | 4 +- 6 files changed, 162 insertions(+), 62 deletions(-) diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 80a4460023f..672b95e2d01 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,28 +143,29 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest column. + * @brief Create a tdigest column of empty clusters. * - * An empty tdigest column contains a single row of length 0 + * The column created contains the specified number of rows of empty clusters. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest column. + * @returns A tdigest column of empty clusters. */ CUDF_EXPORT -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest scalar. + * @brief Create a scalar of an empty tdigest cluster. * - * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 + * The returned scalar is a struct_scalar that contains a single row of an empty cluster. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest scalar. + * @returns A scalar of an empty tdigest cluster. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index 1758790cd64..be7d19b2227 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 0d017cf1f13..76cd55bf994 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -292,32 +292,33 @@ std::unique_ptr make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto min_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto max_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(1, - make_empty_column(type_id::FLOAT64), - make_empty_column(type_id::FLOAT64), + return make_tdigest_column(num_rows, + cudf::make_empty_column(type_id::FLOAT64), + cudf::make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -338,7 +339,7 @@ std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_empty_tdigest_column(stream, mr)->release(); + auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..d591fb5c171 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param has_nulls Whether or not the input contains nulls - * + * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be + * set to false when there is no empty cluster, true otherwise. */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool has_nulls) + bool may_have_empty_clusters) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // if the input contains nulls we can potentially have a group that generates no - // clusters because -all- of the input values are null. in that case, the reduce_by_key call - // in the tdigest generation step will need a location to store the unused reduction value for - // that group of nulls. these "stubs" will be postprocessed out afterwards. - if (has_nulls) { group_num_clusters[group_index] = 1; } + // If the input contains empty clusters, we can potentially have a group that also generates + // empty clusters because -all- of the input values are null or empty cluster. In that case, the + // `reduce_by_key` call in the tdigest generation step will need a location to store the unused + // reduction value for that group of nulls and empty clusters. These "stubs" will be + // postprocessed out afterwards. + if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } return; } @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param has_nulls Whether or not the input data contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - has_nulls); + may_have_empty_clusters); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - has_nulls); + may_have_empty_clusters); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -580,7 +582,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -595,7 +597,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!has_nulls) { return 0; } + if (!may_have_empty_clusters) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -661,6 +663,10 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } +/** + * @brief A functor which returns the cluster index within a group that the value at + * the given value index falls into. + */ template struct compute_tdigests_keys_fn { int const delta; @@ -706,8 +712,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param values_begin Beginning of the range of input values. - * @param values_end End of the range of input values. + * @param centroids_begin Beginning of the range of centroids. + * @param centroids_end End of the range of centroids. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -715,7 +721,8 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param has_nulls Whether or not the input contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -731,7 +738,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -750,7 +757,9 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (total_clusters == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -793,7 +802,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - has_nulls, + may_have_empty_clusters, stream, mr); } @@ -1145,8 +1154,13 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto merged_weights = merged->get_column(1).view(); + // If there are no values, we can simply return a column that has only empty tdigests. + if (merged_weights.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); + } + // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1161,6 +1175,10 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; + // We do not know whether there is any empty cluster in the input without actually reading the + // data, which could be expensive. So, we just assume that there could be empty clusters. + auto const may_have_empty_clusters = true; + // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1177,7 +1195,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - false, + may_have_empty_clusters, stream, mr); @@ -1202,7 +1220,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - false, + may_have_empty_clusters, stream, mr); } @@ -1267,7 +1285,9 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (col.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1293,7 +1313,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index baa59026b07..3780dbb1d95 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,3 +507,81 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } + +std::unique_ptr do_agg( + cudf::column_view key, + cudf::column_view val, + std::function()> make_agg) +{ + std::vector keys; + keys.push_back(key); + cudf::table_view const key_table(keys); + + cudf::groupby::groupby gb(key_table); + std::vector requests; + cudf::groupby::aggregation_request req; + req.values = val; + req.aggregations.push_back(make_agg()); + requests.push_back(std::move(req)); + + auto result = gb.aggregate(std::move(requests)); + + std::vector> result_columns; + for (auto&& c : result.first->release()) { + result_columns.push_back(std::move(c)); + } + + EXPECT_EQ(result.second.size(), 1); + EXPECT_EQ(result.second[0].results.size(), 1); + result_columns.push_back(std::move(result.second[0].results[0])); + + return std::make_unique(std::move(result_columns)); +} + +TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) +{ + // The input must be sorted by the key. + // See `aggregate_result_functor::operator()` for details. + auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; + auto const keys_view = cudf::column_view(keys); + auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); + auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + // All values are null + return false; + }); + auto const vals = cudf::test::fixed_width_column_wrapper{ + val_elems, val_elems + keys_view.size(), val_valids}; + + auto const delta = 10000; + + // Compute tdigest. The result should have 3 empty clusters, one per group. + auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { + return cudf::make_tdigest_aggregation(delta); + }); + + auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_computed_keys_view{expected_computed_keys}; + auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_computed_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); + // The computed values are nullable even though the input values are not. + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), + compute_result->get_column(1).view()); + + // Merge tdigest. The result should have 3 empty clusters, one per group. + auto const merge_result = + do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { + return cudf::make_merge_tdigest_aggregation(delta); + }); + + auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_merged_keys_view{expected_merged_keys}; + auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_merged_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); +} diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 915717713df..7359f0406fc 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input;