Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example to demonstrate multithreaded read_parquet pipelines #16828

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ff2480b
Add the new multithreaded parquet example
mhaseeb123 Sep 18, 2024
d06f7f2
Set the default output path to the current path
mhaseeb123 Sep 18, 2024
c13a408
Style fix
mhaseeb123 Sep 18, 2024
12adeeb
Use stream pool for parquet write as well
mhaseeb123 Sep 18, 2024
a8ae50a
Add more details to the example
mhaseeb123 Sep 18, 2024
6679f89
Minor improvements
mhaseeb123 Sep 18, 2024
e04602c
Minor improvement
mhaseeb123 Sep 18, 2024
21ce7c7
Minor improvements
mhaseeb123 Sep 18, 2024
b649530
Merge branch 'branch-24.10' into fea-parquet-multithreaded-example
mhaseeb123 Sep 18, 2024
b8b8bb9
Move the vector to concatenate tables
mhaseeb123 Sep 19, 2024
188ce11
Minor improvement
mhaseeb123 Sep 23, 2024
1827654
Merge branch 'branch-24.10' into fea-parquet-multithreaded-example
mhaseeb123 Sep 23, 2024
e14916f
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 23, 2024
a528eb3
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 26, 2024
990f2bb
Make multithreaded parquet io example more sophisticated
mhaseeb123 Sep 27, 2024
06817d0
Minor updates
mhaseeb123 Sep 27, 2024
9dc8a1e
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 27, 2024
af8ec6a
Minor improvements
mhaseeb123 Sep 27, 2024
d3778cc
Set default thread count = 1 instead of 2
mhaseeb123 Sep 27, 2024
c2b39cc
Minor improvement
mhaseeb123 Sep 27, 2024
8f39fb2
Add io source types
mhaseeb123 Oct 1, 2024
d0c2a62
Minor comment updates
mhaseeb123 Oct 1, 2024
945c0c0
Style fix and add to CI.
mhaseeb123 Oct 1, 2024
f30c801
Minor improvement
mhaseeb123 Oct 1, 2024
f701e34
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 1, 2024
719bfb6
Updates
mhaseeb123 Oct 1, 2024
2ade064
Style fix.
mhaseeb123 Oct 1, 2024
b559eaf
Print message when skipping a subdirectory
mhaseeb123 Oct 2, 2024
73de5bc
Update cpp/examples/parquet_io/io_source.hpp
mhaseeb123 Oct 2, 2024
52e6953
Update cpp/examples/parquet_io/common_utils.cpp
mhaseeb123 Oct 2, 2024
6194a50
Do not use `fmtlib`
mhaseeb123 Oct 2, 2024
3420c3f
Minor style fix
mhaseeb123 Oct 2, 2024
85d906d
Minor change
mhaseeb123 Oct 2, 2024
70ec6fd
Address minor nits from reviews
mhaseeb123 Oct 3, 2024
00390cd
Update cpp/examples/parquet_io/parquet_io_multithreaded.cpp
mhaseeb123 Oct 3, 2024
5ad8ecd
Move code to cpp files and minor refactoring
mhaseeb123 Oct 3, 2024
74763b0
Minor style fix
mhaseeb123 Oct 3, 2024
06afb49
Minor updates
mhaseeb123 Oct 4, 2024
db9aace
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 4, 2024
1a04409
Nits from code reviews
mhaseeb123 Oct 4, 2024
2fb523f
Merge branch 'fea-parquet-multithreaded-example' of https://github.co…
mhaseeb123 Oct 4, 2024
8be6710
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
2a6db5d
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
cc6242c
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
3a59027
Minor arg setting
mhaseeb123 Oct 10, 2024
60e5d75
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 10, 2024
7cfd7ae
Adjust spacing
mhaseeb123 Oct 11, 2024
d1fbad8
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 11, 2024
d9102f0
Apply suggestion
mhaseeb123 Oct 11, 2024
174e6c9
Merge branch 'fea-parquet-multithreaded-example' of https://github.co…
mhaseeb123 Oct 11, 2024
b61f18e
Minor
mhaseeb123 Oct 11, 2024
803e8c9
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cpp/examples/parquet_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ project(

include(../fetch_dependencies.cmake)

# Configure your project here
# Build and install parquet_io
add_executable(parquet_io parquet_io.cpp)
target_link_libraries(parquet_io PRIVATE cudf::cudf)
target_compile_features(parquet_io PRIVATE cxx_std_17)

install(TARGETS parquet_io DESTINATION bin/examples/libcudf)

# Build and install parquet_io_multithreaded
add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp)
target_link_libraries(parquet_io_multithreaded PRIVATE cudf::cudf)
target_compile_features(parquet_io_multithreaded PRIVATE cxx_std_17)
install(TARGETS parquet_io_multithreaded DESTINATION bin/examples/libcudf)

# Install the example.parquet file
install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf)
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

#pragma once

#include "../utilities/timer.hpp"

#include <cudf/concatenate.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/join.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/cuda_stream_pool.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <fmt/color.h>

#include <chrono>
#include <iostream>
#include <optional>
Expand Down Expand Up @@ -124,3 +130,29 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

return std::nullopt;
}

/**
* @brief Check if two tables are identical, throw an error otherwise
*
* @param lhs_table View to lhs table
* @param rhs_table View to rhs table
*/
inline void check_identical_tables(cudf::table_view const& lhs_table,
cudf::table_view const& rhs_table)
{
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(lhs_table, rhs_table, cudf::null_equality::EQUAL);

// No exception thrown, check indices
auto const valid = indices->size() == 0;
fmt::print(
fmt::emphasis::bold | fg(fmt::color::green_yellow), "Transcoding valid: {}\n", valid);
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error(
fmt::format(fmt::emphasis::bold | fg(fmt::color::red), "Transcoding valid: false\n"));
}
}
36 changes: 8 additions & 28 deletions cpp/examples/parquet_io/parquet_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
* limitations under the License.
*/

#include "parquet_io.hpp"

#include "../utilities/timer.hpp"
#include "common.hpp"

#include <cudf/utilities/default_stream.hpp>

Expand Down Expand Up @@ -130,50 +128,32 @@ int main(int argc, char const** argv)
// Read input parquet file
// We do not want to time the initial read time as it may include
// time for nvcomp, cufile loading and RMM growth
std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl;
std::cout << "Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth."
<< std::endl
<< std::endl;
fmt::print("\nReading {}...", input_filepath);
fmt::print(
"Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth.\n\n");
auto [input, metadata] = read_parquet(input_filepath);

// Status string to indicate if page stats are set to be written or not
auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
// Write parquet file with the specified encoding and compression
std::cout << "Writing " << output_filepath << " with encoding, compression and "
<< page_stat_string << ".." << std::endl;
fmt::print("Writing {} with encoding, compression and {}..\n", output_filepath, page_stat_string);

// `timer` is automatically started here
cudf::examples::timer timer;
write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats);
timer.print_elapsed_millis();

// Read the parquet file written with encoding and compression
std::cout << "Reading " << output_filepath << "..." << std::endl;
fmt::print("Reading {}...\n", output_filepath);

// Reset the timer
timer.reset();
auto [transcoded_input, transcoded_metadata] = read_parquet(output_filepath);
timer.print_elapsed_millis();

// Check for validity
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(input->view(),
transcoded_input->view(),
cudf::null_equality::EQUAL,
cudf::get_default_stream(),
resource.get());

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl;
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
std::cout << "Transcoding valid: false" << std::endl;
}
check_identical_tables(input->view(), transcoded_input->view());
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

return 0;
}
Loading
Loading