From 4e87069bd43ee969797265eaed00f82eda255dd4 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Wed, 15 May 2024 22:14:30 -0500 Subject: [PATCH] Cap the absolute row index per pass in parquet chunked reader. (#15735) Fixes https://github.com/rapidsai/cudf/issues/15690 There was an issue when computing page row counts/indices at the pass level in the chunked reader. Because we estimate list row counts for pages we have not yet decompressed, this can sometimes lead to estimates row counts that are larger than the actual (known) number of rows for a pass. This caused an out-of-bounds read down the line. We were already handling this at the subpass level, just not at the pass level. Also includes some fixes in debug logging code that is #ifdef'd out. Authors: - https://github.com/nvdbaranec - David Wendt (https://github.com/davidwendt) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - David Wendt (https://github.com/davidwendt) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/15735 --- cpp/src/io/parquet/reader_impl_chunking.cu | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 912f53a8277..f4fb6bc57e6 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -101,7 +101,7 @@ void print_cumulative_page_info(device_span d_pages, printf("\tP %s: {%lu, %lu, %lu}\n", is_list ? "(L)" : "", pidx, - c_info[pidx].row_index, + c_info[pidx].end_row_index, c_info[pidx].size_bytes); } } @@ -121,16 +121,17 @@ void print_cumulative_row_info(host_span sizes, printf("------------\nCumulative sizes %s (index, row_index, size_bytes, page_key)\n", label.c_str()); for (size_t idx = 0; idx < sizes.size(); idx++) { - printf("{%lu, %lu, %lu, %d}", idx, sizes[idx].row_index, sizes[idx].size_bytes, sizes[idx].key); + printf( + "{%lu, %lu, %lu, %d}", idx, sizes[idx].end_row_index, sizes[idx].size_bytes, sizes[idx].key); if (splits.has_value()) { // if we have a split at this row count and this is the last instance of this row count auto start = thrust::make_transform_iterator(splits->begin(), [](row_range const& i) { return i.skip_rows; }); auto end = start + splits->size(); - auto split = std::find(start, end, sizes[idx].row_index); + auto split = std::find(start, end, sizes[idx].end_row_index); auto const split_index = [&]() -> int { - if (split != end && - ((idx == sizes.size() - 1) || (sizes[idx + 1].row_index > sizes[idx].row_index))) { + if (split != end && ((idx == sizes.size() - 1) || + (sizes[idx + 1].end_row_index > sizes[idx].end_row_index))) { return static_cast(std::distance(start, split)); } return idx == 0 ? 0 : -1; @@ -259,8 +260,9 @@ struct set_row_index { auto const& page = pages[i]; auto const& chunk = chunks[page.chunk_idx]; size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows; - // if we have been passed in a cap, apply it - c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row; + // this cap is necessary because in the chunked reader, we use estimations for the row + // counts for list columns, which can result in values > than the absolute number of rows. + c_info[i].end_row_index = min(max_row, page_end_row); } }; @@ -461,6 +463,7 @@ adjust_cumulative_sizes(device_span c_info, thrust::make_discard_iterator(), key_offsets.begin()) .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); thrust::exclusive_scan( rmm::exec_policy_nosync(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); @@ -1292,10 +1295,12 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds) printf("\tnum_rows: %'lu\n", pass.num_rows); printf("\tbase mem usage: %'lu\n", pass.base_mem_size); auto const num_columns = _input_columns.size(); + std::vector h_page_offsets = + cudf::detail::make_std_vector_sync(pass.page_offsets, _stream); for (size_t c_idx = 0; c_idx < num_columns; c_idx++) { printf("\t\tColumn %'lu: num_pages(%'d)\n", c_idx, - pass.page_offsets[c_idx + 1] - pass.page_offsets[c_idx]); + h_page_offsets[c_idx + 1] - h_page_offsets[c_idx]); } #endif @@ -1362,11 +1367,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) // can be considerable. include_decompression_scratch_size(pass.chunks, pass.pages, c_info, _stream); - auto iter = thrust::make_counting_iterator(0); + auto iter = thrust::make_counting_iterator(0); + auto const pass_max_row = pass.skip_rows + pass.num_rows; thrust::for_each(rmm::exec_policy_nosync(_stream), iter, iter + pass.pages.size(), - set_row_index{pass.chunks, pass.pages, c_info, 0}); + set_row_index{pass.chunks, pass.pages, c_info, pass_max_row}); // print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream); // get the next batch of pages @@ -1448,11 +1454,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) printf("\t\tTotal expected usage: %'lu\n", total_expected_size == 0 ? subpass.decomp_page_data.size() + pass.base_mem_size : total_expected_size + pass.base_mem_size); + std::vector h_page_indices = cudf::detail::make_std_vector_sync(page_indices, _stream); for (size_t c_idx = 0; c_idx < num_columns; c_idx++) { printf("\t\tColumn %'lu: pages(%'lu - %'lu)\n", c_idx, - page_indices[c_idx].start, - page_indices[c_idx].end); + h_page_indices[c_idx].start, + h_page_indices[c_idx].end); } printf("\t\tOutput chunks:\n"); for (size_t idx = 0; idx < subpass.output_chunk_read_info.size(); idx++) {