Skip to content

Commit

Permalink
Cap the absolute row index per pass in parquet chunked reader. (#15735)
Browse files Browse the repository at this point in the history
Fixes  #15690

There was an issue when computing page row counts/indices at the pass level in the chunked reader.  Because we estimate list row counts for pages we have not yet decompressed, this can sometimes lead to estimates row counts that are larger than the actual (known) number of rows for a pass.  This caused an out-of-bounds read down the line.  We were already handling this at the subpass level, just not at the pass level.

Also includes some fixes in debug logging code that is #ifdef'd out.

Authors:
  - https://github.com/nvdbaranec
  - David Wendt (https://github.com/davidwendt)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #15735
  • Loading branch information
nvdbaranec authored May 16, 2024
1 parent ec07927 commit 4e87069
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ void print_cumulative_page_info(device_span<PageInfo const> d_pages,
printf("\tP %s: {%lu, %lu, %lu}\n",
is_list ? "(L)" : "",
pidx,
c_info[pidx].row_index,
c_info[pidx].end_row_index,
c_info[pidx].size_bytes);
}
}
@@ -121,16 +121,17 @@ void print_cumulative_row_info(host_span<cumulative_page_info const> sizes,
printf("------------\nCumulative sizes %s (index, row_index, size_bytes, page_key)\n",
label.c_str());
for (size_t idx = 0; idx < sizes.size(); idx++) {
printf("{%lu, %lu, %lu, %d}", idx, sizes[idx].row_index, sizes[idx].size_bytes, sizes[idx].key);
printf(
"{%lu, %lu, %lu, %d}", idx, sizes[idx].end_row_index, sizes[idx].size_bytes, sizes[idx].key);
if (splits.has_value()) {
// if we have a split at this row count and this is the last instance of this row count
auto start = thrust::make_transform_iterator(splits->begin(),
[](row_range const& i) { return i.skip_rows; });
auto end = start + splits->size();
auto split = std::find(start, end, sizes[idx].row_index);
auto split = std::find(start, end, sizes[idx].end_row_index);
auto const split_index = [&]() -> int {
if (split != end &&
((idx == sizes.size() - 1) || (sizes[idx + 1].row_index > sizes[idx].row_index))) {
if (split != end && ((idx == sizes.size() - 1) ||
(sizes[idx + 1].end_row_index > sizes[idx].end_row_index))) {
return static_cast<int>(std::distance(start, split));
}
return idx == 0 ? 0 : -1;
@@ -259,8 +260,9 @@ struct set_row_index {
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
// this cap is necessary because in the chunked reader, we use estimations for the row
// counts for list columns, which can result in values > than the absolute number of rows.
c_info[i].end_row_index = min(max_row, page_end_row);
}
};

@@ -461,6 +463,7 @@ adjust_cumulative_sizes(device_span<cumulative_page_info const> c_info,
thrust::make_discard_iterator(),
key_offsets.begin())
.second;

size_t const num_unique_keys = key_offsets_end - key_offsets.begin();
thrust::exclusive_scan(
rmm::exec_policy_nosync(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin());
@@ -1292,10 +1295,12 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds)
printf("\tnum_rows: %'lu\n", pass.num_rows);
printf("\tbase mem usage: %'lu\n", pass.base_mem_size);
auto const num_columns = _input_columns.size();
std::vector<size_type> h_page_offsets =
cudf::detail::make_std_vector_sync(pass.page_offsets, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: num_pages(%'d)\n",
c_idx,
pass.page_offsets[c_idx + 1] - pass.page_offsets[c_idx]);
h_page_offsets[c_idx + 1] - h_page_offsets[c_idx]);
}
#endif

@@ -1362,11 +1367,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
// can be considerable.
include_decompression_scratch_size(pass.chunks, pass.pages, c_info, _stream);

auto iter = thrust::make_counting_iterator(0);
auto iter = thrust::make_counting_iterator(0);
auto const pass_max_row = pass.skip_rows + pass.num_rows;
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_row_index{pass.chunks, pass.pages, c_info, 0});
set_row_index{pass.chunks, pass.pages, c_info, pass_max_row});
// print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream);

// get the next batch of pages
@@ -1448,11 +1454,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
printf("\t\tTotal expected usage: %'lu\n",
total_expected_size == 0 ? subpass.decomp_page_data.size() + pass.base_mem_size
: total_expected_size + pass.base_mem_size);
std::vector<page_span> h_page_indices = cudf::detail::make_std_vector_sync(page_indices, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: pages(%'lu - %'lu)\n",
c_idx,
page_indices[c_idx].start,
page_indices[c_idx].end);
h_page_indices[c_idx].start,
h_page_indices[c_idx].end);
}
printf("\t\tOutput chunks:\n");
for (size_t idx = 0; idx < subpass.output_chunk_read_info.size(); idx++) {

0 comments on commit 4e87069

Please sign in to comment.