diff --git a/.gitignore b/.gitignore index 619e1464b2a..180a6a286e2 100644 --- a/.gitignore +++ b/.gitignore @@ -80,7 +80,6 @@ build/ cpp/build/ cpp/examples/*/install/ cpp/examples/*/build/ -cpp/examples/tpch/datagen/datafusion cpp/include/cudf/ipc_generated/*.h cpp/thirdparty/googletest/ diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1040fcb7b91..7bc01e64441 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -384,6 +384,7 @@ add_library( src/io/json/nested_json_gpu.cu src/io/json/read_json.cu src/io/json/parser_features.cpp + src/io/json/process_tokens.cu src/io/json/write_json.cu src/io/orc/aggregate_orc_metadata.cpp src/io/orc/dict_enc.cu diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index d2c22b788cb..3bf9d02b384 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -36,25 +36,25 @@ target_include_directories( ) add_library( - tpch_data_generator STATIC - common/tpch_data_generator/tpch_data_generator.cpp common/tpch_data_generator/table_helpers.cpp - common/tpch_data_generator/random_column_generator.cu + ndsh_data_generator STATIC + common/ndsh_data_generator/ndsh_data_generator.cpp common/ndsh_data_generator/table_helpers.cpp + common/ndsh_data_generator/random_column_generator.cu ) -target_compile_features(tpch_data_generator PUBLIC cxx_std_17 cuda_std_17) +target_compile_features(ndsh_data_generator PUBLIC cxx_std_17 cuda_std_17) target_compile_options( - tpch_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" + ndsh_data_generator PUBLIC "$<$:${CUDF_CXX_FLAGS}>" "$<$:${CUDF_CUDA_FLAGS}>" ) target_link_libraries( - tpch_data_generator + ndsh_data_generator PUBLIC cudf cudftestutil nvtx3::nvtx3-cpp PRIVATE $ ) target_include_directories( - tpch_data_generator + ndsh_data_generator PUBLIC "$" "$" "$" ) @@ -127,8 +127,8 @@ function(ConfigureNVBench CMAKE_BENCH_NAME) INSTALL_RPATH "\$ORIGIN/../../../lib" ) target_link_libraries( - ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common cudf_datagen nvbench::nvbench - $ + ${CMAKE_BENCH_NAME} PRIVATE cudf_benchmark_common ndsh_data_generator cudf_datagen + nvbench::nvbench $ ) install( TARGETS ${CMAKE_BENCH_NAME} @@ -175,6 +175,14 @@ ConfigureBench(COPY_IF_ELSE_BENCH copying/copy_if_else.cpp) # * transpose benchmark --------------------------------------------------------------------------- ConfigureBench(TRANSPOSE_BENCH transpose/transpose.cpp) +# ################################################################################################## +# * nds-h benchmark -------------------------------------------------------------------------------- +ConfigureNVBench(NDSH_Q1 ndsh/q01.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q5 ndsh/q05.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q6 ndsh/q06.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q9 ndsh/q09.cpp ndsh/utilities.cpp) +ConfigureNVBench(NDSH_Q10 ndsh/q10.cpp ndsh/utilities.cpp) + # ################################################################################################## # * stream_compaction benchmark ------------------------------------------------------------------- ConfigureNVBench( diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp similarity index 97% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp index 236fe8095ad..fa7edd225ba 100644 --- a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.cpp +++ b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "tpch_data_generator.hpp" +#include "ndsh_data_generator.hpp" #include "random_column_generator.hpp" #include "table_helpers.hpp" @@ -435,46 +435,37 @@ std::unique_ptr generate_lineitem_partial(cudf::table_view const& o columns.push_back(std::move(l_quantity)); columns.push_back(std::move(l_discount)); columns.push_back(std::move(l_tax)); + columns.push_back(std::move(l_returnflag)); + columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipdate_ts)); columns.push_back(std::move(l_commitdate_ts)); columns.push_back(std::move(l_receiptdate_ts)); - columns.push_back(std::move(l_returnflag)); - columns.push_back(std::move(l_linestatus)); columns.push_back(std::move(l_shipinstruct)); columns.push_back(std::move(l_shipmode)); columns.push_back(std::move(l_comment)); return std::make_unique(std::move(columns)); } -std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem, +/** + * @brief Generate the part of the `orders` table dependent on the `lineitem` table + * + * @param lineitem_partial The partially generated `lineitem` table + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + */ +std::unique_ptr generate_orders_dependent(cudf::table_view const& lineitem_partial, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto const l_linestatus_mask = lineitem.column(0); - auto const l_orderkey = lineitem.column(1); - auto const l_discount = lineitem.column(6); - auto const l_tax = lineitem.column(7); - auto const l_extendedprice = lineitem.column(16); + auto const l_linestatus_mask = lineitem_partial.column(0); + auto const l_orderkey = lineitem_partial.column(1); + auto const l_extendedprice = lineitem_partial.column(6); + auto const l_discount = lineitem_partial.column(7); + auto const l_tax = lineitem_partial.column(8); std::vector> orders_dependent_columns; - // Generate the `o_totalprice` column - // We calculate the `charge` column, which is a function of `l_extendedprice`, - // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` - auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); - auto o_totalprice = [&]() { - auto const keys = cudf::table_view({l_orderkey}); - cudf::groupby::groupby gb(keys); - std::vector requests; - requests.push_back(cudf::groupby::aggregation_request()); - requests[0].aggregations.push_back(cudf::make_sum_aggregation()); - requests[0].values = l_charge->view(); - auto agg_result = gb.aggregate(requests); - return cudf::round(agg_result.second[0].results[0]->view(), 2); - }(); - orders_dependent_columns.push_back(std::move(o_totalprice)); - // Generate the `o_orderstatus` column auto o_orderstatus = [&]() { auto const keys = cudf::table_view({l_orderkey}); @@ -529,6 +520,22 @@ std::unique_ptr generate_orders_dependent(cudf::table_view const& l cudf::string_scalar("P"), o_orderstatus_intermediate->view(), mask_b->view()); }(); orders_dependent_columns.push_back(std::move(o_orderstatus)); + + // Generate the `o_totalprice` column + // We calculate the `charge` column, which is a function of `l_extendedprice`, + // `l_tax`, and `l_discount` and then group by `l_orderkey` and sum the `charge` + auto const l_charge = calculate_charge(l_extendedprice, l_tax, l_discount, stream, mr); + auto o_totalprice = [&]() { + auto const keys = cudf::table_view({l_orderkey}); + cudf::groupby::groupby gb(keys); + std::vector requests; + requests.push_back(cudf::groupby::aggregation_request()); + requests[0].aggregations.push_back(cudf::make_sum_aggregation()); + requests[0].values = l_charge->view(); + auto agg_result = gb.aggregate(requests); + return cudf::round(agg_result.second[0].results[0]->view(), 2); + }(); + orders_dependent_columns.push_back(std::move(o_totalprice)); return std::make_unique(std::move(orders_dependent_columns)); } @@ -730,9 +737,7 @@ generate_orders_lineitem_part(double scale_factor, // Generate the `part` table auto part = generate_part(scale_factor, stream, mr); - // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column, - // add the column to the `lineitem` table, and write the `lineitem` table to a parquet file - + // Join the `part` and partial `lineitem` tables, then calculate the `l_extendedprice` column auto l_extendedprice = [&]() { auto const left = cudf::table_view( {lineitem_partial->get_column(2).view(), lineitem_partial->get_column(5).view()}); @@ -752,8 +757,9 @@ generate_orders_lineitem_part(double scale_factor, return cudf::round(col->view(), 2); }(); + // Insert the `l_extendedprice` column into the partial columns of the `lineitem` table auto lineitem_partial_columns = lineitem_partial->release(); - lineitem_partial_columns.push_back(std::move(l_extendedprice)); + lineitem_partial_columns.insert(lineitem_partial_columns.begin() + 6, std::move(l_extendedprice)); auto lineitem_temp = std::make_unique(std::move(lineitem_partial_columns)); // Generate the dependent columns of the `orders` table @@ -762,7 +768,7 @@ generate_orders_lineitem_part(double scale_factor, auto orders_independent_columns = orders_independent->release(); auto orders_dependent_columns = orders_dependent->release(); - orders_independent_columns.insert(orders_independent_columns.end(), + orders_independent_columns.insert(orders_independent_columns.begin() + 2, std::make_move_iterator(orders_dependent_columns.begin()), std::make_move_iterator(orders_dependent_columns.end())); diff --git a/cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/tpch_data_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/ndsh_data_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.cu rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.cu diff --git a/cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp b/cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/random_column_generator.hpp rename to cpp/benchmarks/common/ndsh_data_generator/random_column_generator.hpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.cpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.cpp diff --git a/cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp b/cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp similarity index 100% rename from cpp/benchmarks/common/tpch_data_generator/table_helpers.hpp rename to cpp/benchmarks/common/ndsh_data_generator/table_helpers.hpp diff --git a/cpp/benchmarks/ndsh/README.md b/cpp/benchmarks/ndsh/README.md new file mode 100644 index 00000000000..0a462e1684e --- /dev/null +++ b/cpp/benchmarks/ndsh/README.md @@ -0,0 +1,11 @@ +# NDS-H Benchmarks for `libcudf` + +## Disclaimer + +NDS-H is derived from the TPC-H Benchmarks and as such any results obtained using NDS-H are not +comparable to published TPC-H Benchmark results, as the results obtained from using NDS-H do not +comply with the TPC-H Benchmarks. + +## Current Status + +For now, only Q1, Q5, Q6, Q9, and Q10 have been implemented diff --git a/cpp/examples/tpch/q1.cpp b/cpp/benchmarks/ndsh/q01.cpp similarity index 82% rename from cpp/examples/tpch/q1.cpp rename to cpp/benchmarks/ndsh/q01.cpp index 87b7e613766..ef709926ae9 100644 --- a/cpp/examples/tpch/q1.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q1.cpp - * @brief Implement query 1 of the TPC-H benchmark. + * @file q01.cpp + * @brief Implement query 1 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -59,7 +61,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_disc_price( +[[nodiscard]] std::unique_ptr calculate_disc_price( cudf::column_view const& discount, cudf::column_view const& extendedprice, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,7 +88,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_charge( +[[nodiscard]] std::unique_ptr calculate_charge( cudf::column_view const& tax, cudf::column_view const& disc_price, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -101,16 +103,9 @@ return charge; } -int main(int argc, char const** argv) +void run_ndsh_q1(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projections and filter predicate for `lineitem` table std::vector const lineitem_cols = {"l_returnflag", "l_linestatus", @@ -130,12 +125,12 @@ int main(int argc, char const** argv) // Read out the `lineitem` table from parquet file auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = - calc_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); - auto charge = calc_charge(lineitem->column("l_tax"), disc_price->view()); + calculate_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); + auto charge = calculate_charge(lineitem->column("l_tax"), disc_price->view()); (*lineitem).append(disc_price, "disc_price").append(charge, "charge"); // Perform the group by operation @@ -167,9 +162,21 @@ int main(int argc, char const** argv) {"l_returnflag", "l_linestatus"}, {cudf::order::ASCENDING, cudf::order::ASCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q1.parquet"); - return 0; } + +void ndsh_q1(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q1(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q1).set_name("ndsh_q1").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q5.cpp b/cpp/benchmarks/ndsh/q05.cpp similarity index 80% rename from cpp/examples/tpch/q5.cpp rename to cpp/benchmarks/ndsh/q05.cpp index 12c186db10e..522bc4789c2 100644 --- a/cpp/examples/tpch/q5.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** - * @file q5.cpp - * @brief Implement query 5 of the TPC-H benchmark. + * @file q05.cpp + * @brief Implement query 5 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -67,7 +69,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -86,16 +88,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q5(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -125,17 +120,17 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = - read_parquet(args.dataset_dir + "/customer.parquet", {"c_custkey", "c_nationkey"}); + read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); - auto const lineitem = read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(sources["lineitem"].make_source_info(), {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); auto const nation = - read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_regionkey", "n_name"}); + read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); auto const region = - read_parquet(args.dataset_dir + "/region.parquet", region_cols, std::move(region_pred)); + read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred)); // Perform the joins auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); @@ -147,7 +142,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -162,9 +157,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q5.parquet"); - return 0; } + +void ndsh_q5(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q5(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q5).set_name("ndsh_q5").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q6.cpp b/cpp/benchmarks/ndsh/q06.cpp similarity index 79% rename from cpp/examples/tpch/q6.cpp rename to cpp/benchmarks/ndsh/q06.cpp index 92dac40c768..04078547973 100644 --- a/cpp/examples/tpch/q6.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -14,17 +14,20 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include +#include #include +#include + /** - * @file q6.cpp - * @brief Implement query 6 of the TPC-H benchmark. + * @file q06.cpp + * @brief Implement query 6 of the NDS-H benchmark. * * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; * @@ -48,7 +51,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -60,16 +63,9 @@ return revenue; } -int main(int argc, char const** argv) +void run_ndsh_q6(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the `lineitem` table from parquet file std::vector const lineitem_cols = { "l_extendedprice", "l_discount", "l_shipdate", "l_quantity"}; @@ -88,7 +84,7 @@ int main(int argc, char const** argv) auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); auto lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = @@ -99,8 +95,8 @@ int main(int argc, char const** argv) (*lineitem).append(discout_float, "l_discount_float").append(quantity_float, "l_quantity_float"); // Apply the filters - auto const discount_ref = cudf::ast::column_reference(lineitem->col_id("l_discount_float")); - auto const quantity_ref = cudf::ast::column_reference(lineitem->col_id("l_quantity_float")); + auto const discount_ref = cudf::ast::column_reference(lineitem->column_id("l_discount_float")); + auto const quantity_ref = cudf::ast::column_reference(lineitem->column_id("l_quantity_float")); auto discount_lower = cudf::numeric_scalar(0.05); auto const discount_lower_literal = cudf::ast::literal(discount_lower); @@ -123,16 +119,28 @@ int main(int argc, char const** argv) auto const filtered_table = apply_filter(lineitem, discount_quantity_pred); // Calculate the `revenue` column - auto revenue = - calc_revenue(filtered_table->column("l_extendedprice"), filtered_table->column("l_discount")); + auto revenue = calculate_revenue(filtered_table->column("l_extendedprice"), + filtered_table->column("l_discount")); // Sum the `revenue` column auto const revenue_view = revenue->view(); auto const result_table = apply_reduction(revenue_view, cudf::aggregation::Kind::SUM, "revenue"); - timer.print_elapsed_millis(); - // Write query result to a parquet file result_table->to_parquet("q6.parquet"); - return 0; } + +void ndsh_q6(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q6(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q6).set_name("ndsh_q6").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q9.cpp b/cpp/benchmarks/ndsh/q09.cpp similarity index 78% rename from cpp/examples/tpch/q9.cpp rename to cpp/benchmarks/ndsh/q09.cpp index 2882182aa2b..59218ab8912 100644 --- a/cpp/examples/tpch/q9.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -14,9 +14,9 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" +#include #include #include #include @@ -24,9 +24,11 @@ #include #include +#include + /** - * @file q9.cpp - * @brief Implement query 9 of the TPC-H benchmark. + * @file q09.cpp + * @brief Implement query 9 of the NDS-H benchmark. * * create view part as select * from '/tables/scale-1/part.parquet'; * create view supplier as select * from '/tables/scale-1/supplier.parquet'; @@ -79,7 +81,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_amount( +[[nodiscard]] std::unique_ptr calculate_amount( cudf::column_view const& discount, cudf::column_view const& extendedprice, cudf::column_view const& supplycost, @@ -109,28 +111,21 @@ return amount; } -int main(int argc, char const** argv) +void run_ndsh_q9(nvbench::state& state, + std::unordered_map& sources) { - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; - // Read out the table from parquet files auto const lineitem = read_parquet( - args.dataset_dir + "/lineitem.parquet", + sources["lineitem"].make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_name"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(args.dataset_dir + "/part.parquet", {"p_partkey", "p_name"}); - auto const partsupp = read_parquet(args.dataset_dir + "/partsupp.parquet", + read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(sources["partsupp"].make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = - read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); // Generating the `profit` table // Filter the part table using `p_name like '%green%'` @@ -150,10 +145,10 @@ int main(int argc, char const** argv) // Calculate the `nation`, `o_year`, and `amount` columns auto n_name = std::make_unique(joined_table->column("n_name")); auto o_year = cudf::datetime::extract_year(joined_table->column("o_orderdate")); - auto amount = calc_amount(joined_table->column("l_discount"), - joined_table->column("l_extendedprice"), - joined_table->column("ps_supplycost"), - joined_table->column("l_quantity")); + auto amount = calculate_amount(joined_table->column("l_discount"), + joined_table->column("l_extendedprice"), + joined_table->column("ps_supplycost"), + joined_table->column("l_quantity")); // Put together the `profit` table std::vector> profit_columns; @@ -175,9 +170,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby( groupedby_table, {"nation", "o_year"}, {cudf::order::ASCENDING, cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q9.parquet"); - return 0; } + +void ndsh_q9(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q9(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q9).set_name("ndsh_q9").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/examples/tpch/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp similarity index 81% rename from cpp/examples/tpch/q10.cpp rename to cpp/benchmarks/ndsh/q10.cpp index fdf147b50e0..a520480020a 100644 --- a/cpp/examples/tpch/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -14,17 +14,19 @@ * limitations under the License. */ -#include "../utilities/timer.hpp" -#include "utils.hpp" +#include "utilities.hpp" #include +#include #include #include #include +#include + /** * @file q10.cpp - * @brief Implement query 10 of the TPC-H benchmark. + * @brief Implement query 10 of the NDS-H benchmark. * * create view customer as select * from '/tables/scale-1/customer.parquet'; * create view orders as select * from '/tables/scale-1/orders.parquet'; @@ -72,7 +74,7 @@ * @param stream The CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. */ -[[nodiscard]] std::unique_ptr calc_revenue( +[[nodiscard]] std::unique_ptr calculate_revenue( cudf::column_view const& extendedprice, cudf::column_view const& discount, rmm::cuda_stream_view stream = cudf::get_default_stream(), @@ -90,16 +92,10 @@ mr); return revenue; } -int main(int argc, char const** argv) -{ - auto const args = parse_args(argc, argv); - - // Use a memory pool - auto resource = create_memory_resource(args.memory_resource_type); - cudf::set_current_device_resource(resource.get()); - - cudf::examples::timer timer; +void run_ndsh_q10(nvbench::state& state, + std::unordered_map& sources) +{ // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( @@ -126,15 +122,15 @@ int main(int argc, char const** argv) // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = read_parquet( - args.dataset_dir + "/customer.parquet", + sources["customer"].make_source_info(), {"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"}); auto const orders = - read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); + read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); auto const lineitem = - read_parquet(args.dataset_dir + "/lineitem.parquet", + read_parquet(sources["lineitem"].make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_name", "n_nationkey"}); + auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); @@ -143,7 +139,7 @@ int main(int argc, char const** argv) // Calculate and append the `revenue` column auto revenue = - calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + calculate_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); (*joined_table).append(revenue, "revenue"); // Perform the groupby operation @@ -159,9 +155,22 @@ int main(int argc, char const** argv) auto const orderedby_table = apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); - timer.print_elapsed_millis(); - // Write query result to a parquet file orderedby_table->to_parquet("q10.parquet"); - return 0; } + +void ndsh_q10(nvbench::state& state) +{ + // Generate the required parquet files in device buffers + double const scale_factor = state.get_float64("scale_factor"); + std::unordered_map sources; + generate_parquet_data_sources( + scale_factor, {"customer", "orders", "lineitem", "nation"}, sources); + + auto stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::sync, + [&](nvbench::launch& launch) { run_ndsh_q10(state, sources); }); +} + +NVBENCH_BENCH(ndsh_q10).set_name("ndsh_q10").add_float64_axis("scale_factor", {0.01, 0.1, 1}); diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp new file mode 100644 index 00000000000..2d514764fc2 --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utilities.hpp" + +#include "common/ndsh_data_generator/ndsh_data_generator.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace { + +std::vector const ORDERS_SCHEMA = {"o_orderkey", + "o_custkey", + "o_orderstatus", + "o_totalprice", + "o_orderdate", + "o_orderpriority", + "o_clerk", + "o_shippriority", + "o_comment"}; +std::vector const LINEITEM_SCHEMA = {"l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment"}; +std::vector const PART_SCHEMA = {"p_partkey", + "p_name", + "p_mfgr", + "p_brand", + "p_type", + "p_size", + "p_container", + "p_retailprice", + "p_comment"}; +std::vector const PARTSUPP_SCHEMA = { + "ps_partkey", "ps_suppkey", "ps_availqty", "ps_supplycost", "ps_comment"}; +std::vector const SUPPLIER_SCHEMA = { + "s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment"}; +std::vector const CUSTOMER_SCHEMA = {"c_custkey", + "c_name", + "c_address", + "c_nationkey", + "c_phone", + "c_acctbal", + "c_mktsegment", + "c_comment"}; +std::vector const NATION_SCHEMA = { + "n_nationkey", "n_name", "n_regionkey", "n_comment"}; +std::vector const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"}; + +} // namespace + +cudf::table_view table_with_names::table() const { return tbl->view(); } + +cudf::column_view table_with_names::column(std::string const& col_name) const +{ + return tbl->view().column(column_id(col_name)); +} + +std::vector const& table_with_names::column_names() const { return col_names; } + +cudf::size_type table_with_names::column_id(std::string const& col_name) const +{ + auto it = std::find(col_names.begin(), col_names.end(), col_name); + if (it == col_names.end()) { + std::string err_msg = "Column `" + col_name + "` not found"; + throw std::runtime_error(err_msg); + } + return std::distance(col_names.begin(), it); +} + +table_with_names& table_with_names::append(std::unique_ptr& col, + std::string const& col_name) +{ + auto cols = tbl->release(); + cols.push_back(std::move(col)); + tbl = std::make_unique(std::move(cols)); + col_names.push_back(col_name); + return (*this); +} + +cudf::table_view table_with_names::select(std::vector const& col_names) const +{ + CUDF_FUNC_RANGE(); + std::vector col_indices; + for (auto const& col_name : col_names) { + col_indices.push_back(column_id(col_name)); + } + return tbl->select(col_indices); +} + +void table_with_names::to_parquet(std::string const& filepath) const +{ + CUDF_FUNC_RANGE(); + auto const sink_info = cudf::io::sink_info(filepath); + cudf::io::table_metadata metadata; + metadata.schema_info = + std::vector(col_names.begin(), col_names.end()); + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); +} + +std::unique_ptr join_and_gather(cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto const left_selected = left_input.select(left_on); + auto const right_selected = right_input.select(right_on); + auto const [left_join_indices, right_join_indices] = cudf::inner_join( + left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); + + auto const left_indices_span = cudf::device_span{*left_join_indices}; + auto const right_indices_span = cudf::device_span{*right_join_indices}; + + auto const left_indices_col = cudf::column_view{left_indices_span}; + auto const right_indices_col = cudf::column_view{right_indices_span}; + + auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); + auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); + + auto joined_cols = left_result->release(); + auto right_cols = right_result->release(); + joined_cols.insert(joined_cols.end(), + std::make_move_iterator(right_cols.begin()), + std::make_move_iterator(right_cols.end())); + return std::make_unique(std::move(joined_cols)); +} + +std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + std::vector left_on_indices; + std::vector right_on_indices; + std::transform( + left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { + return left_input->column_id(col_name); + }); + std::transform(right_on.begin(), + right_on.end(), + std::back_inserter(right_on_indices), + [&](auto const& col_name) { return right_input->column_id(col_name); }); + auto table = join_and_gather( + left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); + ; + std::vector merged_column_names; + merged_column_names.reserve(left_input->column_names().size() + + right_input->column_names().size()); + std::copy(left_input->column_names().begin(), + left_input->column_names().end(), + std::back_inserter(merged_column_names)); + std::copy(right_input->column_names().begin(), + right_input->column_names().end(), + std::back_inserter(merged_column_names)); + return std::make_unique(std::move(table), merged_column_names); + return std::make_unique(std::move(table), merged_column_names); +} + +std::unique_ptr apply_filter(std::unique_ptr const& table, + cudf::ast::operation const& predicate) +{ + CUDF_FUNC_RANGE(); + auto const boolean_mask = cudf::compute_column(table->table(), predicate); + auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_mask(std::unique_ptr const& table, + std::unique_ptr const& mask) +{ + CUDF_FUNC_RANGE(); + auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_groupby(std::unique_ptr const& table, + groupby_context_t const& ctx) +{ + CUDF_FUNC_RANGE(); + auto const keys = table->select(ctx.keys); + cudf::groupby::groupby groupby_obj(keys); + std::vector result_column_names; + result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); + std::vector requests; + for (auto& [value_col, aggregations] : ctx.values) { + requests.emplace_back(cudf::groupby::aggregation_request()); + for (auto& agg : aggregations) { + if (agg.first == cudf::aggregation::Kind::SUM) { + requests.back().aggregations.push_back( + cudf::make_sum_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::MEAN) { + requests.back().aggregations.push_back( + cudf::make_mean_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { + requests.back().aggregations.push_back( + cudf::make_count_aggregation()); + } else { + throw std::runtime_error("Unsupported aggregation"); + } + result_column_names.push_back(agg.second); + } + requests.back().values = table->column(value_col); + } + auto agg_results = groupby_obj.aggregate(requests); + std::vector> result_columns; + for (auto i = 0; i < agg_results.first->num_columns(); i++) { + auto col = std::make_unique(agg_results.first->get_column(i)); + result_columns.push_back(std::move(col)); + } + for (size_t i = 0; i < agg_results.second.size(); i++) { + for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { + result_columns.push_back(std::move(agg_results.second[i].results[j])); + } + } + auto result_table = std::make_unique(std::move(result_columns)); + return std::make_unique(std::move(result_table), result_column_names); +} + +std::unique_ptr apply_orderby(std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders) +{ + CUDF_FUNC_RANGE(); + std::vector column_views; + for (auto& key : sort_keys) { + column_views.push_back(table->column(key)); + } + auto result_table = + cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); + return std::make_unique(std::move(result_table), table->column_names()); +} + +std::unique_ptr apply_reduction(cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name) +{ + CUDF_FUNC_RANGE(); + auto const agg = cudf::make_sum_aggregation(); + auto const result = cudf::reduce(column, *agg, column.type()); + cudf::size_type const len = 1; + auto col = cudf::make_column_from_scalar(*result, len); + std::vector> columns; + columns.push_back(std::move(col)); + auto result_table = std::make_unique(std::move(columns)); + std::vector col_names = {col_name}; + return std::make_unique(std::move(result_table), col_names); +} + +std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns, + std::unique_ptr const& predicate) +{ + CUDF_FUNC_RANGE(); + auto builder = cudf::io::parquet_reader_options_builder(source_info); + if (!columns.empty()) { builder.columns(columns); } + if (predicate) { builder.filter(*predicate); } + auto const options = builder.build(); + auto table_with_metadata = cudf::io::read_parquet(options); + std::vector column_names; + for (auto const& col_info : table_with_metadata.metadata.schema_info) { + column_names.push_back(col_info.name); + } + return std::make_unique(std::move(table_with_metadata.tbl), column_names); +} + +std::tm make_tm(int year, int month, int day) +{ + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + return tm; +} + +int32_t days_since_epoch(int year, int month, int day) +{ + std::tm tm = make_tm(year, month, day); + std::tm epoch = make_tm(1970, 1, 1); + std::time_t time = std::mktime(&tm); + std::time_t epoch_time = std::mktime(&epoch); + double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); + return static_cast(diff); +} + +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source) +{ + CUDF_FUNC_RANGE(); + auto const stream = cudf::get_default_stream(); + + // Prepare the table metadata + cudf::io::table_metadata metadata; + std::vector col_name_infos; + for (auto& col_name : col_names) { + col_name_infos.push_back(cudf::io::column_name_info(col_name)); + } + metadata.schema_info = col_name_infos; + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + + // Declare a host and device buffer + std::vector h_buffer; + + // Write parquet data to host buffer + auto builder = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); + + // Copy host buffer to device buffer + source.d_buffer.resize(h_buffer.size(), stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value())); +} + +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources) +{ + CUDF_FUNC_RANGE(); + std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { + sources[table_name] = parquet_device_buffer(); + }); + + auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto partsupp = cudf::datagen::generate_partsupp( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto supplier = cudf::datagen::generate_supplier( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto customer = cudf::datagen::generate_customer( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + + auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + auto region = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + + write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]); + write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]); + write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]); + write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]); + write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]); + write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); + write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); + write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); +} diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp new file mode 100644 index 00000000000..762e43deccf --- /dev/null +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +/** + * @brief A class to represent a table with column names attached + */ +class table_with_names { + public: + table_with_names(std::unique_ptr tbl, std::vector col_names) + : tbl(std::move(tbl)), col_names(col_names){}; + /** + * @brief Return the table view + */ + [[nodiscard]] cudf::table_view table() const; + /** + * @brief Return the column view for a given column name + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::column_view column(std::string const& col_name) const; + /** + * @param Return the column names of the table + */ + [[nodiscard]] std::vector const& column_names() const; + /** + * @brief Translate a column name to a column index + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::size_type column_id(std::string const& col_name) const; + /** + * @brief Append a column to the table + * + * @param col The column to append + * @param col_name The name of the appended column + */ + table_with_names& append(std::unique_ptr& col, std::string const& col_name); + /** + * @brief Select a subset of columns from the table + * + * @param col_names The names of the columns to select + */ + [[nodiscard]] cudf::table_view select(std::vector const& col_names) const; + /** + * @brief Write the table to a parquet file + * + * @param filepath The path to the parquet file + */ + void to_parquet(std::string const& filepath) const; + + private: + std::unique_ptr tbl; + std::vector col_names; +}; + +/** + * @brief Inner join two tables and gather the result + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr join_and_gather( + cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls); + +/** + * @brief Apply an inner join operation to two tables + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls = cudf::null_equality::EQUAL); + +/** + * @brief Apply a filter predicate to a table + * + * @param table The input table + * @param predicate The filter predicate + */ +[[nodiscard]] std::unique_ptr apply_filter( + std::unique_ptr const& table, cudf::ast::operation const& predicate); + +/** + * @brief Apply a boolean mask to a table + * + * @param table The input table + * @param mask The boolean mask + */ +[[nodiscard]] std::unique_ptr apply_mask( + std::unique_ptr const& table, std::unique_ptr const& mask); + +/** + * Struct representing group by key columns, value columns, and the type of aggregations to perform + * on the value columns + */ +struct groupby_context_t { + std::vector keys; + std::unordered_map>> + values; +}; + +/** + * @brief Apply a groupby operation to a table + * + * @param table The input table + * @param ctx The groupby context + */ +[[nodiscard]] std::unique_ptr apply_groupby( + std::unique_ptr const& table, groupby_context_t const& ctx); + +/** + * @brief Apply an order by operation to a table + * + * @param table The input table + * @param sort_keys The sort keys + * @param sort_key_orders The sort key orders + */ +[[nodiscard]] std::unique_ptr apply_orderby( + std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders); + +/** + * @brief Apply a reduction operation to a column + * + * @param column The input column + * @param agg_kind The aggregation kind + * @param col_name The name of the output column + */ +[[nodiscard]] std::unique_ptr apply_reduction( + cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name); + +/** + * @brief Read a parquet file into a table + * + * @param source_info The source of the parquet file + * @param columns The columns to read + * @param predicate The filter predicate to pushdown + */ +[[nodiscard]] std::unique_ptr read_parquet( + cudf::io::source_info const& source_info, + std::vector const& columns = {}, + std::unique_ptr const& predicate = nullptr); + +/** + * @brief Generate the `std::tm` structure from year, month, and day + * + * @param year The year + * @param month The month + * @param day The day + */ +std::tm make_tm(int year, int month, int day); + +/** + * @brief Calculate the number of days since the UNIX epoch + * + * @param year The year + * @param month The month + * @param day The day + */ +int32_t days_since_epoch(int year, int month, int day); + +/** + * @brief Struct representing a parquet device buffer + */ +struct parquet_device_buffer { + parquet_device_buffer() : d_buffer{0, cudf::get_default_stream()} {}; + cudf::io::source_info make_source_info() { return cudf::io::source_info(d_buffer); } + rmm::device_uvector d_buffer; +}; + +/** + * @brief Write a `cudf::table` to a parquet device buffer + * + * @param table The `cudf::table` to write + * @param col_names The column names of the table + * @param parquet_device_buffer The parquet device buffer to write the table to + */ +void write_to_parquet_device_buffer(std::unique_ptr const& table, + std::vector const& col_names, + parquet_device_buffer& source); + +/** + * @brief Generate NDS-H tables and write to parquet device buffers + * + * @param scale_factor The scale factor of NDS-H tables to generate + * @param table_names The names of the tables to generate + * @param sources The parquet data sources to populate + */ +void generate_parquet_data_sources(double scale_factor, + std::vector const& table_names, + std::unordered_map& sources); diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 8e8d8bd0b78..25984df1b60 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -57,7 +57,6 @@ build_example() { } build_example basic -build_example tpch build_example strings build_example nested_types build_example parquet_io diff --git a/cpp/examples/tpch/CMakeLists.txt b/cpp/examples/tpch/CMakeLists.txt deleted file mode 100644 index 373a6d72d56..00000000000 --- a/cpp/examples/tpch/CMakeLists.txt +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -cmake_minimum_required(VERSION 3.26.4) - -include(../set_cuda_architecture.cmake) - -rapids_cuda_init_architectures(tpch_example) -rapids_cuda_set_architectures(RAPIDS) - -project( - tpch_example - VERSION 0.0.1 - LANGUAGES CXX CUDA -) - -include(../fetch_dependencies.cmake) - -add_executable(tpch_q1 q1.cpp) -target_link_libraries(tpch_q1 PRIVATE cudf::cudf) -target_compile_features(tpch_q1 PRIVATE cxx_std_17) - -add_executable(tpch_q5 q5.cpp) -target_link_libraries(tpch_q5 PRIVATE cudf::cudf) -target_compile_features(tpch_q5 PRIVATE cxx_std_17) - -add_executable(tpch_q6 q6.cpp) -target_link_libraries(tpch_q6 PRIVATE cudf::cudf) -target_compile_features(tpch_q6 PRIVATE cxx_std_17) - -add_executable(tpch_q9 q9.cpp) -target_link_libraries(tpch_q9 PRIVATE cudf::cudf) -target_compile_features(tpch_q9 PRIVATE cxx_std_17) - -add_executable(tpch_q10 q10.cpp) -target_link_libraries(tpch_q10 PRIVATE cudf::cudf) -target_compile_features(tpch_q10 PRIVATE cxx_std_17) diff --git a/cpp/examples/tpch/README.md b/cpp/examples/tpch/README.md deleted file mode 100644 index 8c046c3f1e8..00000000000 --- a/cpp/examples/tpch/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# TPC-H Derived Examples - -Implements TPC-H queries using `libcudf`. We leverage the data generator (wrapper around official TPC-H datagen) from [Apache Datafusion](https://github.com/apache/datafusion) for generating data in Parquet format. - -## Requirements - -- Rust -- [libcudf](https://github.com/rapidsai/cudf/blob/branch-24.08/CONTRIBUTING.md#setting-up-your-build-environment) - -## Running Queries - -1. Build the `libcudf` examples. -```bash -cd cudf/cpp/examples -./build.sh -``` -The TPC-H query binaries would be built inside `tpch/build`. - -2. Generate the dataset. -```bash -cd tpch/datagen -./datagen.sh [scale factor (1/10)] -``` - -The parquet files will be generated in `tpch/datagen/datafusion/benchmarks/data/tpch_sf[scale factor]`. - -3. Set these environment variables for optimized runtimes. -```bash -export KVIKIO_COMPAT_MODE="on" -export LIBCUDF_CUFILE_POLICY="KVIKIO" -export CUDA_MODULE_LOADING="EAGER" -``` - -4. Execute the queries. -```bash -./tpch/build/tpch_q[query no] [path to dataset] [memory resource type (cuda/pool/managed/managed_pool)] -``` - -A parquet file named `q[query no].parquet` would be generated containing the results of the query. diff --git a/cpp/examples/tpch/datagen/correct_datatypes.py b/cpp/examples/tpch/datagen/correct_datatypes.py deleted file mode 100644 index 8564774647b..00000000000 --- a/cpp/examples/tpch/datagen/correct_datatypes.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. - -import os -import sys - -import pyarrow as pa -import pyarrow.parquet as pq -import pandas as pd - -if __name__ == "__main__": - dataset_path = str(sys.argv[1]) - tables = ["lineitem", "part", "partsupp", "orders", "supplier", "customer", "nation", "region"] - for table in tables: - filepath = os.path.join(dataset_path, f"{table}.parquet") - print("Reading file ", filepath) - - if filepath.endswith("lineitem.parquet"): - df = pd.read_parquet(filepath) - df["l_linenumber"] = df["l_linenumber"].astype("int64") - df["l_quantity"] = df["l_quantity"].astype("int64") - df["l_extendedprice"] = df["l_extendedprice"].astype("float64") - df["l_discount"] = df["l_discount"].astype("float64") - df["l_tax"] = df["l_tax"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("part.parquet"): - df = pd.read_parquet(filepath) - df["p_size"] = df["p_size"].astype("int64") - df["p_retailprice"] = df["p_retailprice"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("partsupp.parquet"): - df = pd.read_parquet(filepath) - df["ps_availqty"] = df["ps_availqty"].astype("int64") - df["ps_supplycost"] = df["ps_supplycost"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("orders.parquet"): - df = pd.read_parquet(filepath) - df["o_totalprice"] = df["o_totalprice"].astype("float64") - df["o_shippriority"] = df["o_shippriority"].astype("int64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("supplier.parquet"): - df = pd.read_parquet(filepath) - df["s_acctbal"] = df["s_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("customer.parquet"): - df = pd.read_parquet(filepath) - df["c_acctbal"] = df["c_acctbal"].astype("float64") - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("nation.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") - - elif filepath.endswith("region.parquet"): - df = pd.read_parquet(filepath) - pq.write_table(pa.Table.from_pandas(df), filepath, compression="snappy") diff --git a/cpp/examples/tpch/datagen/datagen.sh b/cpp/examples/tpch/datagen/datagen.sh deleted file mode 100755 index 0b03753daea..00000000000 --- a/cpp/examples/tpch/datagen/datagen.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -# Copyright (c) 2024, NVIDIA CORPORATION. - -set -e - -scale_factor=$1 -script_dir=$(pwd) - -# Clone the datafusion repository and apply a patch -# for single threaded data generation so that a -# single parquet file is generated for each table -rm -rf datafusion -git clone https://github.com/apache/datafusion.git datafusion -cd datafusion/ -git checkout 679a85f -git apply ${script_dir}/tpch.patch -cd benchmarks/ - -# Generate the data -# Currently, we support only scale factor 1 and 10 -if [ ${scale_factor} -eq 1 ]; then - ./bench.sh data tpch -elif [ ${scale_factor} -eq 10 ]; then - ./bench.sh data tpch10 -else - echo "Unsupported scale factor" - exit 1 -fi - -# Correct the datatypes of the parquet files -python3 ${script_dir}/correct_datatypes.py data/tpch_sf${scale_factor} diff --git a/cpp/examples/tpch/datagen/tpch.patch b/cpp/examples/tpch/datagen/tpch.patch deleted file mode 100644 index 42727aa9904..00000000000 --- a/cpp/examples/tpch/datagen/tpch.patch +++ /dev/null @@ -1,33 +0,0 @@ -diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh -index 3b854f6dc..f000f09c0 100755 ---- a/benchmarks/bench.sh -+++ b/benchmarks/bench.sh -@@ -311,6 +311,15 @@ data_tpch() { - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet - popd > /dev/null - fi -+ -+ cp ${TPCH_DIR}/lineitem/part-0.parquet ${TPCH_DIR}/lineitem.parquet -+ cp ${TPCH_DIR}/orders/part-0.parquet ${TPCH_DIR}/orders.parquet -+ cp ${TPCH_DIR}/part/part-0.parquet ${TPCH_DIR}/part.parquet -+ cp ${TPCH_DIR}/partsupp/part-0.parquet ${TPCH_DIR}/partsupp.parquet -+ cp ${TPCH_DIR}/customer/part-0.parquet ${TPCH_DIR}/customer.parquet -+ cp ${TPCH_DIR}/supplier/part-0.parquet ${TPCH_DIR}/supplier.parquet -+ cp ${TPCH_DIR}/nation/part-0.parquet ${TPCH_DIR}/nation.parquet -+ cp ${TPCH_DIR}/region/part-0.parquet ${TPCH_DIR}/region.parquet - } - - # Runs the tpch benchmark -diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs -index b5204b343..84fd2e78d 100644 ---- a/datafusion/common/src/config.rs -+++ b/datafusion/common/src/config.rs -@@ -250,7 +250,7 @@ config_namespace! { - /// concurrency. - /// - /// Defaults to the number of CPU cores on the system -- pub target_partitions: usize, default = num_cpus::get() -+ pub target_partitions: usize, default = 1 - - /// The default time zone - /// diff --git a/cpp/examples/tpch/utils.hpp b/cpp/examples/tpch/utils.hpp deleted file mode 100644 index 8102fa8f976..00000000000 --- a/cpp/examples/tpch/utils.hpp +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -// RMM memory resource creation utilities -inline auto make_cuda() { return std::make_shared(); } -inline auto make_pool() -{ - return rmm::mr::make_owning_wrapper( - make_cuda(), rmm::percent_of_free_device_memory(50)); -} -inline auto make_managed() { return std::make_shared(); } -inline auto make_managed_pool() -{ - return rmm::mr::make_owning_wrapper( - make_managed(), rmm::percent_of_free_device_memory(50)); -} -inline std::shared_ptr create_memory_resource( - std::string const& mode) -{ - if (mode == "cuda") return make_cuda(); - if (mode == "pool") return make_pool(); - if (mode == "managed") return make_managed(); - if (mode == "managed_pool") return make_managed_pool(); - CUDF_FAIL("Unknown rmm_mode parameter: " + mode + - "\nExpecting: cuda, pool, managed, or managed_pool"); -} - -/** - * @brief A class to represent a table with column names attached - */ -class table_with_names { - public: - table_with_names(std::unique_ptr tbl, std::vector col_names) - : tbl(std::move(tbl)), col_names(col_names) - { - } - /** - * @brief Return the table view - */ - [[nodiscard]] cudf::table_view table() const { return tbl->view(); } - /** - * @brief Return the column view for a given column name - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::column_view column(std::string const& col_name) const - { - return tbl->view().column(col_id(col_name)); - } - /** - * @param Return the column names of the table - */ - [[nodiscard]] std::vector column_names() const { return col_names; } - /** - * @brief Translate a column name to a column index - * - * @param col_name The name of the column - */ - [[nodiscard]] cudf::size_type col_id(std::string const& col_name) const - { - CUDF_FUNC_RANGE(); - auto it = std::find(col_names.begin(), col_names.end(), col_name); - if (it == col_names.end()) { throw std::runtime_error("Column not found"); } - return std::distance(col_names.begin(), it); - } - /** - * @brief Append a column to the table - * - * @param col The column to append - * @param col_name The name of the appended column - */ - table_with_names& append(std::unique_ptr& col, std::string const& col_name) - { - CUDF_FUNC_RANGE(); - auto cols = tbl->release(); - cols.push_back(std::move(col)); - tbl = std::make_unique(std::move(cols)); - col_names.push_back(col_name); - return (*this); - } - /** - * @brief Select a subset of columns from the table - * - * @param col_names The names of the columns to select - */ - [[nodiscard]] cudf::table_view select(std::vector const& col_names) const - { - CUDF_FUNC_RANGE(); - std::vector col_indices; - for (auto const& col_name : col_names) { - col_indices.push_back(col_id(col_name)); - } - return tbl->select(col_indices); - } - /** - * @brief Write the table to a parquet file - * - * @param filepath The path to the parquet file - */ - void to_parquet(std::string const& filepath) const - { - CUDF_FUNC_RANGE(); - auto const sink_info = cudf::io::sink_info(filepath); - cudf::io::table_metadata metadata; - metadata.schema_info = - std::vector(col_names.begin(), col_names.end()); - auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); - builder.metadata(table_input_metadata); - auto const options = builder.build(); - cudf::io::write_parquet(options); - } - - private: - std::unique_ptr tbl; - std::vector col_names; -}; - -/** - * @brief Concatenate two vectors - * - * @param lhs The left vector - * @param rhs The right vector - */ -template -std::vector concat(std::vector const& lhs, std::vector const& rhs) -{ - std::vector result; - result.reserve(lhs.size() + rhs.size()); - std::copy(lhs.begin(), lhs.end(), std::back_inserter(result)); - std::copy(rhs.begin(), rhs.end(), std::back_inserter(result)); - return result; -} - -/** - * @brief Inner join two tables and gather the result - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr join_and_gather( - cudf::table_view const& left_input, - cudf::table_view const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls) -{ - CUDF_FUNC_RANGE(); - constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; - auto const left_selected = left_input.select(left_on); - auto const right_selected = right_input.select(right_on); - auto const [left_join_indices, right_join_indices] = cudf::inner_join( - left_selected, right_selected, compare_nulls, cudf::get_current_device_resource_ref()); - - auto const left_indices_span = cudf::device_span{*left_join_indices}; - auto const right_indices_span = cudf::device_span{*right_join_indices}; - - auto const left_indices_col = cudf::column_view{left_indices_span}; - auto const right_indices_col = cudf::column_view{right_indices_span}; - - auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); - auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); - - auto joined_cols = left_result->release(); - auto right_cols = right_result->release(); - joined_cols.insert(joined_cols.end(), - std::make_move_iterator(right_cols.begin()), - std::make_move_iterator(right_cols.end())); - return std::make_unique(std::move(joined_cols)); -} - -/** - * @brief Apply an inner join operation to two tables - * - * @param left_input The left input table - * @param right_input The right input table - * @param left_on The columns to join on in the left table - * @param right_on The columns to join on in the right table - * @param compare_nulls The null equality policy - */ -[[nodiscard]] std::unique_ptr apply_inner_join( - std::unique_ptr const& left_input, - std::unique_ptr const& right_input, - std::vector const& left_on, - std::vector const& right_on, - cudf::null_equality compare_nulls = cudf::null_equality::EQUAL) -{ - CUDF_FUNC_RANGE(); - std::vector left_on_indices; - std::vector right_on_indices; - std::transform( - left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { - return left_input->col_id(col_name); - }); - std::transform(right_on.begin(), - right_on.end(), - std::back_inserter(right_on_indices), - [&](auto const& col_name) { return right_input->col_id(col_name); }); - auto table = join_and_gather( - left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); - return std::make_unique( - std::move(table), concat(left_input->column_names(), right_input->column_names())); -} - -/** - * @brief Apply a filter predicated to a table - * - * @param table The input table - * @param predicate The filter predicate - */ -[[nodiscard]] std::unique_ptr apply_filter( - std::unique_ptr const& table, cudf::ast::operation const& predicate) -{ - CUDF_FUNC_RANGE(); - auto const boolean_mask = cudf::compute_column(table->table(), predicate); - auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a boolean mask to a table - * - * @param table The input table - * @param mask The boolean mask - */ -[[nodiscard]] std::unique_ptr apply_mask( - std::unique_ptr const& table, std::unique_ptr const& mask) -{ - CUDF_FUNC_RANGE(); - auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); - return std::make_unique(std::move(result_table), table->column_names()); -} - -struct groupby_context_t { - std::vector keys; - std::unordered_map>> - values; -}; - -/** - * @brief Apply a groupby operation to a table - * - * @param table The input table - * @param ctx The groupby context - */ -[[nodiscard]] std::unique_ptr apply_groupby( - std::unique_ptr const& table, groupby_context_t const& ctx) -{ - CUDF_FUNC_RANGE(); - auto const keys = table->select(ctx.keys); - cudf::groupby::groupby groupby_obj(keys); - std::vector result_column_names; - result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); - std::vector requests; - for (auto& [value_col, aggregations] : ctx.values) { - requests.emplace_back(cudf::groupby::aggregation_request()); - for (auto& agg : aggregations) { - if (agg.first == cudf::aggregation::Kind::SUM) { - requests.back().aggregations.push_back( - cudf::make_sum_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::MEAN) { - requests.back().aggregations.push_back( - cudf::make_mean_aggregation()); - } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { - requests.back().aggregations.push_back( - cudf::make_count_aggregation()); - } else { - throw std::runtime_error("Unsupported aggregation"); - } - result_column_names.push_back(agg.second); - } - requests.back().values = table->column(value_col); - } - auto agg_results = groupby_obj.aggregate(requests); - std::vector> result_columns; - for (size_t i = 0; i < agg_results.first->num_columns(); i++) { - auto col = std::make_unique(agg_results.first->get_column(i)); - result_columns.push_back(std::move(col)); - } - for (size_t i = 0; i < agg_results.second.size(); i++) { - for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { - result_columns.push_back(std::move(agg_results.second[i].results[j])); - } - } - auto result_table = std::make_unique(std::move(result_columns)); - return std::make_unique(std::move(result_table), result_column_names); -} - -/** - * @brief Apply an order by operation to a table - * - * @param table The input table - * @param sort_keys The sort keys - * @param sort_key_orders The sort key orders - */ -[[nodiscard]] std::unique_ptr apply_orderby( - std::unique_ptr const& table, - std::vector const& sort_keys, - std::vector const& sort_key_orders) -{ - CUDF_FUNC_RANGE(); - std::vector column_views; - for (auto& key : sort_keys) { - column_views.push_back(table->column(key)); - } - auto result_table = - cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); - return std::make_unique(std::move(result_table), table->column_names()); -} - -/** - * @brief Apply a reduction operation to a column - * - * @param column The input column - * @param agg_kind The aggregation kind - * @param col_name The name of the output column - */ -[[nodiscard]] std::unique_ptr apply_reduction( - cudf::column_view const& column, - cudf::aggregation::Kind const& agg_kind, - std::string const& col_name) -{ - CUDF_FUNC_RANGE(); - auto const agg = cudf::make_sum_aggregation(); - auto const result = cudf::reduce(column, *agg, column.type()); - cudf::size_type const len = 1; - auto col = cudf::make_column_from_scalar(*result, len); - std::vector> columns; - columns.push_back(std::move(col)); - auto result_table = std::make_unique(std::move(columns)); - std::vector col_names = {col_name}; - return std::make_unique(std::move(result_table), col_names); -} - -/** - * @brief Read a parquet file into a table - * - * @param filename The path to the parquet file - * @param columns The columns to read - * @param predicate The filter predicate to pushdown - */ -[[nodiscard]] std::unique_ptr read_parquet( - std::string const& filename, - std::vector const& columns = {}, - std::unique_ptr const& predicate = nullptr) -{ - CUDF_FUNC_RANGE(); - auto const source = cudf::io::source_info(filename); - auto builder = cudf::io::parquet_reader_options_builder(source); - if (!columns.empty()) { builder.columns(columns); } - if (predicate) { builder.filter(*predicate); } - auto const options = builder.build(); - auto table_with_metadata = cudf::io::read_parquet(options); - std::vector column_names; - for (auto const& col_info : table_with_metadata.metadata.schema_info) { - column_names.push_back(col_info.name); - } - return std::make_unique(std::move(table_with_metadata.tbl), column_names); -} - -/** - * @brief Generate the `std::tm` structure from year, month, and day - * - * @param year The year - * @param month The month - * @param day The day - */ -std::tm make_tm(int year, int month, int day) -{ - std::tm tm{}; - tm.tm_year = year - 1900; - tm.tm_mon = month - 1; - tm.tm_mday = day; - return tm; -} - -/** - * @brief Calculate the number of days since the UNIX epoch - * - * @param year The year - * @param month The month - * @param day The day - */ -int32_t days_since_epoch(int year, int month, int day) -{ - std::tm tm = make_tm(year, month, day); - std::tm epoch = make_tm(1970, 1, 1); - std::time_t time = std::mktime(&tm); - std::time_t epoch_time = std::mktime(&epoch); - double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); - return static_cast(diff); -} - -struct tpch_example_args { - std::string dataset_dir; - std::string memory_resource_type; -}; - -/** - * @brief Parse command line arguments into a struct - * - * @param argc The number of command line arguments - * @param argv The command line arguments - */ -tpch_example_args parse_args(int argc, char const** argv) -{ - if (argc < 3) { - std::string usage_message = "Usage: " + std::string(argv[0]) + - " \n The query result will be " - "saved to a parquet file named q{query_no}.parquet in the current " - "working directory "; - throw std::runtime_error(usage_message); - } - tpch_example_args args; - args.dataset_dir = argv[1]; - args.memory_resource_type = argv[2]; - return args; -} diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 80a4460023f..672b95e2d01 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,28 +143,29 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest column. + * @brief Create a tdigest column of empty clusters. * - * An empty tdigest column contains a single row of length 0 + * The column created contains the specified number of rows of empty clusters. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest column. + * @returns A tdigest column of empty clusters. */ CUDF_EXPORT -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create an empty tdigest scalar. + * @brief Create a scalar of an empty tdigest cluster. * - * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 + * The returned scalar is a struct_scalar that contains a single row of an empty cluster. * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns An empty tdigest scalar. + * @returns A scalar of an empty tdigest cluster. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index a3d6533705e..ff25a5bacae 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -128,6 +129,19 @@ class json_reader_options { // Whether to recover after an invalid JSON line json_recovery_mode_t _recovery_mode = json_recovery_mode_t::FAIL; + // Validation checks for spark + // Should the json validation be strict or not + // Note: strict validation enforces the JSON specification https://www.json.org/json-en.html + bool _strict_validation = false; + // Allow leading zeros for numeric values. + bool _allow_numeric_leading_zeros = true; + // Allow non-numeric numbers: NaN, +INF, -INF, +Infinity, Infinity, -Infinity + bool _allow_nonnumeric_numbers = true; + // Allow unquoted control characters + bool _allow_unquoted_control_chars = true; + // Additional values to recognize as null values + std::vector _na_values; + /** * @brief Constructor from source info. * @@ -298,6 +312,55 @@ class json_reader_options { */ [[nodiscard]] json_recovery_mode_t recovery_mode() const { return _recovery_mode; } + /** + * @brief Whether json validation should be enforced strictly or not. + * + * @return true if it should be. + */ + [[nodiscard]] bool is_strict_validation() const { return _strict_validation; } + + /** + * @brief Whether leading zeros are allowed in numeric values. + * + * @note: This validation is enforced only if strict validation is enabled. + * + * @return true if leading zeros are allowed in numeric values + */ + [[nodiscard]] bool is_allowed_numeric_leading_zeros() const + { + return _allow_numeric_leading_zeros; + } + + /** + * @brief Whether unquoted number values should be allowed NaN, +INF, -INF, +Infinity, Infinity, + * and -Infinity. + * + * @note: This validation is enforced only if strict validation is enabled. + * + * @return true if leading zeros are allowed in numeric values + */ + [[nodiscard]] bool is_allowed_nonnumeric_numbers() const { return _allow_nonnumeric_numbers; } + + /** + * @brief Whether in a quoted string should characters greater than or equal to 0 and less than 32 + * be allowed without some form of escaping. + * + * @note: This validation is enforced only if strict validation is enabled. + * + * @return true if unquoted control chars are allowed. + */ + [[nodiscard]] bool is_allowed_unquoted_control_chars() const + { + return _allow_unquoted_control_chars; + } + + /** + * @brief Returns additional values to recognize as null values. + * + * @return Additional values to recognize as null values + */ + [[nodiscard]] std::vector const& get_na_values() const { return _na_values; } + /** * @brief Set data types for columns to be read. * @@ -427,6 +490,63 @@ class json_reader_options { * @param val An enum value to indicate the JSON reader's behavior on invalid JSON lines. */ void set_recovery_mode(json_recovery_mode_t val) { _recovery_mode = val; } + + /** + * @brief Set whether strict validation is enabled or not. + * + * @param val Boolean value to indicate whether strict validation is enabled. + */ + void set_strict_validation(bool val) { _strict_validation = val; } + + /** + * @brief Set whether leading zeros are allowed in numeric values. Strict validation + * must be enabled for this to work. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val Boolean value to indicate whether leading zeros are allowed in numeric values + */ + void allow_numeric_leading_zeros(bool val) + { + CUDF_EXPECTS(_strict_validation, "Strict validation must be enabled for this to work."); + _allow_numeric_leading_zeros = val; + } + + /** + * @brief Set whether unquoted number values should be allowed NaN, +INF, -INF, +Infinity, + * Infinity, and -Infinity. Strict validation must be enabled for this to work. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val Boolean value to indicate whether leading zeros are allowed in numeric values + */ + void allow_nonnumeric_numbers(bool val) + { + CUDF_EXPECTS(_strict_validation, "Strict validation must be enabled for this to work."); + _allow_nonnumeric_numbers = val; + } + + /** + * @brief Set whether in a quoted string should characters greater than or equal to 0 + * and less than 32 be allowed without some form of escaping. Strict validation must + * be enabled for this to work. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val true to indicate whether unquoted control chars are allowed. + */ + void allow_unquoted_control_chars(bool val) + { + CUDF_EXPECTS(_strict_validation, "Strict validation must be enabled for this to work."); + _allow_unquoted_control_chars = val; + } + + /** + * @brief Sets additional values to recognize as null values. + * + * @param vals Vector of values to be considered to be null + */ + void set_na_values(std::vector vals) { _na_values = std::move(vals); } }; /** @@ -638,6 +758,76 @@ class json_reader_options_builder { return *this; } + /** + * @brief Set whether json validation should be strict or not. + * + * @param val Boolean value to indicate whether json validation should be strict or not. + * @return this for chaining + */ + json_reader_options_builder& strict_validation(bool val) + { + options.set_strict_validation(val); + return *this; + } + + /** + * @brief Set Whether leading zeros are allowed in numeric values. Strict validation must + * be enabled for this to have any effect. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val Boolean value to indicate whether leading zeros are allowed in numeric values + * @return this for chaining + */ + json_reader_options_builder& numeric_leading_zeros(bool val) + { + options.allow_numeric_leading_zeros(val); + return *this; + } + + /** + * @brief Set whether specific unquoted number values are valid JSON. The values are NaN, + * +INF, -INF, +Infinity, Infinity, and -Infinity. + * Strict validation must be enabled for this to have any effect. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val Boolean value to indicate if unquoted nonnumeric values are valid json or not. + * @return this for chaining + */ + json_reader_options_builder& nonnumeric_numbers(bool val) + { + options.allow_nonnumeric_numbers(val); + return *this; + } + + /** + * @brief Set whether chars >= 0 and < 32 are allowed in a quoted string without + * some form of escaping. Strict validation must be enabled for this to have any effect. + * + * @throw cudf::logic_error if `strict_validation` is not enabled before setting this option. + * + * @param val Boolean value to indicate if unquoted control chars are allowed or not. + * @return this for chaining + */ + json_reader_options_builder& unquoted_control_chars(bool val) + { + options.allow_unquoted_control_chars(val); + return *this; + } + + /** + * @brief Sets additional values to recognize as null values. + * + * @param vals Vector of values to be considered to be null + * @return this for chaining + */ + json_reader_options_builder& na_values(std::vector vals) + { + options.set_na_values(std::move(vals)); + return *this; + } + /** * @brief move json_reader_options member once it's built. */ diff --git a/cpp/include/cudf/io/nvcomp_adapter.hpp b/cpp/include/cudf/io/nvcomp_adapter.hpp index e7fe3cc7214..0d74a4158ad 100644 --- a/cpp/include/cudf/io/nvcomp_adapter.hpp +++ b/cpp/include/cudf/io/nvcomp_adapter.hpp @@ -24,7 +24,7 @@ namespace CUDF_EXPORT cudf { namespace io::nvcomp { -enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4 }; +enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4, GZIP }; /** * @brief Set of parameters that impact whether nvCOMP features are enabled. diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index 1758790cd64..be7d19b2227 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index 261a8eb401d..c3187f73a95 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,8 @@ auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... return nvcompBatchedLZ4DecompressGetTempSizeEx(std::forward(args)...); case compression_type::DEFLATE: return nvcompBatchedDeflateDecompressGetTempSizeEx(std::forward(args)...); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressGetTempSizeEx(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } } @@ -73,6 +76,8 @@ auto batched_decompress_async(compression_type compression, Args&&... args) case compression_type::DEFLATE: return nvcompBatchedDeflateDecompressAsync(std::forward(args)...); case compression_type::LZ4: return nvcompBatchedLZ4DecompressAsync(std::forward(args)...); + case compression_type::GZIP: + return nvcompBatchedGzipDecompressAsync(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } } @@ -84,6 +89,7 @@ std::string compression_type_name(compression_type compression) case compression_type::ZSTD: return "Zstandard"; case compression_type::DEFLATE: return "Deflate"; case compression_type::LZ4: return "LZ4"; + case compression_type::GZIP: return "GZIP"; } return "compression_type(" + std::to_string(static_cast(compression)) + ")"; } @@ -359,8 +365,8 @@ std::optional is_compression_disabled_impl(compression_type compres return "nvCOMP use is disabled through the `LIBCUDF_NVCOMP_POLICY` environment variable."; } return std::nullopt; + default: return "Unsupported compression type"; } - return "Unsupported compression type"; } std::optional is_compression_disabled(compression_type compression, @@ -396,7 +402,8 @@ std::optional is_decompression_disabled_impl(compression_type compr feature_status_parameters params) { switch (compression) { - case compression_type::DEFLATE: { + case compression_type::DEFLATE: + case compression_type::GZIP: { if (not params.are_all_integrations_enabled) { return "DEFLATE decompression is experimental, you can enable it through " "`LIBCUDF_NVCOMP_POLICY` environment variable."; @@ -447,6 +454,7 @@ std::optional is_decompression_disabled(compression_type compressio size_t required_alignment(compression_type compression) { switch (compression) { + case compression_type::GZIP: case compression_type::DEFLATE: return nvcompDeflateRequiredAlignment; case compression_type::SNAPPY: return nvcompSnappyRequiredAlignment; case compression_type::ZSTD: return nvcompZstdRequiredAlignment; @@ -462,7 +470,7 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi case compression_type::SNAPPY: return nvcompSnappyCompressionMaxAllowedChunkSize; case compression_type::ZSTD: return nvcompZstdCompressionMaxAllowedChunkSize; case compression_type::LZ4: return nvcompLZ4CompressionMaxAllowedChunkSize; - default: return std::nullopt; + default: CUDF_FAIL("Unsupported compression type"); } } diff --git a/cpp/src/io/json/json_normalization.cu b/cpp/src/io/json/json_normalization.cu index 7899ea7bac4..97d5884fef1 100644 --- a/cpp/src/io/json/json_normalization.cu +++ b/cpp/src/io/json/json_normalization.cu @@ -16,6 +16,7 @@ #include "io/fst/lookup_tables.cuh" +#include #include #include #include @@ -302,6 +303,7 @@ void normalize_single_quotes(datasource::owning_buffer& inda rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); static constexpr std::int32_t min_out = 0; static constexpr std::int32_t max_out = 2; auto parser = @@ -330,6 +332,7 @@ void normalize_whitespace(datasource::owning_buffer& indata, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); static constexpr std::int32_t min_out = 0; static constexpr std::int32_t max_out = 2; auto parser = diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index b06458e1a8e..75639a0438f 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -225,6 +225,21 @@ std::pair, rmm::device_uvector> pr device_span token_indices, rmm::cuda_stream_view stream); +/** + * @brief Validate the tokens conforming to behavior given in options. + * + * @param d_input The string of input characters + * @param tokens The tokens to be post-processed + * @param token_indices The tokens' corresponding indices that are post-processed + * @param options Parsing options specifying the parsing behaviour + * @param stream The cuda stream to dispatch GPU kernels to + */ +void validate_token_stream(device_span d_input, + device_span tokens, + device_span token_indices, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream); + /** * @brief Parses the given JSON string and generates a tree representation of the given input. * diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index d76e5447c30..4e513d3495c 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -1660,6 +1660,7 @@ std::pair, rmm::device_uvector> ge if (delimiter_offset == 1) { tokens.set_element(0, token_t::LineEnd, stream); + validate_token_stream(json_in, tokens, tokens_indices, options, stream); auto [filtered_tokens, filtered_tokens_indices] = process_token_stream(tokens, tokens_indices, stream); tokens = std::move(filtered_tokens); @@ -2082,7 +2083,9 @@ cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& opt parse_opts.keepquotes = options.is_enabled_keep_quotes(); parse_opts.trie_true = cudf::detail::create_serialized_trie({"true"}, stream); parse_opts.trie_false = cudf::detail::create_serialized_trie({"false"}, stream); - parse_opts.trie_na = cudf::detail::create_serialized_trie({"", "null"}, stream); + std::vector na_values{"", "null"}; + na_values.insert(na_values.end(), options.get_na_values().begin(), options.get_na_values().end()); + parse_opts.trie_na = cudf::detail::create_serialized_trie(na_values, stream); return parse_opts; } diff --git a/cpp/src/io/json/process_tokens.cu b/cpp/src/io/json/process_tokens.cu new file mode 100644 index 00000000000..83c7b663980 --- /dev/null +++ b/cpp/src/io/json/process_tokens.cu @@ -0,0 +1,310 @@ + +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/utilities/trie.cuh" +#include "nested_json.hpp" +#include "tabulate_output_iterator.cuh" + +#include +#include +#include + +#include +#include + +#include +#include + +namespace cudf::io::json { +namespace detail { + +struct write_if { + using token_t = cudf::io::json::token_t; + using scan_type = thrust::pair; + PdaTokenT* tokens; + size_t n; + // Index, value + __device__ void operator()(size_type i, scan_type x) + { + if (i == n - 1 or tokens[i + 1] == token_t::LineEnd) { + if (x.first == token_t::ErrorBegin and tokens[i] != token_t::ErrorBegin) { + tokens[i] = token_t::ErrorBegin; + } + } + } +}; + +enum class number_state { + START = 0, + SAW_NEG, // not a complete state + LEADING_ZERO, + WHOLE, + SAW_RADIX, // not a complete state + FRACTION, + START_EXPONENT, // not a complete state + AFTER_SIGN_EXPONENT, // not a complete state + EXPONENT +}; + +enum class string_state { + NORMAL = 0, + ESCAPED, // not a complete state + ESCAPED_U // not a complete state +}; + +__device__ inline bool substr_eq(const char* data, + SymbolOffsetT const start, + SymbolOffsetT const end, + SymbolOffsetT const expected_len, + const char* expected) +{ + if (end - start != expected_len) { return false; } + for (auto idx = 0; idx < expected_len; idx++) { + if (data[start + idx] != expected[idx]) { return false; } + } + return true; +} + +void validate_token_stream(device_span d_input, + device_span tokens, + device_span token_indices, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (!options.is_strict_validation()) { return; } + using token_t = cudf::io::json::token_t; + cudf::detail::optional_trie trie_na = + cudf::detail::create_serialized_trie(options.get_na_values(), stream); + auto trie_na_view = cudf::detail::make_trie_view(trie_na); + auto validate_values = cuda::proclaim_return_type( + [data = d_input.data(), + trie_na = trie_na_view, + allow_numeric_leading_zeros = options.is_allowed_numeric_leading_zeros(), + allow_nonnumeric = + options.is_allowed_nonnumeric_numbers()] __device__(SymbolOffsetT start, + SymbolOffsetT end) -> bool { + // This validates an unquoted value. A value must match https://www.json.org/json-en.html + // but the leading and training whitespace should already have been removed, and is not + // a string + auto c = data[start]; + auto is_null_literal = serialized_trie_contains(trie_na, {data + start, end - start}); + if (is_null_literal) { + return true; + } else if ('n' == c) { + return substr_eq(data, start, end, 4, "null"); + } else if ('t' == c) { + return substr_eq(data, start, end, 4, "true"); + } else if ('f' == c) { + return substr_eq(data, start, end, 5, "false"); + } else if (allow_nonnumeric && c == 'N') { + return substr_eq(data, start, end, 3, "NaN"); + } else if (allow_nonnumeric && c == 'I') { + return substr_eq(data, start, end, 8, "Infinity"); + } else if (allow_nonnumeric && c == '+') { + return substr_eq(data, start, end, 4, "+INF") || + substr_eq(data, start, end, 9, "+Infinity"); + } else if ('-' == c || c <= '9' && 'c' >= '0') { + // number + auto num_state = number_state::START; + for (auto at = start; at < end; at++) { + c = data[at]; + switch (num_state) { + case number_state::START: + if ('-' == c) { + num_state = number_state::SAW_NEG; + } else if ('0' == c) { + num_state = number_state::LEADING_ZERO; + } else if (c >= '1' && c <= '9') { + num_state = number_state::WHOLE; + } else { + return false; + } + break; + case number_state::SAW_NEG: + if ('0' == c) { + num_state = number_state::LEADING_ZERO; + } else if (c >= '1' && c <= '9') { + num_state = number_state::WHOLE; + } else if (allow_nonnumeric && 'I' == c) { + return substr_eq(data, start, end, 4, "-INF") || + substr_eq(data, start, end, 9, "-Infinity"); + } else { + return false; + } + break; + case number_state::LEADING_ZERO: + if (allow_numeric_leading_zeros && c >= '0' && c <= '9') { + num_state = number_state::WHOLE; + } else if ('.' == c) { + num_state = number_state::SAW_RADIX; + } else if ('e' == c || 'E' == c) { + num_state = number_state::START_EXPONENT; + } else { + return false; + } + break; + case number_state::WHOLE: + if (c >= '0' && c <= '9') { + num_state = number_state::WHOLE; + } else if ('.' == c) { + num_state = number_state::SAW_RADIX; + } else if ('e' == c || 'E' == c) { + num_state = number_state::START_EXPONENT; + } else { + return false; + } + break; + case number_state::SAW_RADIX: + if (c >= '0' && c <= '9') { + num_state = number_state::FRACTION; + } else if ('e' == c || 'E' == c) { + num_state = number_state::START_EXPONENT; + } else { + return false; + } + break; + case number_state::FRACTION: + if (c >= '0' && c <= '9') { + num_state = number_state::FRACTION; + } else if ('e' == c || 'E' == c) { + num_state = number_state::START_EXPONENT; + } else { + return false; + } + break; + case number_state::START_EXPONENT: + if ('+' == c || '-' == c) { + num_state = number_state::AFTER_SIGN_EXPONENT; + } else if (c >= '0' && c <= '9') { + num_state = number_state::EXPONENT; + } else { + return false; + } + break; + case number_state::AFTER_SIGN_EXPONENT: + if (c >= '0' && c <= '9') { + num_state = number_state::EXPONENT; + } else { + return false; + } + break; + case number_state::EXPONENT: + if (c >= '0' && c <= '9') { + num_state = number_state::EXPONENT; + } else { + return false; + } + break; + } + } + return num_state != number_state::AFTER_SIGN_EXPONENT && + num_state != number_state::START_EXPONENT && num_state != number_state::SAW_NEG && + num_state != number_state::SAW_RADIX; + } else { + return false; + } + }); + + auto validate_strings = cuda::proclaim_return_type( + [data = d_input.data(), + allow_unquoted_control_chars = + options.is_allowed_unquoted_control_chars()] __device__(SymbolOffsetT start, + SymbolOffsetT end) -> bool { + // This validates a quoted string. A string must match https://www.json.org/json-en.html + // but we already know that it has a starting and ending " and all white space has been + // stripped out. Also the base CUDF validation makes sure escaped chars are correct + // so we only need to worry about unquoted control chars + + auto state = string_state::NORMAL; + auto u_count = 0; + for (SymbolOffsetT idx = start + 1; idx < end; idx++) { + auto c = data[idx]; + if (!allow_unquoted_control_chars && static_cast(c) >= 0 && static_cast(c) < 32) { + return false; + } + + switch (state) { + case string_state::NORMAL: + if (c == '\\') { state = string_state::ESCAPED; } + break; + case string_state::ESCAPED: + // in Spark you can allow any char to be escaped, but CUDF + // validates it in some cases so we need to also validate it. + if (c == 'u') { + state = string_state::ESCAPED_U; + u_count = 0; + } else if (c == '"' || c == '\\' || c == '/' || c == 'b' || c == 'f' || c == 'n' || + c == 'r' || c == 't') { + state = string_state::NORMAL; + } else { + return false; + } + break; + case string_state::ESCAPED_U: + if ((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')) { + u_count++; + if (u_count == 4) { + state = string_state::NORMAL; + u_count = 0; + } + } else { + return false; + } + break; + } + } + return string_state::NORMAL == state; + }); + + auto num_tokens = tokens.size(); + auto count_it = thrust::make_counting_iterator(0); + auto predicate = [tokens = tokens.begin(), + token_indices = token_indices.begin(), + validate_values, + validate_strings] __device__(auto i) -> bool { + if (tokens[i] == token_t::ValueEnd) { + return !validate_values(token_indices[i - 1], token_indices[i]); + } else if (tokens[i] == token_t::FieldNameEnd || tokens[i] == token_t::StringEnd) { + return !validate_strings(token_indices[i - 1], token_indices[i]); + } + return false; + }; + + using scan_type = write_if::scan_type; + auto conditional_write = write_if{tokens.begin(), num_tokens}; + auto conditional_output_it = cudf::detail::make_tabulate_output_iterator(conditional_write); + auto transform_op = cuda::proclaim_return_type( + [predicate, tokens = tokens.begin()] __device__(auto i) -> scan_type { + if (predicate(i)) return {token_t::ErrorBegin, tokens[i] == token_t::LineEnd}; + return {static_cast(tokens[i]), tokens[i] == token_t::LineEnd}; + }); + auto binary_op = cuda::proclaim_return_type( + [] __device__(scan_type prev, scan_type curr) -> scan_type { + auto op_result = (prev.first == token_t::ErrorBegin ? prev.first : curr.first); + return scan_type((curr.second ? curr.first : op_result), prev.second | curr.second); + }); + + thrust::transform_inclusive_scan(rmm::exec_policy(stream), + count_it, + count_it + num_tokens, + conditional_output_it, + transform_op, + binary_op); // in-place scan +} +} // namespace detail +} // namespace cudf::io::json diff --git a/cpp/src/io/json/tabulate_output_iterator.cuh b/cpp/src/io/json/tabulate_output_iterator.cuh new file mode 100644 index 00000000000..7cf3655e259 --- /dev/null +++ b/cpp/src/io/json/tabulate_output_iterator.cuh @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace cudf { +namespace detail { + +// Proxy reference that calls BinaryFunction with index value and the rhs of assignment operator +template +class tabulate_output_iterator_proxy { + public: + __host__ __device__ tabulate_output_iterator_proxy(const IndexT index, BinaryFunction fun) + : index(index), fun(fun) + { + } + template + __host__ __device__ tabulate_output_iterator_proxy operator=(const T& rhs_value) + { + fun(index, rhs_value); + return *this; + } + + private: + IndexT index; + BinaryFunction fun; +}; + +/** + * @brief Tabulate output iterator with custom binary function which takes index and value. + * + * @code {.cpp} + * #include "tabulate_output_iterator.cuh" + * #include + * #include + * #include + * + * struct set_bits_field { + * int* bitfield; + * __device__ inline void set_bit(size_t bit_index) + * { + * atomicOr(&bitfield[bit_index/32], (int{1} << (bit_index % 32))); + * } + * __device__ inline void clear_bit(size_t bit_index) + * { + * atomicAnd(&bitfield[bit_index / 32], ~(int{1} << (bit_index % 32))); + * } + * // Index, value + * __device__ void operator()(size_t i, bool x) + * { + * if (x) + * set_bit(i); + * else + * clear_bit(i); + * } + * }; + * + * thrust::device_vector v(1, 0x00000000); + * auto result_begin = thrust::make_tabulate_output_iterator(set_bits_field{v.data().get()}); + * auto value = thrust::make_transform_iterator(thrust::make_counting_iterator(0), + * [] __device__ (int x) { return x%2; }); + * thrust::copy(thrust::device, value, value+32, result_begin); + * assert(v[0] == 0xaaaaaaaa); + * @endcode + * + * + * @tparam BinaryFunction Binary function to be called with the Iterator value and the rhs of + * assignment operator. + * @tparam Iterator iterator type that acts as index of the output. + */ +template +class tabulate_output_iterator + : public thrust::iterator_adaptor, + thrust::counting_iterator, + thrust::use_default, + thrust::use_default, + thrust::use_default, + tabulate_output_iterator_proxy> { + public: + // parent class. + using super_t = thrust::iterator_adaptor, + thrust::counting_iterator, + thrust::use_default, + thrust::use_default, + thrust::use_default, + tabulate_output_iterator_proxy>; + // friend thrust::iterator_core_access to allow it access to the private interface dereference() + friend class thrust::iterator_core_access; + __host__ __device__ tabulate_output_iterator(BinaryFunction fun) : fun(fun) {} + + private: + BinaryFunction fun; + + // thrust::iterator_core_access accesses this function + __host__ __device__ typename super_t::reference dereference() const + { + return tabulate_output_iterator_proxy(*this->base(), fun); + } +}; + +template +tabulate_output_iterator __host__ __device__ +make_tabulate_output_iterator(BinaryFunction fun) +{ + return tabulate_output_iterator(fun); +} // end make_tabulate_output_iterator + +} // namespace detail +} // namespace cudf + +// Register tabulate_output_iterator_proxy with 'is_proxy_reference' from +// type_traits to enable its use with algorithms. +template +struct thrust::detail::is_proxy_reference< + cudf::detail::tabulate_output_iterator_proxy> + : public thrust::detail::true_type {}; diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 84f0dab0d8b..245e1829c72 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -865,8 +865,18 @@ std::vector compute_page_splits_by_row(device_span make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto min_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = - cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); + auto max_col = cudf::make_numeric_column( + data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(1, - make_empty_column(type_id::FLOAT64), - make_empty_column(type_id::FLOAT64), + return make_tdigest_column(num_rows, + cudf::make_empty_column(type_id::FLOAT64), + cudf::make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -338,7 +339,7 @@ std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_empty_tdigest_column(stream, mr)->release(); + auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index 2dd25a7b890..d591fb5c171 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param has_nulls Whether or not the input contains nulls - * + * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be + * set to false when there is no empty cluster, true otherwise. */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool has_nulls) + bool may_have_empty_clusters) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,11 +399,12 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // if the input contains nulls we can potentially have a group that generates no - // clusters because -all- of the input values are null. in that case, the reduce_by_key call - // in the tdigest generation step will need a location to store the unused reduction value for - // that group of nulls. these "stubs" will be postprocessed out afterwards. - if (has_nulls) { group_num_clusters[group_index] = 1; } + // If the input contains empty clusters, we can potentially have a group that also generates + // empty clusters because -all- of the input values are null or empty cluster. In that case, the + // `reduce_by_key` call in the tdigest generation step will need a location to store the unused + // reduction value for that group of nulls and empty clusters. These "stubs" will be + // postprocessed out afterwards. + if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } return; } @@ -502,7 +503,8 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param has_nulls Whether or not the input data contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -516,7 +518,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -535,7 +537,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - has_nulls); + may_have_empty_clusters); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -567,7 +569,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - has_nulls); + may_have_empty_clusters); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -580,7 +582,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -595,7 +597,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!has_nulls) { return 0; } + if (!may_have_empty_clusters) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -661,6 +663,10 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } +/** + * @brief A functor which returns the cluster index within a group that the value at + * the given value index falls into. + */ template struct compute_tdigests_keys_fn { int const delta; @@ -706,8 +712,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param values_begin Beginning of the range of input values. - * @param values_end End of the range of input values. + * @param centroids_begin Beginning of the range of centroids. + * @param centroids_end End of the range of centroids. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -715,7 +721,8 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param has_nulls Whether or not the input contains nulls + * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be + * set to false only when there is no empty cluster. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -731,7 +738,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool has_nulls, + bool may_have_empty_clusters, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -750,7 +757,9 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (total_clusters == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -793,7 +802,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - has_nulls, + may_have_empty_clusters, stream, mr); } @@ -1145,8 +1154,13 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); + auto merged_weights = merged->get_column(1).view(); + // If there are no values, we can simply return a column that has only empty tdigests. + if (merged_weights.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); + } + // generate cumulative weights - auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1161,6 +1175,10 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; + // We do not know whether there is any empty cluster in the input without actually reading the + // data, which could be expensive. So, we just assume that there could be empty clusters. + auto const may_have_empty_clusters = true; + // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1177,7 +1195,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - false, + may_have_empty_clusters, stream, mr); @@ -1202,7 +1220,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - false, + may_have_empty_clusters, stream, mr); } @@ -1267,7 +1285,9 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } + if (col.size() == 0) { + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1293,7 +1313,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); + return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index baa59026b07..3780dbb1d95 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,3 +507,81 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } + +std::unique_ptr do_agg( + cudf::column_view key, + cudf::column_view val, + std::function()> make_agg) +{ + std::vector keys; + keys.push_back(key); + cudf::table_view const key_table(keys); + + cudf::groupby::groupby gb(key_table); + std::vector requests; + cudf::groupby::aggregation_request req; + req.values = val; + req.aggregations.push_back(make_agg()); + requests.push_back(std::move(req)); + + auto result = gb.aggregate(std::move(requests)); + + std::vector> result_columns; + for (auto&& c : result.first->release()) { + result_columns.push_back(std::move(c)); + } + + EXPECT_EQ(result.second.size(), 1); + EXPECT_EQ(result.second[0].results.size(), 1); + result_columns.push_back(std::move(result.second[0].results[0])); + + return std::make_unique(std::move(result_columns)); +} + +TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) +{ + // The input must be sorted by the key. + // See `aggregate_result_functor::operator()` for details. + auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; + auto const keys_view = cudf::column_view(keys); + auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); + auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { + // All values are null + return false; + }); + auto const vals = cudf::test::fixed_width_column_wrapper{ + val_elems, val_elems + keys_view.size(), val_valids}; + + auto const delta = 10000; + + // Compute tdigest. The result should have 3 empty clusters, one per group. + auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { + return cudf::make_tdigest_aggregation(delta); + }); + + auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_computed_keys_view{expected_computed_keys}; + auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_computed_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); + // The computed values are nullable even though the input values are not. + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), + compute_result->get_column(1).view()); + + // Merge tdigest. The result should have 3 empty clusters, one per group. + auto const merge_result = + do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { + return cudf::make_merge_tdigest_aggregation(delta); + }); + + auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; + cudf::column_view const expected_merged_keys_view{expected_merged_keys}; + auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + expected_merged_keys_view.size(), + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); +} diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index c26e5ca3edb..960c19fce2e 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -2180,6 +2180,86 @@ TEST_F(JsonReaderTest, JSONLinesRecoveringSync) cudf::set_pinned_memory_resource(last_mr); } +// Validation +TEST_F(JsonReaderTest, ValueValidation) +{ + // parsing error as null rows + std::string data = + // 0 -> a: -2 (valid) + R"({"a":-2 }{})" + "\n" + // 1 -> (invalid) + R"({"b":{}should_be_invalid})" + "\n" + // 2 -> b (valid) + R"({"b":{"a":3} })" + "\n" + // 3 -> c: (valid/null based on option) + R"({"a": 1, "c":nan, "d": "null" } )" + "\n" + "\n" + // 4 -> (valid/null based on option) + R"({"a":04, "c": 1.23, "d": "abc"} 123)" + "\n" + // 5 -> (valid) + R"({"a":5}//Comment after record)" + "\n" + // 6 -> ((valid/null based on option) + R"({"a":06} //Comment after whitespace)" + "\n" + // 7 -> (invalid) + R"({"a":5 //Invalid Comment within record})"; + + // leadingZeros allowed + // na_values, + { + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL) + .strict_validation(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 4); + EXPECT_EQ(result.tbl->num_rows(), 8); + auto b_a_col = int64_wrapper({0, 0, 3, 0, 0, 0, 0, 0}); + auto a_column = int64_wrapper{{-2, 0, 0, 0, 4, 5, 6, 0}, + {true, false, false, false, true, true, true, false}}; + auto b_column = cudf::test::structs_column_wrapper( + {b_a_col}, {false, false, true, false, false, false, false, false}); + auto c_column = float64_wrapper({0.0, 0.0, 0.0, 0.0, 1.23, 0.0, 0.0, 0.0}, + {false, false, false, false, true, false, false, false}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), a_column); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), b_column); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2), c_column); + } + // leadingZeros not allowed, NaN allowed + { + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL) + .strict_validation(true) + .numeric_leading_zeros(false) + .na_values({"nan"}); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 4); + EXPECT_EQ(result.tbl->num_rows(), 8); + EXPECT_EQ(result.tbl->get_column(2).type().id(), cudf::type_id::INT8); // empty column + auto b_a_col = int64_wrapper({0, 0, 3, 0, 0, 0, 0, 0}); + auto a_column = int64_wrapper{{-2, 0, 0, 1, 4, 5, 6, 0}, + {true, false, false, true, false, true, false, false}}; + auto b_column = cudf::test::structs_column_wrapper( + {b_a_col}, {false, false, true, false, false, false, false, false}); + auto c_column = int8_wrapper({0, 0, 0, 0, 0, 0, 0, 0}, + {false, false, false, false, false, false, false, false}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), a_column); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), b_column); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(2), c_column); + } +} + TEST_F(JsonReaderTest, MixedTypes) { using LCWS = cudf::test::lists_column_wrapper; diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 915717713df..7359f0406fc 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( - cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( + 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input; diff --git a/docs/cudf/source/user_guide/io/io.md b/docs/cudf/source/user_guide/io/io.md index adcdaa51e7e..97b961b455b 100644 --- a/docs/cudf/source/user_guide/io/io.md +++ b/docs/cudf/source/user_guide/io/io.md @@ -75,7 +75,6 @@ IO format. - **Notes:** - \[¹\] - Not all orientations are GPU-accelerated. @@ -177,4 +176,9 @@ If no value is set, behavior will be the same as the "STABLE" option. +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ | DEFLATE | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ | Experimental | Experimental | ❌ | +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + | LZ4 | ❌ | ❌ | Stable | Stable | ❌ | ❌ | Stable | Stable | ❌ | + +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + | GZIP | ❌ | ❌ | Experimental | Experimental | ❌ | ❌ | ❌ | ❌ | ❌ | + +-----------------------+--------+--------+--------------+--------------+---------+--------+--------------+--------------+--------+ + ``` diff --git a/java/src/main/java/ai/rapids/cudf/JSONOptions.java b/java/src/main/java/ai/rapids/cudf/JSONOptions.java index b37d0d88ec9..c8308ca17ec 100644 --- a/java/src/main/java/ai/rapids/cudf/JSONOptions.java +++ b/java/src/main/java/ai/rapids/cudf/JSONOptions.java @@ -34,6 +34,10 @@ public final class JSONOptions extends ColumnFilterOptions { private final boolean normalizeWhitespace; private final boolean mixedTypesAsStrings; private final boolean keepStringQuotes; + private final boolean strictValidation; + private final boolean allowLeadingZeros; + private final boolean allowNonNumericNumbers; + private final boolean allowUnquotedControlChars; private JSONOptions(Builder builder) { super(builder); @@ -44,6 +48,10 @@ private JSONOptions(Builder builder) { normalizeWhitespace = builder.normalizeWhitespace; mixedTypesAsStrings = builder.mixedTypesAsStrings; keepStringQuotes = builder.keepQuotes; + strictValidation = builder.strictValidation; + allowLeadingZeros = builder.allowLeadingZeros; + allowNonNumericNumbers = builder.allowNonNumericNumbers; + allowUnquotedControlChars = builder.allowUnquotedControlChars; } public boolean isDayFirst() { @@ -75,6 +83,22 @@ public boolean keepStringQuotes() { return keepStringQuotes; } + public boolean strictValidation() { + return strictValidation; + } + + public boolean leadingZerosAllowed() { + return allowLeadingZeros; + } + + public boolean nonNumericNumbersAllowed() { + return allowNonNumericNumbers; + } + + public boolean unquotedControlChars() { + return allowUnquotedControlChars; + } + @Override String[] getIncludeColumnNames() { throw new UnsupportedOperationException("JSON reader didn't support column prune"); @@ -85,6 +109,10 @@ public static Builder builder() { } public static final class Builder extends ColumnFilterOptions.Builder { + private boolean strictValidation = false; + private boolean allowUnquotedControlChars = true; + private boolean allowNonNumericNumbers = false; + private boolean allowLeadingZeros = false; private boolean dayFirst = false; private boolean lines = true; @@ -95,10 +123,45 @@ public static final class Builder extends ColumnFilterOptions.Builder(normalize_single_quotes)) .normalize_whitespace(static_cast(normalize_whitespace)) .mixed_types_as_string(mixed_types_as_string) + .strict_validation(strict_validation) .keep_quotes(keep_quotes); - + if (strict_validation) { + opts.numeric_leading_zeros(allow_leading_zeros) + .nonnumeric_numbers(allow_nonnumeric_numbers) + .unquoted_control_chars(allow_unquoted_control); + } auto result = std::make_unique(cudf::io::read_json(opts.build())); @@ -1652,17 +1661,22 @@ Java_ai_rapids_cudf_Table_readAndInferJSONFromDataSource(JNIEnv* env, CATCH_STD(env, 0); } -JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readAndInferJSON(JNIEnv* env, - jclass, - jlong buffer, - jlong buffer_length, - jboolean day_first, - jboolean lines, - jboolean recover_with_null, - jboolean normalize_single_quotes, - jboolean normalize_whitespace, - jboolean mixed_types_as_string, - jboolean keep_quotes) +JNIEXPORT jlong JNICALL +Java_ai_rapids_cudf_Table_readAndInferJSON(JNIEnv* env, + jclass, + jlong buffer, + jlong buffer_length, + jboolean day_first, + jboolean lines, + jboolean recover_with_null, + jboolean normalize_single_quotes, + jboolean normalize_whitespace, + jboolean mixed_types_as_string, + jboolean keep_quotes, + jboolean strict_validation, + jboolean allow_leading_zeros, + jboolean allow_nonnumeric_numbers, + jboolean allow_unquoted_control) { JNI_NULL_CHECK(env, buffer, "buffer cannot be null", 0); if (buffer_length <= 0) { @@ -1684,8 +1698,14 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readAndInferJSON(JNIEnv* env, .recovery_mode(recovery_mode) .normalize_single_quotes(static_cast(normalize_single_quotes)) .normalize_whitespace(static_cast(normalize_whitespace)) + .strict_validation(strict_validation) .mixed_types_as_string(mixed_types_as_string) .keep_quotes(keep_quotes); + if (strict_validation) { + opts.numeric_leading_zeros(allow_leading_zeros) + .nonnumeric_numbers(allow_nonnumeric_numbers) + .unquoted_control_chars(allow_unquoted_control); + } auto result = std::make_unique(cudf::io::read_json(opts.build())); @@ -1790,6 +1810,10 @@ Java_ai_rapids_cudf_Table_readJSONFromDataSource(JNIEnv* env, jboolean normalize_whitespace, jboolean mixed_types_as_string, jboolean keep_quotes, + jboolean strict_validation, + jboolean allow_leading_zeros, + jboolean allow_nonnumeric_numbers, + jboolean allow_unquoted_control, jlong ds_handle) { JNI_NULL_CHECK(env, ds_handle, "no data source handle given", 0); @@ -1824,7 +1848,13 @@ Java_ai_rapids_cudf_Table_readJSONFromDataSource(JNIEnv* env, .normalize_single_quotes(static_cast(normalize_single_quotes)) .normalize_whitespace(static_cast(normalize_whitespace)) .mixed_types_as_string(mixed_types_as_string) + .strict_validation(strict_validation) .keep_quotes(keep_quotes); + if (strict_validation) { + opts.numeric_leading_zeros(allow_leading_zeros) + .nonnumeric_numbers(allow_nonnumeric_numbers) + .unquoted_control_chars(allow_unquoted_control); + } if (!n_types.is_null()) { if (n_types.size() != n_scales.size()) { @@ -1874,7 +1904,11 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(JNIEnv* env, jboolean normalize_single_quotes, jboolean normalize_whitespace, jboolean mixed_types_as_string, - jboolean keep_quotes) + jboolean keep_quotes, + jboolean strict_validation, + jboolean allow_leading_zeros, + jboolean allow_nonnumeric_numbers, + jboolean allow_unquoted_control) { bool read_buffer = true; if (buffer == 0) { @@ -1923,7 +1957,13 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(JNIEnv* env, .normalize_single_quotes(static_cast(normalize_single_quotes)) .normalize_whitespace(static_cast(normalize_whitespace)) .mixed_types_as_string(mixed_types_as_string) + .strict_validation(strict_validation) .keep_quotes(keep_quotes); + if (strict_validation) { + opts.numeric_leading_zeros(allow_leading_zeros) + .nonnumeric_numbers(allow_nonnumeric_numbers) + .unquoted_control_chars(allow_unquoted_control); + } if (!n_types.is_null()) { if (n_types.size() != n_scales.size()) { diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 050bcbb268f..56fe63598d9 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -437,6 +437,7 @@ void testReadWhitespacesJSONFile() throws IOException { } } + @Test void testReadSingleQuotesJSONFileKeepQuotes() throws IOException { Schema schema = Schema.builder() .column(DType.STRING, "A") @@ -455,6 +456,206 @@ void testReadSingleQuotesJSONFileKeepQuotes() throws IOException { } } + private static final byte[] JSON_VALIDATION_BUFFER = ( + "{\"a\":true}\n" + + "{\"a\":false}\n" + + "{\"a\":null}\n" + + "{\"a\":true, \"b\":truee}\n" + + "{\"a\":true, \"b\":\"nulll\"}\n" + + "{\"a\": 1}\n" + + "{\"a\": 0}\n" + + "{\"a\": -}\n" + + "{\"a\": -0}\n" + + "{\"a\": -01}\n" + + + "{\"a\": 01}\n" + + "{\"a\": -0.1}\n" + + "{\"a\": -00.1}\n" + + "{\"a\": NaN}\n" + + "{\"a\": INF}\n" + + "{\"a\": +INF}\n" + + "{\"a\": -INF}\n" + + "{\"a\": +Infinity}\n" + + "{\"a\": Infinity}\n" + + "{\"a\": -Infinity}\n" + + + "{\"a\": INFinity}\n" + + "{\"a\":\"3710-11-10T02:46:58.732Z\"}\n" + + "{\"a\":12.}\n" + + "{\"a\": -3.4e+38}\n" + + "{\"a\": -3.4e-38}\n" + + "{\"a\": 1.4e38}\n" + + "{\"a\": -3.4E+38}\n" + + "{\"a\": -3.4E-38}\n" + + "{\"a\": 1.4E38}\n" + + "{\"a\": -3.4E+}\n" + + + "{\"a\": -3.4E-}\n" + + "{\"a\": \"A\u0000B\"}\n" + + "{\"a\": \"A\\u0000B\"}\n" + + "{\"a\": \"A\u0001B\"}\n" + + "{\"a\": \"A\\u0001B\"}\n" + + "{\"a\": \"A\u001FB\"}\n" + + "{\"a\": \"A\\u001FB\"}\n" + + "{\"a\": \"A\u0020B\"}\n" + + "{\"a\": \"A\\u0020B\"}\n" + + "{\"a\": \"\\u12\"}\n" + + + "{\"a\": \"\\z\"}\n" + + "{\"a\": \"\\r\"}\n" + + "{\"a\": \"something\", \"b\": \"\\z\"}\n" + ).getBytes(StandardCharsets.UTF_8); + + @Test + void testJSONValidationNoStrict() { + Schema schema = Schema.builder() + .column(DType.STRING, "a") + .build(); + JSONOptions opts = JSONOptions.builder() + .withRecoverWithNull(true) + .withMixedTypesAsStrings(true) + .withNormalizeWhitespace(true) + .withKeepQuotes(true) + .withNormalizeSingleQuotes(true) + .withStrictValidation(false) + .withLeadingZeros(false) + .withNonNumericNumbers(false) + .withUnquotedControlChars(true) + .build(); + try (Table expected = new Table.TestBuilder() + .column( + "true", "false", null, "true", "true", "1", "0", "-", "-0", "-01", + "01", "-0.1", "-00.1", "NaN", "INF", "+INF", "-INF", "+Infinity", "Infinity", "-Infinity", + "INFinity", "\"3710-11-10T02:46:58.732Z\"", "12.", "-3.4e+38", "-3.4e-38", "1.4e38", "-3.4E+38", "-3.4E-38", "1.4E38", "-3.4E+", + "-3.4E-", "\"A\u0000B\"", "\"A\u0000B\"", "\"A\u0001B\"", "\"A\u0001B\"", "\"A\u001FB\"", "\"A\u001FB\"", "\"A B\"", "\"A B\"", null, + null, "\"\r\"", "\"something\"") + .build(); + MultiBufferDataSource source = sourceFrom(JSON_VALIDATION_BUFFER); + Table table = Table.readJSON(schema, opts, source, (int)expected.getRowCount())) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testJSONValidation() { + Schema schema = Schema.builder() + .column(DType.STRING, "a") + .build(); + JSONOptions opts = JSONOptions.builder() + .withRecoverWithNull(true) + .withMixedTypesAsStrings(true) + .withNormalizeWhitespace(true) + .withKeepQuotes(true) + .withNormalizeSingleQuotes(true) + .withStrictValidation(true) + .withLeadingZeros(false) + .withNonNumericNumbers(false) + .withUnquotedControlChars(true) + .build(); + try (Table expected = new Table.TestBuilder() + .column( + "true", "false", null, null, "true", "1", "0", null, "-0", null, + null, "-0.1", null, null, null, null, null, null, null, null, + null, "\"3710-11-10T02:46:58.732Z\"", null, "-3.4e+38", "-3.4e-38", "1.4e38", "-3.4E+38", "-3.4E-38", "1.4E38", null, + null, "\"A\u0000B\"", "\"A\u0000B\"", "\"A\u0001B\"", "\"A\u0001B\"", "\"A\u001FB\"", "\"A\u001FB\"", "\"A B\"", "\"A B\"", null, + null, "\"\r\"", null) + .build(); + MultiBufferDataSource source = sourceFrom(JSON_VALIDATION_BUFFER); + Table table = Table.readJSON(schema, opts, source, (int)expected.getRowCount())) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testJSONValidationLeadingZeros() { + Schema schema = Schema.builder() + .column(DType.STRING, "a") + .build(); + JSONOptions opts = JSONOptions.builder() + .withRecoverWithNull(true) + .withMixedTypesAsStrings(true) + .withNormalizeWhitespace(true) + .withKeepQuotes(true) + .withNormalizeSingleQuotes(true) + .withStrictValidation(true) + .withLeadingZeros(true) + .withNonNumericNumbers(false) + .withUnquotedControlChars(true) + .build(); + try (Table expected = new Table.TestBuilder() + .column( + "true", "false", null, null, "true", "1", "0", null, "-0", "-01", + "01", "-0.1", "-00.1", null, null, null, null, null, null, null, + null, "\"3710-11-10T02:46:58.732Z\"", null, "-3.4e+38", "-3.4e-38", "1.4e38", "-3.4E+38", "-3.4E-38", "1.4E38", null, + null, "\"A\u0000B\"", "\"A\u0000B\"", "\"A\u0001B\"", "\"A\u0001B\"", "\"A\u001FB\"", "\"A\u001FB\"", "\"A B\"", "\"A B\"", null, + null, "\"\r\"", null) + .build(); + MultiBufferDataSource source = sourceFrom(JSON_VALIDATION_BUFFER); + Table table = Table.readJSON(schema, opts, source, (int)expected.getRowCount())) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testJSONValidationNonNumeric() { + Schema schema = Schema.builder() + .column(DType.STRING, "a") + .build(); + JSONOptions opts = JSONOptions.builder() + .withRecoverWithNull(true) + .withMixedTypesAsStrings(true) + .withNormalizeWhitespace(true) + .withKeepQuotes(true) + .withNormalizeSingleQuotes(true) + .withStrictValidation(true) + .withLeadingZeros(false) + .withNonNumericNumbers(true) + .withUnquotedControlChars(true) + .build(); + try (Table expected = new Table.TestBuilder() + .column( + "true", "false", null, null, "true", "1", "0", null, "-0", null, + null, "-0.1", null, "NaN", null, "+INF", "-INF", "+Infinity", "Infinity", "-Infinity", + null, "\"3710-11-10T02:46:58.732Z\"", null, "-3.4e+38", "-3.4e-38", "1.4e38", "-3.4E+38", "-3.4E-38", "1.4E38", null, + null, "\"A\u0000B\"", "\"A\u0000B\"", "\"A\u0001B\"", "\"A\u0001B\"", "\"A\u001FB\"", "\"A\u001FB\"", "\"A B\"", "\"A B\"", null, + null, "\"\r\"", null) + .build(); + MultiBufferDataSource source = sourceFrom(JSON_VALIDATION_BUFFER); + Table table = Table.readJSON(schema, opts, source, (int)expected.getRowCount())) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testJSONValidationUnquotedControl() { + Schema schema = Schema.builder() + .column(DType.STRING, "a") + .build(); + JSONOptions opts = JSONOptions.builder() + .withRecoverWithNull(true) + .withMixedTypesAsStrings(true) + .withNormalizeWhitespace(true) + .withKeepQuotes(true) + .withNormalizeSingleQuotes(true) + .withStrictValidation(true) + .withLeadingZeros(false) + .withNonNumericNumbers(false) + .withUnquotedControlChars(false) + .build(); + try (Table expected = new Table.TestBuilder() + .column( + "true", "false", null, null, "true", "1", "0", null, "-0", null, + null, "-0.1", null, null, null, null, null, null, null, null, + null, "\"3710-11-10T02:46:58.732Z\"", null, "-3.4e+38", "-3.4e-38", "1.4e38", "-3.4E+38", "-3.4E-38", "1.4E38", null, + null, null, "\"A\u0000B\"", null, "\"A\u0001B\"", null, "\"A\u001FB\"", "\"A B\"", "\"A B\"", null, + null, "\"\r\"", null) + .build(); + MultiBufferDataSource source = sourceFrom(JSON_VALIDATION_BUFFER); + Table table = Table.readJSON(schema, opts, source, (int)expected.getRowCount())) { + assertTablesAreEqual(expected, table); + } + } + private static final byte[] NESTED_JSON_DATA_BUFFER = ("{\"a\":{\"c\":\"C1\"}}\n" + "{\"a\":{\"c\":\"C2\", \"b\":\"B2\"}}\n" + "{\"d\":[1,2,3]}\n" + diff --git a/python/cudf/cudf/__init__.py b/python/cudf/cudf/__init__.py index d7da42a1708..99b759e2166 100644 --- a/python/cudf/cudf/__init__.py +++ b/python/cudf/cudf/__init__.py @@ -46,7 +46,7 @@ ListDtype, StructDtype, ) -from cudf.core.groupby import Grouper +from cudf.core.groupby import Grouper, NamedAgg from cudf.core.index import ( BaseIndex, CategoricalIndex, diff --git a/python/cudf/cudf/core/groupby/__init__.py b/python/cudf/cudf/core/groupby/__init__.py index 4375ed3e3da..621edb316cf 100644 --- a/python/cudf/cudf/core/groupby/__init__.py +++ b/python/cudf/cudf/core/groupby/__init__.py @@ -1,8 +1,9 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from cudf.core.groupby.groupby import GroupBy, Grouper +from cudf.core.groupby.groupby import GroupBy, Grouper, NamedAgg __all__ = [ "GroupBy", "Grouper", + "NamedAgg", ] diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 4f283d41b17..6424c8af877 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -76,6 +76,34 @@ def _is_row_of(chunk, obj): ) +NamedAgg = pd.NamedAgg + + +NamedAgg.__doc__ = """ +Helper for column specific aggregation with control over output column names. + +Subclass of typing.NamedTuple. + +Parameters +---------- +column : Hashable + Column label in the DataFrame to apply aggfunc. +aggfunc : function or str + Function to apply to the provided column. + +Examples +-------- +>>> df = cudf.DataFrame({"key": [1, 1, 2], "a": [-1, 0, 1], 1: [10, 11, 12]}) +>>> agg_a = cudf.NamedAgg(column="a", aggfunc="min") +>>> agg_1 = cudf.NamedAgg(column=1, aggfunc=lambda x: x.mean()) +>>> df.groupby("key").agg(result_a=agg_a, result_1=agg_1) + result_a result_1 +key +1 -1 10.5 +2 1 12.0 +""" + + groupby_doc_template = textwrap.dedent( """Group using a mapper or by a Series of columns. @@ -1296,9 +1324,21 @@ def _normalize_aggs( columns = values._columns aggs_per_column = (aggs,) * len(columns) elif not aggs and kwargs: - column_names, aggs_per_column = kwargs.keys(), kwargs.values() - columns = tuple(self.obj._data[x[0]] for x in kwargs.values()) - aggs_per_column = tuple(x[1] for x in kwargs.values()) + column_names = kwargs.keys() + + def _raise_invalid_type(x): + raise TypeError( + f"Invalid keyword argument {x} of type {type(x)} was passed to agg" + ) + + columns, aggs_per_column = zip( + *( + (self.obj._data[x[0]], x[1]) + if isinstance(x, tuple) + else _raise_invalid_type(x) + for x in kwargs.values() + ) + ) else: raise TypeError("Must provide at least one aggregation function.") diff --git a/python/cudf/cudf/tests/groupby/test_agg.py b/python/cudf/cudf/tests/groupby/test_agg.py index 99e7523031b..dc20a27177a 100644 --- a/python/cudf/cudf/tests/groupby/test_agg.py +++ b/python/cudf/cudf/tests/groupby/test_agg.py @@ -56,3 +56,19 @@ def test_dataframe_agg(attr, func): ) assert_eq(agg, pd_agg) + + agg = getattr(df.groupby("a"), attr)( + foo=cudf.NamedAgg(column="b", aggfunc=func), + bar=cudf.NamedAgg(column="a", aggfunc=func), + ) + pd_agg = getattr(pdf.groupby(["a"]), attr)( + foo=("b", func), bar=("a", func) + ) + + assert_eq(agg, pd_agg) + + +def test_dataframe_agg_with_invalid_kwarg(): + with pytest.raises(TypeError, match="Invalid keyword argument"): + df = cudf.DataFrame({"a": [1, 2, 1, 2], "b": [0, 0, 0, 0]}) + df.groupby("a").agg(foo=set())