Skip to content

Commit

Permalink
Support filtered I/O in chunked_parquet_reader and simplify the use…
Browse files Browse the repository at this point in the history
… of `parquet_reader_options` (#15764)

This PR does the following:

1. It enables the support for filtered I/O in chunked parquet reader.
2. It simplifies the use of `parquet_reader_options` in `parquet::readers` by taking and saving the options at reader construction for later use instead of passing around `options` as arguments from `read()`, `has_next()` and `chunked_read()` to `prepare_data()`, `read_chunk_internal()` and several other internal APIs.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Karthikeyan (https://github.com/karthikeyann)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15764
  • Loading branch information
mhaseeb123 authored May 21, 2024
1 parent 1dd1910 commit eb7b50a
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 158 deletions.
4 changes: 1 addition & 3 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@ class reader {
/**
* @brief Reads the dataset as per given options.
*
* @param options Settings for controlling reading behavior
*
* @return The set of columns along with table metadata
*/
table_with_metadata read(parquet_reader_options const& options);
table_with_metadata read();
};

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ table_with_metadata read_parquet(parquet_reader_options const& options,
auto reader =
std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr);

return reader->read(options);
return reader->read();
}

parquet_metadata read_parquet_metadata(source_info const& src_info)
Expand Down
12 changes: 1 addition & 11 deletions cpp/src/io/parquet/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,7 @@ reader::reader(std::vector<std::unique_ptr<datasource>>&& sources,

reader::~reader() = default;

table_with_metadata reader::read(parquet_reader_options const& options)
{
// if the user has specified custom row bounds
bool const uses_custom_row_bounds =
options.get_num_rows().has_value() || options.get_skip_rows() != 0;
return _impl->read(options.get_skip_rows(),
options.get_num_rows(),
uses_custom_row_bounds,
options.get_row_groups(),
options.get_filter());
}
table_with_metadata reader::read() { return _impl->read(); }

chunked_reader::chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
Expand Down
89 changes: 35 additions & 54 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ inline bool is_treat_fixed_length_as_string(thrust::optional<LogicalType> const&

} // namespace

void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -88,7 +88,7 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
is_treat_fixed_length_as_string(chunk.logical_type);
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
if (!_has_page_index || uses_custom_row_bounds(mode) || has_flba) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
Expand Down Expand Up @@ -419,6 +419,10 @@ reader::impl::impl(std::size_t chunk_read_limit,
rmm::device_async_resource_ref mr)
: _stream{stream},
_mr{mr},
_options{options.get_timestamp_type(),
options.get_skip_rows(),
options.get_num_rows(),
options.get_row_groups()},
_sources{std::move(sources)},
_output_chunk_read_limit{chunk_read_limit},
_input_pass_read_limit{pass_read_limit}
Expand All @@ -427,11 +431,6 @@ reader::impl::impl(std::size_t chunk_read_limit,
_metadata =
std::make_unique<aggregate_reader_metadata>(_sources, options.is_enabled_use_arrow_schema());

// Override output timestamp resolution if requested
if (options.get_timestamp_type().id() != type_id::EMPTY) {
_timestamp_type = options.get_timestamp_type();
}

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

Expand All @@ -452,34 +451,34 @@ reader::impl::impl(std::size_t chunk_read_limit,
filter_columns_names,
options.is_enabled_use_pandas_metadata(),
_strings_to_categorical,
_timestamp_type.id());
_options.timestamp_type.id());

// Save the states of the output buffers for reuse in `chunk_read()`.
for (auto const& buff : _output_buffers) {
_output_buffers_template.emplace_back(cudf::io::detail::inline_column_buffer::empty_like(buff));
}

// Save the name to reference converter to extract output filter AST in
// `preprocess_file()` and `finalize_output()`
table_metadata metadata;
populate_metadata(metadata);
_expr_conv = named_to_reference_converter(options.get_filter(), metadata);
}

void reader::impl::prepare_data(int64_t skip_rows,
std::optional<size_type> const& num_rows,
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter)
void reader::impl::prepare_data(read_mode mode)
{
// if we have not preprocessed at the whole-file level, do that now
if (!_file_preprocessed) {
// setup file level information
// - read row group information
// - setup information on (parquet) chunks
// - compute schedule of input passes
preprocess_file(skip_rows, num_rows, row_group_indices, filter);
preprocess_file(mode);
}

// handle any chunking work (ratcheting through the subpasses and chunks within
// our current pass) if in bounds
if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) {
handle_chunking(uses_custom_row_bounds);
}
if (_file_itm_data._current_input_pass < _file_itm_data.num_passes()) { handle_chunking(mode); }
}

