Skip to content

Commit

Permalink
Add io_type axis with PINNED_BUFFER default for PQ multithreaded read…
Browse files Browse the repository at this point in the history
…er nvbench

Signed-off-by: Muhammad Haseeb <[email protected]>
  • Loading branch information
mhaseeb123 committed Sep 16, 2024
1 parent 124d3e3 commit 0bde532
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ std::string get_label(std::string const& test_name, nvbench::state const& state)
}

std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
nvbench::state& state, std::vector<cudf::type_id> const& d_types)
nvbench::state& state, std::vector<cudf::type_id> const& d_types, io_type const io_source_type)
{
cudf::size_type const cardinality = state.get_int64("cardinality");
cudf::size_type const run_length = state.get_int64("run_length");
Expand All @@ -63,7 +63,7 @@ std::tuple<std::vector<cuio_source_sink_pair>, size_t, size_t> write_file_data(
size_t total_file_size = 0;

for (size_t i = 0; i < num_files; ++i) {
cuio_source_sink_pair source_sink{io_type::HOST_BUFFER};
cuio_source_sink_pair source_sink{io_source_type};

auto const tbl = create_random_table(
cycle_dtypes(d_types, num_cols),
Expand Down Expand Up @@ -92,11 +92,13 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
{
size_t const data_size = state.get_int64("total_data_size");
auto const num_threads = state.get_int64("num_threads");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
auto [source_sink_vector, total_file_size, num_files] =
write_file_data(state, d_types, source_type);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
source_sink_vector.end(),
Expand Down Expand Up @@ -173,10 +175,12 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");
size_t const input_limit = state.get_int64("input_limit");
size_t const output_limit = state.get_int64("output_limit");
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
auto [source_sink_vector, total_file_size, num_files] =
write_file_data(state, d_types, source_type);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
source_sink_vector.end(),
Expand Down Expand Up @@ -264,7 +268,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed)
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width)
.set_name("parquet_multithreaded_read_decode_fixed_width")
Expand All @@ -273,7 +278,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width)
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_string)
.set_name("parquet_multithreaded_read_decode_string")
Expand All @@ -282,7 +288,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_string)
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_list)
.set_name("parquet_multithreaded_read_decode_list")
Expand All @@ -291,7 +298,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_list)
.add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024})
.add_int64_axis("num_threads", {1, 2, 4, 8})
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8});
.add_int64_axis("run_length", {8})
.add_string_axis("io_type", {"PINNED_BUFFER"});

// mixed data types: fixed width, strings
NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed)
Expand All @@ -303,7 +311,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});
.add_int64_axis("output_limit", {640 * 1024 * 1024})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width)
.set_name("parquet_multithreaded_read_decode_chunked_fixed_width")
Expand All @@ -314,7 +323,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});
.add_int64_axis("output_limit", {640 * 1024 * 1024})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string)
.set_name("parquet_multithreaded_read_decode_chunked_string")
Expand All @@ -325,7 +335,8 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});
.add_int64_axis("output_limit", {640 * 1024 * 1024})
.add_string_axis("io_type", {"PINNED_BUFFER"});

NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list)
.set_name("parquet_multithreaded_read_decode_chunked_list")
Expand All @@ -336,4 +347,5 @@ NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list)
.add_int64_axis("num_cols", {4})
.add_int64_axis("run_length", {8})
.add_int64_axis("input_limit", {640 * 1024 * 1024})
.add_int64_axis("output_limit", {640 * 1024 * 1024});
.add_int64_axis("output_limit", {640 * 1024 * 1024})
.add_string_axis("io_type", {"PINNED_BUFFER"});

0 comments on commit 0bde532

Please sign in to comment.