diff --git a/.gitignore b/.gitignore index 619e1464b2a..180a6a286e2 100644 --- a/.gitignore +++ b/.gitignore @@ -80,7 +80,6 @@ build/ cpp/build/ cpp/examples/*/install/ cpp/examples/*/build/ -cpp/examples/tpch/datagen/datafusion cpp/include/cudf/ipc_generated/*.h cpp/thirdparty/googletest/ diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index d2c22b788cb..3bf9d02b384 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -36,25 +36,25 @@ target_include_directories( ) add_library( - tpch_data_generator STATIC - common/tpch_data_generator/tpch_data_generator.cpp common/tpch_data_generator/table_helpers.cpp - common/tpch_data_generator/random_column_generator.cu + ndsh_data_generator STATIC + common/ndsh_data_generator/ndsh_data_generator.cpp common/ndsh_data_generator/table_helpers.cpp + common/ndsh_data_generator/random_column_generator.cu ) -target_compile_features(tpch_data_generator PUBLIC cxx_std_17 cuda_std_17) +target_compile_features(ndsh_data_generator PUBLIC cxx_std_17 cuda_std_17) target_compile_options( - tpch_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" + ndsh_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" "$<$:${CUDF_CUDA_FLAGS}>" ) target_link_libraries( - tpch_data_generator + ndsh_data_generator PUBLIC cudf cudftestutil nvtx3::nvtx3-cpp PRIVATE $ ) target_include_directories( - tpch_data_generator + ndsh_data_generator PUBLIC "$" "$" "$" ) @@ -127,8 +127,8 @@ function(ConfigureNVBench CMAKE_BENCH_NAME) INSTALL_RPATH "\$ORIGIN/../../../lib" ) target_link_libraries( - ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common cudf_datagen nvbench::nvbench - $ + ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common ndsh_data_generator cudf_datagen + nvbench::nvbench $ ) install( TARGETS ${CMAKE_BENCH_NAME} @@ -175,6 +175,14 @@ ConfigureBench(COPY_IF_ELSE_BENCH copying/copy_if_else.cpp) # * transpose benchmark --------------------------------------------------------------------------- ConfigureBench(TRANSPOSE_BENCH transpose/transpose.cpp) +# ################################################################################################## +# * nds-h benchmark -------------------------------------------------------------------------------- +ConfigureNVBench(NDSH_Q1 ndsh/q01.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q5 ndsh/q05.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q6 ndsh/q06.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q9 ndsh/q09.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q10 ndsh/q10.cpp ndsh/utilities.cpp) + # ################################################################################################## # * stream_compaction benchmark ------------------------------------------------------------------- ConfigureNVBench( diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp similarity index 97% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp index 236fe8095ad..fa7edd225ba 100644 --- a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp +++ b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "tpch_data_generator.hpp" +#include "ndsh_data_generator.hpp" #include "random_column_generator.hpp" #include "table_helpers.hpp" @@ -435,46 +435,37 @@ std::unique_ptr generate_lineitem_partial(cudf::table_view const& o columns.push_back(std::move(l_quantity)); columns.push_back(std::move(l_discount)); columns.push_back(std::move(l_tax)); + columns.push_back(std::move(l_returnflag)); + columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipdate_ts)); columns.push_back(std::move(l_commitdate_ts)); columns.push_back(std::move(l_receiptdate_ts)); - columns.push_back(std::move(l_returnflag)); - columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipinstruct)); columns.push_back(std::move(l_shipmode)); columns.push_back(std::move(l_comment)); return std::make_unique(std::move(columns)); } -std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem, +/** + * @brief Generate the part of the `orders` table dependent on the `lineitem` table + * + * @param lineitem_partial The partially generated `lineitem` table + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + */ +std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem_partial, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto const l_linestatus_mask = lineitem.column(0); - auto const l_orderkey = lineitem.column(1); - auto const l_discount = lineitem.column(6); - auto const l_tax = lineitem.column(7); - auto const l_extendedprice = lineitem.column(16); + auto const l_linestatus_mask = lineitem_partial.column(0); + auto const l_orderkey = lineitem_partial.column(1); + auto const l_extendedprice = lineitem_partial.column(6); + auto const l_discount = lineitem_partial.column(7); + auto const l_tax = lineitem_partial.column(8); std::vector> orders_dependent_columns; - // Generate the `o_totalprice` column - // We calculate the `charge` column, which is a function of `l_extendedprice`, - // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` - auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); - auto o_totalprice = [&]() { - auto const keys = cudf::table_view({l_orderkey}); - cudf::groupby::groupby gb(keys); - std::vector requests; - requests.push_back(cudf::groupby::aggregation_request()); - requests[0].aggregations.push_back(cudf::make_sum_aggregation()); - requests[0].values = l_charge->view(); - auto agg_result = gb.aggregate(requests); - return cudf::round(agg_result.second[0].results[0]->view(), 2); - }(); - orders_dependent_columns.push_back(std::move(o_totalprice)); - // Generate the `o_orderstatus` column auto o_orderstatus = [&]() { auto const keys = cudf::table_view({l_orderkey}); @@ -529,6 +520,22 @@ std::unique_ptr generate_orders_dependent(cudf::table_view const& l cudf::string_scalar("P"), o_orderstatus_intermediate->view(), mask_b->view()); }(); orders_dependent_columns.push_back(std::move(o_orderstatus)); + + // Generate the `o_totalprice` column + // We calculate the `charge` column, which is a function of `l_extendedprice`, + // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` + auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); + auto o_totalprice = [&]() { + auto const keys = cudf::table_view({l_orderkey}); + cudf::groupby::groupby gb(keys); + std::vector requests; + requests.push_back(cudf::groupby::aggregation_request()); + requests[0].aggregations.push_back(cudf::make_sum_aggregation()); + requests[0].values = l_charge->view(); + auto agg_result = gb.aggregate(requests); + return cudf::round(agg_result.second[0].results[0]->view(), 2); + }(); + orders_dependent_columns.push_back(std::move(o_totalprice)); return std::make_unique(std::move(orders_dependent_columns)); } @@ -730,9 +737,7 @@ generate_orders_lineitem_part(double scale_factor, // Generate the `part` table auto part = generate_part(scale_factor, stream, mr); - // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column, - // add the column to the `lineitem` table, and write the `lineitem` table to a parquet file - + // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column auto l_extendedprice = [&]() { auto const left = cudf::table_view( {lineitem_partial->get_column(2).view(), lineitem_partial->get_column(5).view()}); @@ -752,8 +757,9 @@ generate_orders_lineitem_part(double scale_factor, return cudf::round(col->view(), 2); }(); + // Insert the `l_extendedprice` column into the partial columns of the `lineitem` table auto lineitem_partial_columns = lineitem_partial->release(); - lineitem_partial_columns.push_back(std::move(l_extendedprice)); + lineitem_partial_columns.insert(lineitem_partial_columns.begin() + 6, std::move(l_extendedprice)); auto lineitem_temp = std::make_unique(std::move(lineitem_partial_columns)); // Generate the dependent columns of the `orders` table @@ -762,7 +768,7 @@ generate_orders_lineitem_part(double scale_factor, auto orders_independent_columns = orders_independent->release(); auto orders_dependent_columns = orders_dependent->release(); - orders_independent_columns.insert(orders_independent_columns.end(), + orders_independent_columns.insert(orders_independent_columns.begin() + 2, std::make_move_iterator(orders_dependent_columns.begin()), std::make_move_iterator(orders_dependent_columns.end())); diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp diff --git a/cpp/benchmarks/ndsh/README.md b/cpp/benchmarks/ndsh/README.md new file mode 100644 index 00000000000..0a462e1684e --- /dev/null +++ b/cpp/benchmarks/ndsh/README.md @@ -0,0 +1,11 @@ +# NDS-H Benchmarks for `libcudf` + +## Disclaimer + +NDS-H is derived from the TPC-H Benchmarks and as such any results obtained using NDS-H are not +comparable to published TPC-H Benchmark results, as the results obtained from using NDS-H do not +comply with the TPC-H Benchmarks. + +## Current Status + +For now, only Q1, Q5, Q6, Q9, and Q10 have been implemented diff --git a/cpp/examples/tpch/q1.cpp b/cpp/benchmarks/ndsh/q01.cpp similarity index 82% rename from cpp/examples/tpch/q1.cpp rename to cpp/benchmarks/ndsh/q01.cpp index 87b7e613766..ef709926ae9 100644 --- a/cpp/examples/tpch/q1.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q1.cpp - * @brief Implement query 1 of the TPC-H benchmark. + * @file q01.cpp + * @brief Implement query 1 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -59,7 +61,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_disc_price( +[[nodiscard]] std::unique_ptr calculate_disc_price( cudf::column_view const& discount, cudf::column_view const& extendedprice, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,7 +88,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_charge( +[[nodiscard]] std::unique_ptr calculate_charge( cudf::column_view const& tax, cudf::column_view const& disc_price, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -101,16 +103,9 @@ return charge; } -int main(int argc, char const** argv) +void run_ndsh_q1(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projections and filter predicate for `lineitem` table std::vector const lineitem_cols = {"l_returnflag", "l_linestatus", @@ -130,12 +125,12 @@ int main(int argc, char const** argv) // Read out the `lineitem` table from parquet file auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = - calc_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); - auto charge = calc_charge(lineitem->column("l_tax"), disc_price->view()); + calculate_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); + auto charge = calculate_charge(lineitem->column("l_tax"), disc_price->view()); (*lineitem).append(disc_price, "disc_price").append(charge, "charge"); // Perform the group by operation @@ -167,9 +162,21 @@ int main(int argc, char const** argv) {"l_returnflag", "l_linestatus"}, {cudf::order::ASCENDING, cudf::order::ASCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q1.parquet"); - return 0; } + +void ndsh_q1(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q1(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q1).set_name("ndsh_q1").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q5.cpp b/cpp/benchmarks/ndsh/q05.cpp similarity index 80% rename from cpp/examples/tpch/q5.cpp rename to cpp/benchmarks/ndsh/q05.cpp index 12c186db10e..522bc4789c2 100644 --- a/cpp/examples/tpch/q5.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q5.cpp - * @brief Implement query 5 of the TPC-H benchmark. + * @file q05.cpp + * @brief Implement query 5 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -67,7 +69,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,16 +88,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q5(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -125,17 +120,17 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = - read_parquet(args.dataset_dir + "/customer.parquet", {"c_custkey", "c_nationkey"}); + read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); - auto const lineitem = read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(sources["lineitem"].make_source_info(), {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); auto const nation = - read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_regionkey", "n_name"}); + read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); auto const region = - read_parquet(args.dataset_dir + "/region.parquet", region_cols, std::move(region_pred)); + read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred)); // Perform the joins auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); @@ -147,7 +142,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -162,9 +157,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q5.parquet"); - return 0; } + +void ndsh_q5(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q5(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q5).set_name("ndsh_q5").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q6.cpp b/cpp/benchmarks/ndsh/q06.cpp similarity index 79% rename from cpp/examples/tpch/q6.cpp rename to cpp/benchmarks/ndsh/q06.cpp index 92dac40c768..04078547973 100644 --- a/cpp/examples/tpch/q6.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -14,17 +14,20 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include +#include #include +#include + /** - * @file q6.cpp - * @brief Implement query 6 of the TPC-H benchmark. + * @file q06.cpp + * @brief Implement query 6 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -48,7 +51,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -60,16 +63,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q6(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the `lineitem` table from parquet file std::vector const lineitem_cols = { "l_extendedprice", "l_discount", "l_shipdate", "l_quantity"}; @@ -88,7 +84,7 @@ int main(int argc, char const** argv) auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = @@ -99,8 +95,8 @@ int main(int argc, char const** argv) (*lineitem).append(discout_float, "l_discount_float").append(quantity_float, "l_quantity_float"); // Apply the filters - auto const discount_ref = cudf::ast::column_reference(lineitem->col_id("l_discount_float")); - auto const quantity_ref = cudf::ast::column_reference(lineitem->col_id("l_quantity_float")); + auto const discount_ref = cudf::ast::column_reference(lineitem->column_id("l_discount_float")); + auto const quantity_ref = cudf::ast::column_reference(lineitem->column_id("l_quantity_float")); auto discount_lower = cudf::numeric_scalar(0.05); auto const discount_lower_literal = cudf::ast::literal(discount_lower); @@ -123,16 +119,28 @@ int main(int argc, char const** argv) auto const filtered_table = apply_filter(lineitem, discount_quantity_pred); // Calculate the `revenue` column - auto revenue = - calc_revenue(filtered_table->column("l_extendedprice"), filtered_table->column("l_discount")); + auto revenue = calculate_revenue(filtered_table->column("l_extendedprice"), + filtered_table->column("l_discount")); // Sum the `revenue` column auto const revenue_view = revenue->view(); auto const result_table = apply_reduction(revenue_view, cudf::aggregation::Kind::SUM, "revenue"); - timer.print_elapsed_millis(); - // Write query result to a parquet file result_table->to_parquet("q6.parquet"); - return 0; } + +void ndsh_q6(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q6(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q6).set_name("ndsh_q6").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q9.cpp b/cpp/benchmarks/ndsh/q09.cpp similarity index 78% rename from cpp/examples/tpch/q9.cpp rename to cpp/benchmarks/ndsh/q09.cpp index 2882182aa2b..59218ab8912 100644 --- a/cpp/examples/tpch/q9.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -14,9 +14,9 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" +#include #include #include #include @@ -24,9 +24,11 @@ #include #include +#include + /** - * @file q9.cpp - * @brief Implement query 9 of the TPC-H benchmark. + * @file q09.cpp + * @brief Implement query 9 of the NDS-H benchmark. * * create view part as select * from '/tables/scale-1/part.parquet'; * create view supplier as select * from '/tables/scale-1/supplier.parquet'; @@ -79,7 +81,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_amount( +[[nodiscard]] std::unique_ptr calculate_amount( cudf::column_view const& discount, cudf::column_view const& extendedprice, cudf::column_view const& supplycost, @@ -109,28 +111,21 @@ return amount; } -int main(int argc, char const** argv) +void run_ndsh_q9(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the table from parquet files auto const lineitem = read_parquet( - args.dataset_dir + "/lineitem.parquet", + sources["lineitem"].make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_name"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(args.dataset_dir + "/part.parquet", {"p_partkey", "p_name"}); - auto const partsupp = read_parquet(args.dataset_dir + "/partsupp.parquet", + read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(sources["partsupp"].make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); // Generating the `profit` table // Filter the part table using `p_name like '%green%'` @@ -150,10 +145,10 @@ int main(int argc, char const** argv) // Calculate the `nation`, `o_year`, and `amount` columns auto n_name = std::make_unique(joined_table->column("n_name")); auto o_year = cudf::datetime::extract_year(joined_table->column("o_orderdate")); - auto amount = calc_amount(joined_table->column("l_discount"), - joined_table->column("l_extendedprice"), - joined_table->column("ps_supplycost"), - joined_table->column("l_quantity")); + auto amount = calculate_amount(joined_table->column("l_discount"), + joined_table->column("l_extendedprice"), + joined_table->column("ps_supplycost"), + joined_table->column("l_quantity")); // Put together the `profit` table std::vector> profit_columns; @@ -175,9 +170,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby( groupedby_table, {"nation", "o_year"}, {cudf::order::ASCENDING, cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q9.parquet"); - return 0; } + +void ndsh_q9(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q9(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q9).set_name("ndsh_q9").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp similarity index 81% rename from cpp/examples/tpch/q10.cpp rename to cpp/benchmarks/ndsh/q10.cpp index fdf147b50e0..a520480020a 100644 --- a/cpp/examples/tpch/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** * @file q10.cpp - * @brief Implement query 10 of the TPC-H benchmark. + * @brief Implement query 10 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -72,7 +74,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -90,16 +92,10 @@ mr); return revenue; } -int main(int argc, char const** argv) -{ - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; +void run_ndsh_q10(nvbench::state& state, + std::unordered_map& sources) +{ // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -126,15 +122,15 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = read_parquet( - args.dataset_dir + "/customer.parquet", + sources["customer"].make_source_info(), {"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); auto const lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["lineitem"].make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_name", "n_nationkey"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); @@ -143,7 +139,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -159,9 +155,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q10.parquet"); - return 0; } + +void ndsh_q10(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q10(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q10).set_name("ndsh_q10").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp new file mode 100644 index 00000000000..2d514764fc2 --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utilities.hpp" + +#include "common/ndsh_data_generator/ndsh_data_generator.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace { + +std::vector const ORDERS_SCHEMA = {"o_orderkey", + "o_custkey", + "o_orderstatus", + "o_totalprice", + "o_orderdate", + "o_orderpriority", + "o_clerk", + "o_shippriority", + "o_comment"}; +std::vector const LINEITEM_SCHEMA = {"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment"}; +std::vector const PART_SCHEMA = {"p_partkey", + "p_name", + "p_mfgr", + "p_brand", + "p_type", + "p_size", + "p_container", + "p_retailprice", + "p_comment"}; +std::vector const PARTSUPP_SCHEMA = { + "ps_partkey", "ps_suppkey", "ps_availqty", "ps_supplycost", "ps_comment"}; +std::vector const SUPPLIER_SCHEMA = { + "s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment"}; +std::vector const CUSTOMER_SCHEMA = {"c_custkey", + "c_name", + "c_address", + "c_nationkey", + "c_phone", + "c_acctbal", + "c_mktsegment", + "c_comment"}; +std::vector const NATION_SCHEMA = { + "n_nationkey", "n_name", "n_regionkey", "n_comment"}; +std::vector const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"}; + +} // namespace + +cudf::table_view table_with_names::table() const { return tbl->view(); } + +cudf::column_view table_with_names::column(std::string const& col_name) const +{ + return tbl->view().column(column_id(col_name)); +} + +std::vector const& table_with_names::column_names() const { return col_names; } + +cudf::size_type table_with_names::column_id(std::string const& col_name) const +{ + auto it = std::find(col_names.begin(), col_names.end(), col_name); + if (it == col_names.end()) { + std::string err_msg = "Column `" + col_name + "` not found"; + throw std::runtime_error(err_msg); + } + return std::distance(col_names.begin(), it); +} + +table_with_names& table_with_names::append(std::unique_ptr& col, + std::string const& col_name) +{ + auto cols = tbl->release(); + cols.push_back(std::move(col)); + tbl = std::make_unique(std::move(cols)); + col_names.push_back(col_name); + return (*this); +} + +cudf::table_view table_with_names::select(std::vector const& col_names) const +{ + CUDF_FUNC_RANGE(); + std::vector col_indices; + for (auto const& col_name : col_names) { + col_indices.push_back(column_id(col_name)); + } + return tbl->select(col_indices); +} + +void table_with_names::to_parquet(std::string const& filepath) const +{ + CUDF_FUNC_RANGE(); + auto const sink_info = cudf::io::sink_info(filepath); + cudf::io::table_metadata metadata; + metadata.schema_info = + std::vector(col_names.begin(), col_names.end()); + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); +} + +std::unique_ptr join_and_gather(cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto const left_selected = left_input.select(left_on); + auto const right_selected = right_input.select(right_on); + auto const [left_join_indices, right_join_indices] = cudf::inner_join( + left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); + + auto const left_indices_span = cudf::device_span{*left_join_indices}; + auto const right_indices_span = cudf::device_span{*right_join_indices}; + + auto const left_indices_col = cudf::column_view{left_indices_span}; + auto const right_indices_col = cudf::column_view{right_indices_span}; + + auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); + auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); + + auto joined_cols = left_result->release(); + auto right_cols = right_result->release(); + joined_cols.insert(joined_cols.end(), + std::make_move_iterator(right_cols.begin()), + std::make_move_iterator(right_cols.end())); + return std::make_unique(std::move(joined_cols)); +} + +std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + std::vector left_on_indices; + std::vector right_on_indices; + std::transform( + left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { + return left_input->column_id(col_name); + }); + std::transform(right_on.begin(), + right_on.end(), + std::back_inserter(right_on_indices), + [&](auto const& col_name) { return right_input->column_id(col_name); }); + auto table = join_and_gather( + left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); + ; + std::vector merged_column_names; + merged_column_names.reserve(left_input->column_names().size() + + right_input->column_names().size()); + std::copy(left_input->column_names().begin(), + left_input->column_names().end(), + std::back_inserter(merged_column_names)); + std::copy(right_input->column_names().begin(), + right_input->column_names().end(), + std::back_inserter(merged_column_names)); + return std::make_unique(std::move(table), merged_column_names); + return std::make_unique(std::move(table), merged_column_names); +} + +std::unique_ptr apply_filter(std::unique_ptr const& table, + cudf::ast::operation const& predicate) +{ + CUDF_FUNC_RANGE(); + auto const boolean_mask = cudf::compute_column(table->table(), predicate); + auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_mask(std::unique_ptr const& table, + std::unique_ptr const& mask) +{ + CUDF_FUNC_RANGE(); + auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_groupby(std::unique_ptr const& table, + groupby_context_t const& ctx) +{ + CUDF_FUNC_RANGE(); + auto const keys = table->select(ctx.keys); + cudf::groupby::groupby groupby_obj(keys); + std::vector result_column_names; + result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); + std::vector requests; + for (auto& [value_col, aggregations] : ctx.values) { + requests.emplace_back(cudf::groupby::aggregation_request()); + for (auto& agg : aggregations) { + if (agg.first == cudf::aggregation::Kind::SUM) { + requests.back().aggregations.push_back( + cudf::make_sum_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::MEAN) { + requests.back().aggregations.push_back( + cudf::make_mean_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { + requests.back().aggregations.push_back( + cudf::make_count_aggregation()); + } else { + throw std::runtime_error("Unsupported aggregation"); + } + result_column_names.push_back(agg.second); + } + requests.back().values = table->column(value_col); + } + auto agg_results = groupby_obj.aggregate(requests); + std::vector> result_columns; + for (auto i = 0; i < agg_results.first->num_columns(); i++) { + auto col = std::make_unique(agg_results.first->get_column(i)); + result_columns.push_back(std::move(col)); + } + for (size_t i = 0; i < agg_results.second.size(); i++) { + for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { + result_columns.push_back(std::move(agg_results.second[i].results[j])); + } + } + auto result_table = std::make_unique(std::move(result_columns)); + return std::make_unique(std::move(result_table), result_column_names); +} + +std::unique_ptr apply_orderby(std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders) +{ + CUDF_FUNC_RANGE(); + std::vector column_views; + for (auto& key : sort_keys) { + column_views.push_back(table->column(key)); + } + auto result_table = + cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_reduction(cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name) +{ + CUDF_FUNC_RANGE(); + auto const agg = cudf::make_sum_aggregation(); + auto const result = cudf::reduce(column, *agg, column.type()); + cudf::size_type const len = 1; + auto col = cudf::make_column_from_scalar(*result, len); + std::vector> columns; + columns.push_back(std::move(col)); + auto result_table = std::make_unique(std::move(columns)); + std::vector col_names = {col_name}; + return std::make_unique(std::move(result_table), col_names); +} + +std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns, + std::unique_ptr const& predicate) +{ + CUDF_FUNC_RANGE(); + auto builder = cudf::io::parquet_reader_options_builder(source_info); + if (!columns.empty()) { builder.columns(columns); } + if (predicate) { builder.filter(*predicate); } + auto const options = builder.build(); + auto table_with_metadata = cudf::io::read_parquet(options); + std::vector column_names; + for (auto const& col_info : table_with_metadata.metadata.schema_info) { + column_names.push_back(col_info.name); + } + return std::make_unique(std::move(table_with_metadata.tbl), column_names); +} + +std::tm make_tm(int year, int month, int day) +{ + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + return tm; +} + +int32_t days_since_epoch(int year, int month, int day) +{ + std::tm tm = make_tm(year, month, day); + std::tm epoch = make_tm(1970, 1, 1); + std::time_t time = std::mktime(&tm); + std::time_t epoch_time = std::mktime(&epoch); + double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); + return static_cast(diff); +} + +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source) +{ + CUDF_FUNC_RANGE(); + auto const stream = cudf::get_default_stream(); + + // Prepare the table metadata + cudf::io::table_metadata metadata; + std::vector col_name_infos; + for (auto& col_name : col_names) { + col_name_infos.push_back(cudf::io::column_name_info(col_name)); + } + metadata.schema_info = col_name_infos; + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + + // Declare a host and device buffer + std::vector h_buffer; + + // Write parquet data to host buffer + auto builder = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); + + // Copy host buffer to device buffer + source.d_buffer.resize(h_buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value())); +} + +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources) +{ + CUDF_FUNC_RANGE(); + std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { + sources[table_name] = parquet_device_buffer(); + }); + + auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto partsupp = cudf::datagen::generate_partsupp( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto supplier = cudf::datagen::generate_supplier( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto customer = cudf::datagen::generate_customer( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + auto region = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]); + write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]); + write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]); + write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]); + write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]); + write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); + write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); + write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); +} diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp new file mode 100644 index 00000000000..762e43deccf --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +/** + * @brief A class to represent a table with column names attached + */ +class table_with_names { + public: + table_with_names(std::unique_ptr tbl, std::vector col_names) + : tbl(std::move(tbl)), col_names(col_names){}; + /** + * @brief Return the table view + */ + [[nodiscard]] cudf::table_view table() const; + /** + * @brief Return the column view for a given column name + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::column_view column(std::string const& col_name) const; + /** + * @param Return the column names of the table + */ + [[nodiscard]] std::vector const& column_names() const; + /** + * @brief Translate a column name to a column index + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::size_type column_id(std::string const& col_name) const; + /** + * @brief Append a column to the table + * + * @param col The column to append + * @param col_name The name of the appended column + */ + table_with_names& append(std::unique_ptr& col, std::string const& col_name); + /** + * @brief Select a subset of columns from the table + * + * @param col_names The names of the columns to select + */ + [[nodiscard]] cudf::table_view select(std::vector const& col_names) const; + /** + * @brief Write the table to a parquet file + * + * @param filepath The path to the parquet file + */ + void to_parquet(std::string const& filepath) const; + + private: + std::unique_ptr tbl; + std::vector col_names; +}; + +/** + * @brief Inner join two tables and gather the result + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr join_and_gather( + cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls); + +/** + * @brief Apply an inner join operation to two tables + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls = cudf::null_equality::EQUAL); + +/** + * @brief Apply a filter predicate to a table + * + * @param table The input table + * @param predicate The filter predicate + */ +[[nodiscard]] std::unique_ptr apply_filter( + std::unique_ptr const& table, cudf::ast::operation const& predicate); + +/** + * @brief Apply a boolean mask to a table + * + * @param table The input table + * @param mask The boolean mask + */ +[[nodiscard]] std::unique_ptr apply_mask( + std::unique_ptr const& table, std::unique_ptr const& mask); + +/** + * Struct representing group by key columns, value columns, and the type of aggregations to perform + * on the value columns + */ +struct groupby_context_t { + std::vector keys; + std::unordered_map>> + values; +}; + +/** + * @brief Apply a groupby operation to a table + * + * @param table The input table + * @param ctx The groupby context + */ +[[nodiscard]] std::unique_ptr apply_groupby( + std::unique_ptr const& table, groupby_context_t const& ctx); + +/** + * @brief Apply an order by operation to a table + * + * @param table The input table + * @param sort_keys The sort keys + * @param sort_key_orders The sort key orders + */ +[[nodiscard]] std::unique_ptr apply_orderby( + std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders); + +/** + * @brief Apply a reduction operation to a column + * + * @param column The input column + * @param agg_kind The aggregation kind + * @param col_name The name of the output column + */ +[[nodiscard]] std::unique_ptr apply_reduction( + cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name); + +/** + * @brief Read a parquet file into a table + * + * @param source_info The source of the parquet file + * @param columns The columns to read + * @param predicate The filter predicate to pushdown + */ +[[nodiscard]] std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns = {}, + std::unique_ptr const& predicate = nullptr); + +/** + * @brief Generate the `std::tm` structure from year, month, and day + * + * @param year The year + * @param month The month + * @param day The day + */ +std::tm make_tm(int year, int month, int day); + +/** + * @brief Calculate the number of days since the UNIX epoch + * + * @param year The year + * @param month The month + * @param day The day + */ +int32_t days_since_epoch(int year, int month, int day); + +/** + * @brief Struct representing a parquet device buffer + */ +struct parquet_device_buffer { + parquet_device_buffer() : d_buffer{0, cudf::get_default_stream()} {}; + cudf::io::source_info make_source_info() { return cudf::io::source_info(d_buffer); } + rmm::device_uvector d_buffer; +}; + +/** + * @brief Write a `cudf::table` to a parquet device buffer + * + * @param table The `cudf::table` to write + * @param col_names The column names of the table + * @param parquet_device_buffer The parquet device buffer to write the table to + */ +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source); + +/** + * @brief Generate NDS-H tables and write to parquet device buffers + * + * @param scale_factor The scale factor of NDS-H tables to generate + * @param table_names The names of the tables to generate + * @param sources The parquet data sources to populate + */ +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources); diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 8e8d8bd0b78..25984df1b60 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -57,7 +57,6 @@ build_example() { } build_example basic -build_example tpch build_example strings build_example nested_types build_example parquet_io diff --git a/cpp/examples/tpch/CMakeLists.txt b/cpp/examples/tpch/CMakeLists.txt deleted file mode 100644 index 373a6d72d56..00000000000 --- a/cpp/examples/tpch/CMakeLists.txt +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -cmake_minimum_required(VERSION 3.26.4) - -include(../set_cuda_architecture.cmake) - -rapids_cuda_init_architectures(tpch_example) -rapids_cuda_set_architectures(RAPIDS) - -project( - tpch_example - VERSION 0.0.1 - LANGUAGES CXX CUDA -) - -include(../fetch_dependencies.cmake) - -add_executable(tpch_q1 q1.cpp) -target_link_libraries(tpch_q1 PRIVATE cudf::cudf) -target_compile_features(tpch_q1 PRIVATE cxx_std_17) - -add_executable(tpch_q5 q5.cpp) -target_link_libraries(tpch_q5 PRIVATE cudf::cudf) -target_compile_features(tpch_q5 PRIVATE cxx_std_17) - -add_executable(tpch_q6 q6.cpp) -target_link_libraries(tpch_q6 PRIVATE cudf::cudf) -target_compile_features(tpch_q6 PRIVATE cxx_std_17) - -add_executable(tpch_q9 q9.cpp) -target_link_libraries(tpch_q9 PRIVATE cudf::cudf) -target_compile_features(tpch_q9 PRIVATE cxx_std_17) - -add_executable(tpch_q10 q10.cpp) -target_link_libraries(tpch_q10 PRIVATE cudf::cudf) -target_compile_features(tpch_q10 PRIVATE cxx_std_17) diff --git a/cpp/examples/tpch/README.md b/cpp/examples/tpch/README.md deleted file mode 100644 index 8c046c3f1e8..00000000000 --- a/cpp/examples/tpch/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# TPC-H Derived Examples - -Implements TPC-H queries using `libcudf`. We leverage the data generator (wrapper around official TPC-H datagen) from [Apache Datafusion](https://github.com/apache/datafusion) for generating data in Parquet format. - -## Requirements - -- Rust -- [libcudf](https://github.com/rapidsai/cudf/blob/branch-24.08/CONTRIBUTING.md#setting-up-your-build-environment) - -## Running Queries - -1. Build the `libcudf` examples. -```bash -cd cudf/cpp/examples -./build.sh -``` -The TPC-H query binaries would be built inside `tpch/build`. - -2. Generate the dataset. -```bash -cd tpch/datagen -./datagen.sh [scale factor (1/10)] -``` - -The parquet files will be generated in `tpch/datagen/datafusion/benchmarks/data/tpch_sf[scale factor]`. - -3. Set these environment variables for optimized runtimes. -```bash -export KVIKIO_COMPAT_MODE="on" -export LIBCUDF_CUFILE_POLICY="KVIKIO" -export CUDA_MODULE_LOADING="EAGER" -``` - -4. Execute the queries. -```bash -./tpch/build/tpch_q[query no] [path to dataset] [memory resource type (cuda/pool/managed/managed_pool)] -``` - -A parquet file named `q[query no].parquet` would be generated containing the results of the query. diff --git a/cpp/examples/tpch/datagen/correct_datatypes.py b/cpp/examples/tpch/datagen/correct_datatypes.py deleted file mode 100644 index 8564774647b..00000000000 --- a/cpp/examples/tpch/datagen/correct_datatypes.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -import os -import sys - -import pyarrow as pa -import pyarrow.parquet as pq -import pandas as pd - -if __name__ == "__main__": - dataset_path = str(sys.argv[1]) - tables = ["lineitem", "part", "partsupp", "orders", "supplier", "customer", "nation", "region"] - for table in tables: - filepath = os.path.join(dataset_path, f"{table}.parquet") - print("Reading file ", filepath) - - if filepath.endswith("lineitem.parquet"): - df = pd.read_parquet(filepath) - df["l_linenumber"] = df["l_linenumber"].astype("int64") - df["l_quantity"] = df["l_quantity"].astype("int64") - df["l_extendedprice"] = df["l_extendedprice"].astype("float64") - df["l_discount"] = df["l_discount"].astype("float64") - df["l_tax"] = df["l_tax"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("part.parquet"): - df = pd.read_parquet(filepath) - df["p_size"] = df["p_size"].astype("int64") - df["p_retailprice"] = df["p_retailprice"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("partsupp.parquet"): - df = pd.read_parquet(filepath) - df["ps_availqty"] = df["ps_availqty"].astype("int64") - df["ps_supplycost"] = df["ps_supplycost"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("orders.parquet"): - df = pd.read_parquet(filepath) - df["o_totalprice"] = df["o_totalprice"].astype("float64") - df["o_shippriority"] = df["o_shippriority"].astype("int64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("supplier.parquet"): - df = pd.read_parquet(filepath) - df["s_acctbal"] = df["s_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("customer.parquet"): - df = pd.read_parquet(filepath) - df["c_acctbal"] = df["c_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("nation.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("region.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") diff --git a/cpp/examples/tpch/datagen/datagen.sh b/cpp/examples/tpch/datagen/datagen.sh deleted file mode 100755 index 0b03753daea..00000000000 --- a/cpp/examples/tpch/datagen/datagen.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -# Copyright (c) 2024, NVIDIA CORPORATION. - -set -e - -scale_factor=$1 -script_dir=$(pwd) - -# Clone the datafusion repository and apply a patch -# for single threaded data generation so that a -# single parquet file is generated for each table -rm -rf datafusion -git clone https://github.com/apache/datafusion.git datafusion -cd datafusion/ -git checkout 679a85f -git apply ${script_dir}/tpch.patch -cd benchmarks/ - -# Generate the data -# Currently, we support only scale factor 1 and 10 -if [ ${scale_factor} -eq 1 ]; then - ./bench.sh data tpch -elif [ ${scale_factor} -eq 10 ]; then - ./bench.sh data tpch10 -else - echo "Unsupported scale factor" - exit 1 -fi - -# Correct the datatypes of the parquet files -python3 ${script_dir}/correct_datatypes.py data/tpch_sf${scale_factor} diff --git a/cpp/examples/tpch/datagen/tpch.patch b/cpp/examples/tpch/datagen/tpch.patch deleted file mode 100644 index 42727aa9904..00000000000 --- a/cpp/examples/tpch/datagen/tpch.patch +++ /dev/null @@ -1,33 +0,0 @@ -diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh -index 3b854f6dc..f000f09c0 100755 ---- a/benchmarks/bench.sh -+++ b/benchmarks/bench.sh -@@ -311,6 +311,15 @@ data_tpch() { - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet - popd > /dev/null - fi -+ -+ cp ${TPCH_DIR}/lineitem/part-0.parquet ${TPCH_DIR}/lineitem.parquet -+ cp ${TPCH_DIR}/orders/part-0.parquet ${TPCH_DIR}/orders.parquet -+ cp ${TPCH_DIR}/part/part-0.parquet ${TPCH_DIR}/part.parquet -+ cp ${TPCH_DIR}/partsupp/part-0.parquet ${TPCH_DIR}/partsupp.parquet -+ cp ${TPCH_DIR}/customer/part-0.parquet ${TPCH_DIR}/customer.parquet -+ cp ${TPCH_DIR}/supplier/part-0.parquet ${TPCH_DIR}/supplier.parquet -+ cp ${TPCH_DIR}/nation/part-0.parquet ${TPCH_DIR}/nation.parquet -+ cp ${TPCH_DIR}/region/part-0.parquet ${TPCH_DIR}/region.parquet - } - - # Runs the tpch benchmark -diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs -index b5204b343..84fd2e78d 100644 ---- a/datafusion/common/src/config.rs -+++ b/datafusion/common/src/config.rs -@@ -250,7 +250,7 @@ config_namespace! { - /// concurrency. - /// - /// Defaults to the number of CPU cores on the system -- pub target_partitions: usize, default = num_cpus::get() -+ pub target_partitions: usize, default = 1 - - /// The default time zone - /// diff --git a/cpp/examples/tpch/utils.hpp b/cpp/examples/tpch/utils.hpp deleted file mode 100644 index 8102fa8f976..00000000000 --- a/cpp/examples/tpch/utils.hpp +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -// RMM memory resource creation utilities -inline auto make_cuda() { return std::make_shared(); } -inline auto make_pool() -{ - return rmm::mr::make_owning_wrapper( - make_cuda(), rmm::percent_of_free_device_memory(50)); -} -inline auto make_managed() { return std::make_shared(); } -inline auto make_managed_pool() -{ - return rmm::mr::make_owning_wrapper( - make_managed(), rmm::percent_of_free_device_memory(50)); -} -inline std::shared_ptr create_memory_resource( - std::string const& mode) -{ - if (mode == "cuda") return make_cuda(); - if (mode == "pool") return make_pool(); - if (mode == "managed") return make_managed(); - if (mode == "managed_pool") return make_managed_pool(); - CUDF_FAIL("Unknown rmm_mode parameter: " + mode + - "\nExpecting: cuda, pool, managed, or managed_pool"); -} - -/** - * @brief A class to represent a table with column names attached - */ -class table_with_names { - public: - table_with_names(std::unique_ptr tbl, std::vector col_names) - : tbl(std::move(tbl)), col_names(col_names) - { - } - /** - * @brief Return the table view - */ - [[nodiscard]] cudf::table_view table() const { return tbl->view(); } - /** - * @brief Return the column view for a given column name - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::column_view column(std::string const& col_name) const - { - return tbl->view().column(col_id(col_name)); - } - /** - * @param Return the column names of the table - */ - [[nodiscard]] std::vector column_names() const { return col_names; } - /** - * @brief Translate a column name to a column index - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::size_type col_id(std::string const& col_name) const - { - CUDF_FUNC_RANGE(); - auto it = std::find(col_names.begin(), col_names.end(), col_name); - if (it == col_names.end()) { throw std::runtime_error("Column not found"); } - return std::distance(col_names.begin(), it); - } - /** - * @brief Append a column to the table - * - * @param col The column to append - * @param col_name The name of the appended column - */ - table_with_names& append(std::unique_ptr& col, std::string const& col_name) - { - CUDF_FUNC_RANGE(); - auto cols = tbl->release(); - cols.push_back(std::move(col)); - tbl = std::make_unique(std::move(cols)); - col_names.push_back(col_name); - return (*this); - } - /** - * @brief Select a subset of columns from the table - * - * @param col_names The names of the columns to select - */ - [[nodiscard]] cudf::table_view select(std::vector const& col_names) const - { - CUDF_FUNC_RANGE(); - std::vector col_indices; - for (auto const& col_name : col_names) { - col_indices.push_back(col_id(col_name)); - } - return tbl->select(col_indices); - } - /** - * @brief Write the table to a parquet file - * - * @param filepath The path to the parquet file - */ - void to_parquet(std::string const& filepath) const - { - CUDF_FUNC_RANGE(); - auto const sink_info = cudf::io::sink_info(filepath); - cudf::io::table_metadata metadata; - metadata.schema_info = - std::vector(col_names.begin(), col_names.end()); - auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); - builder.metadata(table_input_metadata); - auto const options = builder.build(); - cudf::io::write_parquet(options); - } - - private: - std::unique_ptr tbl; - std::vector col_names; -}; - -/** - * @brief Concatenate two vectors - * - * @param lhs The left vector - * @param rhs The right vector - */ -template -std::vector concat(std::vector const& lhs, std::vector const& rhs) -{ - std::vector result; - result.reserve(lhs.size() + rhs.size()); - std::copy(lhs.begin(), lhs.end(), std::back_inserter(result)); - std::copy(rhs.begin(), rhs.end(), std::back_inserter(result)); - return result; -} - -/** - * @brief Inner join two tables and gather the result - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr join_and_gather( - cudf::table_view const& left_input, - cudf::table_view const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls) -{ - CUDF_FUNC_RANGE(); - constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; - auto const left_selected = left_input.select(left_on); - auto const right_selected = right_input.select(right_on); - auto const [left_join_indices, right_join_indices] = cudf::inner_join( - left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); - - auto const left_indices_span = cudf::device_span{*left_join_indices}; - auto const right_indices_span = cudf::device_span{*right_join_indices}; - - auto const left_indices_col = cudf::column_view{left_indices_span}; - auto const right_indices_col = cudf::column_view{right_indices_span}; - - auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); - auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); - - auto joined_cols = left_result->release(); - auto right_cols = right_result->release(); - joined_cols.insert(joined_cols.end(), - std::make_move_iterator(right_cols.begin()), - std::make_move_iterator(right_cols.end())); - return std::make_unique(std::move(joined_cols)); -} - -/** - * @brief Apply an inner join operation to two tables - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr apply_inner_join( - std::unique_ptr const& left_input, - std::unique_ptr const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls = cudf::null_equality::EQUAL) -{ - CUDF_FUNC_RANGE(); - std::vector left_on_indices; - std::vector right_on_indices; - std::transform( - left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { - return left_input->col_id(col_name); - }); - std::transform(right_on.begin(), - right_on.end(), - std::back_inserter(right_on_indices), - [&](auto const& col_name) { return right_input->col_id(col_name); }); - auto table = join_and_gather( - left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); - return std::make_unique( - std::move(table), concat(left_input->column_names(), right_input->column_names())); -} - -/** - * @brief Apply a filter predicated to a table - * - * @param table The input table - * @param predicate The filter predicate - */ -[[nodiscard]] std::unique_ptr apply_filter( - std::unique_ptr const& table, cudf::ast::operation const& predicate) -{ - CUDF_FUNC_RANGE(); - auto const boolean_mask = cudf::compute_column(table->table(), predicate); - auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a boolean mask to a table - * - * @param table The input table - * @param mask The boolean mask - */ -[[nodiscard]] std::unique_ptr apply_mask( - std::unique_ptr const& table, std::unique_ptr const& mask) -{ - CUDF_FUNC_RANGE(); - auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -struct groupby_context_t { - std::vector keys; - std::unordered_map>> - values; -}; - -/** - * @brief Apply a groupby operation to a table - * - * @param table The input table - * @param ctx The groupby context - */ -[[nodiscard]] std::unique_ptr apply_groupby( - std::unique_ptr const& table, groupby_context_t const& ctx) -{ - CUDF_FUNC_RANGE(); - auto const keys = table->select(ctx.keys); - cudf::groupby::groupby groupby_obj(keys); - std::vector result_column_names; - result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); - std::vector requests; - for (auto& [value_col, aggregations] : ctx.values) { - requests.emplace_back(cudf::groupby::aggregation_request()); - for (auto& agg : aggregations) { - if (agg.first == cudf::aggregation::Kind::SUM) { - requests.back().aggregations.push_back( - cudf::make_sum_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::MEAN) { - requests.back().aggregations.push_back( - cudf::make_mean_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { - requests.back().aggregations.push_back( - cudf::make_count_aggregation()); - } else { - throw std::runtime_error("Unsupported aggregation"); - } - result_column_names.push_back(agg.second); - } - requests.back().values = table->column(value_col); - } - auto agg_results = groupby_obj.aggregate(requests); - std::vector> result_columns; - for (size_t i = 0; i < agg_results.first->num_columns(); i++) { - auto col = std::make_unique(agg_results.first->get_column(i)); - result_columns.push_back(std::move(col)); - } - for (size_t i = 0; i < agg_results.second.size(); i++) { - for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { - result_columns.push_back(std::move(agg_results.second[i].results[j])); - } - } - auto result_table = std::make_unique(std::move(result_columns)); - return std::make_unique(std::move(result_table), result_column_names); -} - -/** - * @brief Apply an order by operation to a table - * - * @param table The input table - * @param sort_keys The sort keys - * @param sort_key_orders The sort key orders - */ -[[nodiscard]] std::unique_ptr apply_orderby( - std::unique_ptr const& table, - std::vector const& sort_keys, - std::vector const& sort_key_orders) -{ - CUDF_FUNC_RANGE(); - std::vector column_views; - for (auto& key : sort_keys) { - column_views.push_back(table->column(key)); - } - auto result_table = - cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a reduction operation to a column - * - * @param column The input column - * @param agg_kind The aggregation kind - * @param col_name The name of the output column - */ -[[nodiscard]] std::unique_ptr apply_reduction( - cudf::column_view const& column, - cudf::aggregation::Kind const& agg_kind, - std::string const& col_name) -{ - CUDF_FUNC_RANGE(); - auto const agg = cudf::make_sum_aggregation(); - auto const result = cudf::reduce(column, *agg, column.type()); - cudf::size_type const len = 1; - auto col = cudf::make_column_from_scalar(*result, len); - std::vector> columns; - columns.push_back(std::move(col)); - auto result_table = std::make_unique(std::move(columns)); - std::vector col_names = {col_name}; - return std::make_unique(std::move(result_table), col_names); -} - -/** - * @brief Read a parquet file into a table - * - * @param filename The path to the parquet file - * @param columns The columns to read - * @param predicate The filter predicate to pushdown - */ -[[nodiscard]] std::unique_ptr read_parquet( - std::string const& filename, - std::vector const& columns = {}, - std::unique_ptr const& predicate = nullptr) -{ - CUDF_FUNC_RANGE(); - auto const source = cudf::io::source_info(filename); - auto builder = cudf::io::parquet_reader_options_builder(source); - if (!columns.empty()) { builder.columns(columns); } - if (predicate) { builder.filter(*predicate); } - auto const options = builder.build(); - auto table_with_metadata = cudf::io::read_parquet(options); - std::vector column_names; - for (auto const& col_info : table_with_metadata.metadata.schema_info) { - column_names.push_back(col_info.name); - } - return std::make_unique(std::move(table_with_metadata.tbl), column_names); -} - -/** - * @brief Generate the `std::tm` structure from year, month, and day - * - * @param year The year - * @param month The month - * @param day The day - */ -std::tm make_tm(int year, int month, int day) -{ - std::tm tm{}; - tm.tm_year = year - 1900; - tm.tm_mon = month - 1; - tm.tm_mday = day; - return tm; -} - -/** - * @brief Calculate the number of days since the UNIX epoch - * - * @param year The year - * @param month The month - * @param day The day - */ -int32_t days_since_epoch(int year, int month, int day) -{ - std::tm tm = make_tm(year, month, day); - std::tm epoch = make_tm(1970, 1, 1); - std::time_t time = std::mktime(&tm); - std::time_t epoch_time = std::mktime(&epoch); - double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); - return static_cast(diff); -} - -struct tpch_example_args { - std::string dataset_dir; - std::string memory_resource_type; -}; - -/** - * @brief Parse command line arguments into a struct - * - * @param argc The number of command line arguments - * @param argv The command line arguments - */ -tpch_example_args parse_args(int argc, char const** argv) -{ - if (argc < 3) { - std::string usage_message = "Usage: " + std::string(argv[0]) + - " \n The query result will be " - "saved to a parquet file named q{query_no}.parquet in the current " - "working directory "; - throw std::runtime_error(usage_message); - } - tpch_example_args args; - args.dataset_dir = argv[1]; - args.memory_resource_type = argv[2]; - return args; -}