diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 80a4460023f..672b95e2d01 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,28 +143,29 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest column. + * @brief Create a tdigest column of empty clusters. * - * An empty tdigest column contains a single row of length 0 + * The column created contains the specified number of rows of empty clusters. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest column. + * @returns A tdigest column of empty clusters. */ CUDF_EXPORT -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest scalar. + * @brief Create a scalar of an empty tdigest cluster. * - * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 + * The returned scalar is a struct_scalar that contains a single row of an empty cluster. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest scalar. + * @returns A scalar of an empty tdigest cluster. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index 1758790cd64..be7d19b2227 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 0d017cf1f13..76cd55bf994 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -292,32 +292,33 @@ std::unique_ptr make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto min_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto max_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(1, - make_empty_column(type_id::FLOAT64), - make_empty_column(type_id::FLOAT64), + return make_tdigest_column(num_rows, + cudf::make_empty_column(type_id::FLOAT64), + cudf::make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -338,7 +339,7 @@ std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_empty_tdigest_column(stream, mr)->release(); + auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..d591fb5c171 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param has_nulls Whether or not the input contains nulls - * + * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be + * set to false when there is no empty cluster, true otherwise. */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool has_nulls) + bool may_have_empty_clusters) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // if the input contains nulls we can potentially have a group that generates no - // clusters because -all- of the input values are null. in that case, the reduce_by_key call - // in the tdigest generation step will need a location to store the unused reduction value for - // that group of nulls. these "stubs" will be postprocessed out afterwards. - if (has_nulls) { group_num_clusters[group_index] = 1; } + // If the input contains empty clusters, we can potentially have a group that also generates + // empty clusters because -all- of the input values are null or empty cluster. In that case, the + // `reduce_by_key` call in the tdigest generation step will need a location to store the unused + // reduction value for that group of nulls and empty clusters. These "stubs" will be + // postprocessed out afterwards. + if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } return; } @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param has_nulls Whether or not the input data contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - has_nulls); + may_have_empty_clusters); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - has_nulls); + may_have_empty_clusters); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -580,7 +582,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -595,7 +597,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!has_nulls) { return 0; } + if (!may_have_empty_clusters) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -661,6 +663,10 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } +/** + * @brief A functor which returns the cluster index within a group that the value at + * the given value index falls into. + */ template struct compute_tdigests_keys_fn { int const delta; @@ -706,8 +712,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param values_begin Beginning of the range of input values. - * @param values_end End of the range of input values. + * @param centroids_begin Beginning of the range of centroids. + * @param centroids_end End of the range of centroids. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -715,7 +721,8 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param has_nulls Whether or not the input contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -731,7 +738,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -750,7 +757,9 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (total_clusters == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -793,7 +802,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - has_nulls, + may_have_empty_clusters, stream, mr); } @@ -1145,8 +1154,13 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto merged_weights = merged->get_column(1).view(); + // If there are no values, we can simply return a column that has only empty tdigests. + if (merged_weights.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); + } + // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1161,6 +1175,10 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; + // We do not know whether there is any empty cluster in the input without actually reading the + // data, which could be expensive. So, we just assume that there could be empty clusters. + auto const may_have_empty_clusters = true; + // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1177,7 +1195,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - false, + may_have_empty_clusters, stream, mr); @@ -1202,7 +1220,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - false, + may_have_empty_clusters, stream, mr); } @@ -1267,7 +1285,9 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (col.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1293,7 +1313,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index baa59026b07..3780dbb1d95 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,3 +507,81 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } + +std::unique_ptr do_agg( + cudf::column_view key, + cudf::column_view val, + std::function()> make_agg) +{ + std::vector keys; + keys.push_back(key); + cudf::table_view const key_table(keys); + + cudf::groupby::groupby gb(key_table); + std::vector requests; + cudf::groupby::aggregation_request req; + req.values = val; + req.aggregations.push_back(make_agg()); + requests.push_back(std::move(req)); + + auto result = gb.aggregate(std::move(requests)); + + std::vector> result_columns; + for (auto&& c : result.first->release()) { + result_columns.push_back(std::move(c)); + } + + EXPECT_EQ(result.second.size(), 1); + EXPECT_EQ(result.second[0].results.size(), 1); + result_columns.push_back(std::move(result.second[0].results[0])); + + return std::make_unique(std::move(result_columns)); +} + +TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) +{ + // The input must be sorted by the key. + // See `aggregate_result_functor::operator()` for details. + auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; + auto const keys_view = cudf::column_view(keys); + auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); + auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + // All values are null + return false; + }); + auto const vals = cudf::test::fixed_width_column_wrapper{ + val_elems, val_elems + keys_view.size(), val_valids}; + + auto const delta = 10000; + + // Compute tdigest. The result should have 3 empty clusters, one per group. + auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { + return cudf::make_tdigest_aggregation(delta); + }); + + auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_computed_keys_view{expected_computed_keys}; + auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_computed_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); + // The computed values are nullable even though the input values are not. + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), + compute_result->get_column(1).view()); + + // Merge tdigest. The result should have 3 empty clusters, one per group. + auto const merge_result = + do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { + return cudf::make_merge_tdigest_aggregation(delta); + }); + + auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_merged_keys_view{expected_merged_keys}; + auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_merged_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); +} diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 915717713df..7359f0406fc 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input;