From 167e664b01b0c0cbfb6406a99860065b8e306681 Mon Sep 17 00:00:00 2001 From: db Date: Wed, 27 Sep 2023 17:48:32 -0500 Subject: [PATCH] PR review changes. Explicitly force the writer to not use dictionary encoding to keep the hardcoded uncompressed size predictable. --- cpp/src/io/parquet/parquet_gpu.hpp | 4 ++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 24 +++++++++++--------- cpp/tests/io/parquet_chunked_reader_test.cpp | 15 +++++++----- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 4e7b4cd68d8..572fe5c2820 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -369,8 +369,8 @@ struct pass_intermediate_data { std::vector output_chunk_read_info; std::size_t current_output_chunk{0}; - rmm::device_buffer level_decode_data; - int level_type_size; + rmm::device_buffer level_decode_data{}; + int level_type_size{0}; // skip_rows and num_rows values for this particular pass. these may be adjusted values from the // global values stored in file_intermediate_data. diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 59a5785e80d..c731c467f2c 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -866,9 +866,9 @@ void reader::impl::compute_input_pass_row_group_info() // std::size_t const read_limit = _input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits::max(); - std::size_t cur_read = 0; - std::size_t cur_rg_start = 0; - std::size_t cur_row_count = 0; + std::size_t cur_pass_byte_size = 0; + std::size_t cur_rg_start = 0; + std::size_t cur_row_count = 0; _input_pass_row_group_offsets.push_back(0); _input_pass_row_count.push_back(0); @@ -877,23 +877,25 @@ void reader::impl::compute_input_pass_row_group_info() auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); // can we add this row group - if (cur_read + row_group.total_byte_size >= read_limit) { - // always need to add at least 1 row group, so add ourselves + if (cur_pass_byte_size + row_group.total_byte_size >= read_limit) { + // A single row group (the current one) is larger than the read limit: + // We always need to include at least one row group, so end the pass at the end of the current + // row group if (cur_rg_start == cur_rg_index) { _input_pass_row_group_offsets.push_back(cur_rg_index + 1); _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); - cur_rg_start = cur_rg_index + 1; - cur_read = 0; + cur_rg_start = cur_rg_index + 1; + cur_pass_byte_size = 0; } - // add the previous group + // End the pass at the end of the previous row group else { _input_pass_row_group_offsets.push_back(cur_rg_index); _input_pass_row_count.push_back(cur_row_count); - cur_rg_start = cur_rg_index; - cur_read = row_group.total_byte_size; + cur_rg_start = cur_rg_index; + cur_pass_byte_size = row_group.total_byte_size; } } else { - cur_read += row_group.total_byte_size; + cur_pass_byte_size += row_group.total_byte_size; } cur_row_count += row_group.num_rows; } diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp index 744c1d94527..05fb9a3ec48 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cpp +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -961,13 +961,16 @@ TEST_F(ParquetChunkedReaderTest, InputLimitSimple) constexpr int num_rows = 25'000'000; auto value_iter = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i; }); cudf::test::fixed_width_column_wrapper expected(value_iter, value_iter + num_rows); - cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder( - cudf::io::sink_info{filepath}, cudf::table_view{{expected}}); - cudf::io::write_parquet(opts); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, + cudf::table_view{{expected}}) + // note: it is unnecessary to force compression to NONE here because the size we are using in + // the row group is the uncompressed data size. But forcing the dictionary policy to + // dictionary_policy::NEVER is necessary to prevent changes in the + // decompressed-but-not-yet-decoded data. + .dictionary_policy(cudf::io::dictionary_policy::NEVER); - // Note: some of these tests make explicit assumptions that the compressed size of the data in - // each row group will be 4001150. Changes to compression or other defaults may cause them to - // break (just requiring some tweaks). + cudf::io::write_parquet(opts); { // no chunking