Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of tdigest merge aggregation. #16780

Merged
merged 23 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b6aea93
Optimize the merging of tdigest groups in the tdigest merge aggregation.
nvdbaranec Sep 9, 2024
73a6360
Formatting.
nvdbaranec Sep 9, 2024
996b0cc
Add tdigest merge benchmark.
nvdbaranec Sep 11, 2024
b02d780
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 11, 2024
00edf5b
Formatting.
nvdbaranec Sep 11, 2024
e80154d
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 17, 2024
46dd1f0
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 18, 2024
c19bab6
Switch to using NVBench for the benchmarks. Added an axis for larger …
nvdbaranec Sep 19, 2024
84d71da
Use device_uvectors instead of full columns in several places. Use cu…
nvdbaranec Sep 19, 2024
72192b8
Formatting
nvdbaranec Sep 19, 2024
eb345ca
Add some static casts to the state reading code in the benchmark.
nvdbaranec Sep 20, 2024
cc8324f
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 20, 2024
68cd231
Update cpp/benchmarks/quantiles/tdigest.cu
mhaseeb123 Sep 20, 2024
6c5bc4b
Style fix
mhaseeb123 Sep 20, 2024
c3180a4
Minor style fix
mhaseeb123 Sep 20, 2024
88a092a
Merge branch 'branch-24.10' into tdigest_merge_opt
mhaseeb123 Sep 20, 2024
e0bfb37
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 23, 2024
7cc7570
Merge branch 'tdigest_merge_opt' of github.com:nvdbaranec/cudf into t…
nvdbaranec Sep 23, 2024
6e21bd7
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 23, 2024
cc62cb6
Merge branch 'branch-24.10' into tdigest_merge_opt
galipremsagar Sep 23, 2024
77512e8
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 24, 2024
7d21093
Merge branch 'tdigest_merge_opt' of github.com:nvdbaranec/cudf into t…
nvdbaranec Sep 24, 2024
743da41
Merge branch 'branch-24.10' into tdigest_merge_opt
nvdbaranec Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ ConfigureNVBench(STRUCT_CREATION_NVBENCH structs/create_structs.cpp)
# --------------------------------------------------------------------------------
ConfigureBench(QUANTILES_BENCH quantiles/quantiles.cpp)

# ##################################################################################################
# * tdigest benchmark
# --------------------------------------------------------------------------------
ConfigureNVBench(TDIGEST_NVBENCH quantiles/tdigest.cu)

# ##################################################################################################
# * type_dispatcher benchmark ---------------------------------------------------------------------
ConfigureBench(TYPE_DISPATCHER_BENCH type_dispatcher/type_dispatcher.cu)
Expand Down
126 changes: 126 additions & 0 deletions cpp/benchmarks/quantiles/tdigest.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/column_wrapper.hpp>

#include <cudf/detail/tdigest/tdigest.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/exec_policy.hpp>

#include <cuda/functional>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>

#include <nvbench/nvbench.cuh>

void bm_tdigest_merge(nvbench::state& state)
{
cudf::size_type const num_tdigests =
static_cast<cudf::size_type>(state.get_int64("num_tdigests"));
cudf::size_type const tdigest_size =
static_cast<cudf::size_type>(state.get_int64("tdigest_size"));
cudf::size_type const tdigests_per_group =
static_cast<cudf::size_type>(state.get_int64("tdigests_per_group"));
cudf::size_type const max_centroids =
static_cast<cudf::size_type>(state.get_int64("max_centroids"));
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
auto const num_groups = num_tdigests / tdigests_per_group;
auto const total_centroids = num_tdigests * tdigest_size;

auto stream = cudf::get_default_stream();
auto mr = rmm::mr::get_current_device_resource();

constexpr int base_value = 5;

// construct inner means/weights
auto val_iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<double>([tdigest_size](cudf::size_type i) {
return static_cast<double>(base_value + (i % tdigest_size));
}));
auto one_iter = thrust::make_constant_iterator(1);
cudf::test::fixed_width_column_wrapper<double> means(val_iter, val_iter + total_centroids);
cudf::test::fixed_width_column_wrapper<double> weights(one_iter, one_iter + total_centroids);
std::vector<std::unique_ptr<cudf::column>> inner_struct_children;
inner_struct_children.push_back(means.release());
inner_struct_children.push_back(weights.release());
cudf::test::structs_column_wrapper inner_struct(std::move(inner_struct_children));

