From 5192b885bba82039823da687bc0a013ee74566a7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 10 Sep 2024 14:19:27 -0700 Subject: [PATCH 1/4] Fix empty cluster handling in tdigest merge (#16675) This PR fixes an edge case bug in the tdigest merge. When there are multiple distinct keys but all values are empty clusters, the value column is currently merged into a single empty cluster after merge, which leads to an error while creating a result table because of the mismatching number of rows in the key and value columns. This bug can be reproduced only when all values are empty clusters. If some values are empty but some are not, the current implementation returns a valid result. This bug was originally reported in https://github.com/NVIDIA/spark-rapids/issues/11367. The bug exists in `merge_tdigests()` as it assumes that there is no empty cluster in the merge stage even when there are (`has_nulls` are fixed to `false`). It is rather safe to assume that always there could be empty clusters. This PR fixes the flag by fixing it to true. Also, `has_nulls` has been renamed to a more descriptive name, `may_have_empty_clusters`. The tdigest reduce does not have the same issue as it does not call `merge_tdigests()`. Authors: - Jihoon Son (https://github.com/jihoonson) - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - https://github.com/nvdbaranec URL: https://github.com/rapidsai/cudf/pull/16675 --- cpp/include/cudf/detail/tdigest/tdigest.hpp | 17 ++-- cpp/include/cudf_test/tdigest_utilities.cuh | 20 ++--- cpp/src/quantiles/tdigest/tdigest.cu | 23 ++--- .../quantiles/tdigest/tdigest_aggregation.cu | 70 +++++++++------ cpp/tests/groupby/tdigest_tests.cu | 90 +++++++++++++++++-- .../quantiles/percentile_approx_test.cpp | 4 +- 6 files changed, 162 insertions(+), 62 deletions(-) diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 80a4460023f..672b95e2d01 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,28 +143,29 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest column. + * @brief Create a tdigest column of empty clusters. * - * An empty tdigest column contains a single row of length 0 + * The column created contains the specified number of rows of empty clusters. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest column. + * @returns A tdigest column of empty clusters. */ CUDF_EXPORT -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest scalar. + * @brief Create a scalar of an empty tdigest cluster. * - * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 + * The returned scalar is a struct_scalar that contains a single row of an empty cluster. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest scalar. + * @returns A scalar of an empty tdigest cluster. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index 1758790cd64..be7d19b2227 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 0d017cf1f13..76cd55bf994 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -292,32 +292,33 @@ std::unique_ptr make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto min_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto max_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(1, - make_empty_column(type_id::FLOAT64), - make_empty_column(type_id::FLOAT64), + return make_tdigest_column(num_rows, + cudf::make_empty_column(type_id::FLOAT64), + cudf::make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -338,7 +339,7 @@ std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_empty_tdigest_column(stream, mr)->release(); + auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..d591fb5c171 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param has_nulls Whether or not the input contains nulls - * + * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be + * set to false when there is no empty cluster, true otherwise. */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool has_nulls) + bool may_have_empty_clusters) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // if the input contains nulls we can potentially have a group that generates no - // clusters because -all- of the input values are null. in that case, the reduce_by_key call - // in the tdigest generation step will need a location to store the unused reduction value for - // that group of nulls. these "stubs" will be postprocessed out afterwards. - if (has_nulls) { group_num_clusters[group_index] = 1; } + // If the input contains empty clusters, we can potentially have a group that also generates + // empty clusters because -all- of the input values are null or empty cluster. In that case, the + // `reduce_by_key` call in the tdigest generation step will need a location to store the unused + // reduction value for that group of nulls and empty clusters. These "stubs" will be + // postprocessed out afterwards. + if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } return; } @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param has_nulls Whether or not the input data contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - has_nulls); + may_have_empty_clusters); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - has_nulls); + may_have_empty_clusters); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -580,7 +582,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -595,7 +597,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!has_nulls) { return 0; } + if (!may_have_empty_clusters) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -661,6 +663,10 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } +/** + * @brief A functor which returns the cluster index within a group that the value at + * the given value index falls into. + */ template struct compute_tdigests_keys_fn { int const delta; @@ -706,8 +712,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param values_begin Beginning of the range of input values. - * @param values_end End of the range of input values. + * @param centroids_begin Beginning of the range of centroids. + * @param centroids_end End of the range of centroids. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -715,7 +721,8 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param has_nulls Whether or not the input contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -731,7 +738,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -750,7 +757,9 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (total_clusters == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -793,7 +802,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - has_nulls, + may_have_empty_clusters, stream, mr); } @@ -1145,8 +1154,13 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto merged_weights = merged->get_column(1).view(); + // If there are no values, we can simply return a column that has only empty tdigests. + if (merged_weights.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); + } + // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1161,6 +1175,10 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; + // We do not know whether there is any empty cluster in the input without actually reading the + // data, which could be expensive. So, we just assume that there could be empty clusters. + auto const may_have_empty_clusters = true; + // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1177,7 +1195,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - false, + may_have_empty_clusters, stream, mr); @@ -1202,7 +1220,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - false, + may_have_empty_clusters, stream, mr); } @@ -1267,7 +1285,9 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (col.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1293,7 +1313,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index baa59026b07..3780dbb1d95 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,3 +507,81 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } + +std::unique_ptr do_agg( + cudf::column_view key, + cudf::column_view val, + std::function()> make_agg) +{ + std::vector keys; + keys.push_back(key); + cudf::table_view const key_table(keys); + + cudf::groupby::groupby gb(key_table); + std::vector requests; + cudf::groupby::aggregation_request req; + req.values = val; + req.aggregations.push_back(make_agg()); + requests.push_back(std::move(req)); + + auto result = gb.aggregate(std::move(requests)); + + std::vector> result_columns; + for (auto&& c : result.first->release()) { + result_columns.push_back(std::move(c)); + } + + EXPECT_EQ(result.second.size(), 1); + EXPECT_EQ(result.second[0].results.size(), 1); + result_columns.push_back(std::move(result.second[0].results[0])); + + return std::make_unique(std::move(result_columns)); +} + +TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) +{ + // The input must be sorted by the key. + // See `aggregate_result_functor::operator()` for details. + auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; + auto const keys_view = cudf::column_view(keys); + auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); + auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + // All values are null + return false; + }); + auto const vals = cudf::test::fixed_width_column_wrapper{ + val_elems, val_elems + keys_view.size(), val_valids}; + + auto const delta = 10000; + + // Compute tdigest. The result should have 3 empty clusters, one per group. + auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { + return cudf::make_tdigest_aggregation(delta); + }); + + auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_computed_keys_view{expected_computed_keys}; + auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_computed_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); + // The computed values are nullable even though the input values are not. + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), + compute_result->get_column(1).view()); + + // Merge tdigest. The result should have 3 empty clusters, one per group. + auto const merge_result = + do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { + return cudf::make_merge_tdigest_aggregation(delta); + }); + + auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_merged_keys_view{expected_merged_keys}; + auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_merged_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); +} diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 915717713df..7359f0406fc 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input; From c3d323df1df5ba4c5377374b5b4ffdc06829c02b Mon Sep 17 00:00:00 2001 From: Jayjeet Chakraborty Date: Tue, 10 Sep 2024 17:14:28 -0700 Subject: [PATCH 2/4] Move NDS-H examples into benchmarks (#16663) Moving the TPC-H examples into benchmarks by converting each of them into NVBench's. The benchmarks can be built by ```bash ./build.sh libcudf benchmarks ``` Also, addresses #16711 Authors: - Jayjeet Chakraborty (https://github.com/JayjeetAtGithub) Approvers: - Mark Harris (https://github.com/harrism) URL: https://github.com/rapidsai/cudf/pull/16663 --- .gitignore | 1 - cpp/benchmarks/CMakeLists.txt | 26 +- .../ndsh_data_generator.cpp} | 66 +-- .../ndsh_data_generator.hpp} | 0 .../random_column_generator.cu | 0 .../random_column_generator.hpp | 0 .../table_helpers.cpp | 0 .../table_helpers.hpp | 0 cpp/benchmarks/ndsh/README.md | 11 + .../tpch/q1.cpp => benchmarks/ndsh/q01.cpp} | 49 +- .../tpch/q5.cpp => benchmarks/ndsh/q05.cpp} | 56 ++- .../tpch/q6.cpp => benchmarks/ndsh/q06.cpp} | 52 +- .../tpch/q9.cpp => benchmarks/ndsh/q09.cpp} | 62 +-- .../tpch => benchmarks/ndsh}/q10.cpp | 51 +- cpp/benchmarks/ndsh/utilities.cpp | 400 +++++++++++++++ cpp/benchmarks/ndsh/utilities.hpp | 227 +++++++++ cpp/examples/build.sh | 1 - cpp/examples/tpch/CMakeLists.txt | 36 -- cpp/examples/tpch/README.md | 39 -- .../tpch/datagen/correct_datatypes.py | 60 --- cpp/examples/tpch/datagen/datagen.sh | 31 -- cpp/examples/tpch/datagen/tpch.patch | 33 -- cpp/examples/tpch/utils.hpp | 458 ------------------ 23 files changed, 846 insertions(+), 813 deletions(-) rename cpp/benchmarks/common/{tpch_data_generator/tpch_data_generator.cpp => ndsh_data_generator/ndsh_data_generator.cpp} (97%) rename cpp/benchmarks/common/{tpch_data_generator/tpch_data_generator.hpp => ndsh_data_generator/ndsh_data_generator.hpp} (100%) rename cpp/benchmarks/common/{tpch_data_generator => ndsh_data_generator}/random_column_generator.cu (100%) rename cpp/benchmarks/common/{tpch_data_generator => ndsh_data_generator}/random_column_generator.hpp (100%) rename cpp/benchmarks/common/{tpch_data_generator => ndsh_data_generator}/table_helpers.cpp (100%) rename cpp/benchmarks/common/{tpch_data_generator => ndsh_data_generator}/table_helpers.hpp (100%) create mode 100644 cpp/benchmarks/ndsh/README.md rename cpp/{examples/tpch/q1.cpp => benchmarks/ndsh/q01.cpp} (82%) rename cpp/{examples/tpch/q5.cpp => benchmarks/ndsh/q05.cpp} (80%) rename cpp/{examples/tpch/q6.cpp => benchmarks/ndsh/q06.cpp} (79%) rename cpp/{examples/tpch/q9.cpp => benchmarks/ndsh/q09.cpp} (78%) rename cpp/{examples/tpch => benchmarks/ndsh}/q10.cpp (81%) create mode 100644 cpp/benchmarks/ndsh/utilities.cpp create mode 100644 cpp/benchmarks/ndsh/utilities.hpp delete mode 100644 cpp/examples/tpch/CMakeLists.txt delete mode 100644 cpp/examples/tpch/README.md delete mode 100644 cpp/examples/tpch/datagen/correct_datatypes.py delete mode 100755 cpp/examples/tpch/datagen/datagen.sh delete mode 100644 cpp/examples/tpch/datagen/tpch.patch delete mode 100644 cpp/examples/tpch/utils.hpp diff --git a/.gitignore b/.gitignore index 619e1464b2a..180a6a286e2 100644 --- a/.gitignore +++ b/.gitignore @@ -80,7 +80,6 @@ build/ cpp/build/ cpp/examples/*/install/ cpp/examples/*/build/ -cpp/examples/tpch/datagen/datafusion cpp/include/cudf/ipc_generated/*.h cpp/thirdparty/googletest/ diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index d2c22b788cb..3bf9d02b384 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -36,25 +36,25 @@ target_include_directories( ) add_library( - tpch_data_generator STATIC - common/tpch_data_generator/tpch_data_generator.cpp common/tpch_data_generator/table_helpers.cpp - common/tpch_data_generator/random_column_generator.cu + ndsh_data_generator STATIC + common/ndsh_data_generator/ndsh_data_generator.cpp common/ndsh_data_generator/table_helpers.cpp + common/ndsh_data_generator/random_column_generator.cu ) -target_compile_features(tpch_data_generator PUBLIC cxx_std_17 cuda_std_17) +target_compile_features(ndsh_data_generator PUBLIC cxx_std_17 cuda_std_17) target_compile_options( - tpch_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" + ndsh_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" "$<$:${CUDF_CUDA_FLAGS}>" ) target_link_libraries( - tpch_data_generator + ndsh_data_generator PUBLIC cudf cudftestutil nvtx3::nvtx3-cpp PRIVATE $ ) target_include_directories( - tpch_data_generator + ndsh_data_generator PUBLIC "$" "$" "$" ) @@ -127,8 +127,8 @@ function(ConfigureNVBench CMAKE_BENCH_NAME) INSTALL_RPATH "\$ORIGIN/../../../lib" ) target_link_libraries( - ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common cudf_datagen nvbench::nvbench - $ + ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common ndsh_data_generator cudf_datagen + nvbench::nvbench $ ) install( TARGETS ${CMAKE_BENCH_NAME} @@ -175,6 +175,14 @@ ConfigureBench(COPY_IF_ELSE_BENCH copying/copy_if_else.cpp) # * transpose benchmark --------------------------------------------------------------------------- ConfigureBench(TRANSPOSE_BENCH transpose/transpose.cpp) +# ################################################################################################## +# * nds-h benchmark -------------------------------------------------------------------------------- +ConfigureNVBench(NDSH_Q1 ndsh/q01.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q5 ndsh/q05.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q6 ndsh/q06.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q9 ndsh/q09.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q10 ndsh/q10.cpp ndsh/utilities.cpp) + # ################################################################################################## # * stream_compaction benchmark ------------------------------------------------------------------- ConfigureNVBench( diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp similarity index 97% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp index 236fe8095ad..fa7edd225ba 100644 --- a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp +++ b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "tpch_data_generator.hpp" +#include "ndsh_data_generator.hpp" #include "random_column_generator.hpp" #include "table_helpers.hpp" @@ -435,46 +435,37 @@ std::unique_ptr generate_lineitem_partial(cudf::table_view const& o columns.push_back(std::move(l_quantity)); columns.push_back(std::move(l_discount)); columns.push_back(std::move(l_tax)); + columns.push_back(std::move(l_returnflag)); + columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipdate_ts)); columns.push_back(std::move(l_commitdate_ts)); columns.push_back(std::move(l_receiptdate_ts)); - columns.push_back(std::move(l_returnflag)); - columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipinstruct)); columns.push_back(std::move(l_shipmode)); columns.push_back(std::move(l_comment)); return std::make_unique(std::move(columns)); } -std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem, +/** + * @brief Generate the part of the `orders` table dependent on the `lineitem` table + * + * @param lineitem_partial The partially generated `lineitem` table + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + */ +std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem_partial, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto const l_linestatus_mask = lineitem.column(0); - auto const l_orderkey = lineitem.column(1); - auto const l_discount = lineitem.column(6); - auto const l_tax = lineitem.column(7); - auto const l_extendedprice = lineitem.column(16); + auto const l_linestatus_mask = lineitem_partial.column(0); + auto const l_orderkey = lineitem_partial.column(1); + auto const l_extendedprice = lineitem_partial.column(6); + auto const l_discount = lineitem_partial.column(7); + auto const l_tax = lineitem_partial.column(8); std::vector> orders_dependent_columns; - // Generate the `o_totalprice` column - // We calculate the `charge` column, which is a function of `l_extendedprice`, - // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` - auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); - auto o_totalprice = [&]() { - auto const keys = cudf::table_view({l_orderkey}); - cudf::groupby::groupby gb(keys); - std::vector requests; - requests.push_back(cudf::groupby::aggregation_request()); - requests[0].aggregations.push_back(cudf::make_sum_aggregation()); - requests[0].values = l_charge->view(); - auto agg_result = gb.aggregate(requests); - return cudf::round(agg_result.second[0].results[0]->view(), 2); - }(); - orders_dependent_columns.push_back(std::move(o_totalprice)); - // Generate the `o_orderstatus` column auto o_orderstatus = [&]() { auto const keys = cudf::table_view({l_orderkey}); @@ -529,6 +520,22 @@ std::unique_ptr generate_orders_dependent(cudf::table_view const& l cudf::string_scalar("P"), o_orderstatus_intermediate->view(), mask_b->view()); }(); orders_dependent_columns.push_back(std::move(o_orderstatus)); + + // Generate the `o_totalprice` column + // We calculate the `charge` column, which is a function of `l_extendedprice`, + // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` + auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); + auto o_totalprice = [&]() { + auto const keys = cudf::table_view({l_orderkey}); + cudf::groupby::groupby gb(keys); + std::vector requests; + requests.push_back(cudf::groupby::aggregation_request()); + requests[0].aggregations.push_back(cudf::make_sum_aggregation()); + requests[0].values = l_charge->view(); + auto agg_result = gb.aggregate(requests); + return cudf::round(agg_result.second[0].results[0]->view(), 2); + }(); + orders_dependent_columns.push_back(std::move(o_totalprice)); return std::make_unique(std::move(orders_dependent_columns)); } @@ -730,9 +737,7 @@ generate_orders_lineitem_part(double scale_factor, // Generate the `part` table auto part = generate_part(scale_factor, stream, mr); - // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column, - // add the column to the `lineitem` table, and write the `lineitem` table to a parquet file - + // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column auto l_extendedprice = [&]() { auto const left = cudf::table_view( {lineitem_partial->get_column(2).view(), lineitem_partial->get_column(5).view()}); @@ -752,8 +757,9 @@ generate_orders_lineitem_part(double scale_factor, return cudf::round(col->view(), 2); }(); + // Insert the `l_extendedprice` column into the partial columns of the `lineitem` table auto lineitem_partial_columns = lineitem_partial->release(); - lineitem_partial_columns.push_back(std::move(l_extendedprice)); + lineitem_partial_columns.insert(lineitem_partial_columns.begin() + 6, std::move(l_extendedprice)); auto lineitem_temp = std::make_unique(std::move(lineitem_partial_columns)); // Generate the dependent columns of the `orders` table @@ -762,7 +768,7 @@ generate_orders_lineitem_part(double scale_factor, auto orders_independent_columns = orders_independent->release(); auto orders_dependent_columns = orders_dependent->release(); - orders_independent_columns.insert(orders_independent_columns.end(), + orders_independent_columns.insert(orders_independent_columns.begin() + 2, std::make_move_iterator(orders_dependent_columns.begin()), std::make_move_iterator(orders_dependent_columns.end())); diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp diff --git a/cpp/benchmarks/ndsh/README.md b/cpp/benchmarks/ndsh/README.md new file mode 100644 index 00000000000..0a462e1684e --- /dev/null +++ b/cpp/benchmarks/ndsh/README.md @@ -0,0 +1,11 @@ +# NDS-H Benchmarks for `libcudf` + +## Disclaimer + +NDS-H is derived from the TPC-H Benchmarks and as such any results obtained using NDS-H are not +comparable to published TPC-H Benchmark results, as the results obtained from using NDS-H do not +comply with the TPC-H Benchmarks. + +## Current Status + +For now, only Q1, Q5, Q6, Q9, and Q10 have been implemented diff --git a/cpp/examples/tpch/q1.cpp b/cpp/benchmarks/ndsh/q01.cpp similarity index 82% rename from cpp/examples/tpch/q1.cpp rename to cpp/benchmarks/ndsh/q01.cpp index 87b7e613766..ef709926ae9 100644 --- a/cpp/examples/tpch/q1.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q1.cpp - * @brief Implement query 1 of the TPC-H benchmark. + * @file q01.cpp + * @brief Implement query 1 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -59,7 +61,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_disc_price( +[[nodiscard]] std::unique_ptr calculate_disc_price( cudf::column_view const& discount, cudf::column_view const& extendedprice, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,7 +88,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_charge( +[[nodiscard]] std::unique_ptr calculate_charge( cudf::column_view const& tax, cudf::column_view const& disc_price, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -101,16 +103,9 @@ return charge; } -int main(int argc, char const** argv) +void run_ndsh_q1(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projections and filter predicate for `lineitem` table std::vector const lineitem_cols = {"l_returnflag", "l_linestatus", @@ -130,12 +125,12 @@ int main(int argc, char const** argv) // Read out the `lineitem` table from parquet file auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = - calc_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); - auto charge = calc_charge(lineitem->column("l_tax"), disc_price->view()); + calculate_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); + auto charge = calculate_charge(lineitem->column("l_tax"), disc_price->view()); (*lineitem).append(disc_price, "disc_price").append(charge, "charge"); // Perform the group by operation @@ -167,9 +162,21 @@ int main(int argc, char const** argv) {"l_returnflag", "l_linestatus"}, {cudf::order::ASCENDING, cudf::order::ASCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q1.parquet"); - return 0; } + +void ndsh_q1(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q1(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q1).set_name("ndsh_q1").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q5.cpp b/cpp/benchmarks/ndsh/q05.cpp similarity index 80% rename from cpp/examples/tpch/q5.cpp rename to cpp/benchmarks/ndsh/q05.cpp index 12c186db10e..522bc4789c2 100644 --- a/cpp/examples/tpch/q5.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q5.cpp - * @brief Implement query 5 of the TPC-H benchmark. + * @file q05.cpp + * @brief Implement query 5 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -67,7 +69,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,16 +88,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q5(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -125,17 +120,17 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = - read_parquet(args.dataset_dir + "/customer.parquet", {"c_custkey", "c_nationkey"}); + read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); - auto const lineitem = read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(sources["lineitem"].make_source_info(), {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); auto const nation = - read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_regionkey", "n_name"}); + read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); auto const region = - read_parquet(args.dataset_dir + "/region.parquet", region_cols, std::move(region_pred)); + read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred)); // Perform the joins auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); @@ -147,7 +142,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -162,9 +157,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q5.parquet"); - return 0; } + +void ndsh_q5(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q5(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q5).set_name("ndsh_q5").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q6.cpp b/cpp/benchmarks/ndsh/q06.cpp similarity index 79% rename from cpp/examples/tpch/q6.cpp rename to cpp/benchmarks/ndsh/q06.cpp index 92dac40c768..04078547973 100644 --- a/cpp/examples/tpch/q6.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -14,17 +14,20 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include +#include #include +#include + /** - * @file q6.cpp - * @brief Implement query 6 of the TPC-H benchmark. + * @file q06.cpp + * @brief Implement query 6 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -48,7 +51,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -60,16 +63,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q6(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the `lineitem` table from parquet file std::vector const lineitem_cols = { "l_extendedprice", "l_discount", "l_shipdate", "l_quantity"}; @@ -88,7 +84,7 @@ int main(int argc, char const** argv) auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = @@ -99,8 +95,8 @@ int main(int argc, char const** argv) (*lineitem).append(discout_float, "l_discount_float").append(quantity_float, "l_quantity_float"); // Apply the filters - auto const discount_ref = cudf::ast::column_reference(lineitem->col_id("l_discount_float")); - auto const quantity_ref = cudf::ast::column_reference(lineitem->col_id("l_quantity_float")); + auto const discount_ref = cudf::ast::column_reference(lineitem->column_id("l_discount_float")); + auto const quantity_ref = cudf::ast::column_reference(lineitem->column_id("l_quantity_float")); auto discount_lower = cudf::numeric_scalar(0.05); auto const discount_lower_literal = cudf::ast::literal(discount_lower); @@ -123,16 +119,28 @@ int main(int argc, char const** argv) auto const filtered_table = apply_filter(lineitem, discount_quantity_pred); // Calculate the `revenue` column - auto revenue = - calc_revenue(filtered_table->column("l_extendedprice"), filtered_table->column("l_discount")); + auto revenue = calculate_revenue(filtered_table->column("l_extendedprice"), + filtered_table->column("l_discount")); // Sum the `revenue` column auto const revenue_view = revenue->view(); auto const result_table = apply_reduction(revenue_view, cudf::aggregation::Kind::SUM, "revenue"); - timer.print_elapsed_millis(); - // Write query result to a parquet file result_table->to_parquet("q6.parquet"); - return 0; } + +void ndsh_q6(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q6(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q6).set_name("ndsh_q6").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q9.cpp b/cpp/benchmarks/ndsh/q09.cpp similarity index 78% rename from cpp/examples/tpch/q9.cpp rename to cpp/benchmarks/ndsh/q09.cpp index 2882182aa2b..59218ab8912 100644 --- a/cpp/examples/tpch/q9.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -14,9 +14,9 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" +#include #include #include #include @@ -24,9 +24,11 @@ #include #include +#include + /** - * @file q9.cpp - * @brief Implement query 9 of the TPC-H benchmark. + * @file q09.cpp + * @brief Implement query 9 of the NDS-H benchmark. * * create view part as select * from '/tables/scale-1/part.parquet'; * create view supplier as select * from '/tables/scale-1/supplier.parquet'; @@ -79,7 +81,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_amount( +[[nodiscard]] std::unique_ptr calculate_amount( cudf::column_view const& discount, cudf::column_view const& extendedprice, cudf::column_view const& supplycost, @@ -109,28 +111,21 @@ return amount; } -int main(int argc, char const** argv) +void run_ndsh_q9(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the table from parquet files auto const lineitem = read_parquet( - args.dataset_dir + "/lineitem.parquet", + sources["lineitem"].make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_name"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(args.dataset_dir + "/part.parquet", {"p_partkey", "p_name"}); - auto const partsupp = read_parquet(args.dataset_dir + "/partsupp.parquet", + read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(sources["partsupp"].make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); // Generating the `profit` table // Filter the part table using `p_name like '%green%'` @@ -150,10 +145,10 @@ int main(int argc, char const** argv) // Calculate the `nation`, `o_year`, and `amount` columns auto n_name = std::make_unique(joined_table->column("n_name")); auto o_year = cudf::datetime::extract_year(joined_table->column("o_orderdate")); - auto amount = calc_amount(joined_table->column("l_discount"), - joined_table->column("l_extendedprice"), - joined_table->column("ps_supplycost"), - joined_table->column("l_quantity")); + auto amount = calculate_amount(joined_table->column("l_discount"), + joined_table->column("l_extendedprice"), + joined_table->column("ps_supplycost"), + joined_table->column("l_quantity")); // Put together the `profit` table std::vector> profit_columns; @@ -175,9 +170,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby( groupedby_table, {"nation", "o_year"}, {cudf::order::ASCENDING, cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q9.parquet"); - return 0; } + +void ndsh_q9(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q9(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q9).set_name("ndsh_q9").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp similarity index 81% rename from cpp/examples/tpch/q10.cpp rename to cpp/benchmarks/ndsh/q10.cpp index fdf147b50e0..a520480020a 100644 --- a/cpp/examples/tpch/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** * @file q10.cpp - * @brief Implement query 10 of the TPC-H benchmark. + * @brief Implement query 10 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -72,7 +74,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -90,16 +92,10 @@ mr); return revenue; } -int main(int argc, char const** argv) -{ - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; +void run_ndsh_q10(nvbench::state& state, + std::unordered_map& sources) +{ // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -126,15 +122,15 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = read_parquet( - args.dataset_dir + "/customer.parquet", + sources["customer"].make_source_info(), {"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); auto const lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["lineitem"].make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_name", "n_nationkey"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); @@ -143,7 +139,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -159,9 +155,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q10.parquet"); - return 0; } + +void ndsh_q10(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q10(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q10).set_name("ndsh_q10").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp new file mode 100644 index 00000000000..2d514764fc2 --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utilities.hpp" + +#include "common/ndsh_data_generator/ndsh_data_generator.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace { + +std::vector const ORDERS_SCHEMA = {"o_orderkey", + "o_custkey", + "o_orderstatus", + "o_totalprice", + "o_orderdate", + "o_orderpriority", + "o_clerk", + "o_shippriority", + "o_comment"}; +std::vector const LINEITEM_SCHEMA = {"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment"}; +std::vector const PART_SCHEMA = {"p_partkey", + "p_name", + "p_mfgr", + "p_brand", + "p_type", + "p_size", + "p_container", + "p_retailprice", + "p_comment"}; +std::vector const PARTSUPP_SCHEMA = { + "ps_partkey", "ps_suppkey", "ps_availqty", "ps_supplycost", "ps_comment"}; +std::vector const SUPPLIER_SCHEMA = { + "s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment"}; +std::vector const CUSTOMER_SCHEMA = {"c_custkey", + "c_name", + "c_address", + "c_nationkey", + "c_phone", + "c_acctbal", + "c_mktsegment", + "c_comment"}; +std::vector const NATION_SCHEMA = { + "n_nationkey", "n_name", "n_regionkey", "n_comment"}; +std::vector const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"}; + +} // namespace + +cudf::table_view table_with_names::table() const { return tbl->view(); } + +cudf::column_view table_with_names::column(std::string const& col_name) const +{ + return tbl->view().column(column_id(col_name)); +} + +std::vector const& table_with_names::column_names() const { return col_names; } + +cudf::size_type table_with_names::column_id(std::string const& col_name) const +{ + auto it = std::find(col_names.begin(), col_names.end(), col_name); + if (it == col_names.end()) { + std::string err_msg = "Column `" + col_name + "` not found"; + throw std::runtime_error(err_msg); + } + return std::distance(col_names.begin(), it); +} + +table_with_names& table_with_names::append(std::unique_ptr& col, + std::string const& col_name) +{ + auto cols = tbl->release(); + cols.push_back(std::move(col)); + tbl = std::make_unique(std::move(cols)); + col_names.push_back(col_name); + return (*this); +} + +cudf::table_view table_with_names::select(std::vector const& col_names) const +{ + CUDF_FUNC_RANGE(); + std::vector col_indices; + for (auto const& col_name : col_names) { + col_indices.push_back(column_id(col_name)); + } + return tbl->select(col_indices); +} + +void table_with_names::to_parquet(std::string const& filepath) const +{ + CUDF_FUNC_RANGE(); + auto const sink_info = cudf::io::sink_info(filepath); + cudf::io::table_metadata metadata; + metadata.schema_info = + std::vector(col_names.begin(), col_names.end()); + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); +} + +std::unique_ptr join_and_gather(cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto const left_selected = left_input.select(left_on); + auto const right_selected = right_input.select(right_on); + auto const [left_join_indices, right_join_indices] = cudf::inner_join( + left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); + + auto const left_indices_span = cudf::device_span{*left_join_indices}; + auto const right_indices_span = cudf::device_span{*right_join_indices}; + + auto const left_indices_col = cudf::column_view{left_indices_span}; + auto const right_indices_col = cudf::column_view{right_indices_span}; + + auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); + auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); + + auto joined_cols = left_result->release(); + auto right_cols = right_result->release(); + joined_cols.insert(joined_cols.end(), + std::make_move_iterator(right_cols.begin()), + std::make_move_iterator(right_cols.end())); + return std::make_unique(std::move(joined_cols)); +} + +std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + std::vector left_on_indices; + std::vector right_on_indices; + std::transform( + left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { + return left_input->column_id(col_name); + }); + std::transform(right_on.begin(), + right_on.end(), + std::back_inserter(right_on_indices), + [&](auto const& col_name) { return right_input->column_id(col_name); }); + auto table = join_and_gather( + left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); + ; + std::vector merged_column_names; + merged_column_names.reserve(left_input->column_names().size() + + right_input->column_names().size()); + std::copy(left_input->column_names().begin(), + left_input->column_names().end(), + std::back_inserter(merged_column_names)); + std::copy(right_input->column_names().begin(), + right_input->column_names().end(), + std::back_inserter(merged_column_names)); + return std::make_unique(std::move(table), merged_column_names); + return std::make_unique(std::move(table), merged_column_names); +} + +std::unique_ptr apply_filter(std::unique_ptr const& table, + cudf::ast::operation const& predicate) +{ + CUDF_FUNC_RANGE(); + auto const boolean_mask = cudf::compute_column(table->table(), predicate); + auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_mask(std::unique_ptr const& table, + std::unique_ptr const& mask) +{ + CUDF_FUNC_RANGE(); + auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_groupby(std::unique_ptr const& table, + groupby_context_t const& ctx) +{ + CUDF_FUNC_RANGE(); + auto const keys = table->select(ctx.keys); + cudf::groupby::groupby groupby_obj(keys); + std::vector result_column_names; + result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); + std::vector requests; + for (auto& [value_col, aggregations] : ctx.values) { + requests.emplace_back(cudf::groupby::aggregation_request()); + for (auto& agg : aggregations) { + if (agg.first == cudf::aggregation::Kind::SUM) { + requests.back().aggregations.push_back( + cudf::make_sum_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::MEAN) { + requests.back().aggregations.push_back( + cudf::make_mean_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { + requests.back().aggregations.push_back( + cudf::make_count_aggregation()); + } else { + throw std::runtime_error("Unsupported aggregation"); + } + result_column_names.push_back(agg.second); + } + requests.back().values = table->column(value_col); + } + auto agg_results = groupby_obj.aggregate(requests); + std::vector> result_columns; + for (auto i = 0; i < agg_results.first->num_columns(); i++) { + auto col = std::make_unique(agg_results.first->get_column(i)); + result_columns.push_back(std::move(col)); + } + for (size_t i = 0; i < agg_results.second.size(); i++) { + for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { + result_columns.push_back(std::move(agg_results.second[i].results[j])); + } + } + auto result_table = std::make_unique(std::move(result_columns)); + return std::make_unique(std::move(result_table), result_column_names); +} + +std::unique_ptr apply_orderby(std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders) +{ + CUDF_FUNC_RANGE(); + std::vector column_views; + for (auto& key : sort_keys) { + column_views.push_back(table->column(key)); + } + auto result_table = + cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_reduction(cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name) +{ + CUDF_FUNC_RANGE(); + auto const agg = cudf::make_sum_aggregation(); + auto const result = cudf::reduce(column, *agg, column.type()); + cudf::size_type const len = 1; + auto col = cudf::make_column_from_scalar(*result, len); + std::vector> columns; + columns.push_back(std::move(col)); + auto result_table = std::make_unique(std::move(columns)); + std::vector col_names = {col_name}; + return std::make_unique(std::move(result_table), col_names); +} + +std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns, + std::unique_ptr const& predicate) +{ + CUDF_FUNC_RANGE(); + auto builder = cudf::io::parquet_reader_options_builder(source_info); + if (!columns.empty()) { builder.columns(columns); } + if (predicate) { builder.filter(*predicate); } + auto const options = builder.build(); + auto table_with_metadata = cudf::io::read_parquet(options); + std::vector column_names; + for (auto const& col_info : table_with_metadata.metadata.schema_info) { + column_names.push_back(col_info.name); + } + return std::make_unique(std::move(table_with_metadata.tbl), column_names); +} + +std::tm make_tm(int year, int month, int day) +{ + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + return tm; +} + +int32_t days_since_epoch(int year, int month, int day) +{ + std::tm tm = make_tm(year, month, day); + std::tm epoch = make_tm(1970, 1, 1); + std::time_t time = std::mktime(&tm); + std::time_t epoch_time = std::mktime(&epoch); + double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); + return static_cast(diff); +} + +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source) +{ + CUDF_FUNC_RANGE(); + auto const stream = cudf::get_default_stream(); + + // Prepare the table metadata + cudf::io::table_metadata metadata; + std::vector col_name_infos; + for (auto& col_name : col_names) { + col_name_infos.push_back(cudf::io::column_name_info(col_name)); + } + metadata.schema_info = col_name_infos; + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + + // Declare a host and device buffer + std::vector h_buffer; + + // Write parquet data to host buffer + auto builder = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); + + // Copy host buffer to device buffer + source.d_buffer.resize(h_buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value())); +} + +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources) +{ + CUDF_FUNC_RANGE(); + std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { + sources[table_name] = parquet_device_buffer(); + }); + + auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto partsupp = cudf::datagen::generate_partsupp( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto supplier = cudf::datagen::generate_supplier( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto customer = cudf::datagen::generate_customer( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + auto region = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]); + write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]); + write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]); + write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]); + write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]); + write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); + write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); + write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); +} diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp new file mode 100644 index 00000000000..762e43deccf --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +/** + * @brief A class to represent a table with column names attached + */ +class table_with_names { + public: + table_with_names(std::unique_ptr tbl, std::vector col_names) + : tbl(std::move(tbl)), col_names(col_names){}; + /** + * @brief Return the table view + */ + [[nodiscard]] cudf::table_view table() const; + /** + * @brief Return the column view for a given column name + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::column_view column(std::string const& col_name) const; + /** + * @param Return the column names of the table + */ + [[nodiscard]] std::vector const& column_names() const; + /** + * @brief Translate a column name to a column index + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::size_type column_id(std::string const& col_name) const; + /** + * @brief Append a column to the table + * + * @param col The column to append + * @param col_name The name of the appended column + */ + table_with_names& append(std::unique_ptr& col, std::string const& col_name); + /** + * @brief Select a subset of columns from the table + * + * @param col_names The names of the columns to select + */ + [[nodiscard]] cudf::table_view select(std::vector const& col_names) const; + /** + * @brief Write the table to a parquet file + * + * @param filepath The path to the parquet file + */ + void to_parquet(std::string const& filepath) const; + + private: + std::unique_ptr tbl; + std::vector col_names; +}; + +/** + * @brief Inner join two tables and gather the result + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr join_and_gather( + cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls); + +/** + * @brief Apply an inner join operation to two tables + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls = cudf::null_equality::EQUAL); + +/** + * @brief Apply a filter predicate to a table + * + * @param table The input table + * @param predicate The filter predicate + */ +[[nodiscard]] std::unique_ptr apply_filter( + std::unique_ptr const& table, cudf::ast::operation const& predicate); + +/** + * @brief Apply a boolean mask to a table + * + * @param table The input table + * @param mask The boolean mask + */ +[[nodiscard]] std::unique_ptr apply_mask( + std::unique_ptr const& table, std::unique_ptr const& mask); + +/** + * Struct representing group by key columns, value columns, and the type of aggregations to perform + * on the value columns + */ +struct groupby_context_t { + std::vector keys; + std::unordered_map>> + values; +}; + +/** + * @brief Apply a groupby operation to a table + * + * @param table The input table + * @param ctx The groupby context + */ +[[nodiscard]] std::unique_ptr apply_groupby( + std::unique_ptr const& table, groupby_context_t const& ctx); + +/** + * @brief Apply an order by operation to a table + * + * @param table The input table + * @param sort_keys The sort keys + * @param sort_key_orders The sort key orders + */ +[[nodiscard]] std::unique_ptr apply_orderby( + std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders); + +/** + * @brief Apply a reduction operation to a column + * + * @param column The input column + * @param agg_kind The aggregation kind + * @param col_name The name of the output column + */ +[[nodiscard]] std::unique_ptr apply_reduction( + cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name); + +/** + * @brief Read a parquet file into a table + * + * @param source_info The source of the parquet file + * @param columns The columns to read + * @param predicate The filter predicate to pushdown + */ +[[nodiscard]] std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns = {}, + std::unique_ptr const& predicate = nullptr); + +/** + * @brief Generate the `std::tm` structure from year, month, and day + * + * @param year The year + * @param month The month + * @param day The day + */ +std::tm make_tm(int year, int month, int day); + +/** + * @brief Calculate the number of days since the UNIX epoch + * + * @param year The year + * @param month The month + * @param day The day + */ +int32_t days_since_epoch(int year, int month, int day); + +/** + * @brief Struct representing a parquet device buffer + */ +struct parquet_device_buffer { + parquet_device_buffer() : d_buffer{0, cudf::get_default_stream()} {}; + cudf::io::source_info make_source_info() { return cudf::io::source_info(d_buffer); } + rmm::device_uvector d_buffer; +}; + +/** + * @brief Write a `cudf::table` to a parquet device buffer + * + * @param table The `cudf::table` to write + * @param col_names The column names of the table + * @param parquet_device_buffer The parquet device buffer to write the table to + */ +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source); + +/** + * @brief Generate NDS-H tables and write to parquet device buffers + * + * @param scale_factor The scale factor of NDS-H tables to generate + * @param table_names The names of the tables to generate + * @param sources The parquet data sources to populate + */ +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources); diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 8e8d8bd0b78..25984df1b60 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -57,7 +57,6 @@ build_example() { } build_example basic -build_example tpch build_example strings build_example nested_types build_example parquet_io diff --git a/cpp/examples/tpch/CMakeLists.txt b/cpp/examples/tpch/CMakeLists.txt deleted file mode 100644 index 373a6d72d56..00000000000 --- a/cpp/examples/tpch/CMakeLists.txt +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -cmake_minimum_required(VERSION 3.26.4) - -include(../set_cuda_architecture.cmake) - -rapids_cuda_init_architectures(tpch_example) -rapids_cuda_set_architectures(RAPIDS) - -project( - tpch_example - VERSION 0.0.1 - LANGUAGES CXX CUDA -) - -include(../fetch_dependencies.cmake) - -add_executable(tpch_q1 q1.cpp) -target_link_libraries(tpch_q1 PRIVATE cudf::cudf) -target_compile_features(tpch_q1 PRIVATE cxx_std_17) - -add_executable(tpch_q5 q5.cpp) -target_link_libraries(tpch_q5 PRIVATE cudf::cudf) -target_compile_features(tpch_q5 PRIVATE cxx_std_17) - -add_executable(tpch_q6 q6.cpp) -target_link_libraries(tpch_q6 PRIVATE cudf::cudf) -target_compile_features(tpch_q6 PRIVATE cxx_std_17) - -add_executable(tpch_q9 q9.cpp) -target_link_libraries(tpch_q9 PRIVATE cudf::cudf) -target_compile_features(tpch_q9 PRIVATE cxx_std_17) - -add_executable(tpch_q10 q10.cpp) -target_link_libraries(tpch_q10 PRIVATE cudf::cudf) -target_compile_features(tpch_q10 PRIVATE cxx_std_17) diff --git a/cpp/examples/tpch/README.md b/cpp/examples/tpch/README.md deleted file mode 100644 index 8c046c3f1e8..00000000000 --- a/cpp/examples/tpch/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# TPC-H Derived Examples - -Implements TPC-H queries using `libcudf`. We leverage the data generator (wrapper around official TPC-H datagen) from [Apache Datafusion](https://github.com/apache/datafusion) for generating data in Parquet format. - -## Requirements - -- Rust -- [libcudf](https://github.com/rapidsai/cudf/blob/branch-24.08/CONTRIBUTING.md#setting-up-your-build-environment) - -## Running Queries - -1. Build the `libcudf` examples. -```bash -cd cudf/cpp/examples -./build.sh -``` -The TPC-H query binaries would be built inside `tpch/build`. - -2. Generate the dataset. -```bash -cd tpch/datagen -./datagen.sh [scale factor (1/10)] -``` - -The parquet files will be generated in `tpch/datagen/datafusion/benchmarks/data/tpch_sf[scale factor]`. - -3. Set these environment variables for optimized runtimes. -```bash -export KVIKIO_COMPAT_MODE="on" -export LIBCUDF_CUFILE_POLICY="KVIKIO" -export CUDA_MODULE_LOADING="EAGER" -``` - -4. Execute the queries. -```bash -./tpch/build/tpch_q[query no] [path to dataset] [memory resource type (cuda/pool/managed/managed_pool)] -``` - -A parquet file named `q[query no].parquet` would be generated containing the results of the query. diff --git a/cpp/examples/tpch/datagen/correct_datatypes.py b/cpp/examples/tpch/datagen/correct_datatypes.py deleted file mode 100644 index 8564774647b..00000000000 --- a/cpp/examples/tpch/datagen/correct_datatypes.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -import os -import sys - -import pyarrow as pa -import pyarrow.parquet as pq -import pandas as pd - -if __name__ == "__main__": - dataset_path = str(sys.argv[1]) - tables = ["lineitem", "part", "partsupp", "orders", "supplier", "customer", "nation", "region"] - for table in tables: - filepath = os.path.join(dataset_path, f"{table}.parquet") - print("Reading file ", filepath) - - if filepath.endswith("lineitem.parquet"): - df = pd.read_parquet(filepath) - df["l_linenumber"] = df["l_linenumber"].astype("int64") - df["l_quantity"] = df["l_quantity"].astype("int64") - df["l_extendedprice"] = df["l_extendedprice"].astype("float64") - df["l_discount"] = df["l_discount"].astype("float64") - df["l_tax"] = df["l_tax"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("part.parquet"): - df = pd.read_parquet(filepath) - df["p_size"] = df["p_size"].astype("int64") - df["p_retailprice"] = df["p_retailprice"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("partsupp.parquet"): - df = pd.read_parquet(filepath) - df["ps_availqty"] = df["ps_availqty"].astype("int64") - df["ps_supplycost"] = df["ps_supplycost"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("orders.parquet"): - df = pd.read_parquet(filepath) - df["o_totalprice"] = df["o_totalprice"].astype("float64") - df["o_shippriority"] = df["o_shippriority"].astype("int64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("supplier.parquet"): - df = pd.read_parquet(filepath) - df["s_acctbal"] = df["s_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("customer.parquet"): - df = pd.read_parquet(filepath) - df["c_acctbal"] = df["c_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("nation.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("region.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") diff --git a/cpp/examples/tpch/datagen/datagen.sh b/cpp/examples/tpch/datagen/datagen.sh deleted file mode 100755 index 0b03753daea..00000000000 --- a/cpp/examples/tpch/datagen/datagen.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -# Copyright (c) 2024, NVIDIA CORPORATION. - -set -e - -scale_factor=$1 -script_dir=$(pwd) - -# Clone the datafusion repository and apply a patch -# for single threaded data generation so that a -# single parquet file is generated for each table -rm -rf datafusion -git clone https://github.com/apache/datafusion.git datafusion -cd datafusion/ -git checkout 679a85f -git apply ${script_dir}/tpch.patch -cd benchmarks/ - -# Generate the data -# Currently, we support only scale factor 1 and 10 -if [ ${scale_factor} -eq 1 ]; then - ./bench.sh data tpch -elif [ ${scale_factor} -eq 10 ]; then - ./bench.sh data tpch10 -else - echo "Unsupported scale factor" - exit 1 -fi - -# Correct the datatypes of the parquet files -python3 ${script_dir}/correct_datatypes.py data/tpch_sf${scale_factor} diff --git a/cpp/examples/tpch/datagen/tpch.patch b/cpp/examples/tpch/datagen/tpch.patch deleted file mode 100644 index 42727aa9904..00000000000 --- a/cpp/examples/tpch/datagen/tpch.patch +++ /dev/null @@ -1,33 +0,0 @@ -diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh -index 3b854f6dc..f000f09c0 100755 ---- a/benchmarks/bench.sh -+++ b/benchmarks/bench.sh -@@ -311,6 +311,15 @@ data_tpch() { - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet - popd > /dev/null - fi -+ -+ cp ${TPCH_DIR}/lineitem/part-0.parquet ${TPCH_DIR}/lineitem.parquet -+ cp ${TPCH_DIR}/orders/part-0.parquet ${TPCH_DIR}/orders.parquet -+ cp ${TPCH_DIR}/part/part-0.parquet ${TPCH_DIR}/part.parquet -+ cp ${TPCH_DIR}/partsupp/part-0.parquet ${TPCH_DIR}/partsupp.parquet -+ cp ${TPCH_DIR}/customer/part-0.parquet ${TPCH_DIR}/customer.parquet -+ cp ${TPCH_DIR}/supplier/part-0.parquet ${TPCH_DIR}/supplier.parquet -+ cp ${TPCH_DIR}/nation/part-0.parquet ${TPCH_DIR}/nation.parquet -+ cp ${TPCH_DIR}/region/part-0.parquet ${TPCH_DIR}/region.parquet - } - - # Runs the tpch benchmark -diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs -index b5204b343..84fd2e78d 100644 ---- a/datafusion/common/src/config.rs -+++ b/datafusion/common/src/config.rs -@@ -250,7 +250,7 @@ config_namespace! { - /// concurrency. - /// - /// Defaults to the number of CPU cores on the system -- pub target_partitions: usize, default = num_cpus::get() -+ pub target_partitions: usize, default = 1 - - /// The default time zone - /// diff --git a/cpp/examples/tpch/utils.hpp b/cpp/examples/tpch/utils.hpp deleted file mode 100644 index 8102fa8f976..00000000000 --- a/cpp/examples/tpch/utils.hpp +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -// RMM memory resource creation utilities -inline auto make_cuda() { return std::make_shared(); } -inline auto make_pool() -{ - return rmm::mr::make_owning_wrapper( - make_cuda(), rmm::percent_of_free_device_memory(50)); -} -inline auto make_managed() { return std::make_shared(); } -inline auto make_managed_pool() -{ - return rmm::mr::make_owning_wrapper( - make_managed(), rmm::percent_of_free_device_memory(50)); -} -inline std::shared_ptr create_memory_resource( - std::string const& mode) -{ - if (mode == "cuda") return make_cuda(); - if (mode == "pool") return make_pool(); - if (mode == "managed") return make_managed(); - if (mode == "managed_pool") return make_managed_pool(); - CUDF_FAIL("Unknown rmm_mode parameter: " + mode + - "\nExpecting: cuda, pool, managed, or managed_pool"); -} - -/** - * @brief A class to represent a table with column names attached - */ -class table_with_names { - public: - table_with_names(std::unique_ptr tbl, std::vector col_names) - : tbl(std::move(tbl)), col_names(col_names) - { - } - /** - * @brief Return the table view - */ - [[nodiscard]] cudf::table_view table() const { return tbl->view(); } - /** - * @brief Return the column view for a given column name - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::column_view column(std::string const& col_name) const - { - return tbl->view().column(col_id(col_name)); - } - /** - * @param Return the column names of the table - */ - [[nodiscard]] std::vector column_names() const { return col_names; } - /** - * @brief Translate a column name to a column index - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::size_type col_id(std::string const& col_name) const - { - CUDF_FUNC_RANGE(); - auto it = std::find(col_names.begin(), col_names.end(), col_name); - if (it == col_names.end()) { throw std::runtime_error("Column not found"); } - return std::distance(col_names.begin(), it); - } - /** - * @brief Append a column to the table - * - * @param col The column to append - * @param col_name The name of the appended column - */ - table_with_names& append(std::unique_ptr& col, std::string const& col_name) - { - CUDF_FUNC_RANGE(); - auto cols = tbl->release(); - cols.push_back(std::move(col)); - tbl = std::make_unique(std::move(cols)); - col_names.push_back(col_name); - return (*this); - } - /** - * @brief Select a subset of columns from the table - * - * @param col_names The names of the columns to select - */ - [[nodiscard]] cudf::table_view select(std::vector const& col_names) const - { - CUDF_FUNC_RANGE(); - std::vector col_indices; - for (auto const& col_name : col_names) { - col_indices.push_back(col_id(col_name)); - } - return tbl->select(col_indices); - } - /** - * @brief Write the table to a parquet file - * - * @param filepath The path to the parquet file - */ - void to_parquet(std::string const& filepath) const - { - CUDF_FUNC_RANGE(); - auto const sink_info = cudf::io::sink_info(filepath); - cudf::io::table_metadata metadata; - metadata.schema_info = - std::vector(col_names.begin(), col_names.end()); - auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); - builder.metadata(table_input_metadata); - auto const options = builder.build(); - cudf::io::write_parquet(options); - } - - private: - std::unique_ptr tbl; - std::vector col_names; -}; - -/** - * @brief Concatenate two vectors - * - * @param lhs The left vector - * @param rhs The right vector - */ -template -std::vector concat(std::vector const& lhs, std::vector const& rhs) -{ - std::vector result; - result.reserve(lhs.size() + rhs.size()); - std::copy(lhs.begin(), lhs.end(), std::back_inserter(result)); - std::copy(rhs.begin(), rhs.end(), std::back_inserter(result)); - return result; -} - -/** - * @brief Inner join two tables and gather the result - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr join_and_gather( - cudf::table_view const& left_input, - cudf::table_view const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls) -{ - CUDF_FUNC_RANGE(); - constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; - auto const left_selected = left_input.select(left_on); - auto const right_selected = right_input.select(right_on); - auto const [left_join_indices, right_join_indices] = cudf::inner_join( - left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); - - auto const left_indices_span = cudf::device_span{*left_join_indices}; - auto const right_indices_span = cudf::device_span{*right_join_indices}; - - auto const left_indices_col = cudf::column_view{left_indices_span}; - auto const right_indices_col = cudf::column_view{right_indices_span}; - - auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); - auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); - - auto joined_cols = left_result->release(); - auto right_cols = right_result->release(); - joined_cols.insert(joined_cols.end(), - std::make_move_iterator(right_cols.begin()), - std::make_move_iterator(right_cols.end())); - return std::make_unique(std::move(joined_cols)); -} - -/** - * @brief Apply an inner join operation to two tables - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr apply_inner_join( - std::unique_ptr const& left_input, - std::unique_ptr const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls = cudf::null_equality::EQUAL) -{ - CUDF_FUNC_RANGE(); - std::vector left_on_indices; - std::vector right_on_indices; - std::transform( - left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { - return left_input->col_id(col_name); - }); - std::transform(right_on.begin(), - right_on.end(), - std::back_inserter(right_on_indices), - [&](auto const& col_name) { return right_input->col_id(col_name); }); - auto table = join_and_gather( - left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); - return std::make_unique( - std::move(table), concat(left_input->column_names(), right_input->column_names())); -} - -/** - * @brief Apply a filter predicated to a table - * - * @param table The input table - * @param predicate The filter predicate - */ -[[nodiscard]] std::unique_ptr apply_filter( - std::unique_ptr const& table, cudf::ast::operation const& predicate) -{ - CUDF_FUNC_RANGE(); - auto const boolean_mask = cudf::compute_column(table->table(), predicate); - auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a boolean mask to a table - * - * @param table The input table - * @param mask The boolean mask - */ -[[nodiscard]] std::unique_ptr apply_mask( - std::unique_ptr const& table, std::unique_ptr const& mask) -{ - CUDF_FUNC_RANGE(); - auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -struct groupby_context_t { - std::vector keys; - std::unordered_map>> - values; -}; - -/** - * @brief Apply a groupby operation to a table - * - * @param table The input table - * @param ctx The groupby context - */ -[[nodiscard]] std::unique_ptr apply_groupby( - std::unique_ptr const& table, groupby_context_t const& ctx) -{ - CUDF_FUNC_RANGE(); - auto const keys = table->select(ctx.keys); - cudf::groupby::groupby groupby_obj(keys); - std::vector result_column_names; - result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); - std::vector requests; - for (auto& [value_col, aggregations] : ctx.values) { - requests.emplace_back(cudf::groupby::aggregation_request()); - for (auto& agg : aggregations) { - if (agg.first == cudf::aggregation::Kind::SUM) { - requests.back().aggregations.push_back( - cudf::make_sum_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::MEAN) { - requests.back().aggregations.push_back( - cudf::make_mean_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { - requests.back().aggregations.push_back( - cudf::make_count_aggregation()); - } else { - throw std::runtime_error("Unsupported aggregation"); - } - result_column_names.push_back(agg.second); - } - requests.back().values = table->column(value_col); - } - auto agg_results = groupby_obj.aggregate(requests); - std::vector> result_columns; - for (size_t i = 0; i < agg_results.first->num_columns(); i++) { - auto col = std::make_unique(agg_results.first->get_column(i)); - result_columns.push_back(std::move(col)); - } - for (size_t i = 0; i < agg_results.second.size(); i++) { - for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { - result_columns.push_back(std::move(agg_results.second[i].results[j])); - } - } - auto result_table = std::make_unique(std::move(result_columns)); - return std::make_unique(std::move(result_table), result_column_names); -} - -/** - * @brief Apply an order by operation to a table - * - * @param table The input table - * @param sort_keys The sort keys - * @param sort_key_orders The sort key orders - */ -[[nodiscard]] std::unique_ptr apply_orderby( - std::unique_ptr const& table, - std::vector const& sort_keys, - std::vector const& sort_key_orders) -{ - CUDF_FUNC_RANGE(); - std::vector column_views; - for (auto& key : sort_keys) { - column_views.push_back(table->column(key)); - } - auto result_table = - cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a reduction operation to a column - * - * @param column The input column - * @param agg_kind The aggregation kind - * @param col_name The name of the output column - */ -[[nodiscard]] std::unique_ptr apply_reduction( - cudf::column_view const& column, - cudf::aggregation::Kind const& agg_kind, - std::string const& col_name) -{ - CUDF_FUNC_RANGE(); - auto const agg = cudf::make_sum_aggregation(); - auto const result = cudf::reduce(column, *agg, column.type()); - cudf::size_type const len = 1; - auto col = cudf::make_column_from_scalar(*result, len); - std::vector> columns; - columns.push_back(std::move(col)); - auto result_table = std::make_unique(std::move(columns)); - std::vector col_names = {col_name}; - return std::make_unique(std::move(result_table), col_names); -} - -/** - * @brief Read a parquet file into a table - * - * @param filename The path to the parquet file - * @param columns The columns to read - * @param predicate The filter predicate to pushdown - */ -[[nodiscard]] std::unique_ptr read_parquet( - std::string const& filename, - std::vector const& columns = {}, - std::unique_ptr const& predicate = nullptr) -{ - CUDF_FUNC_RANGE(); - auto const source = cudf::io::source_info(filename); - auto builder = cudf::io::parquet_reader_options_builder(source); - if (!columns.empty()) { builder.columns(columns); } - if (predicate) { builder.filter(*predicate); } - auto const options = builder.build(); - auto table_with_metadata = cudf::io::read_parquet(options); - std::vector column_names; - for (auto const& col_info : table_with_metadata.metadata.schema_info) { - column_names.push_back(col_info.name); - } - return std::make_unique(std::move(table_with_metadata.tbl), column_names); -} - -/** - * @brief Generate the `std::tm` structure from year, month, and day - * - * @param year The year - * @param month The month - * @param day The day - */ -std::tm make_tm(int year, int month, int day) -{ - std::tm tm{}; - tm.tm_year = year - 1900; - tm.tm_mon = month - 1; - tm.tm_mday = day; - return tm; -} - -/** - * @brief Calculate the number of days since the UNIX epoch - * - * @param year The year - * @param month The month - * @param day The day - */ -int32_t days_since_epoch(int year, int month, int day) -{ - std::tm tm = make_tm(year, month, day); - std::tm epoch = make_tm(1970, 1, 1); - std::time_t time = std::mktime(&tm); - std::time_t epoch_time = std::mktime(&epoch); - double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); - return static_cast(diff); -} - -struct tpch_example_args { - std::string dataset_dir; - std::string memory_resource_type; -}; - -/** - * @brief Parse command line arguments into a struct - * - * @param argc The number of command line arguments - * @param argv The command line arguments - */ -tpch_example_args parse_args(int argc, char const** argv) -{ - if (argc < 3) { - std::string usage_message = "Usage: " + std::string(argv[0]) + - " \n The query result will be " - "saved to a parquet file named q{query_no}.parquet in the current " - "working directory "; - throw std::runtime_error(usage_message); - } - tpch_example_args args; - args.dataset_dir = argv[1]; - args.memory_resource_type = argv[2]; - return args; -} From 4cdb1bf9cf7ad4f19b8abd034513172902d187a3 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Tue, 10 Sep 2024 20:20:29 -0400 Subject: [PATCH 3/4] [FEA] Add support for `cudf.NamedAgg` (#16744) Closes #15118 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/16744 --- python/cudf/cudf/__init__.py | 2 +- python/cudf/cudf/core/groupby/__init__.py | 5 ++- python/cudf/cudf/core/groupby/groupby.py | 46 ++++++++++++++++++++-- python/cudf/cudf/tests/groupby/test_agg.py | 16 ++++++++ 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/__init__.py b/python/cudf/cudf/__init__.py index d7da42a1708..99b759e2166 100644 --- a/python/cudf/cudf/__init__.py +++ b/python/cudf/cudf/__init__.py @@ -46,7 +46,7 @@ ListDtype, StructDtype, ) -from cudf.core.groupby import Grouper +from cudf.core.groupby import Grouper, NamedAgg from cudf.core.index import ( BaseIndex, CategoricalIndex, diff --git a/python/cudf/cudf/core/groupby/__init__.py b/python/cudf/cudf/core/groupby/__init__.py index 4375ed3e3da..621edb316cf 100644 --- a/python/cudf/cudf/core/groupby/__init__.py +++ b/python/cudf/cudf/core/groupby/__init__.py @@ -1,8 +1,9 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from cudf.core.groupby.groupby import GroupBy, Grouper +from cudf.core.groupby.groupby import GroupBy, Grouper, NamedAgg __all__ = [ "GroupBy", "Grouper", + "NamedAgg", ] diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 4f283d41b17..6424c8af877 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -76,6 +76,34 @@ def _is_row_of(chunk, obj): ) +NamedAgg = pd.NamedAgg + + +NamedAgg.__doc__ = """ +Helper for column specific aggregation with control over output column names. + +Subclass of typing.NamedTuple. + +Parameters +---------- +column : Hashable + Column label in the DataFrame to apply aggfunc. +aggfunc : function or str + Function to apply to the provided column. + +Examples +-------- +>>> df = cudf.DataFrame({"key": [1, 1, 2], "a": [-1, 0, 1], 1: [10, 11, 12]}) +>>> agg_a = cudf.NamedAgg(column="a", aggfunc="min") +>>> agg_1 = cudf.NamedAgg(column=1, aggfunc=lambda x: x.mean()) +>>> df.groupby("key").agg(result_a=agg_a, result_1=agg_1) + result_a result_1 +key +1 -1 10.5 +2 1 12.0 +""" + + groupby_doc_template = textwrap.dedent( """Group using a mapper or by a Series of columns. @@ -1296,9 +1324,21 @@ def _normalize_aggs( columns = values._columns aggs_per_column = (aggs,) * len(columns) elif not aggs and kwargs: - column_names, aggs_per_column = kwargs.keys(), kwargs.values() - columns = tuple(self.obj._data[x[0]] for x in kwargs.values()) - aggs_per_column = tuple(x[1] for x in kwargs.values()) + column_names = kwargs.keys() + + def _raise_invalid_type(x): + raise TypeError( + f"Invalid keyword argument {x} of type {type(x)} was passed to agg" + ) + + columns, aggs_per_column = zip( + *( + (self.obj._data[x[0]], x[1]) + if isinstance(x, tuple) + else _raise_invalid_type(x) + for x in kwargs.values() + ) + ) else: raise TypeError("Must provide at least one aggregation function.") diff --git a/python/cudf/cudf/tests/groupby/test_agg.py b/python/cudf/cudf/tests/groupby/test_agg.py index 99e7523031b..dc20a27177a 100644 --- a/python/cudf/cudf/tests/groupby/test_agg.py +++ b/python/cudf/cudf/tests/groupby/test_agg.py @@ -56,3 +56,19 @@ def test_dataframe_agg(attr, func): ) assert_eq(agg, pd_agg) + + agg = getattr(df.groupby("a"), attr)( + foo=cudf.NamedAgg(column="b", aggfunc=func), + bar=cudf.NamedAgg(column="a", aggfunc=func), + ) + pd_agg = getattr(pdf.groupby(["a"]), attr)( + foo=("b", func), bar=("a", func) + ) + + assert_eq(agg, pd_agg) + + +def test_dataframe_agg_with_invalid_kwarg(): + with pytest.raises(TypeError, match="Invalid keyword argument"): + df = cudf.DataFrame({"a": [1, 2, 1, 2], "b": [0, 0, 0, 0]}) + df.groupby("a").agg(foo=set()) From 750adca4e4cc7b18ef80ba39950ed1d250919016 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 10 Sep 2024 17:40:49 -0700 Subject: [PATCH 4/4] nvCOMP GZIP integration (#16770) nvCOMP GZIP integration. Opt-in for now. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Mark Harris (https://github.com/harrism) - Nghia Truong (https://github.com/ttnghia) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/16770 --- cpp/include/cudf/io/nvcomp_adapter.hpp | 2 +- cpp/src/io/comp/nvcomp_adapter.cpp | 14 +++++++++++--- cpp/src/io/parquet/reader_impl_chunking.cu | 14 ++++++++++++-- docs/cudf/source/user_guide/io/io.md | 6 +++++- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cpp/include/cudf/io/nvcomp_adapter.hpp b/cpp/include/cudf/io/nvcomp_adapter.hpp index e7fe3cc7214..0d74a4158ad 100644 --- a/cpp/include/cudf/io/nvcomp_adapter.hpp +++ b/cpp/include/cudf/io/nvcomp_adapter.hpp @@ -24,7 +24,7 @@ namespace CUDF_EXPORT cudf { namespace io::nvcomp { -enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4 }; +enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4, GZIP }; /** * @brief Set of parameters that impact whether nvCOMP features are enabled. diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index 261a8eb401d..c3187f73a95 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,8 @@ auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... return nvcompBatchedLZ4DecompressGetTempSizeEx(std::forward(args)...); case compression_type::DEFLATE: return nvcompBatchedDeflateDecompressGetTempSizeEx(std::forward(args)...); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressGetTempSizeEx(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } } @@ -73,6 +76,8 @@ auto batched_decompress_async(compression_type compression, Args&&... args) case compression_type::DEFLATE: return nvcompBatchedDeflateDecompressAsync(std::forward(args)...); case compression_type::LZ4: return nvcompBatchedLZ4DecompressAsync(std::forward(args)...); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressAsync(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } } @@ -84,6 +89,7 @@ std::string compression_type_name(compression_type compression) case compression_type::ZSTD: return "Zstandard"; case compression_type::DEFLATE: return "Deflate"; case compression_type::LZ4: return "LZ4"; + case compression_type::GZIP: return "GZIP"; } return "compression_type(" + std::to_string(static_cast(compression)) + ")"; } @@ -359,8 +365,8 @@ std::optional is_compression_disabled_impl(compression_type compres return "nvCOMP use is disabled through the `LIBCUDF_NVCOMP_POLICY` environment variable."; } return std::nullopt; + default: return "Unsupported compression type"; } - return "Unsupported compression type"; } std::optional is_compression_disabled(compression_type compression, @@ -396,7 +402,8 @@ std::optional is_decompression_disabled_impl(compression_type compr feature_status_parameters params) { switch (compression) { - case compression_type::DEFLATE: { + case compression_type::DEFLATE: + case compression_type::GZIP: { if (not params.are_all_integrations_enabled) { return "DEFLATE decompression is experimental, you can enable it through " "`LIBCUDF_NVCOMP_POLICY` environment variable."; @@ -447,6 +454,7 @@ std::optional is_decompression_disabled(compression_type compressio size_t required_alignment(compression_type compression) { switch (compression) { + case compression_type::GZIP: case compression_type::DEFLATE: return nvcompDeflateRequiredAlignment; case compression_type::SNAPPY: return nvcompSnappyRequiredAlignment; case compression_type::ZSTD: return nvcompZstdRequiredAlignment; @@ -462,7 +470,7 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi case compression_type::SNAPPY: return nvcompSnappyCompressionMaxAllowedChunkSize; case compression_type::ZSTD: return nvcompZstdCompressionMaxAllowedChunkSize; case compression_type::LZ4: return nvcompLZ4CompressionMaxAllowedChunkSize; - default: return std::nullopt; + default: CUDF_FAIL("Unsupported compression type"); } } diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 84f0dab0d8b..245e1829c72 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -865,8 +865,18 @@ std::vector compute_page_splits_by_row(device_span - **Notes:** - \[¹\] - Not all orientations are GPU-accelerated. @@ -177,4 +176,9 @@ If no value is set, behavior will be the same as the "STABLE" option. +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ | DEFLATE | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | Experimental | Experimental | ❌ | +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + | LZ4 | ❌ | ❌ | Stable | Stable | ❌ | ❌ | Stable | Stable | ❌ | + +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + | GZIP | ❌ | ❌ | Experimental | Experimental | ❌ | ❌ | ❌ | ❌ | ❌ | + +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + ```