diff --git a/cpp/examples/parquet_io/common.hpp b/cpp/examples/parquet_io/common.hpp index c4cbd6a589a..37eb138640a 100644 --- a/cpp/examples/parquet_io/common.hpp +++ b/cpp/examples/parquet_io/common.hpp @@ -145,11 +145,11 @@ inline void check_identical_tables(cudf::table_view const& lhs_table, // No exception thrown, check indices auto const valid = indices->size() == 0; fmt::print( - fmt::emphasis::bold | fg(fmt::color::green_yellow), "Transcoding valid: {}\n\n", valid); + fmt::emphasis::bold | fg(fmt::color::green_yellow), "Tables identical: {}\n\n", valid); } catch (std::exception& e) { std::cerr << e.what() << std::endl << std::endl; throw std::runtime_error( - fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Transcoding valid: false\n\n")); + fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Tables identical: false\n\n")); } } diff --git a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp index 02e4c772e5b..7728c91cbb7 100644 --- a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp +++ b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp @@ -179,16 +179,19 @@ int32_t main(int argc, char const** argv) // Set arguments to defaults std::string input_paths = "example.parquet"; int32_t input_multiplier = 1; + int32_t num_reads = 1; int32_t thread_count = 2; std::optional io_type = std::nullopt; bool validate_output = false; // Function to print example usage auto const print_usage = [] { - fmt::print(fg(fmt::color::yellow), - "\nUsage: parquet_io_multithreaded \n" - " \n" - " \n\n"); + fmt::print( + fg(fmt::color::yellow), + "\nUsage: parquet_io_multithreaded \n" + " \n" + " \n\n"); fmt::print( fg(fmt::color::light_sky_blue), "Note: Provide as many arguments as you like in the above order. Default values\n" @@ -198,9 +201,10 @@ int32_t main(int argc, char const** argv) // Set to the provided args switch (argc) { - case 6: validate_output = get_boolean(argv[5]); [[fallthrough]]; - case 5: io_type = get_io_sink_type(argv[4]); [[fallthrough]]; - case 4: thread_count = std::max(thread_count, std::stoi(std::string{argv[3]})); [[fallthrough]]; + case 7: validate_output = get_boolean(argv[6]); [[fallthrough]]; + case 6: io_type = get_io_sink_type(argv[5]); [[fallthrough]]; + case 5: thread_count = std::max(thread_count, std::stoi(std::string{argv[4]})); [[fallthrough]]; + case 4: num_reads = std::max(1, std::stoi(std::string{argv[3]})); [[fallthrough]]; case 3: input_multiplier = std::max(input_multiplier, std::stoi(std::string{argv[2]})); [[fallthrough]]; @@ -308,82 +312,72 @@ int32_t main(int argc, char const** argv) return 0; } - // Read the parquet files with multiple threads - { - fmt::print(fg(fmt::color::yellow), - "\nReading {} input files using {} threads without timing it as \n" - "it may include times for nvcomp, cufile loading and RMM growth.\n\n", - input_files.size(), - thread_count); - - // If we are writing output then read with CONCATENATE_THREAD - if (io_type.has_value()) { - // Launch - auto const tables = read_parquet_multithreaded( - input_files, thread_count, stream_pool); - default_stream.synchronize(); - // Initialize the default output path to avoid race condition with multiple writer threads. - std::ignore = get_default_output_path(); - - // Construct a vector of table views for write_parquet_multithreaded - auto const table_views = [&tables]() { - std::vector table_views; - table_views.reserve(tables.size()); - - std::transform( - tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) { - return tbl->view(); - }); - return table_views; - }(); - - // Write tables to parquet - fmt::print("Writing parquet output to sink type: {}..\n", std::string{argv[4]}); - cudf::examples::timer timer; - write_parquet_multithreaded(table_views, thread_count, stream_pool); - default_stream.synchronize(); - timer.print_elapsed_millis(); - } - // Else simply read with NOWORK mode - else { - std::ignore = - read_parquet_multithreaded(input_files, thread_count, stream_pool); - default_stream.synchronize(); - } - } - - // Re-read the same parquet files with multiple threads and discard the read tables + // Read the same parquet files specified times with multiple threads and discard the read tables { fmt::print( - "Re-reading {} input files using {} threads and discarding output " + "\nReading {} input files {} times using {} threads and discarding output " "tables..\n", input_files.size(), + num_reads, thread_count); + fmt::print( + fg(fmt::color::yellow), + "Note that the first read may include times for nvcomp, cufile loading and RMM growth.\n\n"); cudf::examples::timer timer; - // Read parquet files and discard the tables - std::ignore = - read_parquet_multithreaded(input_files, thread_count, stream_pool); + std::for_each(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_reads), + [&](auto i) { // Read parquet files and discard the tables + std::ignore = read_parquet_multithreaded( + input_files, thread_count, stream_pool); + }); default_stream.synchronize(); timer.print_elapsed_millis(); } - // Verify the output files if requested - if (validate_output and io_type.has_value()) { - fmt::print("Verifying output..\n"); - - // CONCATENATE_ALL returns a vector of 1 table - auto const input_table = std::move( - read_parquet_multithreaded(input_files, thread_count, stream_pool) - .back()); + // Do we need to write parquet as well? + if (io_type.has_value()) { + // Read input files with CONCATENATE_THREADS mode + auto const tables = read_parquet_multithreaded( + input_files, thread_count, stream_pool); + default_stream.synchronize(); + // Initialize the default output path to avoid race condition with multiple writer threads. + std::ignore = get_default_output_path(); + + // Construct a vector of table views for write_parquet_multithreaded + auto const table_views = [&tables]() { + std::vector table_views; + table_views.reserve(tables.size()); + + std::transform( + tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) { + return tbl->view(); + }); + return table_views; + }(); - auto const transcoded_table = - std::move(read_parquet_multithreaded( - extract_input_files(get_default_output_path()), thread_count, stream_pool) - .back()); + // Write tables to parquet + fmt::print("Writing parquet output to sink type: {}..\n", std::string{argv[5]}); + cudf::examples::timer timer; + write_parquet_multithreaded(table_views, thread_count, stream_pool); default_stream.synchronize(); + timer.print_elapsed_millis(); + + // Verify the output if requested + if (validate_output) { + fmt::print("Verifying output..\n"); + + // CONCATENATE_ALL returns a vector of 1 table + auto const input_table = cudf::concatenate(table_views, default_stream); - // Check for validity - check_identical_tables(input_table->view(), transcoded_table->view()); + auto const transcoded_table = + std::move(read_parquet_multithreaded( + extract_input_files(get_default_output_path()), thread_count, stream_pool) + .back()); + default_stream.synchronize(); + + // Check if the tables are identical + check_identical_tables(input_table->view(), transcoded_table->view()); + } } // Print peak memory