Skip to content

Commit

Permalink
Optimization of tdigest merge aggregation. (#16780)
Browse files Browse the repository at this point in the history
Fixes #16625

This PR fixes a slow implementation of the centroid merging step during the tdigest merge aggregation.  Previously it was doing a linear march over the individual tdigests per group and merging them one by one.  This led to terrible performance for large numbers of groups.  In principle though, all this really was doing was a segmented sort of centroid values. So that's what this PR changes it to.  Speedup for 1,000,000 input tidests with 1,000,000 individual groups is ~1000x,

```
Old
---------------------------------------------------------------------------------------------------------------
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time        7473 ms         7472 ms            8
TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time        7433 ms         7431 ms            8
```


```
New
---------------------------------------------------------------------------------------------------------------
Benchmark                                                                     Time             CPU   Iterations
---------------------------------------------------------------------------------------------------------------
TDigest/many_tiny_groups/1000000/1/1/10000/iterations:8/manual_time        6.72 ms         6.79 ms            8
TDigest/many_tiny_groups2/1000000/1/1/1000/iterations:8/manual_time        1.24 ms         1.32 ms            8
```

Authors:
  - https://github.com/nvdbaranec
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Nghia Truong (https://github.com/ttnghia)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #16780
  • Loading branch information
nvdbaranec authored Sep 25, 2024
1 parent d11ec7a commit 8e78424
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 88 deletions.
5 changes: 5 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ ConfigureNVBench(STRUCT_CREATION_NVBENCH structs/create_structs.cpp)
# --------------------------------------------------------------------------------
ConfigureBench(QUANTILES_BENCH quantiles/quantiles.cpp)

# ##################################################################################################
# * tdigest benchmark
# --------------------------------------------------------------------------------
ConfigureNVBench(TDIGEST_NVBENCH quantiles/tdigest.cu)

# ##################################################################################################
# * type_dispatcher benchmark ---------------------------------------------------------------------
ConfigureBench(TYPE_DISPATCHER_BENCH type_dispatcher/type_dispatcher.cu)
Expand Down
123 changes: 123 additions & 0 deletions cpp/benchmarks/quantiles/tdigest.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/column_wrapper.hpp>

#include <cudf/detail/tdigest/tdigest.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>

#include <nvbench/nvbench.cuh>

void bm_tdigest_merge(nvbench::state& state)
{
auto const num_tdigests = static_cast<cudf::size_type>(state.get_int64("num_tdigests"));
auto const tdigest_size = static_cast<cudf::size_type>(state.get_int64("tdigest_size"));
auto const tdigests_per_group =
static_cast<cudf::size_type>(state.get_int64("tdigests_per_group"));
auto const max_centroids = static_cast<cudf::size_type>(state.get_int64("max_centroids"));
auto const num_groups = num_tdigests / tdigests_per_group;
auto const total_centroids = num_tdigests * tdigest_size;

auto stream = cudf::get_default_stream();
auto mr = rmm::mr::get_current_device_resource();

constexpr int base_value = 5;

// construct inner means/weights
auto val_iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<double>([tdigest_size](cudf::size_type i) {
return static_cast<double>(base_value + (i % tdigest_size));
}));
auto one_iter = thrust::make_constant_iterator(1);
cudf::test::fixed_width_column_wrapper<double> means(val_iter, val_iter + total_centroids);
cudf::test::fixed_width_column_wrapper<double> weights(one_iter, one_iter + total_centroids);
std::vector<std::unique_ptr<cudf::column>> inner_struct_children;
inner_struct_children.push_back(means.release());
inner_struct_children.push_back(weights.release());
cudf::test::structs_column_wrapper inner_struct(std::move(inner_struct_children));

// construct the tdigest lists themselves
auto offset_iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<cudf::size_type>([tdigest_size](cudf::size_type i) {
return i * tdigest_size;
}));
cudf::test::fixed_width_column_wrapper<int> offsets(offset_iter, offset_iter + num_tdigests + 1);
auto list_col = cudf::make_lists_column(
num_tdigests, offsets.release(), inner_struct.release(), 0, {}, stream, mr);

// min and max columns
auto min_iter = thrust::make_constant_iterator(base_value);
auto max_iter = thrust::make_constant_iterator(base_value + (tdigest_size - 1));
cudf::test::fixed_width_column_wrapper<double> mins(min_iter, min_iter + num_tdigests);
cudf::test::fixed_width_column_wrapper<double> maxes(max_iter, max_iter + num_tdigests);

// assemble the whole thing
std::vector<std::unique_ptr<cudf::column>> tdigest_children;
tdigest_children.push_back(std::move(list_col));
tdigest_children.push_back(mins.release());
tdigest_children.push_back(maxes.release());
cudf::test::structs_column_wrapper tdigest(std::move(tdigest_children));

rmm::device_uvector<cudf::size_type> group_offsets(num_groups + 1, stream, mr);
rmm::device_uvector<cudf::size_type> group_labels(num_tdigests, stream, mr);
auto group_offset_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[tdigests_per_group] __device__(cudf::size_type i) { return i * tdigests_per_group; }));
thrust::copy(rmm::exec_policy_nosync(stream, mr),
group_offset_iter,
group_offset_iter + num_groups + 1,
group_offsets.begin());
auto group_label_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[tdigests_per_group] __device__(cudf::size_type i) { return i / tdigests_per_group; }));
thrust::copy(rmm::exec_policy_nosync(stream, mr),
group_label_iter,
group_label_iter + num_tdigests,
group_labels.begin());

