Skip to content

Commit

Permalink
Fix empty cluster handling in tdigest merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jihoonson committed Sep 24, 2024
1 parent f8db575 commit 182843a
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 50 deletions.
17 changes: 9 additions & 8 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,29 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest column.
* @brief Create a tdigest column of empty clusters.
*
* An empty tdigest column contains a single row of length 0
* The column created contains the specified number of rows of empty clusters.
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest column.
* @returns A tdigest column of empty clusters.
*/
CUDF_EXPORT
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create an empty tdigest scalar.
* @brief Create a scalar of an empty tdigest cluster.
*
* An empty tdigest scalar is a struct_scalar that contains a single row of length 0
* The returned scalar is a struct_scalar that contains a single row of an empty cluster.
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns An empty tdigest scalar.
* @returns A scalar of an empty tdigest cluster.
*/
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
23 changes: 12 additions & 11 deletions cpp/src/quantiles/tdigest/tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,33 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr);
}

std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto offsets = cudf::make_fixed_width_column(
data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr);
data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
offsets->mutable_view().begin<size_type>(),
offsets->mutable_view().end<size_type>(),
0);

auto min_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto min_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
min_col->mutable_view().begin<double>(),
min_col->mutable_view().end<double>(),
0);
auto max_col =
cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr);
auto max_col = cudf::make_numeric_column(
data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr);
thrust::fill(rmm::exec_policy(stream),
max_col->mutable_view().begin<double>(),
max_col->mutable_view().end<double>(),
0);

