From eb7b50a293f47afac8ba4166c7bb0059d940b1c9 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 20 May 2024 18:33:02 -0700 Subject: [PATCH] Support filtered I/O in `chunked_parquet_reader` and simplify the use of `parquet_reader_options` (#15764) This PR does the following: 1. It enables the support for filtered I/O in chunked parquet reader. 2. It simplifies the use of `parquet_reader_options` in `parquet::readers` by taking and saving the options at reader construction for later use instead of passing around `options` as arguments from `read()`, `has_next()` and `chunked_read()` to `prepare_data()`, `read_chunk_internal()` and several other internal APIs. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Vukasin Milovanovic (https://github.com/vuule) - Karthikeyan (https://github.com/karthikeyann) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15764 --- cpp/include/cudf/io/detail/parquet.hpp | 4 +- cpp/src/io/functions.cpp | 2 +- cpp/src/io/parquet/reader.cpp | 12 +- cpp/src/io/parquet/reader_impl.cpp | 89 ++++++-------- cpp/src/io/parquet/reader_impl.hpp | 118 +++++++++---------- cpp/src/io/parquet/reader_impl_chunking.cu | 20 ++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 29 ++--- 7 files changed, 116 insertions(+), 158 deletions(-) diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index fcf5f0d9290..978216d971e 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -76,11 +76,9 @@ class reader { /** * @brief Reads the dataset as per given options. * - * @param options Settings for controlling reading behavior - * * @return The set of columns along with table metadata */ - table_with_metadata read(parquet_reader_options const& options); + table_with_metadata read(); }; /** diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 0358a1a6b86..3ba2facf276 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -548,7 +548,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options, auto reader = std::make_unique(std::move(datasources), options, stream, mr); - return reader->read(options); + return reader->read(); } parquet_metadata read_parquet_metadata(source_info const& src_info) diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 170f7503134..8dfd68cd9b8 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -32,17 +32,7 @@ reader::reader(std::vector>&& sources, reader::~reader() = default; -table_with_metadata reader::read(parquet_reader_options const& options) -{ - // if the user has specified custom row bounds - bool const uses_custom_row_bounds = - options.get_num_rows().has_value() || options.get_skip_rows() != 0; - return _impl->read(options.get_skip_rows(), - options.get_num_rows(), - uses_custom_row_bounds, - options.get_row_groups(), - options.get_filter()); -} +table_with_metadata reader::read() { return _impl->read(); } chunked_reader::chunked_reader(std::size_t chunk_read_limit, std::size_t pass_read_limit, diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index fba95093c9c..1bd2fae281c 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -46,7 +46,7 @@ inline bool is_treat_fixed_length_as_string(thrust::optional const& } // namespace -void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows) +void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_rows) { auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; @@ -88,7 +88,7 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row is_treat_fixed_length_as_string(chunk.logical_type); }); - if (!_has_page_index || uses_custom_row_bounds || has_flba) { + if (!_has_page_index || uses_custom_row_bounds(mode) || has_flba) { ComputePageStringSizes(subpass.pages, pass.chunks, delta_temp_buf, @@ -419,6 +419,10 @@ reader::impl::impl(std::size_t chunk_read_limit, rmm::device_async_resource_ref mr) : _stream{stream}, _mr{mr}, + _options{options.get_timestamp_type(), + options.get_skip_rows(), + options.get_num_rows(), + options.get_row_groups()}, _sources{std::move(sources)}, _output_chunk_read_limit{chunk_read_limit}, _input_pass_read_limit{pass_read_limit} @@ -427,11 +431,6 @@ reader::impl::impl(std::size_t chunk_read_limit, _metadata = std::make_unique(_sources, options.is_enabled_use_arrow_schema()); - // Override output timestamp resolution if requested - if (options.get_timestamp_type().id() != type_id::EMPTY) { - _timestamp_type = options.get_timestamp_type(); - } - // Strings may be returned as either string or categorical columns _strings_to_categorical = options.is_enabled_convert_strings_to_categories(); @@ -452,19 +451,21 @@ reader::impl::impl(std::size_t chunk_read_limit, filter_columns_names, options.is_enabled_use_pandas_metadata(), _strings_to_categorical, - _timestamp_type.id()); + _options.timestamp_type.id()); // Save the states of the output buffers for reuse in `chunk_read()`. for (auto const& buff : _output_buffers) { _output_buffers_template.emplace_back(cudf::io::detail::inline_column_buffer::empty_like(buff)); } + + // Save the name to reference converter to extract output filter AST in + // `preprocess_file()` and `finalize_output()` + table_metadata metadata; + populate_metadata(metadata); + _expr_conv = named_to_reference_converter(options.get_filter(), metadata); } -void reader::impl::prepare_data(int64_t skip_rows, - std::optional const& num_rows, - bool uses_custom_row_bounds, - host_span const> row_group_indices, - std::optional> filter) +void reader::impl::prepare_data(read_mode mode) { // if we have not preprocessed at the whole-file level, do that now if (!_file_preprocessed) { @@ -472,14 +473,12 @@ void reader::impl::prepare_data(int64_t skip_rows, // - read row group information // - setup information on (parquet) chunks // - compute schedule of input passes - preprocess_file(skip_rows, num_rows, row_group_indices, filter); + preprocess_file(mode); } // handle any chunking work (ratcheting through the subpasses and chunks within // our current pass) if in bounds - if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) { - handle_chunking(uses_custom_row_bounds); - } + if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) { handle_chunking(mode); } } void reader::impl::populate_metadata(table_metadata& out_metadata) @@ -498,8 +497,7 @@ void reader::impl::populate_metadata(table_metadata& out_metadata) out_metadata.per_file_user_data[0].end()}; } -table_with_metadata reader::impl::read_chunk_internal( - bool uses_custom_row_bounds, std::optional> filter) +table_with_metadata reader::impl::read_chunk_internal(read_mode mode) { // If `_output_metadata` has been constructed, just copy it over. auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{}; @@ -510,17 +508,17 @@ table_with_metadata reader::impl::read_chunk_internal( out_columns.reserve(_output_buffers.size()); // no work to do (this can happen on the first pass if we have no rows to read) - if (!has_more_work()) { return finalize_output(out_metadata, out_columns, filter); } + if (!has_more_work()) { return finalize_output(out_metadata, out_columns); } auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; auto const& read_info = subpass.output_chunk_read_info[subpass.current_output_chunk]; // Allocate memory buffers for the output columns. - allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); + allocate_columns(mode, read_info.skip_rows, read_info.num_rows); // Parse data into the output buffers. - decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows); + decode_page_data(mode, read_info.skip_rows, read_info.num_rows); // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { @@ -547,13 +545,11 @@ table_with_metadata reader::impl::read_chunk_internal( } // Add empty columns if needed. Filter output columns based on filter. - return finalize_output(out_metadata, out_columns, filter); + return finalize_output(out_metadata, out_columns); } -table_with_metadata reader::impl::finalize_output( - table_metadata& out_metadata, - std::vector>& out_columns, - std::optional> filter) +table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata, + std::vector>& out_columns) { // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) { @@ -581,10 +577,13 @@ table_with_metadata reader::impl::finalize_output( // increment the output chunk count _file_itm_data._output_chunk_count++; - if (filter.has_value()) { + // check if the output filter AST expression (= _expr_conv.get_converted_expr()) exists + if (_expr_conv.get_converted_expr().has_value()) { auto read_table = std::make_unique(std::move(out_columns)); - auto predicate = cudf::detail::compute_column( - *read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource()); + auto predicate = cudf::detail::compute_column(*read_table, + _expr_conv.get_converted_expr().value().get(), + _stream, + rmm::mr::get_current_device_resource()); CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8, "Predicate filter should return a boolean"); // Exclude columns present in filter only in output @@ -598,22 +597,13 @@ table_with_metadata reader::impl::finalize_output( return {std::make_unique
(std::move(out_columns)), std::move(out_metadata)}; } -table_with_metadata reader::impl::read( - int64_t skip_rows, - std::optional const& num_rows, - bool uses_custom_row_bounds, - host_span const> row_group_indices, - std::optional> filter) +table_with_metadata reader::impl::read() { CUDF_EXPECTS(_output_chunk_read_limit == 0, "Reading the whole file must not have non-zero byte_limit."); - table_metadata metadata; - populate_metadata(metadata); - auto expr_conv = named_to_reference_converter(filter, metadata); - auto output_filter = expr_conv.get_converted_expr(); - prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices, output_filter); - return read_chunk_internal(uses_custom_row_bounds, output_filter); + prepare_data(read_mode::READ_ALL); + return read_chunk_internal(read_mode::READ_ALL); } table_with_metadata reader::impl::read_chunk() @@ -628,22 +618,13 @@ table_with_metadata reader::impl::read_chunk() } } - prepare_data(0 /*skip_rows*/, - std::nullopt /*num_rows, `nullopt` means unlimited*/, - true /*uses_custom_row_bounds*/, - {} /*row_group_indices, empty means read all row groups*/, - std::nullopt /*filter*/); - - return read_chunk_internal(true, std::nullopt); + prepare_data(read_mode::CHUNKED_READ); + return read_chunk_internal(read_mode::CHUNKED_READ); } bool reader::impl::has_next() { - prepare_data(0 /*skip_rows*/, - std::nullopt /*num_rows, `nullopt` means unlimited*/, - true /*uses_custom_row_bounds*/, - {} /*row_group_indices, empty means read all row groups*/, - std::nullopt /*filter*/); + prepare_data(read_mode::CHUNKED_READ); // current_input_pass will only be incremented to be == num_passes after // the last chunk in the last subpass in the last pass has been returned diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 04da8eed591..3b8e80a29e6 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -64,20 +64,9 @@ class reader::impl { /** * @brief Read an entire set or a subset of data and returns a set of columns * - * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds - * @param row_group_indices Lists of row groups to read, one per source - * @param filter Optional AST expression to filter output rows - * * @return The set of columns along with metadata */ - table_with_metadata read(int64_t skip_rows, - std::optional const& num_rows, - bool uses_custom_row_bounds, - host_span const> row_group_indices, - std::optional> filter); + table_with_metadata read(); /** * @brief Constructor from a chunk read limit and an array of dataset sources with reader options. @@ -132,21 +121,17 @@ class reader::impl { // top level functions involved with ratcheting through the passes, subpasses // and output chunks of the read process private: + /** + * @brief The enum indicating whether the data sources are read all at once or chunk by chunk. + */ + enum class read_mode { READ_ALL, CHUNKED_READ }; + /** * @brief Perform the necessary data preprocessing for parsing file later on. * - * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read, or `std::nullopt` to read all rows - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds - * @param row_group_indices Lists of row groups to read (one per source), or empty if read all - * @param filter Optional AST expression to filter row groups based on column chunk statistics + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk */ - void prepare_data(int64_t skip_rows, - std::optional const& num_rows, - bool uses_custom_row_bounds, - host_span const> row_group_indices, - std::optional> filter); + void prepare_data(read_mode mode); /** * @brief Preprocess step for the entire file. @@ -154,23 +139,16 @@ class reader::impl { * Only ever called once. This function reads in rowgroup and associated chunk * information and computes the schedule of top level passes (see `pass_intermediate_data`). * - * @param skip_rows The number of rows to skip in the requested set of rowgroups to be read - * @param num_rows The total number of rows to read out of the selected rowgroups - * @param row_group_indices Lists of row groups to read, one per source - * @param filter Optional AST expression to filter output rows + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk */ - void preprocess_file(int64_t skip_rows, - std::optional const& num_rows, - host_span const> row_group_indices, - std::optional> filter); + void preprocess_file(read_mode mode); /** * @brief Ratchet the pass/subpass/chunk process forward. * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specified - * bounds + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk */ - void handle_chunking(bool uses_custom_row_bounds); + void handle_chunking(read_mode mode); /** * @brief Setup step for the next input read pass. @@ -178,36 +156,31 @@ class reader::impl { * A 'pass' is defined as a subset of row groups read out of the globally * requested set of all row groups. * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk */ - void setup_next_pass(bool uses_custom_row_bounds); + void setup_next_pass(read_mode mode); /** * @brief Setup step for the next decompression subpass. * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds - * * A 'subpass' is defined as a subset of pages within a pass that are * decompressed and decoded as a batch. Subpasses may be further subdivided * into output chunks. + * + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk + * */ - void setup_next_subpass(bool uses_custom_row_bounds); + void setup_next_subpass(read_mode mode); /** * @brief Read a chunk of data and return an output table. * * This function is called internally and expects all preprocessing steps have already been done. * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds - * @param filter Optional AST expression to filter output rows + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk * @return The output table along with columns' metadata */ - table_with_metadata read_chunk_internal( - bool uses_custom_row_bounds, - std::optional> filter); + table_with_metadata read_chunk_internal(read_mode mode); // utility functions private: @@ -253,12 +226,11 @@ class reader::impl { * * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders). * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit */ - void preprocess_subpass_pages(bool uses_custom_row_bounds, size_t chunk_read_limit); + void preprocess_subpass_pages(read_mode mode, size_t chunk_read_limit); /** * @brief Allocate nesting information storage for all pages and set pointers to it. @@ -292,23 +264,19 @@ class reader::impl { * * @param out_metadata The output table metadata * @param out_columns The columns for building the output table - * @param filter Optional AST expression to filter output rows * @return The output table along with columns' metadata */ - table_with_metadata finalize_output( - table_metadata& out_metadata, - std::vector>& out_columns, - std::optional> filter); + table_with_metadata finalize_output(table_metadata& out_metadata, + std::vector>& out_columns); /** * @brief Allocate data buffers for the output columns. * + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk * @param skip_rows Crop all rows below skip_rows * @param num_rows Maximum number of rows to read - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds */ - void allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds); + void allocate_columns(read_mode mode, size_t skip_rows, size_t num_rows); /** * @brief Calculate per-page offsets for string data @@ -320,12 +288,11 @@ class reader::impl { /** * @brief Converts the page data and outputs to columns. * - * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific - * bounds + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk * @param skip_rows Minimum number of rows from start * @param num_rows Number of rows to output */ - void decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows); + void decode_page_data(read_mode mode, size_t skip_rows, size_t num_rows); /** * @brief Creates file-wide parquet chunk information. @@ -354,6 +321,21 @@ class reader::impl { } private: + /** + * @brief Check if the user has specified custom row bounds + * + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk + * @return True if the user has specified custom row bounds + */ + [[nodiscard]] bool uses_custom_row_bounds(read_mode mode) const + { + // TODO: `read_mode` is hardcoded to `true` when `read_mode::CHUNKED_READ` to enforce + // `ComputePageSizes()` computation for all remaining chunks. + return (mode == read_mode::READ_ALL) + ? (_options.num_rows.has_value() or _options.skip_rows != 0) + : true; + } + [[nodiscard]] bool is_first_output_chunk() const { return _file_itm_data._output_chunk_count == 0; @@ -362,6 +344,19 @@ class reader::impl { rmm::cuda_stream_view _stream; rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()}; + // Reader configs. + struct { + // timestamp_type + data_type timestamp_type{type_id::EMPTY}; + // User specified reading rows/stripes selection. + int64_t const skip_rows; + std::optional num_rows; + std::vector> row_group_indices; + } const _options; + + // name to reference converter to extract AST output filter + named_to_reference_converter _expr_conv{std::nullopt, table_metadata{}}; + std::vector> _sources; std::unique_ptr _metadata; @@ -389,7 +384,6 @@ class reader::impl { bool _has_page_index = false; std::optional> _reader_column_schema; - data_type _timestamp_type{type_id::EMPTY}; // chunked reading happens in 2 parts: // diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 6824d72cf04..d3f321af0bd 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1148,12 +1148,12 @@ void include_decompression_scratch_size(device_span chunk } // anonymous namespace -void reader::impl::handle_chunking(bool uses_custom_row_bounds) +void reader::impl::handle_chunking(read_mode mode) { // if this is our first time in here, setup the first pass. if (!_pass_itm_data) { // setup the next pass - setup_next_pass(uses_custom_row_bounds); + setup_next_pass(mode); } auto& pass = *_pass_itm_data; @@ -1181,15 +1181,15 @@ void reader::impl::handle_chunking(bool uses_custom_row_bounds) if (_file_itm_data._current_input_pass == _file_itm_data.num_passes()) { return; } // setup the next pass - setup_next_pass(uses_custom_row_bounds); + setup_next_pass(mode); } } // setup the next sub pass - setup_next_subpass(uses_custom_row_bounds); + setup_next_subpass(mode); } -void reader::impl::setup_next_pass(bool uses_custom_row_bounds) +void reader::impl::setup_next_pass(read_mode mode) { auto const num_passes = _file_itm_data.num_passes(); @@ -1260,7 +1260,7 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds) detect_malformed_pages( pass.pages, pass.chunks, - uses_custom_row_bounds ? std::nullopt : std::make_optional(pass.num_rows), + uses_custom_row_bounds(mode) ? std::nullopt : std::make_optional(pass.num_rows), _stream); // decompress dictionary data if applicable. @@ -1309,7 +1309,7 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds) } } -void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) +void reader::impl::setup_next_subpass(read_mode mode) { auto& pass = *_pass_itm_data; pass.subpass = std::make_unique(); @@ -1444,7 +1444,7 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) // preprocess pages (computes row counts for lists, computes output chunks and computes // the actual row counts we will be able load out of this subpass) - preprocess_subpass_pages(uses_custom_row_bounds, _output_chunk_read_limit); + preprocess_subpass_pages(mode, _output_chunk_read_limit); #if defined(PARQUET_CHUNK_LOGGING) printf("\tSubpass: skip_rows(%'lu), num_rows(%'lu), remaining read limit(%'lu)\n", @@ -1519,8 +1519,8 @@ void reader::impl::create_global_chunk_info() auto& schema = _metadata->get_schema(col.schema_idx); auto [clock_rate, logical_type] = - conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()), - _timestamp_type.id(), + conversion_info(to_type_id(schema, _strings_to_categorical, _options.timestamp_type.id()), + _options.timestamp_type.id(), schema.type, schema.logical_type); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 084f82a2ca0..f533f04e427 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -873,7 +873,7 @@ void reader::impl::allocate_nesting_info() nesting_info[cur_depth].max_def_level = cur_schema.max_definition_level; pni[cur_depth].size = 0; pni[cur_depth].type = - to_type_id(cur_schema, _strings_to_categorical, _timestamp_type.id()); + to_type_id(cur_schema, _strings_to_categorical, _options.timestamp_type.id()); pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; } @@ -1221,17 +1221,14 @@ struct update_pass_num_rows { } // anonymous namespace -void reader::impl::preprocess_file( - int64_t skip_rows, - std::optional const& num_rows, - host_span const> row_group_indices, - std::optional> filter) +void reader::impl::preprocess_file(read_mode mode) { CUDF_EXPECTS(!_file_preprocessed, "Attempted to preprocess file more than once"); // if filter is not empty, then create output types as vector and pass for filtering. + std::vector output_dtypes; - if (filter.has_value()) { + if (_expr_conv.get_converted_expr().has_value()) { std::transform(_output_buffers_template.cbegin(), _output_buffers_template.cend(), std::back_inserter(output_dtypes), @@ -1240,12 +1237,12 @@ void reader::impl::preprocess_file( std::tie( _file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) = - _metadata->select_row_groups(row_group_indices, - skip_rows, - num_rows, + _metadata->select_row_groups(_options.row_group_indices, + _options.skip_rows, + _options.num_rows, output_dtypes, _output_column_schemas, - filter, + _expr_conv.get_converted_expr(), _stream); // check for page indexes @@ -1276,7 +1273,7 @@ void reader::impl::preprocess_file( printf("# Input columns: %'lu\n", _input_columns.size()); for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& schema = _metadata->get_schema(_input_columns[idx].schema_idx); - auto const type_id = to_type_id(schema, _strings_to_categorical, _timestamp_type.id()); + auto const type_id = to_type_id(schema, _strings_to_categorical, _options.timestamp_type.id()); printf("\tC(%'lu, %s): %s\n", idx, _input_columns[idx].name.c_str(), @@ -1330,7 +1327,7 @@ void reader::impl::generate_list_column_row_count_estimates() _stream.synchronize(); } -void reader::impl::preprocess_subpass_pages(bool uses_custom_row_bounds, size_t chunk_read_limit) +void reader::impl::preprocess_subpass_pages(read_mode mode, size_t chunk_read_limit) { auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; @@ -1457,7 +1454,7 @@ void reader::impl::preprocess_subpass_pages(bool uses_custom_row_bounds, size_t compute_output_chunks_for_subpass(); } -void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds) +void reader::impl::allocate_columns(read_mode mode, size_t skip_rows, size_t num_rows) { auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; @@ -1470,7 +1467,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // account. PageInfo::skipped_values, which tells us where to start decoding in the input to // respect the user bounds. It is only necessary to do this second pass if uses_custom_row_bounds // is set (if the user has specified artificial bounds). - if (uses_custom_row_bounds) { + if (uses_custom_row_bounds(mode)) { ComputePageSizes(subpass.pages, pass.chunks, skip_rows, @@ -1479,8 +1476,6 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses false, // no need to compute string sizes pass.level_type_size, _stream); - - // print_pages(pages, _stream); } // iterate over all input columns and allocate any associated output