From 3d07509deb9f589e1f986dc7f822392467ffcdde Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 1 Nov 2024 19:50:57 -0700 Subject: [PATCH] Add `num_iterations` axis to the multi-threaded Parquet benchmarks (#17231) Added an axis that controls the number of times each thread reads its input. Running with a higher number of iterations should better show how work from different threads pipelines. The new axis, "num_iterations", is added to all multi-threaded Parquet reader benchmarks. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Paul Mattione (https://github.com/pmattione-nvidia) URL: https://github.com/rapidsai/cudf/pull/17231 --- .../io/parquet/parquet_reader_multithread.cpp | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index 7121cb9f034..bf7039269bc 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -45,6 +45,7 @@ std::string get_label(std::string const& test_name, nvbench::state const& state) auto const num_cols = state.get_int64("num_cols"); size_t const read_size_mb = get_read_size(state) / (1024 * 1024); return {test_name + ", " + std::to_string(num_cols) + " columns, " + + std::to_string(state.get_int64("num_iterations")) + " iterations, " + std::to_string(state.get_int64("num_threads")) + " threads " + " (" + std::to_string(read_size_mb) + " MB each)"}; } @@ -90,9 +91,10 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, std::vector const& d_types, std::string const& label) { - size_t const data_size = state.get_int64("total_data_size"); - auto const num_threads = state.get_int64("num_threads"); - auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + auto const num_iterations = state.get_int64("num_iterations"); + auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); BS::thread_pool threads(num_threads); @@ -109,12 +111,15 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, nvtxRangePushA(("(read) " + label).c_str()); state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, - [&](nvbench::launch& launch, auto& timer) { + [&, num_files = num_files](nvbench::launch& launch, auto& timer) { auto read_func = [&](int index) { auto const stream = streams[index % num_threads]; cudf::io::parquet_reader_options read_opts = cudf::io::parquet_reader_options::builder(source_info_vector[index]); - cudf::io::read_parquet(read_opts, stream, cudf::get_current_device_resource_ref()); + for (int i = 0; i < num_iterations; ++i) { + cudf::io::read_parquet( + read_opts, stream, cudf::get_current_device_resource_ref()); + } }; threads.pause(); @@ -128,7 +133,8 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, nvtxRangePop(); auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); - state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_element_count(num_iterations * static_cast(data_size) / time, + "bytes_per_second"); state.add_buffer_size( mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); @@ -173,6 +179,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, { size_t const data_size = state.get_int64("total_data_size"); auto const num_threads = state.get_int64("num_threads"); + auto const num_iterations = state.get_int64("num_iterations"); size_t const input_limit = state.get_int64("input_limit"); size_t const output_limit = state.get_int64("output_limit"); auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); @@ -192,22 +199,25 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, nvtxRangePushA(("(read) " + label).c_str()); std::vector chunks; state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, - [&](nvbench::launch& launch, auto& timer) { + [&, num_files = num_files](nvbench::launch& launch, auto& timer) { auto read_func = [&](int index) { auto const stream = streams[index % num_threads]; cudf::io::parquet_reader_options read_opts = cudf::io::parquet_reader_options::builder(source_info_vector[index]); - // divide chunk limits by number of threads so the number of chunks produced is the - // same for all cases. this seems better than the alternative, which is to keep the - // limits the same. if we do that, as the number of threads goes up, the number of - // chunks goes down - so are actually benchmarking the same thing in that case? - auto reader = cudf::io::chunked_parquet_reader( - output_limit / num_threads, input_limit / num_threads, read_opts, stream); - - // read all the chunks - do { - auto table = reader.read_chunk(); - } while (reader.has_next()); + for (int i = 0; i < num_iterations; ++i) { + // divide chunk limits by number of threads so the number of chunks produced is + // the same for all cases. this seems better than the alternative, which is to + // keep the limits the same. if we do that, as the number of threads goes up, the + // number of chunks goes down - so are actually benchmarking the same thing in + // that case? + auto reader = cudf::io::chunked_parquet_reader( + output_limit / num_threads, input_limit / num_threads, read_opts, stream); + + // read all the chunks + do { + auto table = reader.read_chunk(); + } while (reader.has_next()); + } }; threads.pause(); @@ -221,7 +231,8 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, nvtxRangePop(); auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); - state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_element_count(num_iterations * static_cast(data_size) / time, + "bytes_per_second"); state.add_buffer_size( mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); @@ -267,6 +278,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_string_axis("io_type", {"PINNED_BUFFER"}); @@ -277,6 +289,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_string_axis("io_type", {"PINNED_BUFFER"}); @@ -287,6 +300,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_string) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_string_axis("io_type", {"PINNED_BUFFER"}); @@ -297,6 +311,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_list) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_string_axis("io_type", {"PINNED_BUFFER"}); @@ -308,6 +323,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) @@ -320,6 +336,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) @@ -332,6 +349,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024}) @@ -344,6 +362,7 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) .add_int64_axis("cardinality", {1000}) .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_iterations", {1}) .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) .add_int64_axis("input_limit", {640 * 1024 * 1024})