Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Sep 27, 2024
1 parent 9dc8a1e commit af8ec6a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 72 deletions.
4 changes: 2 additions & 2 deletions cpp/examples/parquet_io/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ inline void check_identical_tables(cudf::table_view const& lhs_table,
// No exception thrown, check indices
auto const valid = indices->size() == 0;
fmt::print(
fmt::emphasis::bold | fg(fmt::color::green_yellow), "Transcoding valid: {}\n\n", valid);
fmt::emphasis::bold | fg(fmt::color::green_yellow), "Tables identical: {}\n\n", valid);
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error(
fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Transcoding valid: false\n\n"));
fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Tables identical: false\n\n"));
}
}

Expand Down
134 changes: 64 additions & 70 deletions cpp/examples/parquet_io/parquet_io_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,19 @@ int32_t main(int argc, char const** argv)
// Set arguments to defaults
std::string input_paths = "example.parquet";
int32_t input_multiplier = 1;
int32_t num_reads = 1;
int32_t thread_count = 2;
std::optional<cudf::io::io_type> io_type = std::nullopt;
bool validate_output = false;

// Function to print example usage
auto const print_usage = [] {
fmt::print(fg(fmt::color::yellow),
"\nUsage: parquet_io_multithreaded <comma delimited list of dirs and/or files>\n"
" <input files multiplier> <number of threads> \n"
" <io sink type> <validate output: yes/no>\n\n");
fmt::print(
fg(fmt::color::yellow),
"\nUsage: parquet_io_multithreaded <comma delimited list of dirs and/or files>\n"
" <input files multiplier> <number of times to reads>\n"
" <thread count> <io sink type> <validate output: "
"yes/no>\n\n");
fmt::print(
fg(fmt::color::light_sky_blue),
"Note: Provide as many arguments as you like in the above order. Default values\n"
Expand All @@ -198,9 +201,10 @@ int32_t main(int argc, char const** argv)

// Set to the provided args
switch (argc) {
case 6: validate_output = get_boolean(argv[5]); [[fallthrough]];
case 5: io_type = get_io_sink_type(argv[4]); [[fallthrough]];
case 4: thread_count = std::max(thread_count, std::stoi(std::string{argv[3]})); [[fallthrough]];
case 7: validate_output = get_boolean(argv[6]); [[fallthrough]];
case 6: io_type = get_io_sink_type(argv[5]); [[fallthrough]];
case 5: thread_count = std::max(thread_count, std::stoi(std::string{argv[4]})); [[fallthrough]];
case 4: num_reads = std::max(1, std::stoi(std::string{argv[3]})); [[fallthrough]];
case 3:
input_multiplier = std::max(input_multiplier, std::stoi(std::string{argv[2]}));
[[fallthrough]];
Expand Down Expand Up @@ -308,82 +312,72 @@ int32_t main(int argc, char const** argv)
return 0;
}

// Read the parquet files with multiple threads
{
fmt::print(fg(fmt::color::yellow),
"\nReading {} input files using {} threads without timing it as \n"
"it may include times for nvcomp, cufile loading and RMM growth.\n\n",
input_files.size(),
thread_count);

// If we are writing output then read with CONCATENATE_THREAD
if (io_type.has_value()) {
// Launch
auto const tables = read_parquet_multithreaded<read_mode::CONCATENATE_THREAD>(
input_files, thread_count, stream_pool);
default_stream.synchronize();
// Initialize the default output path to avoid race condition with multiple writer threads.
std::ignore = get_default_output_path();

// Construct a vector of table views for write_parquet_multithreaded
auto const table_views = [&tables]() {
std::vector<cudf::table_view> table_views;
table_views.reserve(tables.size());

std::transform(
tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) {
return tbl->view();
});
return table_views;
}();

// Write tables to parquet
fmt::print("Writing parquet output to sink type: {}..\n", std::string{argv[4]});
cudf::examples::timer timer;
write_parquet_multithreaded(table_views, thread_count, stream_pool);
default_stream.synchronize();
timer.print_elapsed_millis();
}
// Else simply read with NOWORK mode
else {
std::ignore =
read_parquet_multithreaded<read_mode::NOWORK>(input_files, thread_count, stream_pool);
default_stream.synchronize();
}
}

// Re-read the same parquet files with multiple threads and discard the read tables
// Read the same parquet files specified times with multiple threads and discard the read tables
{
fmt::print(
"Re-reading {} input files using {} threads and discarding output "
"\nReading {} input files {} times using {} threads and discarding output "
"tables..\n",
input_files.size(),
num_reads,
thread_count);
fmt::print(
fg(fmt::color::yellow),
"Note that the first read may include times for nvcomp, cufile loading and RMM growth.\n\n");
cudf::examples::timer timer;
// Read parquet files and discard the tables
std::ignore =
read_parquet_multithreaded<read_mode::NOWORK>(input_files, thread_count, stream_pool);
std::for_each(thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_reads),
[&](auto i) { // Read parquet files and discard the tables
std::ignore = read_parquet_multithreaded<read_mode::NOWORK>(
input_files, thread_count, stream_pool);
});
default_stream.synchronize();
timer.print_elapsed_millis();
}

// Verify the output files if requested
if (validate_output and io_type.has_value()) {
fmt::print("Verifying output..\n");

// CONCATENATE_ALL returns a vector of 1 table
auto const input_table = std::move(
read_parquet_multithreaded<read_mode::CONCATENATE_ALL>(input_files, thread_count, stream_pool)
.back());
// Do we need to write parquet as well?
if (io_type.has_value()) {
// Read input files with CONCATENATE_THREADS mode
auto const tables = read_parquet_multithreaded<read_mode::CONCATENATE_THREAD>(
input_files, thread_count, stream_pool);
default_stream.synchronize();
// Initialize the default output path to avoid race condition with multiple writer threads.
std::ignore = get_default_output_path();

// Construct a vector of table views for write_parquet_multithreaded
auto const table_views = [&tables]() {
std::vector<cudf::table_view> table_views;
table_views.reserve(tables.size());

std::transform(
tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) {
return tbl->view();
});
return table_views;
}();

auto const transcoded_table =
std::move(read_parquet_multithreaded<read_mode::CONCATENATE_ALL>(
extract_input_files(get_default_output_path()), thread_count, stream_pool)
.back());
// Write tables to parquet
fmt::print("Writing parquet output to sink type: {}..\n", std::string{argv[5]});
cudf::examples::timer timer;
write_parquet_multithreaded(table_views, thread_count, stream_pool);
default_stream.synchronize();
timer.print_elapsed_millis();

// Verify the output if requested
if (validate_output) {
fmt::print("Verifying output..\n");

// CONCATENATE_ALL returns a vector of 1 table
auto const input_table = cudf::concatenate(table_views, default_stream);

// Check for validity
check_identical_tables(input_table->view(), transcoded_table->view());
auto const transcoded_table =
std::move(read_parquet_multithreaded<read_mode::CONCATENATE_ALL>(
extract_input_files(get_default_output_path()), thread_count, stream_pool)
.back());
default_stream.synchronize();

// Check if the tables are identical
check_identical_tables(input_table->view(), transcoded_table->view());
}
}

// Print peak memory
Expand Down

0 comments on commit af8ec6a

Please sign in to comment.