state.add_element_count(total_centroids);

state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync,
[&](nvbench::launch& launch, auto& timer) {
timer.start();
auto result = cudf::tdigest::detail::group_merge_tdigest(
tdigest, group_offsets, group_labels, num_groups, max_centroids, stream, mr);
timer.stop();
});
}

NVBENCH_BENCH(bm_tdigest_merge)
.set_name("TDigest many tiny groups")
.add_int64_axis("num_tdigests", {500'000})
.add_int64_axis("tdigest_size", {1, 1000})
.add_int64_axis("tdigests_per_group", {1})
.add_int64_axis("max_centroids", {10000, 1000});

NVBENCH_BENCH(bm_tdigest_merge)
.set_name("TDigest many small groups")
.add_int64_axis("num_tdigests", {500'000})
.add_int64_axis("tdigest_size", {1, 1000})
.add_int64_axis("tdigests_per_group", {3})
.add_int64_axis("max_centroids", {10000, 1000});
192 changes: 104 additions & 88 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,76 @@ struct group_key_func {
}
};

// merges all the tdigests within each group. returns a table containing 2 columns:
// the sorted means and weights.
template <typename GroupOffsetIter>
std::pair<rmm::device_uvector<double>, rmm::device_uvector<double>> generate_merged_centroids(
tdigest_column_view const& tdv,
GroupOffsetIter group_offsets,
size_type num_groups,
rmm::cuda_stream_view stream)
{
auto temp_mr = cudf::get_current_device_resource_ref();

auto const total_merged_centroids = tdv.means().size();

// output is the merged centroids (means, weights)
rmm::device_uvector<double> output_means(total_merged_centroids, stream, temp_mr);
rmm::device_uvector<double> output_weights(total_merged_centroids, stream, temp_mr);

// each group represents a collection of tdigest columns. each row is 1 tdigest.
// within each group, we want to sort all the centroids within all the tdigests
// in that group, using the means as the key. the "outer offsets" represent the indices of the
// tdigests, and the "inner offsets" represents the list of centroids for a particular tdigest.
//
// rows
// ---- centroid 0 ---------
// tdigest 0 centroid 1
// ---- centroid 2 group 0
// tdigest 1 centroid 3
// ---- centroid 4 ---------
// tdigest 2 centroid 5
// ---- centroid 6 group 1
// tdigest 3 centroid 7
// centroid 8
// ---- centroid 9 --------
auto inner_offsets = tdv.centroids().offsets();
auto centroid_offsets = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<size_type>(
[group_offsets, inner_offsets = tdv.centroids().offsets().begin<size_type>()] __device__(
size_type i) { return inner_offsets[group_offsets[i]]; }));

// perform the sort using the means as the key
size_t temp_size;
CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(nullptr,
temp_size,
tdv.means().begin<double>(),
output_means.begin(),
tdv.weights().begin<double>(),
output_weights.begin(),
total_merged_centroids,
num_groups,
centroid_offsets,
centroid_offsets + 1,
stream.value()));

rmm::device_buffer temp_mem(temp_size, stream, temp_mr);
CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(temp_mem.data(),
temp_size,
tdv.means().begin<double>(),
output_means.begin(),
tdv.weights().begin<double>(),
output_weights.begin(),
total_merged_centroids,
num_groups,
centroid_offsets,
centroid_offsets + 1,
stream.value()));

return {std::move(output_means), std::move(output_weights)};
}

template <typename HGroupOffsetIter, typename GroupOffsetIter, typename GroupLabelIter>
std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
HGroupOffsetIter h_outer_offsets,
Expand All @@ -1032,59 +1102,6 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an
// algorithm like a super-merge that takes two layers of keys: one which identifies the outer
// grouping of tdigests, and one which identifies the inner groupings of the tdigests within the
// outer groups.
// TODO: investigate replacing the iterative merge with a single stable_sort_by_key.

// bring tdigest offsets back to the host
auto tdigest_offsets = tdv.centroids().offsets();
std::vector<size_type> h_inner_offsets(tdigest_offsets.size());
cudaMemcpyAsync(h_inner_offsets.data(),
tdigest_offsets.begin<size_type>(),
sizeof(size_type) * tdigest_offsets.size(),
cudaMemcpyDefault,
stream);

stream.synchronize();

// extract all means and weights into a table
cudf::table_view tdigests_unsliced({tdv.means(), tdv.weights()});