// construct the tdigest lists themselves
auto offset_iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<cudf::size_type>([tdigest_size](cudf::size_type i) {
return i * tdigest_size;
}));
cudf::test::fixed_width_column_wrapper<int> offsets(offset_iter, offset_iter + num_tdigests + 1);
auto list_col = cudf::make_lists_column(
num_tdigests, offsets.release(), inner_struct.release(), 0, {}, stream, mr);

// min and max columns
auto min_iter = thrust::make_constant_iterator(base_value);
auto max_iter = thrust::make_constant_iterator(base_value + (tdigest_size - 1));
cudf::test::fixed_width_column_wrapper<double> mins(min_iter, min_iter + num_tdigests);
cudf::test::fixed_width_column_wrapper<double> maxes(max_iter, max_iter + num_tdigests);

// assemble the whole thing
std::vector<std::unique_ptr<cudf::column>> tdigest_children;
tdigest_children.push_back(std::move(list_col));
tdigest_children.push_back(mins.release());
tdigest_children.push_back(maxes.release());
cudf::test::structs_column_wrapper tdigest(std::move(tdigest_children));

rmm::device_uvector<cudf::size_type> group_offsets(num_groups + 1, stream, mr);
rmm::device_uvector<cudf::size_type> group_labels(num_tdigests, stream, mr);
auto group_offset_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[tdigests_per_group] __device__(cudf::size_type i) { return i * tdigests_per_group; }));
thrust::copy(rmm::exec_policy_nosync(stream, mr),
group_offset_iter,
group_offset_iter + num_groups + 1,
group_offsets.begin());
auto group_label_iter = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<cudf::size_type>(
[tdigests_per_group] __device__(cudf::size_type i) { return i / tdigests_per_group; }));
thrust::copy(rmm::exec_policy_nosync(stream, mr),
group_label_iter,
group_label_iter + num_tdigests,
group_labels.begin());

state.add_element_count(total_centroids);

state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync,
[&](nvbench::launch& launch, auto& timer) {
timer.start();
auto result = cudf::tdigest::detail::group_merge_tdigest(
tdigest, group_offsets, group_labels, num_groups, max_centroids, stream, mr);
timer.stop();
});
}

