Skip to content

Commit

Permalink
Add more details to the example
Browse files Browse the repository at this point in the history
  • Loading branch information
mhaseeb123 committed Sep 18, 2024
1 parent 12adeeb commit a8ae50a
Showing 1 changed file with 64 additions and 17 deletions.
81 changes: 64 additions & 17 deletions cpp/examples/parquet_io/parquet_io_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

#include "common.hpp"

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/statistics_resource_adaptor.hpp>

#include <fmt/chrono.h>

#include <filesystem>

/**
Expand Down Expand Up @@ -136,15 +139,15 @@ int main(int argc, char const** argv)
}

// Process and extract all input files
auto const input_files = [&]() {
auto const extract_input_files = [thread_count = thread_count](std::string const& paths) {
std::vector<std::string> parquet_files;
std::vector<std::string> delimited_paths = [&]() {
std::vector<std::string> paths_list;
std::stringstream stream{input_paths};
std::stringstream stream{paths};
std::string path;
// extract the delimited paths.
// Extract the delimited paths.
while (std::getline(stream, path, char{','})) {
paths_list.push_back(path); // Add each token to the vector
paths_list.push_back(path);
}
return paths_list;
}();
Expand Down Expand Up @@ -175,13 +178,37 @@ int main(int argc, char const** argv)
}

return parquet_files;
}();
};

// Concatenate a vector of tables and return
auto const concatenate_tables = [](std::vector<table_t>& tables, rmm::cuda_stream_view stream) {
// Construct the final table
auto table = std::move(tables[0]);
std::for_each(tables.begin() + 1, tables.end(), [&](auto& tbl) {
std::vector<cudf::table_view> const table_views{table->view(), tbl->view()};
table = cudf::concatenate(table_views, stream);
});
return table;
};

// make input files from the input_paths string.
auto const input_files = extract_input_files(input_paths);

// Exit early if nothing to do.
if (not input_files.size()) { return 0; }
if (not input_files.size()) {
std::cerr << "No input files to read. Exiting early.\n";
return 0;
}

// Check if output path is a directory.
if (not std::filesystem::is_directory(std::filesystem::path{output_path})) {
// Check if output path is a valid
if (std::filesystem::is_directory({output_path})) {
// Create a new directory in output path if not empty.
if (not std::filesystem::is_empty({output_path})) {
output_path +=
"/output_" + fmt::format("{:%Y-%m-%d-%H-%M-%S}", std::chrono::system_clock::now());
std::filesystem::create_directory({output_path});
}
} else {
throw std::runtime_error("The provided output path is not a directory\n");
}

Expand Down Expand Up @@ -275,20 +302,40 @@ int main(int argc, char const** argv)
{
std::cout << "Reading for the second time using " << thread_count << " threads..." << std::endl;
cudf::examples::timer timer;
auto tables = read_parquet_multithreaded(input_files);
// Construct the final table
auto table = std::move(tables[0]);
std::for_each(tables.begin() + 1, tables.end(), [&](auto& tbl) {
std::vector<cudf::table_view> const table_views{table->view(), tbl->view()};
table = cudf::concatenate(table_views, default_stream);
});

auto tables = read_parquet_multithreaded(input_files);
auto const table = concatenate_tables(tables, default_stream);
// In case some kernels are still running on the default stream
default_stream.synchronize();
// Print elapsed time and peak memory
timer.print_elapsed_millis();

std::cout << "Reading transcoded files using " << thread_count << " threads..." << std::endl;
timer.reset();
auto transcoded_tables = read_parquet_multithreaded(extract_input_files(output_path));
auto const transcoded_table = concatenate_tables(transcoded_tables, default_stream);
// Print elapsed time and peak memory
timer.print_elapsed_millis();
std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n";

// In case some kernels are still running on the default stream
default_stream.synchronize();

std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n";

// Check for validity
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(
table->view(), transcoded_table->view(), cudf::null_equality::EQUAL, resource.get());

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl;
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
std::cout << "Transcoding valid: false" << std::endl;
}
}

return 0;
Expand Down

0 comments on commit a8ae50a

Please sign in to comment.