diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index abc6f74fccf..4113e38dcf4 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -230,6 +230,11 @@ ConfigureNVBench(STRUCT_CREATION_NVBENCH structs/create_structs.cpp) # -------------------------------------------------------------------------------- ConfigureBench(QUANTILES_BENCH quantiles/quantiles.cpp) +# ################################################################################################## +# * tdigest benchmark +# -------------------------------------------------------------------------------- +ConfigureNVBench(TDIGEST_NVBENCH quantiles/tdigest.cu) + # ################################################################################################## # * type_dispatcher benchmark --------------------------------------------------------------------- ConfigureBench(TYPE_DISPATCHER_BENCH type_dispatcher/type_dispatcher.cu) diff --git a/cpp/benchmarks/quantiles/tdigest.cu b/cpp/benchmarks/quantiles/tdigest.cu new file mode 100644 index 00000000000..9d37dbc9a26 --- /dev/null +++ b/cpp/benchmarks/quantiles/tdigest.cu @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include + +#include +#include +#include + +#include + +void bm_tdigest_merge(nvbench::state& state) +{ + auto const num_tdigests = static_cast(state.get_int64("num_tdigests")); + auto const tdigest_size = static_cast(state.get_int64("tdigest_size")); + auto const tdigests_per_group = + static_cast(state.get_int64("tdigests_per_group")); + auto const max_centroids = static_cast(state.get_int64("max_centroids")); + auto const num_groups = num_tdigests / tdigests_per_group; + auto const total_centroids = num_tdigests * tdigest_size; + + auto stream = cudf::get_default_stream(); + auto mr = rmm::mr::get_current_device_resource(); + + constexpr int base_value = 5; + + // construct inner means/weights + auto val_iter = cudf::detail::make_counting_transform_iterator( + 0, cuda::proclaim_return_type([tdigest_size](cudf::size_type i) { + return static_cast(base_value + (i % tdigest_size)); + })); + auto one_iter = thrust::make_constant_iterator(1); + cudf::test::fixed_width_column_wrapper means(val_iter, val_iter + total_centroids); + cudf::test::fixed_width_column_wrapper weights(one_iter, one_iter + total_centroids); + std::vector> inner_struct_children; + inner_struct_children.push_back(means.release()); + inner_struct_children.push_back(weights.release()); + cudf::test::structs_column_wrapper inner_struct(std::move(inner_struct_children)); + + // construct the tdigest lists themselves + auto offset_iter = cudf::detail::make_counting_transform_iterator( + 0, cuda::proclaim_return_type([tdigest_size](cudf::size_type i) { + return i * tdigest_size; + })); + cudf::test::fixed_width_column_wrapper offsets(offset_iter, offset_iter + num_tdigests + 1); + auto list_col = cudf::make_lists_column( + num_tdigests, offsets.release(), inner_struct.release(), 0, {}, stream, mr); + + // min and max columns + auto min_iter = thrust::make_constant_iterator(base_value); + auto max_iter = thrust::make_constant_iterator(base_value + (tdigest_size - 1)); + cudf::test::fixed_width_column_wrapper mins(min_iter, min_iter + num_tdigests); + cudf::test::fixed_width_column_wrapper maxes(max_iter, max_iter + num_tdigests); + + // assemble the whole thing + std::vector> tdigest_children; + tdigest_children.push_back(std::move(list_col)); + tdigest_children.push_back(mins.release()); + tdigest_children.push_back(maxes.release()); + cudf::test::structs_column_wrapper tdigest(std::move(tdigest_children)); + + rmm::device_uvector group_offsets(num_groups + 1, stream, mr); + rmm::device_uvector group_labels(num_tdigests, stream, mr); + auto group_offset_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [tdigests_per_group] __device__(cudf::size_type i) { return i * tdigests_per_group; })); + thrust::copy(rmm::exec_policy_nosync(stream, mr), + group_offset_iter, + group_offset_iter + num_groups + 1, + group_offsets.begin()); + auto group_label_iter = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [tdigests_per_group] __device__(cudf::size_type i) { return i / tdigests_per_group; })); + thrust::copy(rmm::exec_policy_nosync(stream, mr), + group_label_iter, + group_label_iter + num_tdigests, + group_labels.begin()); + + state.add_element_count(total_centroids); + + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, + [&](nvbench::launch& launch, auto& timer) { + timer.start(); + auto result = cudf::tdigest::detail::group_merge_tdigest( + tdigest, group_offsets, group_labels, num_groups, max_centroids, stream, mr); + timer.stop(); + }); +} + +NVBENCH_BENCH(bm_tdigest_merge) + .set_name("TDigest many tiny groups") + .add_int64_axis("num_tdigests", {500'000}) + .add_int64_axis("tdigest_size", {1, 1000}) + .add_int64_axis("tdigests_per_group", {1}) + .add_int64_axis("max_centroids", {10000, 1000}); + +NVBENCH_BENCH(bm_tdigest_merge) + .set_name("TDigest many small groups") + .add_int64_axis("num_tdigests", {500'000}) + .add_int64_axis("tdigest_size", {1, 1000}) + .add_int64_axis("tdigests_per_group", {3}) + .add_int64_axis("max_centroids", {10000, 1000}); diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..e1c1d2e3002 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -1021,6 +1021,76 @@ struct group_key_func { } }; +// merges all the tdigests within each group. returns a table containing 2 columns: +// the sorted means and weights. +template +std::pair, rmm::device_uvector> generate_merged_centroids( + tdigest_column_view const& tdv, + GroupOffsetIter group_offsets, + size_type num_groups, + rmm::cuda_stream_view stream) +{ + auto temp_mr = cudf::get_current_device_resource_ref(); + + auto const total_merged_centroids = tdv.means().size(); + + // output is the merged centroids (means, weights) + rmm::device_uvector output_means(total_merged_centroids, stream, temp_mr); + rmm::device_uvector output_weights(total_merged_centroids, stream, temp_mr); + + // each group represents a collection of tdigest columns. each row is 1 tdigest. + // within each group, we want to sort all the centroids within all the tdigests + // in that group, using the means as the key. the "outer offsets" represent the indices of the + // tdigests, and the "inner offsets" represents the list of centroids for a particular tdigest. + // + // rows + // ---- centroid 0 --------- + // tdigest 0 centroid 1 + // ---- centroid 2 group 0 + // tdigest 1 centroid 3 + // ---- centroid 4 --------- + // tdigest 2 centroid 5 + // ---- centroid 6 group 1 + // tdigest 3 centroid 7 + // centroid 8 + // ---- centroid 9 -------- + auto inner_offsets = tdv.centroids().offsets(); + auto centroid_offsets = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [group_offsets, inner_offsets = tdv.centroids().offsets().begin()] __device__( + size_type i) { return inner_offsets[group_offsets[i]]; })); + + // perform the sort using the means as the key + size_t temp_size; + CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(nullptr, + temp_size, + tdv.means().begin(), + output_means.begin(), + tdv.weights().begin(), + output_weights.begin(), + total_merged_centroids, + num_groups, + centroid_offsets, + centroid_offsets + 1, + stream.value())); + + rmm::device_buffer temp_mem(temp_size, stream, temp_mr); + CUDF_CUDA_TRY(cub::DeviceSegmentedSort::SortPairs(temp_mem.data(), + temp_size, + tdv.means().begin(), + output_means.begin(), + tdv.weights().begin(), + output_weights.begin(), + total_merged_centroids, + num_groups, + centroid_offsets, + centroid_offsets + 1, + stream.value())); + + return {std::move(output_means), std::move(output_weights)}; +} + template std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, HGroupOffsetIter h_outer_offsets, @@ -1032,59 +1102,6 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // thrust::merge and thrust::merge_by_key don't provide what we need. What we would need is an - // algorithm like a super-merge that takes two layers of keys: one which identifies the outer - // grouping of tdigests, and one which identifies the inner groupings of the tdigests within the - // outer groups. - // TODO: investigate replacing the iterative merge with a single stable_sort_by_key. - - // bring tdigest offsets back to the host - auto tdigest_offsets = tdv.centroids().offsets(); - std::vector h_inner_offsets(tdigest_offsets.size()); - cudaMemcpyAsync(h_inner_offsets.data(), - tdigest_offsets.begin(), - sizeof(size_type) * tdigest_offsets.size(), - cudaMemcpyDefault, - stream); - - stream.synchronize(); - - // extract all means and weights into a table - cudf::table_view tdigests_unsliced({tdv.means(), tdv.weights()}); - - // generate the merged (but not yet compressed) tdigests for each group. - std::vector> tdigests; - tdigests.reserve(num_groups); - std::transform(h_outer_offsets, - h_outer_offsets + num_groups, - std::next(h_outer_offsets), - std::back_inserter(tdigests), - [&](auto tdigest_start, auto tdigest_end) { - // the range of tdigests in this group - auto const num_tdigests = tdigest_end - tdigest_start; - - // slice each tdigest from the input - std::vector unmerged_tdigests; - unmerged_tdigests.reserve(num_tdigests); - auto offset_iter = std::next(h_inner_offsets.begin(), tdigest_start); - std::transform( - offset_iter, - offset_iter + num_tdigests, - std::next(offset_iter), - std::back_inserter(unmerged_tdigests), - [&](size_type start, size_type end) { - return cudf::detail::slice(tdigests_unsliced, {start, end}, stream); - }); - - // merge - return cudf::detail::merge(unmerged_tdigests, - {0}, - {order::ASCENDING}, - {}, - stream, - cudf::get_current_device_resource_ref()); - }); - // generate min and max values auto merged_min_col = cudf::make_numeric_column( data_type{type_id::FLOAT64}, num_groups, mask_state::UNALLOCATED, stream, mr); @@ -1121,7 +1138,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto group_num_weights = cudf::detail::make_counting_transform_iterator( 0, group_num_weights_func{group_offsets, - tdigest_offsets.begin()}); + tdv.centroids().offsets().begin()}); thrust::replace_if(rmm::exec_policy(stream), merged_min_col->mutable_view().begin(), merged_min_col->mutable_view().end(), @@ -1135,29 +1152,33 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_is_empty{}, 0); - // concatenate all the merged tdigests back into one table. - std::vector tdigest_views; - tdigest_views.reserve(num_groups); - std::transform(tdigests.begin(), - tdigests.end(), - std::back_inserter(tdigest_views), - [](std::unique_ptr const& t) { return t->view(); }); - auto merged = - cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto temp_mr = cudf::get_current_device_resource_ref(); + + // merge the centroids + auto [merged_means, merged_weights] = + generate_merged_centroids(tdv, group_offsets, num_groups, stream); + size_t const num_centroids = tdv.means().size(); + CUDF_EXPECTS(merged_means.size() == num_centroids, + "Unexpected number of centroids in merged result"); // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); - auto cumulative_weights = cudf::make_numeric_column( - data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); - auto keys = cudf::detail::make_counting_transform_iterator( - 0, - group_key_func{ - group_labels, tdigest_offsets.begin(), tdigest_offsets.size()}); + rmm::device_uvector cumulative_weights(merged_weights.size(), stream, temp_mr); + + // generate group keys for all centroids in the entire column + rmm::device_uvector group_keys(num_centroids, stream, temp_mr); + auto iter = thrust::make_counting_iterator(0); + auto inner_offsets = tdv.centroids().offsets(); + thrust::transform(rmm::exec_policy(stream), + iter, + iter + num_centroids, + group_keys.begin(), + group_key_func{ + group_labels, inner_offsets.begin(), inner_offsets.size()}); thrust::inclusive_scan_by_key(rmm::exec_policy(stream), - keys, - keys + cumulative_weights->size(), - merged_weights.begin(), - cumulative_weights->mutable_view().begin()); + group_keys.begin(), + group_keys.begin() + num_centroids, + merged_weights.begin(), + cumulative_weights.begin()); auto const delta = max_centroids; @@ -1166,37 +1187,32 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, delta, num_groups, nearest_value_centroid_weights{ - cumulative_weights->view().begin(), - group_offsets, - tdigest_offsets.begin()}, - centroid_group_info{cumulative_weights->view().begin(), - group_offsets, - tdigest_offsets.begin()}, + cumulative_weights.begin(), group_offsets, inner_offsets.begin()}, + centroid_group_info{ + cumulative_weights.begin(), group_offsets, inner_offsets.begin()}, cumulative_centroid_weight{ - cumulative_weights->view().begin(), + cumulative_weights.begin(), group_labels, group_offsets, - {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + {inner_offsets.begin(), static_cast(inner_offsets.size())}}, false, stream, mr); // input centroid values auto centroids = cudf::detail::make_counting_transform_iterator( - 0, - make_weighted_centroid{merged->get_column(0).view().begin(), - merged_weights.begin()}); + 0, make_weighted_centroid{merged_means.begin(), merged_weights.begin()}); // compute the tdigest return compute_tdigests( delta, centroids, - centroids + merged->num_rows(), + centroids + merged_means.size(), cumulative_centroid_weight{ - cumulative_weights->view().begin(), + cumulative_weights.begin(), group_labels, group_offsets, - {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, + {inner_offsets.begin(), static_cast(inner_offsets.size())}}, std::move(merged_min_col), std::move(merged_max_col), group_cluster_wl,