NVBENCH_BENCH(bm_tdigest_merge)
.set_name("TDigest many tiny groups")
.add_int64_axis("num_tdigests", {500'000})
.add_int64_axis("tdigest_size", {1, 1000})
.add_int64_axis("tdigests_per_group", {1})
.add_int64_axis("max_centroids", {10000, 1000});

NVBENCH_BENCH(bm_tdigest_merge)
.set_name("TDigest many small groups")
.add_int64_axis("num_tdigests", {500'000})
.add_int64_axis("tdigest_size", {1, 1000})
.add_int64_axis("tdigests_per_group", {3})
.add_int64_axis("max_centroids", {10000, 1000});
192 changes: 104 additions & 88 deletions cpp/src/quantiles/tdigest/tdigest_aggregation.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,76 @@ struct group_key_func {
}
};

// merges all the tdigests within each group. returns a table containing 2 columns:
// the sorted means and weights.
template <typename GroupOffsetIter>
std::pair<rmm::device_uvector<double>, rmm::device_uvector<double>> generate_merged_centroids(
tdigest_column_view const& tdv,
GroupOffsetIter group_offsets,
size_type num_groups,
rmm::cuda_stream_view stream)
{
auto temp_mr = cudf::get_current_device_resource_ref();

auto const total_merged_centroids = tdv.means().size();

// output is the merged centroids (means, weights)
rmm::device_uvector<double> output_means(total_merged_centroids, stream, temp_mr);
rmm::device_uvector<double> output_weights(total_merged_centroids, stream, temp_mr);

// each group represents a collection of tdigest columns. each row is 1 tdigest.
// within each group, we want to sort all the centroids within all the tdigests
// in that group, using the means as the key. the "outer offsets" represent the indices of the
// tdigests, and the "inner offsets" represents the list of centroids for a particular tdigest.
//
// rows
// ---- centroid 0 ---------
// tdigest 0 centroid 1
// ---- centroid 2 group 0
// tdigest 1 centroid 3
// ---- centroid 4 ---------
// tdigest 2 centroid 5
// ---- centroid 6 group 1
// tdigest 3 centroid 7
// centroid 8
// ---- centroid 9 --------
auto inner_offsets = tdv.centroids().offsets();
auto centroid_offsets = cudf::detail::make_counting_transform_iterator(
0,
cuda::proclaim_return_type<size_type>(
[group_offsets, inner_offsets = tdv.centroids().offsets().begin<size_type>()] __device__(
size_type i) { return inner_offsets[group_offsets[i]]; }));

// perform the sort using the means as the key
size_t temp_size;
CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(nullptr,
temp_size,
tdv.means().begin<double>(),
output_means.begin(),
tdv.weights().begin<double>(),
output_weights.begin(),
total_merged_centroids,
num_groups,
centroid_offsets,
centroid_offsets + 1,
stream.value()));

rmm::device_buffer temp_mem(temp_size, stream, temp_mr);
CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(temp_mem.data(),
temp_size,
tdv.means().begin<double>(),
output_means.begin(),
tdv.weights().begin<double>(),
output_weights.begin(),
total_merged_centroids,
num_groups,
centroid_offsets,
centroid_offsets + 1,
stream.value()));

return {std::move(output_means), std::move(output_weights)};
}

template <typename HGroupOffsetIter, typename GroupOffsetIter, typename GroupLabelIter>
std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
HGroupOffsetIter h_outer_offsets,
Expand All @@ -1032,59 +1102,6 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an
// algorithm like a super-merge that takes two layers of keys: one which identifies the outer
// grouping of tdigests, and one which identifies the inner groupings of the tdigests within the
// outer groups.
// TODO: investigate replacing the iterative merge with a single stable_sort_by_key.

// bring tdigest offsets back to the host
auto tdigest_offsets = tdv.centroids().offsets();
std::vector<size_type> h_inner_offsets(tdigest_offsets.size());
cudaMemcpyAsync(h_inner_offsets.data(),
tdigest_offsets.begin<size_type>(),
sizeof(size_type) * tdigest_offsets.size(),
cudaMemcpyDefault,
stream);

stream.synchronize();

// extract all means and weights into a table
cudf::table_view tdigests_unsliced({tdv.means(), tdv.weights()});

// generate the merged (but not yet compressed) tdigests for each group.
std::vector<std::unique_ptr<table>> tdigests;
tdigests.reserve(num_groups);
std::transform(h_outer_offsets,
h_outer_offsets + num_groups,
std::next(h_outer_offsets),
std::back_inserter(tdigests),
[&](auto tdigest_start, auto tdigest_end) {
// the range of tdigests in this group
auto const num_tdigests = tdigest_end - tdigest_start;

// slice each tdigest from the input
std::vector<table_view> unmerged_tdigests;
unmerged_tdigests.reserve(num_tdigests);
auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start);
std::transform(
offset_iter,
offset_iter + num_tdigests,
std::next(offset_iter),
std::back_inserter(unmerged_tdigests),
[&](size_type start, size_type end) {
return cudf::detail::slice(tdigests_unsliced, {start, end}, stream);
});

// merge
return cudf::detail::merge(unmerged_tdigests,
{0},
{order::ASCENDING},
{},
stream,
cudf::get_current_device_resource_ref());
});

