From 21ce7c7cc42596880ca82de4cd56d24323275122 Mon Sep 17 00:00:00 2001
From: Muhammad Haseeb <mhaseeb@nvidia.com>
Date: Wed, 18 Sep 2024 23:09:32 +0000
Subject: [PATCH] Minor improvements

---
 cpp/examples/parquet_io/common.hpp            |  8 +++-
 cpp/examples/parquet_io/parquet_io.cpp        | 14 +++---
 .../parquet_io/parquet_io_multithreaded.cpp   | 44 ++++++++++++-------
 3 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/cpp/examples/parquet_io/common.hpp b/cpp/examples/parquet_io/common.hpp
index f4e5757412a..25f81022d07 100644
--- a/cpp/examples/parquet_io/common.hpp
+++ b/cpp/examples/parquet_io/common.hpp
@@ -31,6 +31,8 @@
 #include <rmm/mr/device/owning_wrapper.hpp>
 #include <rmm/mr/device/pool_memory_resource.hpp>
 
+#include <fmt/color.h>
+
 #include <chrono>
 #include <iostream>
 #include <optional>
@@ -146,9 +148,11 @@ inline void check_identical_tables(cudf::table_view const& lhs_table,
 
     // No exception thrown, check indices
     auto const valid = indices->size() == 0;
-    std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl;
+    fmt::print(
+      fmt::emphasis::bold | fg(fmt::color::green_yellow), "Transcoding valid: {}\n", valid);
   } catch (std::exception& e) {
     std::cerr << e.what() << std::endl << std::endl;
-    throw std::runtime_error("Transcoding valid: false\n");
+    throw std::runtime_error(
+      fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Transcoding valid: false\n"));
   }
 }
diff --git a/cpp/examples/parquet_io/parquet_io.cpp b/cpp/examples/parquet_io/parquet_io.cpp
index c981928e8f2..06505016ab9 100644
--- a/cpp/examples/parquet_io/parquet_io.cpp
+++ b/cpp/examples/parquet_io/parquet_io.cpp
@@ -126,18 +126,16 @@ int main(int argc, char const** argv)
   // Read input parquet file
   // We do not want to time the initial read time as it may include
   // time for nvcomp, cufile loading and RMM growth
-  std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl;
-  std::cout << "Note: Not timing the initial parquet read as it may include\n"
-               "times for nvcomp, cufile loading and RMM growth."
-            << std::endl
-            << std::endl;
+  fmt::print("\nReading {}...", input_filepath);
+  fmt::print(
+    "Note: Not timing the initial parquet read as it may include\n"
+    "times for nvcomp, cufile loading and RMM growth.\n\n");
   auto [input, metadata] = read_parquet(input_filepath);
 
   // Status string to indicate if page stats are set to be written or not
   auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
   // Write parquet file with the specified encoding and compression
-  std::cout << "Writing " << output_filepath << " with encoding, compression and "
-            << page_stat_string << ".." << std::endl;
+  fmt::print("Writing {} with encoding, compression and {}..\n", output_filepath, page_stat_string);
 
   // `timer` is automatically started here
   cudf::examples::timer timer;
@@ -145,7 +143,7 @@ int main(int argc, char const** argv)
   timer.print_elapsed_millis();
 
   // Read the parquet file written with encoding and compression
-  std::cout << "Reading " << output_filepath << "..." << std::endl;
+  fmt::print("Reading {}...\n", output_filepath);
 
   // Reset the timer
   timer.reset();
