Skip to content

Commit

Permalink
Fix empty cluster handling in tdigest merge (rapidsai#16675)
Browse files Browse the repository at this point in the history
This PR fixes an edge case bug in the tdigest merge. When there are multiple distinct keys but all values are empty clusters, the value column is currently merged into a single empty cluster after merge, which leads to an error while creating a result table because of the mismatching number of rows in the key and value columns. This bug can be reproduced only when all values are empty clusters. If some values are empty but some are not, the current implementation returns a valid result. This bug was originally reported in NVIDIA/spark-rapids#11367.

The bug exists in `merge_tdigests()` as it assumes that there is no empty cluster in the merge stage even when there are (`has_nulls` are fixed to `false`). It is rather safe to assume that always there could be empty clusters. This PR fixes the flag by fixing it to true. Also, `has_nulls` has been renamed to a more descriptive name, `may_have_empty_clusters`. 

The tdigest reduce does not have the same issue as it does not call `merge_tdigests()`.

Authors:
  - Jihoon Son (https://github.com/jihoonson)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - https://github.com/nvdbaranec

URL: rapidsai#16675
  • Loading branch information
jihoonson committed Sep 23, 2024
1 parent 3dbc33a commit 8c839bc
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 62 deletions.
17 changes: 9 additions & 8 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,29 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest column.
* @brief Create a tdigest column of empty clusters.
*
* An empty tdigest column contains a single row of length 0
* The column created contains the specified number of rows of empty clusters.
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest column.
* @returns A tdigest column of empty clusters.
*/
CUDF_EXPORT
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest scalar.
* @brief Create a scalar of an empty tdigest cluster.
*
* An empty tdigest scalar is a struct_scalar that contains a single row of length 0
* The returned scalar is a struct_scalar that contains a single row of an empty cluster.
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest scalar.
* @returns A scalar of an empty tdigest cluster.
*/
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/quantiles/tdigest/tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,33 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr);
}

std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets = cudf::make_fixed_width_column(
data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr);
data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
offsets->mutable_view().begin<size_type>(),
offsets->mutable_view().end<size_type>(),
0);

auto min_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto min_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
min_col->mutable_view().begin<double>(),
min_col->mutable_view().end<double>(),
0);
auto max_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto max_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
max_col->mutable_view().begin<double>(),
max_col->mutable_view().end<double>(),
0);

return make_tdigest_column(1,
make_empty_column(type_id::FLOAT64),
make_empty_column(type_id::FLOAT64),
return make_tdigest_column(num_rows,
cudf::make_empty_column(type_id::FLOAT64),
cudf::make_empty_column(type_id::FLOAT64),
std::move(offsets),
std::move(min_col),
std::move(max_col),
Expand All @@ -338,7 +339,7 @@ std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto contents = make_empty_tdigest_column(stream, mr)->release();
auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release();
return std::make_unique<struct_scalar>(
std::move(*std::make_unique<table>(std::move(contents.children))), true, stream, mr);
}
Expand Down
70 changes: 45 additions & 25 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ std::unique_ptr<scalar> to_tdigest_scalar(std::unique_ptr<column>&& tdigest,
* @param group_cluster_wl Output. The set of cluster weight limits for each group.
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
* @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be
* set to false when there is no empty cluster, true otherwise.
*/

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
Expand All @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
double* group_cluster_wl,
size_type* group_num_clusters,
size_type const* group_cluster_offsets,
bool has_nulls)
bool may_have_empty_clusters)
{
int const tid = threadIdx.x + blockIdx.x * blockDim.x;

Expand All @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
// a group with nothing in it.
group_num_clusters[group_index] = 0;
if (total_weight <= 0) {
// if the input contains nulls we can potentially have a group that generates no
// clusters because -all- of the input values are null. in that case, the reduce_by_key call
// in the tdigest generation step will need a location to store the unused reduction value for
// that group of nulls. these "stubs" will be postprocessed out afterwards.
if (has_nulls) { group_num_clusters[group_index] = 1; }
// If the input contains empty clusters, we can potentially have a group that also generates
// empty clusters because -all- of the input values are null or empty cluster. In that case, the
// `reduce_by_key` call in the tdigest generation step will need a location to store the unused
// reduction value for that group of nulls and empty clusters. These "stubs" will be
// postprocessed out afterwards.
if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; }
return;
}