// generate min and max values
auto merged_min_col = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr);
Expand Down Expand Up @@ -1121,7 +1138,7 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
auto group_num_weights = cudf::detail::make_counting_transform_iterator(
0,
group_num_weights_func<decltype(group_offsets)>{group_offsets,
tdigest_offsets.begin<size_type>()});
tdv.centroids().offsets().begin<size_type>()});
thrust::replace_if(rmm::exec_policy(stream),
merged_min_col->mutable_view().begin<double>(),
merged_min_col->mutable_view().end<double>(),
Expand All @@ -1135,29 +1152,33 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
group_is_empty{},
0);

// concatenate all the merged tdigests back into one table.
std::vector<table_view> tdigest_views;
tdigest_views.reserve(num_groups);
std::transform(tdigests.begin(),
tdigests.end(),
std::back_inserter(tdigest_views),
[](std::unique_ptr<table> const& t) { return t->view(); });
auto merged =
cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref());
auto temp_mr = cudf::get_current_device_resource_ref();

// merge the centroids
auto [merged_means, merged_weights] =
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
generate_merged_centroids(tdv, group_offsets, num_groups, stream);
size_t const num_centroids = tdv.means().size();
CUDF_EXPECTS(merged_means.size() == num_centroids,
"Unexpected number of centroids in merged result");

// generate cumulative weights
auto merged_weights = merged->get_column(1).view();
auto cumulative_weights = cudf::make_numeric_column(
data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream);
auto keys = cudf::detail::make_counting_transform_iterator(
0,
group_key_func<decltype(group_labels)>{
group_labels, tdigest_offsets.begin<size_type>(), tdigest_offsets.size()});
rmm::device_uvector<double> cumulative_weights(merged_weights.size(), stream, temp_mr);

// generate group keys for all centroids in the entire column
rmm::device_uvector<size_type> group_keys(num_centroids, stream, temp_mr);
auto iter = thrust::make_counting_iterator(0);
auto inner_offsets = tdv.centroids().offsets();
thrust::transform(rmm::exec_policy(stream),
iter,
iter + num_centroids,
group_keys.begin(),
group_key_func<decltype(group_labels)>{
group_labels, inner_offsets.begin<size_type>(), inner_offsets.size()});
thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
keys,
keys + cumulative_weights->size(),
merged_weights.begin<double>(),
cumulative_weights->mutable_view().begin<double>());
group_keys.begin(),
group_keys.begin() + num_centroids,
merged_weights.begin(),
cumulative_weights.begin());

auto const delta = max_centroids;

Expand All @@ -1166,37 +1187,32 @@ std::unique_ptr<column> merge_tdigests(tdigest_column_view const& tdv,
delta,
num_groups,
nearest_value_centroid_weights<decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
group_offsets,
tdigest_offsets.begin<size_type>()},
centroid_group_info<decltype(group_offsets)>{cumulative_weights->view().begin<double>(),
group_offsets,
tdigest_offsets.begin<size_type>()},
cumulative_weights.begin(), group_offsets, inner_offsets.begin<size_type>()},
centroid_group_info<decltype(group_offsets)>{
cumulative_weights.begin(), group_offsets, inner_offsets.begin<size_type>()},
cumulative_centroid_weight<decltype(group_labels), decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
cumulative_weights.begin(),
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
{inner_offsets.begin<size_type>(), static_cast<size_t>(inner_offsets.size())}},
false,
stream,
mr);

// input centroid values
auto centroids = cudf::detail::make_counting_transform_iterator(
0,
make_weighted_centroid{merged->get_column(0).view().begin<double>(),
merged_weights.begin<double>()});
0, make_weighted_centroid{merged_means.begin(), merged_weights.begin()});

// compute the tdigest
return compute_tdigests(
delta,
centroids,
centroids + merged->num_rows(),
centroids + merged_means.size(),
cumulative_centroid_weight<decltype(group_labels), decltype(group_offsets)>{
cumulative_weights->view().begin<double>(),
cumulative_weights.begin(),
group_labels,
group_offsets,
{tdigest_offsets.begin<size_type>(), static_cast<size_t>(tdigest_offsets.size())}},
{inner_offsets.begin<size_type>(), static_cast<size_t>(inner_offsets.size())}},
std::move(merged_min_col),
std::move(merged_max_col),
group_cluster_wl,
Expand Down
Loading