// generate the merged (but not yet compressed) tdigests for each group.
std::vector<std::unique_ptr<table>> tdigests;
tdigests.reserve(num_groups);
std::transform(h_outer_offsets,
h_outer_offsets + num_groups,
std::next(h_outer_offsets),
std::back_inserter(tdigests),
[&](auto tdigest_start, auto tdigest_end) {
// the range of tdigests in this group
auto const num_tdigests = tdigest_end - tdigest_start;

// slice each tdigest from the input
std::vector<table_view> unmerged_tdigests;
unmerged_tdigests.reserve(num_tdigests);
auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start);
std::transform(
offset_iter,
offset_iter + num_tdigests,
std::next(offset_iter),
std::back_inserter(unmerged_tdigests),
[&](size_type start, size_type end) {
return cudf::detail::slice(tdigests_unsliced, {start, end}, stream);
});

// merge
return cudf::detail::merge(unmerged_tdigests,
{0},
{order::ASCENDING},
{},
stream,
cudf::get_current_device_resource_ref());
});

// generate min and max values
auto merged_min_col = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
Expand Down Expand Up @@ -1121,7 +1138,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
auto group_num_weights = cudf::detail::make_counting_transform_iterator(
0,
group_num_weights_func<decltype(group_offsets)>{group_offsets,
tdigest_offsets.begin<size_type>()});
tdv.centroids().offsets().begin<size_type>()});
thrust::replace_if(rmm::exec_policy(stream),
merged_min_col->mutable_view().begin<double>(),
merged_min_col->mutable_view().end<double>(),
Expand All @@ -1135,29 +1152,33 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_is_empty{},
0);

// concatenate all the merged tdigests back into one table.
std::vector<table_view> tdigest_views;
tdigest_views.reserve(num_groups);
std::transform(tdigests.begin(),
tdigests.end(),
std::back_inserter(tdigest_views),
[](std::unique_ptr<table> const& t) { return t->view(); });
auto merged =
cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref());
auto temp_mr = cudf::get_current_device_resource_ref();

// merge the centroids
auto [merged_means, merged_weights] =
generate_merged_centroids(tdv, group_offsets, num_groups, stream);
size_t const num_centroids = tdv.means().size();
CUDF_EXPECTS(merged_means.size() == num_centroids,
"Unexpected number of centroids in merged result");

// generate cumulative weights
auto merged_weights = merged->get_column(1).view();
auto cumulative_weights = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream);
auto keys = cudf::detail::make_counting_transform_iterator(
0,
group_key_func<decltype(group_labels)>{
group_labels, tdigest_offsets.begin<size_type>(), tdigest_offsets.size()});
rmm::device_uvector<double> cumulative_weights(merged_weights.size(), stream, temp_mr);

// generate group keys for all centroids in the entire column
rmm::device_uvector<size_type> group_keys(num_centroids, stream, temp_mr);
auto iter = thrust::make_counting_iterator(0);
auto inner_offsets = tdv.centroids().offsets();
thrust::transform(rmm::exec_policy(stream),
iter,
iter + num_centroids,
group_keys.begin(),
group_key_func<decltype(group_labels)>{
group_labels, inner_offsets.begin<size_type>(), inner_offsets.size()});
thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
keys,
keys + cumulative_weights->size(),
merged_weights.begin<double>(),
cumulative_weights->mutable_view().begin<double>());
group_keys.begin(),
group_keys.begin() + num_centroids,
merged_weights.begin(),
cumulative_weights.begin());

auto const delta = max_centroids;

Expand All @@ -1166,37 +1187,32 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
delta,
num_groups,
nearest_value_centroid_weights<decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
group_offsets,
tdigest_offsets.begin<size_type>()},
centroid_group_info<decltype(group_offsets)>{cumulative_weights->view().begin<double>(),
group_offsets,
tdigest_offsets.begin<size_type>()},
cumulative_weights.begin(), group_offsets, inner_offsets.begin<size_type>()},
centroid_group_info<decltype(group_offsets)>{
cumulative_weights.begin(), group_offsets, inner_offsets.begin<size_type>()},
cumulative_centroid_weight<decltype(group_labels), decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
cumulative_weights.begin(),
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
{inner_offsets.begin<size_type>(), static_cast<size_t>(inner_offsets.size())}},
false,
stream,
mr);

// input centroid values
auto centroids = cudf::detail::make_counting_transform_iterator(
0,
make_weighted_centroid{merged->get_column(0).view().begin<double>(),
merged_weights.begin<double>()});
0, make_weighted_centroid{merged_means.begin(), merged_weights.begin()});

// compute the tdigest
return compute_tdigests(
delta,
centroids,
centroids + merged->num_rows(),
centroids + merged_means.size(),
cumulative_centroid_weight<decltype(group_labels), decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
cumulative_weights.begin(),
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
{inner_offsets.begin<size_type>(), static_cast<size_t>(inner_offsets.size())}},
std::move(merged_min_col),
std::move(merged_max_col),
group_cluster_wl,
Expand Down

0 comments on commit 8e78424

Please sign in to comment.