Expand Down Expand Up @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta,
* stream that falls before our current cluster limit
* @param group_info A functor which returns the info for the specified group (total weight,
* size and start offset)
* @param has_nulls Whether or not the input data contains nulls
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta,
NearestWeight nearest_weight,
GroupInfo group_info,
CumulativeWeight cumulative_weight,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta,
nullptr,
group_num_clusters.begin(),
nullptr,
has_nulls);
may_have_empty_clusters);

// generate group cluster offsets (where the clusters for a given group start and end)
auto group_cluster_offsets = cudf::make_numeric_column(
Expand Down Expand Up @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta,
group_cluster_wl.begin(),
group_num_clusters.begin(),
group_cluster_offsets->view().begin<size_type>(),
has_nulls);
may_have_empty_clusters);

return {std::move(group_cluster_wl),
std::move(group_cluster_offsets),
Expand All @@ -580,7 +582,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
std::unique_ptr<column>&& offsets,
std::unique_ptr<column>&& min_col,
std::unique_ptr<column>&& max_col,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -595,7 +597,7 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; };

size_type const num_stubs = [&]() {
if (!has_nulls) { return 0; }
if (!may_have_empty_clusters) { return 0; }
auto iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_type>(is_stub_digest));
return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows);
Expand Down Expand Up @@ -661,6 +663,10 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
mr);
}

/**
* @brief A functor which returns the cluster index within a group that the value at
* the given value index falls into.
*/
template <typename CumulativeWeight>
struct compute_tdigests_keys_fn {
int const delta;
Expand Down Expand Up @@ -706,16 +712,17 @@ struct compute_tdigests_keys_fn {
* boundaries.
*
* @param delta tdigest compression level
* @param values_begin Beginning of the range of input values.
* @param values_end End of the range of input values.
* @param centroids_begin Beginning of the range of centroids.
* @param centroids_end End of the range of centroids.
* @param cumulative_weight Functor which returns cumulative weight and group information for
* an absolute input value index.
* @param min_col Column containing the minimum value per group.
* @param max_col Column containing the maximum value per group.
* @param group_cluster_wl Cluster weight limits for each group.
* @param group_cluster_offsets R-value reference of offsets into the cluster weight limits.
* @param total_clusters Total number of clusters in all groups.
* @param has_nulls Whether or not the input contains nulls
* @param may_have_empty_clusters Whether or not there could be empty clusters. It should be
* set to false only when there is no empty cluster.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
*
Expand All @@ -731,7 +738,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
rmm::device_uvector<double> const& group_cluster_wl,
std::unique_ptr<column>&& group_cluster_offsets,
size_type total_clusters,
bool has_nulls,
bool may_have_empty_clusters,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -750,7 +757,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
// double // max
// }
//
if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (total_clusters == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

// each input group represents an individual tdigest. within each tdigest, we want the keys
// to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall
Expand Down Expand Up @@ -793,7 +802,7 @@ std::unique_ptr<column> compute_tdigests(int delta,
std::move(group_cluster_offsets),
std::move(min_col),
std::move(max_col),
has_nulls,
may_have_empty_clusters,
stream,
mr);
}
Expand Down Expand Up @@ -1145,8 +1154,13 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
auto merged =
cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref());

auto merged_weights = merged->get_column(1).view();
// If there are no values, we can simply return a column that has only empty tdigests.
if (merged_weights.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr);
}

// generate cumulative weights
auto merged_weights = merged->get_column(1).view();
auto cumulative_weights = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream);
auto keys = cudf::detail::make_counting_transform_iterator(
Expand All @@ -1161,6 +1175,10 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

auto const delta = max_centroids;

// We do not know whether there is any empty cluster in the input without actually reading the
// data, which could be expensive. So, we just assume that there could be empty clusters.
auto const may_have_empty_clusters = true;

// generate cluster info
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
Expand All @@ -1177,7 +1195,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
false,
may_have_empty_clusters,
stream,
mr);

Expand All @@ -1202,7 +1220,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
false,
may_have_empty_clusters,
stream,
mr);
}
Expand Down Expand Up @@ -1267,7 +1285,9 @@ std::unique_ptr<column> group_tdigest(column_view const& col,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (col.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

auto const delta = max_centroids;
return cudf::type_dispatcher(col.type(),
Expand All @@ -1293,7 +1313,7 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
tdigest_column_view tdv(input);

if (num_groups == 0 || input.size() == 0) {
return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr);
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

// bring group offsets back to the host
Expand Down
Loading

0 comments on commit 8c839bc

Please sign in to comment.