diff --git a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp index f39f86b7e08..f46f02966f1 100644 --- a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp +++ b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp @@ -16,8 +16,11 @@ #include "common.hpp" +#include #include +#include + #include /** @@ -136,15 +139,15 @@ int main(int argc, char const** argv) } // Process and extract all input files - auto const input_files = [&]() { + auto const extract_input_files = [thread_count = thread_count](std::string const& paths) { std::vector parquet_files; std::vector delimited_paths = [&]() { std::vector paths_list; - std::stringstream stream{input_paths}; + std::stringstream stream{paths}; std::string path; - // extract the delimited paths. + // Extract the delimited paths. while (std::getline(stream, path, char{','})) { - paths_list.push_back(path); // Add each token to the vector + paths_list.push_back(path); } return paths_list; }(); @@ -175,13 +178,37 @@ int main(int argc, char const** argv) } return parquet_files; - }(); + }; + + // Concatenate a vector of tables and return + auto const concatenate_tables = [](std::vector& tables, rmm::cuda_stream_view stream) { + // Construct the final table + auto table = std::move(tables[0]); + std::for_each(tables.begin() + 1, tables.end(), [&](auto& tbl) { + std::vector const table_views{table->view(), tbl->view()}; + table = cudf::concatenate(table_views, stream); + }); + return table; + }; + + // make input files from the input_paths string. + auto const input_files = extract_input_files(input_paths); // Exit early if nothing to do. - if (not input_files.size()) { return 0; } + if (not input_files.size()) { + std::cerr << "No input files to read. Exiting early.\n"; + return 0; + } - // Check if output path is a directory. - if (not std::filesystem::is_directory(std::filesystem::path{output_path})) { + // Check if output path is a valid + if (std::filesystem::is_directory({output_path})) { + // Create a new directory in output path if not empty. + if (not std::filesystem::is_empty({output_path})) { + output_path += + "/output_" + fmt::format("{:%Y-%m-%d-%H-%M-%S}", std::chrono::system_clock::now()); + std::filesystem::create_directory({output_path}); + } + } else { throw std::runtime_error("The provided output path is not a directory\n"); } @@ -275,20 +302,40 @@ int main(int argc, char const** argv) { std::cout << "Reading for the second time using " << thread_count << " threads..." << std::endl; cudf::examples::timer timer; - auto tables = read_parquet_multithreaded(input_files); - // Construct the final table - auto table = std::move(tables[0]); - std::for_each(tables.begin() + 1, tables.end(), [&](auto& tbl) { - std::vector const table_views{table->view(), tbl->view()}; - table = cudf::concatenate(table_views, default_stream); - }); - + auto tables = read_parquet_multithreaded(input_files); + auto const table = concatenate_tables(tables, default_stream); // In case some kernels are still running on the default stream default_stream.synchronize(); + // Print elapsed time and peak memory + timer.print_elapsed_millis(); + std::cout << "Reading transcoded files using " << thread_count << " threads..." << std::endl; + timer.reset(); + auto transcoded_tables = read_parquet_multithreaded(extract_input_files(output_path)); + auto const transcoded_table = concatenate_tables(transcoded_tables, default_stream); // Print elapsed time and peak memory timer.print_elapsed_millis(); - std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n"; + + // In case some kernels are still running on the default stream + default_stream.synchronize(); + + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n"; + + // Check for validity + try { + // Left anti-join the original and transcoded tables + // identical tables should not throw an exception and + // return an empty indices vector + auto const indices = cudf::left_anti_join( + table->view(), transcoded_table->view(), cudf::null_equality::EQUAL, resource.get()); + + // No exception thrown, check indices + auto const valid = indices->size() == 0; + std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl; + } catch (std::exception& e) { + std::cerr << e.what() << std::endl << std::endl; + std::cout << "Transcoding valid: false" << std::endl; + } } return 0;