From fda951867a93a5f649fc7689637bd6400a19128a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Sep 2024 14:34:15 -0700 Subject: [PATCH] Revert "Fix empty cluster handling in tdigest merge (#16675)" This reverts commit 5192b885bba82039823da687bc0a013ee74566a7. --- cpp/include/cudf/detail/tdigest/tdigest.hpp | 17 ++-- cpp/include/cudf_test/tdigest_utilities.cuh | 20 ++--- cpp/src/quantiles/tdigest/tdigest.cu | 23 +++-- .../quantiles/tdigest/tdigest_aggregation.cu | 70 ++++++--------- cpp/tests/groupby/tdigest_tests.cu | 90 ++----------------- .../quantiles/percentile_approx_test.cpp | 4 +- 6 files changed, 62 insertions(+), 162 deletions(-) diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 672b95e2d01..80a4460023f 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,29 +143,28 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create a tdigest column of empty clusters. + * @brief Create an empty tdigest column. * - * The column created contains the specified number of rows of empty clusters. + * An empty tdigest column contains a single row of length 0 * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns A tdigest column of empty clusters. + * @returns An empty tdigest column. */ CUDF_EXPORT -std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create a scalar of an empty tdigest cluster. + * @brief Create an empty tdigest scalar. * - * The returned scalar is a struct_scalar that contains a single row of an empty cluster. + * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns A scalar of an empty tdigest cluster. + * @returns An empty tdigest scalar. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index be7d19b2227..1758790cd64 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 76cd55bf994..0d017cf1f13 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -292,33 +292,32 @@ std::unique_ptr make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = cudf::make_numeric_column( - data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); + auto min_col = + cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = cudf::make_numeric_column( - data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); + auto max_col = + cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(num_rows, - cudf::make_empty_column(type_id::FLOAT64), - cudf::make_empty_column(type_id::FLOAT64), + return make_tdigest_column(1, + make_empty_column(type_id::FLOAT64), + make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -339,7 +338,7 @@ std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); + auto contents = make_empty_tdigest_column(stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index d591fb5c171..2dd25a7b890 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be - * set to false when there is no empty cluster, true otherwise. + * @param has_nulls Whether or not the input contains nulls + * */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool may_have_empty_clusters) + bool has_nulls) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,12 +399,11 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // If the input contains empty clusters, we can potentially have a group that also generates - // empty clusters because -all- of the input values are null or empty cluster. In that case, the - // `reduce_by_key` call in the tdigest generation step will need a location to store the unused - // reduction value for that group of nulls and empty clusters. These "stubs" will be - // postprocessed out afterwards. - if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } + // if the input contains nulls we can potentially have a group that generates no + // clusters because -all- of the input values are null. in that case, the reduce_by_key call + // in the tdigest generation step will need a location to store the unused reduction value for + // that group of nulls. these "stubs" will be postprocessed out afterwards. + if (has_nulls) { group_num_clusters[group_index] = 1; } return; } @@ -503,8 +502,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be - * set to false only when there is no empty cluster. + * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -518,7 +516,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -537,7 +535,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - may_have_empty_clusters); + has_nulls); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -569,7 +567,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - may_have_empty_clusters); + has_nulls); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -582,7 +580,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -597,7 +595,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!may_have_empty_clusters) { return 0; } + if (!has_nulls) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -663,10 +661,6 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } -/** - * @brief A functor which returns the cluster index within a group that the value at - * the given value index falls into. - */ template struct compute_tdigests_keys_fn { int const delta; @@ -712,8 +706,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param centroids_begin Beginning of the range of centroids. - * @param centroids_end End of the range of centroids. + * @param values_begin Beginning of the range of input values. + * @param values_end End of the range of input values. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -721,8 +715,7 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be - * set to false only when there is no empty cluster. + * @param has_nulls Whether or not the input contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -738,7 +731,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -757,9 +750,7 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); - } + if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -802,7 +793,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - may_have_empty_clusters, + has_nulls, stream, mr); } @@ -1154,13 +1145,8 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); - auto merged_weights = merged->get_column(1).view(); - // If there are no values, we can simply return a column that has only empty tdigests. - if (merged_weights.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); - } - // generate cumulative weights + auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1175,10 +1161,6 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; - // We do not know whether there is any empty cluster in the input without actually reading the - // data, which could be expensive. So, we just assume that there could be empty clusters. - auto const may_have_empty_clusters = true; - // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1195,7 +1177,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - may_have_empty_clusters, + false, stream, mr); @@ -1220,7 +1202,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - may_have_empty_clusters, + false, stream, mr); } @@ -1285,9 +1267,7 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); - } + if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1313,7 +1293,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index 3780dbb1d95..baa59026b07 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,81 +507,3 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } - -std::unique_ptr do_agg( - cudf::column_view key, - cudf::column_view val, - std::function()> make_agg) -{ - std::vector keys; - keys.push_back(key); - cudf::table_view const key_table(keys); - - cudf::groupby::groupby gb(key_table); - std::vector requests; - cudf::groupby::aggregation_request req; - req.values = val; - req.aggregations.push_back(make_agg()); - requests.push_back(std::move(req)); - - auto result = gb.aggregate(std::move(requests)); - - std::vector> result_columns; - for (auto&& c : result.first->release()) { - result_columns.push_back(std::move(c)); - } - - EXPECT_EQ(result.second.size(), 1); - EXPECT_EQ(result.second[0].results.size(), 1); - result_columns.push_back(std::move(result.second[0].results[0])); - - return std::make_unique(std::move(result_columns)); -} - -TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) -{ - // The input must be sorted by the key. - // See `aggregate_result_functor::operator()` for details. - auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; - auto const keys_view = cudf::column_view(keys); - auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); - auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { - // All values are null - return false; - }); - auto const vals = cudf::test::fixed_width_column_wrapper{ - val_elems, val_elems + keys_view.size(), val_valids}; - - auto const delta = 10000; - - // Compute tdigest. The result should have 3 empty clusters, one per group. - auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { - return cudf::make_tdigest_aggregation(delta); - }); - - auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; - cudf::column_view const expected_computed_keys_view{expected_computed_keys}; - auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - expected_computed_keys_view.size(), - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); - // The computed values are nullable even though the input values are not. - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), - compute_result->get_column(1).view()); - - // Merge tdigest. The result should have 3 empty clusters, one per group. - auto const merge_result = - do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { - return cudf::make_merge_tdigest_aggregation(delta); - }); - - auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; - cudf::column_view const expected_merged_keys_view{expected_merged_keys}; - auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - expected_merged_keys_view.size(), - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); -} diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 7359f0406fc..915717713df 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input;