void reader::impl::populate_metadata(table_metadata& out_metadata)
Expand All @@ -498,8 +497,7 @@ void reader::impl::populate_metadata(table_metadata& out_metadata)
out_metadata.per_file_user_data[0].end()};
}

table_with_metadata reader::impl::read_chunk_internal(
bool uses_custom_row_bounds, std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
{
// If `_output_metadata` has been constructed, just copy it over.
auto out_metadata = _output_metadata ? table_metadata{*_output_metadata} : table_metadata{};
Expand All @@ -510,17 +508,17 @@ table_with_metadata reader::impl::read_chunk_internal(
out_columns.reserve(_output_buffers.size());

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) { return finalize_output(out_metadata, out_columns, filter); }
if (!has_more_work()) { return finalize_output(out_metadata, out_columns); }

auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
auto const& read_info = subpass.output_chunk_read_info[subpass.current_output_chunk];

// Allocate memory buffers for the output columns.
allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds);
allocate_columns(mode, read_info.skip_rows, read_info.num_rows);

// Parse data into the output buffers.
decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows);
decode_page_data(mode, read_info.skip_rows, read_info.num_rows);

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
Expand All @@ -547,13 +545,11 @@ table_with_metadata reader::impl::read_chunk_internal(
}

// Add empty columns if needed. Filter output columns based on filter.
return finalize_output(out_metadata, out_columns, filter);
return finalize_output(out_metadata, out_columns);
}

table_with_metadata reader::impl::finalize_output(
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns,
std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
{
// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) {
Expand Down Expand Up @@ -581,10 +577,13 @@ table_with_metadata reader::impl::finalize_output(
// increment the output chunk count
_file_itm_data._output_chunk_count++;

if (filter.has_value()) {
// check if the output filter AST expression (= _expr_conv.get_converted_expr()) exists
if (_expr_conv.get_converted_expr().has_value()) {
auto read_table = std::make_unique<table>(std::move(out_columns));
auto predicate = cudf::detail::compute_column(
*read_table, filter.value().get(), _stream, rmm::mr::get_current_device_resource());
auto predicate = cudf::detail::compute_column(*read_table,
_expr_conv.get_converted_expr().value().get(),
_stream,
rmm::mr::get_current_device_resource());
CUDF_EXPECTS(predicate->view().type().id() == type_id::BOOL8,
"Predicate filter should return a boolean");
// Exclude columns present in filter only in output
Expand All @@ -598,22 +597,13 @@ table_with_metadata reader::impl::finalize_output(
return {std::make_unique<table>(std::move(out_columns)), std::move(out_metadata)};
}

table_with_metadata reader::impl::read(
int64_t skip_rows,
std::optional<size_type> const& num_rows,
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter)
table_with_metadata reader::impl::read()
{
CUDF_EXPECTS(_output_chunk_read_limit == 0,
"Reading the whole file must not have non-zero byte_limit.");
table_metadata metadata;
populate_metadata(metadata);
auto expr_conv = named_to_reference_converter(filter, metadata);
auto output_filter = expr_conv.get_converted_expr();

prepare_data(skip_rows, num_rows, uses_custom_row_bounds, row_group_indices, output_filter);
return read_chunk_internal(uses_custom_row_bounds, output_filter);
prepare_data(read_mode::READ_ALL);
return read_chunk_internal(read_mode::READ_ALL);
}

table_with_metadata reader::impl::read_chunk()
Expand All @@ -628,22 +618,13 @@ table_with_metadata reader::impl::read_chunk()
}
}

prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);

return read_chunk_internal(true, std::nullopt);
prepare_data(read_mode::CHUNKED_READ);
return read_chunk_internal(read_mode::CHUNKED_READ);
}

bool reader::impl::has_next()
{
prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
prepare_data(read_mode::CHUNKED_READ);

// current_input_pass will only be incremented to be == num_passes after
// the last chunk in the last subpass in the last pass has been returned
Expand Down
Loading

0 comments on commit eb7b50a

Please sign in to comment.