Skip to content

Commit

Permalink
Add num_iterations axis to the multi-threaded Parquet benchmarks (r…
Browse files Browse the repository at this point in the history
…apidsai#17231)

Added an axis that controls the number of times each thread reads its input. Running with a higher number of iterations should better show how work from different threads pipelines. 
The new axis, "num_iterations", is added to all multi-threaded Parquet reader benchmarks.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Paul Mattione (https://github.com/pmattione-nvidia)

URL: rapidsai#17231
  • Loading branch information
vuule authored Nov 2, 2024
1 parent 6ce9ea4 commit 3d07509
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::string get_label(std::string const& test_name, nvbench::state const& state)
auto const num_cols = state.get_int64("num_cols");
size_t const read_size_mb = get_read_size(state) / (1024 * 1024);
return {test_name + ", " + std::to_string(num_cols) + " columns, " +
std::to_string(state.get_int64("num_iterations")) + " iterations, " +
std::to_string(state.get_int64("num_threads")) + " threads " + " (" +
std::to_string(read_size_mb) + " MB each)"};
}
Expand Down Expand Up @@ -90,9 +91,10 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
std::vector<cudf::type_id> const& d_types,
std::string const& label)
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const num_iterations = state.get_int64("num_iterations");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
BS::thread_pool threads(num_threads);
Expand All @@ -109,12 +111,15 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,

nvtxRangePushA(("(read) " + label).c_str());
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
[&, num_files = num_files](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_info_vector[index]);
cudf::io::read_parquet(read_opts, stream, cudf::get_current_device_resource_ref());
for (int i = 0; i < num_iterations; ++i) {
cudf::io::read_parquet(
read_opts, stream, cudf::get_current_device_resource_ref());
}
};

threads.pause();
Expand All @@ -128,7 +133,8 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(num_iterations * static_cast<double>(data_size) / time,
"bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
Expand Down Expand Up @@ -173,6 +179,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const num_iterations = state.get_int64("num_iterations");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
Expand All @@ -192,22 +199,25 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
nvtxRangePushA(("(read) " + label).c_str());
std::vector<cudf::io::table_with_metadata> chunks;
state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer,
[&](nvbench::launch& launch, auto& timer) {
[&, num_files = num_files](nvbench::launch& launch, auto& timer) {
auto read_func = [&](int index) {
auto const stream = streams[index % num_threads];
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_info_vector[index]);
// divide chunk limits by number of threads so the number of chunks produced is the
// same for all cases. this seems better than the alternative, which is to keep the
// limits the same. if we do that, as the number of threads goes up, the number of
// chunks goes down - so are actually benchmarking the same thing in that case?
auto reader = cudf::io::chunked_parquet_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
for (int i = 0; i < num_iterations; ++i) {
// divide chunk limits by number of threads so the number of chunks produced is
// the same for all cases. this seems better than the alternative, which is to
// keep the limits the same. if we do that, as the number of threads goes up, the
// number of chunks goes down - so are actually benchmarking the same thing in
// that case?
auto reader = cudf::io::chunked_parquet_reader(
output_limit / num_threads, input_limit / num_threads, read_opts, stream);

// read all the chunks
do {
auto table = reader.read_chunk();
} while (reader.has_next());
}
};

threads.pause();
Expand All @@ -221,7 +231,8 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
nvtxRangePop();

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_element_count(num_iterations * static_cast<double>(data_size) / time,
"bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size");
Expand Down Expand Up @@ -267,6 +278,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -277,6 +289,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -287,6 +300,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_string)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -297,6 +311,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_list)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});
Expand All @@ -308,6 +323,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -320,6 +336,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -332,6 +349,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand All @@ -344,6 +362,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list)
.add_int64_axis("cardinality", {1000})
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_iterations", {1})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
Expand Down

0 comments on commit 3d07509

Please sign in to comment.