Skip to content

Commit

Permalink
Support for progressive parquet chunked reading. (#14079)
Browse files Browse the repository at this point in the history
Previously, the parquet chunked reader operated by controlling the size of output chunks only.  It would still ingest the entire input file and decompress it, which can take up a considerable amount of memory.  With this new 'progressive' support, we also 'chunk' at the input level.  Specifically, the user can pass a `pass_read_limit` value which controls how much memory is used for storing compressed/decompressed data.  The reader will make multiple passes over the file, reading as many row groups as it can to attempt to fit within this limit.  Within each pass, chunks are emitted as before. 

From the external user's perspective, the chunked read mechanism is the same.  You call `has_next()` and `read_chunk()`.  If the user has specified a value for `pass_read_limit` the set of chunks produced might end up being different (although the concatenation of all of them will still be the same). 

The core idea of the code change is to add the idea of the internal `pass`.  Previously we had a `file_intermediate_data` which held data across `read_chunk()` calls.   There is now a `pass_intermediate_data` struct which holds information specific to a given pass.  Many of the invariant things from the file level before (row groups and chunks to process) are now stored in the pass intermediate data.  As we begin each pass, we take the subset of global row groups and chunks that we are going to process for this pass, copy them to out intermediate data, and the remainder of the reader reference this instead of the file-level data. 

In order to avoid breaking pre-existing interfaces, there's a new contructor for the `chunked_parquet_reader` class:

```
  chunked_parquet_reader(
    std::size_t chunk_read_limit,
    std::size_t pass_read_limit,
    parquet_reader_options const& options,
    rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
```

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #14079
  • Loading branch information
nvdbaranec authored Sep 28, 2023
1 parent 2c19bf3 commit 53f0f74
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 203 deletions.
39 changes: 34 additions & 5 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class reader {
class chunked_reader : private reader {
public:
/**
* @brief Constructor from a read size limit and an array of data sources with reader options.
* @brief Constructor from an output size memory limit and an input size memory limit and an array
* of data sources with reader options.
*
* The typical usage should be similar to this:
* ```
Expand All @@ -102,17 +103,45 @@ class chunked_reader : private reader {
*
* ```
*
* If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the
* whole file and return a table containing all rows.
* If `chunk_read_limit == 0` (i.e., no output limit), and `pass_read_limit == 0` (no input
* temporary memory size limit) a call to `read_chunk()` will read the whole file and return a
* table containing all rows.
*
* The chunk_read_limit parameter controls the size of the output chunks produces. If the user
* specifies 100 MB of data, the reader will attempt to return chunks containing tables that have
* a total bytes size (over all columns) of 100 MB or less. This is a soft limit and the code
* will not fail if it cannot satisfy the limit. It will make a best-effort atttempt only.
*
* The pass_read_limit parameter controls how much temporary memory is used in the process of
* decoding the file. The primary contributor to this memory usage is the uncompressed size of
* the data read out of the file and the decompressed (but not yet decoded) size of the data. The
* granularity of a given pass is at the row group level. It will not attempt to read at the sub
* row-group level.
*
* Combined, the way to visualize passes and chunks is as follows:
*
* @code{.pseudo}
* for(each pass){
* for(each output chunk within a pass){
* return a table that fits within the output chunk limit
* }
* }
* @endcode
*
* With a pass_read_limit of `0` you are simply saying you have one pass that reads the entire
* file as normal.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* or `0` if there is no limit
* @param pass_read_limit Limit on total amount of memory used for temporary computations during
* loading, or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,30 @@ class chunked_parquet_reader {
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Constructor for chunked reader.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the
* output table for each reading, and a byte limit on the amount of temporary memory to use
* when reading. pass_read_limit affects how many row groups we can read at a time by limiting
* the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an
* absolute limit - if a single row group cannot fit within the limit given, it will still be
* loaded.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
* `0` if there is no limit
* @param options The options used to read Parquet file
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor, destroying the internal reader instance.
*
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,23 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
Expand Down
69 changes: 55 additions & 14 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,33 +321,74 @@ struct ColumnChunkDesc {
};

/**
* @brief Struct to store raw/intermediate file data before parsing.
* @brief The row_group_info class
*/
struct row_group_info {
size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is
// called with index and source_index
size_t start_row;
size_type source_index; // file index.

row_group_info() = default;

row_group_info(size_type index, size_t start_row, size_type source_index)
: index{index}, start_row{start_row}, source_index{source_index}
{
}
};

/**
* @brief Struct to store file-level data that remains constant for
* all passes/chunks for the file.
*/
struct file_intermediate_data {
// all row groups to read
std::vector<row_group_info> row_groups{};

// all chunks from the selected row groups. We may end up reading these chunks progressively
// instead of all at once
std::vector<gpu::ColumnChunkDesc> chunks{};

// skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we
// may not be visiting every row group that contains these bounds
size_t global_skip_rows;
size_t global_num_rows;
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct to store pass-level data that remains constant for a single pass.
*/
struct pass_intermediate_data {
std::vector<std::unique_ptr<datasource::buffer>> raw_page_data;
rmm::device_buffer decomp_page_data;

// rowgroup, chunk and page information for the current pass.
std::vector<row_group_info> row_groups{};
cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc> chunks{};
cudf::detail::hostdevice_vector<gpu::PageInfo> pages_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingInfo> page_nesting_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_buffer level_decode_data;
int level_type_size;
};

/**
* @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading.
*/
struct chunk_intermediate_data {
rmm::device_uvector<int32_t> page_keys{0, rmm::cuda_stream_default};
rmm::device_uvector<int32_t> page_index{0, rmm::cuda_stream_default};
rmm::device_uvector<string_index_pair> str_dict_index{0, rmm::cuda_stream_default};
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
std::vector<gpu::chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data{};
int level_type_size{0};

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
size_t skip_rows;
size_t num_rows;
};
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ table_with_metadata reader::read(parquet_reader_options const& options)
}

chunked_reader::chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
_impl = std::make_unique<impl>(chunk_read_limit, std::move(sources), options, stream, mr);
_impl = std::make_unique<impl>(
chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr);
}

chunked_reader::~chunked_reader() = default;
Expand Down
Loading

0 comments on commit 53f0f74

Please sign in to comment.