Skip to content

Commit

Permalink
PR review changes. Explicitly force the writer to not use dictionary …
Browse files Browse the repository at this point in the history
…encoding to keep the hardcoded uncompressed size predictable.
  • Loading branch information
nvdbaranec committed Sep 27, 2023
1 parent ab95c94 commit 167e664
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ struct pass_intermediate_data {
std::vector<gpu::chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data;
int level_type_size;
rmm::device_buffer level_decode_data{};
int level_type_size{0};

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
Expand Down
24 changes: 13 additions & 11 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,9 @@ void reader::impl::compute_input_pass_row_group_info()
//
std::size_t const read_limit =
_input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits<std::size_t>::max();
std::size_t cur_read = 0;
std::size_t cur_rg_start = 0;
std::size_t cur_row_count = 0;
std::size_t cur_pass_byte_size = 0;
std::size_t cur_rg_start = 0;
std::size_t cur_row_count = 0;
_input_pass_row_group_offsets.push_back(0);
_input_pass_row_count.push_back(0);

Expand All @@ -877,23 +877,25 @@ void reader::impl::compute_input_pass_row_group_info()
auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index);

// can we add this row group
if (cur_read + row_group.total_byte_size >= read_limit) {
// always need to add at least 1 row group, so add ourselves
if (cur_pass_byte_size + row_group.total_byte_size >= read_limit) {
// A single row group (the current one) is larger than the read limit:
// We always need to include at least one row group, so end the pass at the end of the current
// row group
if (cur_rg_start == cur_rg_index) {
_input_pass_row_group_offsets.push_back(cur_rg_index + 1);
_input_pass_row_count.push_back(cur_row_count + row_group.num_rows);
cur_rg_start = cur_rg_index + 1;
cur_read = 0;
cur_rg_start = cur_rg_index + 1;
cur_pass_byte_size = 0;
}
// add the previous group
// End the pass at the end of the previous row group
else {
_input_pass_row_group_offsets.push_back(cur_rg_index);
_input_pass_row_count.push_back(cur_row_count);
cur_rg_start = cur_rg_index;
cur_read = row_group.total_byte_size;
cur_rg_start = cur_rg_index;
cur_pass_byte_size = row_group.total_byte_size;
}
} else {
cur_read += row_group.total_byte_size;
cur_pass_byte_size += row_group.total_byte_size;
}
cur_row_count += row_group.num_rows;
}
Expand Down
15 changes: 9 additions & 6 deletions cpp/tests/io/parquet_chunked_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -961,13 +961,16 @@ TEST_F(ParquetChunkedReaderTest, InputLimitSimple)
constexpr int num_rows = 25'000'000;
auto value_iter = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i; });
cudf::test::fixed_width_column_wrapper<int> expected(value_iter, value_iter + num_rows);
cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder(
cudf::io::sink_info{filepath}, cudf::table_view{{expected}});
cudf::io::write_parquet(opts);
cudf::io::parquet_writer_options opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath},
cudf::table_view{{expected}})
// note: it is unnecessary to force compression to NONE here because the size we are using in
// the row group is the uncompressed data size. But forcing the dictionary policy to
// dictionary_policy::NEVER is necessary to prevent changes in the
// decompressed-but-not-yet-decoded data.
.dictionary_policy(cudf::io::dictionary_policy::NEVER);

// Note: some of these tests make explicit assumptions that the compressed size of the data in
// each row group will be 4001150. Changes to compression or other defaults may cause them to
// break (just requiring some tweaks).
cudf::io::write_parquet(opts);

{
// no chunking
Expand Down

0 comments on commit 167e664

Please sign in to comment.