diff --git a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp
index 5c9be0892cb..361683c0e9e 100644
--- a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp
+++ b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp
@@ -73,7 +73,7 @@ struct read_fn {
 
 struct write_fn {
   std::string const& output_path;
-  std::vector<table_t> const& tables;
+  std::vector<cudf::table_view> const& table_views;
   cudf::io::column_encoding const encoding;
   cudf::io::compression_type const compression;
   std::optional<cudf::io::statistics_freq> const stats_level;
@@ -85,10 +85,10 @@ struct write_fn {
     // write the data for inspection
     auto sink_info =
       cudf::io::sink_info(output_path + "/table_" + std::to_string(thread_id) + ".parquet");
-    auto builder = cudf::io::parquet_writer_options::builder(sink_info, tables[thread_id]->view())
+    auto builder = cudf::io::parquet_writer_options::builder(sink_info, table_views[thread_id])
                      .compression(compression)
                      .stats_level(stats_level.value_or(cudf::io::statistics_freq::STATISTICS_NONE));
-    auto table_metadata = cudf::io::table_input_metadata{tables[thread_id]->view()};
+    auto table_metadata = cudf::io::table_input_metadata{table_views[thread_id]};
 
     std::for_each(table_metadata.column_metadata.begin(),
                   table_metadata.column_metadata.end(),
@@ -246,8 +246,8 @@ int main(int argc, char const** argv)
     return tables;
   };
 
-  // Lambda function to setup and launch multithread parquet write
-  auto const write_parquet_multithreaded = [&](std::vector<table_t> const& tables) {
+  // Lambda function to setup and launch multithreaded parquet writes
+  auto const write_parquet_multithreaded = [&](std::vector<cudf::table_view> const& tables) {
     // Tasks to read each parquet file
     std::vector<write_fn> write_tasks;
     write_tasks.reserve(thread_count);
@@ -271,23 +271,33 @@ int main(int argc, char const** argv)
 
   // Read the parquet files with multiple threads
   {
-    std::cout << "Note: Not timing the initial parquet read as it may include\n"
-                 "times for nvcomp, cufile loading and RMM growth."
-              << std::endl
-              << std::endl;
+    fmt::print(
+      "Note: Not timing the initial parquet read as it may include\n"
+      "times for nvcomp, cufile loading and RMM growth.\n\n");
     // Tables read by each thread
     auto const tables = read_parquet_multithreaded(input_files);
     // In case some kernels are still running on the default stre
     default_stream.synchronize();
 
-    // Write parquet file with the specified encoding and compression
+    // Construct a vector of table views for write_parquet_multithreaded
+    auto const table_views = [&tables]() {
+      std::vector<cudf::table_view> table_views;
+      table_views.reserve(tables.size());
+
+      std::transform(
+        tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) {
+          return tbl->view();
+        });
+      return table_views;
+    }();
+
+    // Write tables to parquet with the specified encoding and compression
     auto const page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
-    std::cout << "Writing at: " << output_path << " with encoding, compression and "
-              << page_stat_string << ".." << std::endl;
+    fmt::print(
+      "Writing at: {} with encoding, compression and {}..\n", output_path, page_stat_string);
 
-    // Write tables using multiple threads
     cudf::examples::timer timer;
-    write_parquet_multithreaded(tables);
+    write_parquet_multithreaded(table_views);
     // In case some kernels are still running on the default stream
     default_stream.synchronize();
     // Print elapsed time
@@ -296,7 +306,7 @@ int main(int argc, char const** argv)
 
   // Re-read the same parquet files with multiple threads
   {
-    std::cout << "Reading for the second time using " << thread_count << " threads..." << std::endl;
+    fmt::print("Reading for the second time using {} threads...\n", thread_count);
     cudf::examples::timer timer;
     auto const input_table =
       concatenate_tables(read_parquet_multithreaded(input_files), default_stream);
@@ -305,7 +315,7 @@ int main(int argc, char const** argv)
     // Print elapsed time and peak memory
     timer.print_elapsed_millis();
 
-    std::cout << "Reading transcoded files using " << thread_count << " threads..." << std::endl;
+    fmt::print("Reading transcoded files using {} threads...\n", thread_count);
     timer.reset();
     auto const transcoded_table = concatenate_tables(
       read_parquet_multithreaded(extract_input_files(output_path)), default_stream);
@@ -314,7 +324,7 @@ int main(int argc, char const** argv)
     // Print elapsed time and peak memory
     timer.print_elapsed_millis();
 
-    std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n";
+    fmt::print("Peak memory: {} MB\n\n", (stats_mr.get_bytes_counter().peak / 1048576.0));
 
     // Check for validity
     check_identical_tables(input_table->view(), transcoded_table->view());