Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for progressive parquet chunked reading. #14079

Merged
39 changes: 34 additions & 5 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class reader {
class chunked_reader : private reader {
public:
/**
* @brief Constructor from a read size limit and an array of data sources with reader options.
* @brief Constructor from an output size memory limit and an input size memory limit and an array
* of data sources with reader options.
*
* The typical usage should be similar to this:
* ```
Expand All @@ -102,17 +103,45 @@ class chunked_reader : private reader {
*
* ```
*
* If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the
* whole file and return a table containing all rows.
* If `chunk_read_limit == 0` (i.e., no output limit), and `pass_read_limit == 0` (no input
* temporary memory size limit) a call to `read_chunk()` will read the whole file and return a
* table containing all rows.
*
* The chunk_read_limit parameter controls the size of the output chunks produces. If the user
* specifies 100 MB of data, the reader will attempt to return chunks containing tables that have
* a total bytes size (over all columns) of 100 MB or less. This is a soft limit and the code
* will not fail if it cannot satisfy the limit. It will make a best-effort atttempt only.
*
* The pass_read_limit parameter controls how much temporary memory is used in the process of
* decoding the file. The primary contributor to this memory usage is the uncompressed size of
* the data read out of the file and the decompressed (but not yet decoded) size of the data. The
* granularity of a given pass is at the row group level. It will not attempt to read at the sub
* row-group level.
*
* Combined, the way to visualize passes and chunks is as follows:
*
* @code{.pseudo}
* for(each pass){
* for(each output chunk within a pass){
* return a table that fits within the output chunk limit
* }
* }
* @endcode
*
* With a pass_read_limit of `0` you are simply saying you have one pass that reads the entire
* file as normal.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* or `0` if there is no limit
* @param pass_read_limit Limit on total amount of memory used for temporary computations during
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
* loading, or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,30 @@ class chunked_parquet_reader {
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Constructor for chunked reader.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the
* output table for each reading, and a byte limit on the amount of temporary memory to use
* when reading. pass_read_limit affects how many row groups we can read at a time by limiting
* the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an
* absolute limit - if a single row group cannot fit within the limit given, it will still be
* loaded.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above it says decompression limit, but here it says decompress and reading. I am also concerned by the soft limit. This seems like the thing that you have hard limits. Should it explode on over limit or is the OOM the explosion and that is why it is considered a soft limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a soft limit because it will attempt to continue even if it can't meet the limit. For example, if the user specified 1 MB and it can't fit even one row group (say 50 MB) into that size, it will still attempt to read/decompress one row group at a time.

* `0` if there is no limit
* @param options The options used to read Parquet file
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
vuule marked this conversation as resolved.
Show resolved Hide resolved
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor, destroying the internal reader instance.
*
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,23 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
Expand Down
69 changes: 55 additions & 14 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,33 +306,74 @@ struct ColumnChunkDesc {
};

/**
* @brief Struct to store raw/intermediate file data before parsing.
* @brief The row_group_info class
*/
struct row_group_info {
size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is
// called with index and source_index
size_t start_row;
size_type source_index; // file index.

row_group_info() = default;

row_group_info(size_type index, size_t start_row, size_type source_index)
: index{index}, start_row{start_row}, source_index{source_index}
{
}
};

/**
* @brief Struct to store file-level data that remains constant for
* all passes/chunks for the file.
*/
struct file_intermediate_data {
// all row groups to read
std::vector<row_group_info> row_groups{};

// all chunks from the selected row groups. We may end up reading these chunks progressively
// instead of all at once
std::vector<gpu::ColumnChunkDesc> chunks{};

// skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we
// may not be visiting every row group that contains these bounds
size_t global_skip_rows;
size_t global_num_rows;
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
size_t skip_rows;
size_t num_rows;
vuule marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* @brief Struct to store pass-level data that remains constant for a single pass.
*/
struct pass_intermediate_data {
std::vector<std::unique_ptr<datasource::buffer>> raw_page_data;
rmm::device_buffer decomp_page_data;

// rowgroup, chunk and page information for the current pass.
std::vector<row_group_info> row_groups{};
cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc> chunks{};
cudf::detail::hostdevice_vector<gpu::PageInfo> pages_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingInfo> page_nesting_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_buffer level_decode_data;
int level_type_size;
};

/**
* @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading.
*/
struct chunk_intermediate_data {
rmm::device_uvector<int32_t> page_keys{0, rmm::cuda_stream_default};
rmm::device_uvector<int32_t> page_index{0, rmm::cuda_stream_default};
rmm::device_uvector<string_index_pair> str_dict_index{0, rmm::cuda_stream_default};
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
std::vector<gpu::chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data;
int level_type_size;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
size_t skip_rows;
size_t num_rows;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
};
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ table_with_metadata reader::read(parquet_reader_options const& options)
}

chunked_reader::chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
_impl = std::make_unique<impl>(chunk_read_limit, std::move(sources), options, stream, mr);
_impl = std::make_unique<impl>(
chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr);
Comment on lines +52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into the initializer list of this constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't initialize a parent's member as part of your own initialization list.

}

chunked_reader::~chunked_reader() = default;
Expand Down
Loading