From ee206566a9e4f45ec403f872e6806698ccf00d80 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Wed, 18 Sep 2024 10:54:48 -0400 Subject: [PATCH 1/2] Simplify future-related code and enable concurrency --- cpp/src/io/parquet/reader_impl.hpp | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 36 +++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 2d46da14bec..1d8d05d0766 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -191,7 +191,7 @@ class reader::impl { * @return pair of boolean indicating if compressed chunks were found and a vector of futures for * read completion */ - std::pair>> read_column_chunks(); + std::pair> read_column_chunks(); /** * @brief Read compressed data and page information for the current pass. diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 8e67f233213..df7b5025200 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -276,8 +276,14 @@ void generate_depth_remappings( } } auto sync_fn = [](decltype(read_tasks) read_tasks) { - for (auto& task : read_tasks) { - task.wait(); + std::vector threads; + for (std::size_t task_idx = 0; task_idx < read_tasks.size(); ++task_idx) { + threads.emplace_back([](std::future& task) { task.wait(); }, + std::ref(read_tasks[task_idx])); + } + + for (auto&& thread : threads) { + thread.join(); } }; return std::async(std::launch::deferred, sync_fn, std::move(read_tasks)); @@ -964,7 +970,7 @@ void reader::impl::allocate_level_decode_space() } } -std::pair>> reader::impl::read_column_chunks() +std::pair> reader::impl::read_column_chunks() { auto const& row_groups_info = _pass_itm_data->row_groups; @@ -989,7 +995,6 @@ std::pair>> reader::impl::read_column_chunks // TODO: make this respect the pass-wide skip_rows/num_rows instead of the file-wide // skip_rows/num_rows // auto remaining_rows = num_rows; - std::vector> read_chunk_tasks; size_type chunk_count = 0; for (auto const& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); @@ -1018,16 +1023,15 @@ std::pair>> reader::impl::read_column_chunks } // Read compressed chunk data to device memory - read_chunk_tasks.push_back(read_column_chunks_async(_sources, - raw_page_data, - chunks, - 0, - chunks.size(), - column_chunk_offsets, - chunk_source_map, - _stream)); - - return {total_decompressed_size > 0, std::move(read_chunk_tasks)}; + return {total_decompressed_size > 0, + read_column_chunks_async(_sources, + raw_page_data, + chunks, + 0, + chunks.size(), + column_chunk_offsets, + chunk_source_map, + _stream)}; } void reader::impl::read_compressed_data() @@ -1042,9 +1046,7 @@ void reader::impl::read_compressed_data() auto const [has_compressed_data, read_chunks_tasks] = read_column_chunks(); pass.has_compressed_data = has_compressed_data; - for (auto& task : read_chunks_tasks) { - task.wait(); - } + read_chunks_tasks.wait(); // Process dataset chunk pages into output columns auto const total_pages = _has_page_index ? count_page_headers_with_pgidx(chunks, _stream) From 2fcd21360de0808c6bc70190af81677544fc628a Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Mon, 23 Sep 2024 23:02:43 -0400 Subject: [PATCH 2/2] Revert some changes --- cpp/src/io/parquet/reader_impl.hpp | 2 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 1d8d05d0766..62ffc4d3077 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -188,7 +188,7 @@ class reader::impl { * * Does not decompress the chunk data. * - * @return pair of boolean indicating if compressed chunks were found and a vector of futures for + * @return pair of boolean indicating if compressed chunks were found and a future for * read completion */ std::pair> read_column_chunks(); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index df7b5025200..3763c2e8e6d 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -276,14 +276,8 @@ void generate_depth_remappings( } } auto sync_fn = [](decltype(read_tasks) read_tasks) { - std::vector threads; - for (std::size_t task_idx = 0; task_idx < read_tasks.size(); ++task_idx) { - threads.emplace_back([](std::future& task) { task.wait(); }, - std::ref(read_tasks[task_idx])); - } - - for (auto&& thread : threads) { - thread.join(); + for (auto& task : read_tasks) { + task.wait(); } }; return std::async(std::launch::deferred, sync_fn, std::move(read_tasks));