return make_tdigest_column(1,
make_empty_column(type_id::FLOAT64),
make_empty_column(type_id::FLOAT64),
return make_tdigest_column(num_rows,
cudf::make_empty_column(type_id::FLOAT64),
cudf::make_empty_column(type_id::FLOAT64),
std::move(offsets),
std::move(min_col),
std::move(max_col),
Expand All @@ -338,7 +339,7 @@ std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto contents = make_empty_tdigest_column(stream, mr)->release();
auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release();
return std::make_unique<struct_scalar>(
std::move(*std::make_unique<table>(std::move(contents.children))), true, stream, mr);
}
Expand Down
84 changes: 71 additions & 13 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ std::unique_ptr<scalar> to_tdigest_scalar(std::unique_ptr<column>&& tdigest,
* @param group_num_clusters Output. The number of output clusters for each input group.
* @param group_cluster_offsets Offsets per-group to the start of it's clusters
* @param has_nulls Whether or not the input contains nulls
*
*/

template <typename GroupInfo, typename NearestWeightFunc, typename CumulativeWeight>
Expand Down Expand Up @@ -661,6 +660,10 @@ std::unique_ptr<column> build_output_column(size_type num_rows,
mr);
}

/**
* @brief A functor which returns the cluster index within a group that the value at
* the given value index falls into.
*/
template <typename CumulativeWeight>
struct compute_tdigests_keys_fn {
int const delta;
Expand Down Expand Up @@ -706,8 +709,8 @@ struct compute_tdigests_keys_fn {
* boundaries.
*
* @param delta tdigest compression level
* @param values_begin Beginning of the range of input values.
* @param values_end End of the range of input values.
* @param centroids_begin Beginning of the range of centroids.
* @param centroids_end End of the range of centroids.
* @param cumulative_weight Functor which returns cumulative weight and group information for
* an absolute input value index.
* @param min_col Column containing the minimum value per group.
Expand Down Expand Up @@ -750,7 +753,9 @@ std::unique_ptr<column> compute_tdigests(int delta,
// double // max
// }
//
if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (total_clusters == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

// each input group represents an individual tdigest. within each tdigest, we want the keys
// to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall
Expand Down Expand Up @@ -1021,6 +1026,38 @@ struct group_key_func {
}
};

/**
* @brief Perform a merge aggregation of tdigests. This function usually takes the input as the
* outputs of multiple `typed_group_tdigest` calls, and merges them.
*
* A tdigest cluster can be empty in the input, which means that there was no valid input data to
* generate that cluster. These empty clusters are currently stored differently in different parts
* of the tdigest column. They are "compressed" in the `means`, `weights`, and `offsets` columns,
* but not in the `min` and `max` columns.
* - The `means` and `weights` columns do not contain values for empty clusters.
* - The empty clusters are stored as two consecutive same values. For example, given an offsets
* column of (0, 1, 1, 2), the second cluster where its offset is 1 is empty. Note that the offsets
* are the offsets for the means and the weights.
* - The `min` and `max` columns contain 0s for the empty clusters.
*
* @param tdv input tdigests. These should have been sorted by the key, but may have not by the
* centroid mean within each group.
* @param h_outer_offsets a host iterator of the offsets to the start of each group. A group is
* counted as one even when the cluster is empty in it. The offsets should have the same values as
* the ones in `group_offsets`.
* @param group_offsets a device iterator of the offsets to the start of each group. A group is
* counted as one even when the cluster is empty in it. The offsets should have the same values as
* the ones in `h_outer_offsets`.
* @param group_labels a device iterator of the the group label for each tdigest cluster including
* empty clusters.
* @param num_group_labels the number of unique group labels.
* @param num_groups the number of groups.
* @param max_centroids the maximum number of centroids (clusters) in the tdigest.
* @param stream CUDA stream
* @param mr device memory resource
*
* @return A column containing the merged tdigests.
*/
template <typename HGroupOffsetIter, typename GroupOffsetIter, typename GroupLabelIter>
std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
HGroupOffsetIter h_outer_offsets,
Expand All @@ -1032,10 +1069,17 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an
// algorithm like a super-merge that takes two layers of keys: one which identifies the outer
// grouping of tdigests, and one which identifies the inner groupings of the tdigests within the
// outer groups.
// Sort the tdigests by the centroid mean within each group and then pass them to
// `compute_tdigests()`. Note that the input has been sorted by the key, but has not by the mean
// yet within each group. For sorting by the key, see
// `store_result_functor::get_grouped_values()`.
//
// NOTE: the current implementation is quite complex and involves offset copy from the device to
// the host. `thrust::merge` and `thrust::merge_by_key` don't provide what we need. What we would
// need is an algorithm like a super-merge that takes two layers of keys: one which identifies the
// outer grouping of tdigests, and one which identifies the inner groupings of the tdigests within
// the outer groups.
//
// TODO: investigate replacing the iterative merge with a single stable_sort_by_key.

// bring tdigest offsets back to the host
Expand Down Expand Up @@ -1076,7 +1120,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
return cudf::detail::slice(tdigests_unsliced, {start, end}, stream);
});

// merge
// merge sorted
return cudf::detail::merge(unmerged_tdigests,
{0},
{order::ASCENDING},
Expand Down Expand Up @@ -1161,6 +1205,10 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

auto const delta = max_centroids;

// TDigest merge takes the output of typed_group_tdigest as its input, which must not have
// any nulls.
auto const has_nulls = false;

// generate cluster info
auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info(
delta,
Expand All @@ -1177,7 +1225,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
false,
has_nulls,
stream,
mr);

Expand All @@ -1202,7 +1250,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_cluster_wl,
std::move(group_cluster_offsets),
total_clusters,
false,
has_nulls,
stream,
mr);
}
Expand Down Expand Up @@ -1267,7 +1315,9 @@ std::unique_ptr<column> group_tdigest(column_view const& col,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); }
if (col.size() == 0) {
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

auto const delta = max_centroids;
return cudf::type_dispatcher(col.type(),
Expand All @@ -1293,7 +1343,15 @@ std::unique_ptr<column> group_merge_tdigest(column_view const& input,
tdigest_column_view tdv(input);

if (num_groups == 0 || input.size() == 0) {
return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr);
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr);
}

if (tdv.means().size() == 0) {
// `group_merge_tdigest` takes the output of `typed_group_tdigest` as its input, which wipes
// out the means and weights for empty clusters. Thus, no mean here indicates that all clusters
// are empty in the input. Let's skip all complex computation in the below, but just return
// an empty tdigest per group.
return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr);
}

// bring group offsets back to the host
Expand Down
Loading

0 comments on commit 182843a

Please sign in to comment.