Skip to content

Commit

Permalink
add more docs and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jihoonson committed Sep 24, 2024
1 parent 182843a commit 0cb319e
Showing 1 changed file with 70 additions and 49 deletions.
119 changes: 70 additions & 49 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,19 @@ struct nearest_value_scalar_weights {
*/
template <typename GroupOffsetsIter>
struct nearest_value_centroid_weights {
double const* cumulative_weights;
GroupOffsetsIter outer_offsets; // groups
size_type const* inner_offsets; // tdigests within a group
double const* cumulative_weights; // cumulative weights of non-empty clusters
GroupOffsetsIter group_offsets; // groups
size_type const* tdigest_offsets; // tdigests within a group

thrust::pair<double, int> operator() __device__(double next_limit, size_type group_index) const
{
auto const tdigest_begin = outer_offsets[group_index];
auto const tdigest_end = outer_offsets[group_index + 1];
auto const num_weights = inner_offsets[tdigest_end] - inner_offsets[tdigest_begin];
auto const tdigest_begin = group_offsets[group_index];
auto const tdigest_end = group_offsets[group_index + 1];
auto const num_weights = tdigest_offsets[tdigest_end] - tdigest_offsets[tdigest_begin];
// NOTE: as it is today, this functor will never be called for any digests that are empty, but
// I'll leave this check here for safety.
if (num_weights == 0) { return thrust::pair<double, int>{0, 0}; }
double const* group_cumulative_weights = cumulative_weights + inner_offsets[tdigest_begin];
double const* group_cumulative_weights = cumulative_weights + tdigest_offsets[tdigest_begin];

auto const index = ((thrust::lower_bound(thrust::seq,
group_cumulative_weights,
Expand Down Expand Up @@ -235,21 +235,26 @@ struct cumulative_scalar_weight {
*/
template <typename GroupLabelsIter, typename GroupOffsetsIter>
struct cumulative_centroid_weight {
double const* cumulative_weights;
GroupLabelsIter group_labels;
GroupOffsetsIter outer_offsets; // groups
cudf::device_span<size_type const> inner_offsets; // tdigests with a group

double const* cumulative_weights; // cumulative weights of non-empty clusters
GroupLabelsIter group_labels; // group labels for each tdigest including empty ones
GroupOffsetsIter group_offsets; // groups
cudf::device_span<size_type const> tdigest_offsets; // tdigests with a group

/**
* @brief Returns the cumulative weight for a given value index. The index `n` is the index of
* `n`-th non-empty cluster.
*/
std::tuple<size_type, size_type, double> operator() __device__(size_type value_index) const
{
auto const tdigest_index =
static_cast<size_type>(
thrust::upper_bound(thrust::seq, inner_offsets.begin(), inner_offsets.end(), value_index) -
inner_offsets.begin()) -
thrust::upper_bound(
thrust::seq, tdigest_offsets.begin(), tdigest_offsets.end(), value_index) -
tdigest_offsets.begin()) -
1;
auto const group_index = group_labels[tdigest_index];
auto const first_tdigest_index = outer_offsets[group_index];
auto const first_weight_index = inner_offsets[first_tdigest_index];
auto const first_tdigest_index = group_offsets[group_index];
auto const first_weight_index = tdigest_offsets[first_tdigest_index];
auto const relative_value_index = value_index - first_weight_index;
double const* group_cumulative_weights = cumulative_weights + first_weight_index;

Expand Down Expand Up @@ -284,15 +289,15 @@ struct scalar_group_info {
// retrieve group info of centroid inputs by group index
template <typename GroupOffsetsIter>
struct centroid_group_info {
double const* cumulative_weights;
GroupOffsetsIter outer_offsets;
size_type const* inner_offsets;
double const* cumulative_weights; // cumulative weights of non-empty clusters
GroupOffsetsIter group_offsets;
size_type const* tdigest_offsets;

__device__ thrust::tuple<double, size_type, size_type> operator()(size_type group_index) const
{
// if there's no weights in this group of digests at all, return 0.
auto const group_start = inner_offsets[outer_offsets[group_index]];
auto const group_end = inner_offsets[outer_offsets[group_index + 1]];
auto const group_start = tdigest_offsets[group_offsets[group_index]];
auto const group_end = tdigest_offsets[group_offsets[group_index + 1]];
auto const num_weights = group_end - group_start;
auto const last_weight_index = group_end - 1;
return num_weights == 0
Expand Down Expand Up @@ -988,38 +993,54 @@ struct typed_reduce_tdigest {
}
};

// utility for merge_tdigests.
/**
* @brief Functor to compute the number of clusters in each group.
*
* Used in `merge_tdigests`.
*/
template <typename GroupOffsetsIter>
struct group_num_weights_func {
GroupOffsetsIter outer_offsets;
size_type const* inner_offsets;
struct group_num_clusters_func {
GroupOffsetsIter group_offsets;
size_type const* tdigest_offsets;

__device__ size_type operator()(size_type group_index)
{
auto const tdigest_begin = outer_offsets[group_index];
auto const tdigest_end = outer_offsets[group_index + 1];
return inner_offsets[tdigest_end] - inner_offsets[tdigest_begin];
auto const tdigest_begin = group_offsets[group_index];
auto const tdigest_end = group_offsets[group_index + 1];
return tdigest_offsets[tdigest_end] - tdigest_offsets[tdigest_begin];
}
};

// utility for merge_tdigests.
/**
* @brief Function to determine if a group is empty.
*
* Used in `merge_tdigests`.
*/
struct group_is_empty {
__device__ bool operator()(size_type group_size) { return group_size == 0; }
};

// utility for merge_tdigests.
/**
* @brief Functor that returns the grouping key for each tdigest cluster.
*
* Used in `merge_tdigests`.
*/
template <typename GroupLabelsIter>
struct group_key_func {
GroupLabelsIter group_labels;
size_type const* inner_offsets;
size_type num_inner_offsets;
size_type const* tdigest_offsets;
size_type num_tdigest_offsets;

/**
* @brief Returns the group index for an absolute cluster index. The index `n` is the index of the
* `n`-th non-empty cluster.
*/
__device__ size_type operator()(size_type index)
{
// what -original- tdigest index this absolute index corresponds to
auto const iter = thrust::prev(
thrust::upper_bound(thrust::seq, inner_offsets, inner_offsets + num_inner_offsets, index));
auto const tdigest_index = thrust::distance(inner_offsets, iter);
auto const iter = thrust::prev(thrust::upper_bound(
thrust::seq, tdigest_offsets, tdigest_offsets + num_tdigest_offsets, index));
auto const tdigest_index = thrust::distance(tdigest_offsets, iter);

// what group index the original tdigest belongs to
return group_labels[tdigest_index];
Expand All @@ -1042,12 +1063,12 @@ struct group_key_func {
*
* @param tdv input tdigests. These should have been sorted by the key, but may have not by the
* centroid mean within each group.
* @param h_outer_offsets a host iterator of the offsets to the start of each group. A group is
* @param h_group_offsets a host iterator of the offsets to the start of each group. A group is
* counted as one even when the cluster is empty in it. The offsets should have the same values as
* the ones in `group_offsets`.
* @param group_offsets a device iterator of the offsets to the start of each group. A group is
* counted as one even when the cluster is empty in it. The offsets should have the same values as
* the ones in `h_outer_offsets`.
* the ones in `h_group_offsets`.
* @param group_labels a device iterator of the the group label for each tdigest cluster including
* empty clusters.
* @param num_group_labels the number of unique group labels.
Expand All @@ -1060,7 +1081,7 @@ struct group_key_func {
*/
template <typename HGroupOffsetIter, typename GroupOffsetIter, typename GroupLabelIter>
std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
HGroupOffsetIter h_outer_offsets,
HGroupOffsetIter h_group_offsets,
GroupOffsetIter group_offsets,
GroupLabelIter group_labels,
size_t num_group_labels,
Expand All @@ -1084,8 +1105,8 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

// bring tdigest offsets back to the host
auto tdigest_offsets = tdv.centroids().offsets();
std::vector<size_type> h_inner_offsets(tdigest_offsets.size());
cudaMemcpyAsync(h_inner_offsets.data(),
std::vector<size_type> h_tdigest_offsets(tdigest_offsets.size());
cudaMemcpyAsync(h_tdigest_offsets.data(),
tdigest_offsets.begin<size_type>(),
sizeof(size_type) * tdigest_offsets.size(),
cudaMemcpyDefault,
Expand All @@ -1099,9 +1120,9 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
// generate the merged (but not yet compressed) tdigests for each group.
std::vector<std::unique_ptr<table>> tdigests;
tdigests.reserve(num_groups);
std::transform(h_outer_offsets,
h_outer_offsets + num_groups,
std::next(h_outer_offsets),
std::transform(h_group_offsets,
h_group_offsets + num_groups,
std::next(h_group_offsets),
std::back_inserter(tdigests),
[&](auto tdigest_start, auto tdigest_end) {
// the range of tdigests in this group
Expand All @@ -1110,7 +1131,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
// slice each tdigest from the input
std::vector<table_view> unmerged_tdigests;
unmerged_tdigests.reserve(num_tdigests);
auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start);
auto offset_iter = std::next(h_tdigest_offsets.begin(), tdigest_start);
std::transform(
offset_iter,
offset_iter + num_tdigests,
Expand Down Expand Up @@ -1162,20 +1183,20 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,

// for any empty groups, set the min and max to be 0. not technically necessary but it makes
// testing simpler.
auto group_num_weights = cudf::detail::make_counting_transform_iterator(
auto group_num_clusters = cudf::detail::make_counting_transform_iterator(
0,
group_num_weights_func<decltype(group_offsets)>{group_offsets,
tdigest_offsets.begin<size_type>()});
group_num_clusters_func<decltype(group_offsets)>{group_offsets,
tdigest_offsets.begin<size_type>()});
thrust::replace_if(rmm::exec_policy(stream),
merged_min_col->mutable_view().begin<double>(),
merged_min_col->mutable_view().end<double>(),
group_num_weights,
group_num_clusters,
group_is_empty{},
0);
thrust::replace_if(rmm::exec_policy(stream),
merged_max_col->mutable_view().begin<double>(),
merged_max_col->mutable_view().end<double>(),
group_num_weights,
group_num_clusters,
group_is_empty{},
0);

Expand Down

0 comments on commit 0cb319e

Please sign in to comment.