Skip to content

Commit

Permalink
Minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Sep 18, 2024
1 parent e04602c commit 21ce7c7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
8 changes: 6 additions & 2 deletions cpp/examples/parquet_io/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <fmt/color.h>

#include <chrono>
#include <iostream>
#include <optional>
Expand Down Expand Up @@ -146,9 +148,11 @@ inline void check_identical_tables(cudf::table_view const& lhs_table,

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl;
fmt::print(
fmt::emphasis::bold | fg(fmt::color::green_yellow), "Transcoding valid: {}\n", valid);
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error("Transcoding valid: false\n");
throw std::runtime_error(
fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Transcoding valid: false\n"));
}
}
14 changes: 6 additions & 8 deletions cpp/examples/parquet_io/parquet_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,24 @@ int main(int argc, char const** argv)
// Read input parquet file
// We do not want to time the initial read time as it may include
// time for nvcomp, cufile loading and RMM growth
std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl;
std::cout << "Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth."
<< std::endl
<< std::endl;
fmt::print("\nReading {}...", input_filepath);
fmt::print(
"Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth.\n\n");
auto [input, metadata] = read_parquet(input_filepath);

// Status string to indicate if page stats are set to be written or not
auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
// Write parquet file with the specified encoding and compression
std::cout << "Writing " << output_filepath << " with encoding, compression and "
<< page_stat_string << ".." << std::endl;
fmt::print("Writing {} with encoding, compression and {}..\n", output_filepath, page_stat_string);

// `timer` is automatically started here
cudf::examples::timer timer;
write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats);
timer.print_elapsed_millis();

// Read the parquet file written with encoding and compression
std::cout << "Reading " << output_filepath << "..." << std::endl;
fmt::print("Reading {}...\n", output_filepath);

// Reset the timer
timer.reset();
Expand Down
44 changes: 27 additions & 17 deletions cpp/examples/parquet_io/parquet_io_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct read_fn {

struct write_fn {
std::string const& output_path;
std::vector<table_t> const& tables;
std::vector<cudf::table_view> const& table_views;
cudf::io::column_encoding const encoding;
cudf::io::compression_type const compression;
std::optional<cudf::io::statistics_freq> const stats_level;
Expand All @@ -85,10 +85,10 @@ struct write_fn {
// write the data for inspection
auto sink_info =
cudf::io::sink_info(output_path + "/table_" + std::to_string(thread_id) + ".parquet");
auto builder = cudf::io::parquet_writer_options::builder(sink_info, tables[thread_id]->view())
auto builder = cudf::io::parquet_writer_options::builder(sink_info, table_views[thread_id])
.compression(compression)
.stats_level(stats_level.value_or(cudf::io::statistics_freq::STATISTICS_NONE));
auto table_metadata = cudf::io::table_input_metadata{tables[thread_id]->view()};
auto table_metadata = cudf::io::table_input_metadata{table_views[thread_id]};

std::for_each(table_metadata.column_metadata.begin(),
table_metadata.column_metadata.end(),
Expand Down Expand Up @@ -246,8 +246,8 @@ int main(int argc, char const** argv)
return tables;
};

// Lambda function to setup and launch multithread parquet write
auto const write_parquet_multithreaded = [&](std::vector<table_t> const& tables) {
// Lambda function to setup and launch multithreaded parquet writes
auto const write_parquet_multithreaded = [&](std::vector<cudf::table_view> const& tables) {
// Tasks to read each parquet file
std::vector<write_fn> write_tasks;
write_tasks.reserve(thread_count);
Expand All @@ -271,23 +271,33 @@ int main(int argc, char const** argv)

// Read the parquet files with multiple threads
{
std::cout << "Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth."
<< std::endl
<< std::endl;
fmt::print(
"Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth.\n\n");
// Tables read by each thread
auto const tables = read_parquet_multithreaded(input_files);
// In case some kernels are still running on the default stre
default_stream.synchronize();

// Write parquet file with the specified encoding and compression
// Construct a vector of table views for write_parquet_multithreaded
auto const table_views = [&tables]() {
std::vector<cudf::table_view> table_views;
table_views.reserve(tables.size());

std::transform(
tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) {
return tbl->view();
});
return table_views;
}();

// Write tables to parquet with the specified encoding and compression
auto const page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
std::cout << "Writing at: " << output_path << " with encoding, compression and "
<< page_stat_string << ".." << std::endl;
fmt::print(
"Writing at: {} with encoding, compression and {}..\n", output_path, page_stat_string);

// Write tables using multiple threads
cudf::examples::timer timer;
write_parquet_multithreaded(tables);
write_parquet_multithreaded(table_views);
// In case some kernels are still running on the default stream
default_stream.synchronize();
// Print elapsed time
Expand All @@ -296,7 +306,7 @@ int main(int argc, char const** argv)

// Re-read the same parquet files with multiple threads
{
std::cout << "Reading for the second time using " << thread_count << " threads..." << std::endl;
fmt::print("Reading for the second time using {} threads...\n", thread_count);
cudf::examples::timer timer;
auto const input_table =
concatenate_tables(read_parquet_multithreaded(input_files), default_stream);
Expand All @@ -305,7 +315,7 @@ int main(int argc, char const** argv)
// Print elapsed time and peak memory
timer.print_elapsed_millis();

std::cout << "Reading transcoded files using " << thread_count << " threads..." << std::endl;
fmt::print("Reading transcoded files using {} threads...\n", thread_count);
timer.reset();
auto const transcoded_table = concatenate_tables(
read_parquet_multithreaded(extract_input_files(output_path)), default_stream);
Expand All @@ -314,7 +324,7 @@ int main(int argc, char const** argv)
// Print elapsed time and peak memory
timer.print_elapsed_millis();

std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n";
fmt::print("Peak memory: {} MB\n\n", (stats_mr.get_bytes_counter().peak / 1048576.0));

// Check for validity
check_identical_tables(input_table->view(), transcoded_table->view());
Expand Down

0 comments on commit 21ce7c7

Please sign in to comment.