Skip to content

Commit

Permalink
Merge pull request rapidsai#14223 from rapidsai/branch-23.10
Browse files Browse the repository at this point in the history
Forward-merge branch-23.10 to branch-23.12
  • Loading branch information
GPUtester authored Sep 28, 2023
2 parents d01b69e + 53f0f74 commit dea0df0
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 203 deletions.
39 changes: 34 additions & 5 deletions cpp/include/cudf/io/detail/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class reader {
class chunked_reader : private reader {
public:
/**
* @brief Constructor from a read size limit and an array of data sources with reader options.
* @brief Constructor from an output size memory limit and an input size memory limit and an array
* of data sources with reader options.
*
* The typical usage should be similar to this:
* ```
Expand All @@ -102,17 +103,45 @@ class chunked_reader : private reader {
*
* ```
*
* If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the
* whole file and return a table containing all rows.
* If `chunk_read_limit == 0` (i.e., no output limit), and `pass_read_limit == 0` (no input
* temporary memory size limit) a call to `read_chunk()` will read the whole file and return a
* table containing all rows.
*
* The chunk_read_limit parameter controls the size of the output chunks produces. If the user
* specifies 100 MB of data, the reader will attempt to return chunks containing tables that have
* a total bytes size (over all columns) of 100 MB or less. This is a soft limit and the code
* will not fail if it cannot satisfy the limit. It will make a best-effort atttempt only.
*
* The pass_read_limit parameter controls how much temporary memory is used in the process of
* decoding the file. The primary contributor to this memory usage is the uncompressed size of
* the data read out of the file and the decompressed (but not yet decoded) size of the data. The
* granularity of a given pass is at the row group level. It will not attempt to read at the sub
* row-group level.
*
* Combined, the way to visualize passes and chunks is as follows:
*
* @code{.pseudo}
* for(each pass){
* for(each output chunk within a pass){
* return a table that fits within the output chunk limit
* }
* }
* @endcode
*
* With a pass_read_limit of `0` you are simply saying you have one pass that reads the entire
* file as normal.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* or `0` if there is no limit
* @param pass_read_limit Limit on total amount of memory used for temporary computations during
* loading, or `0` if there is no limit
* @param sources Input `datasource` objects to read the dataset from
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
explicit chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<cudf::io::datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,30 @@ class chunked_parquet_reader {
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Constructor for chunked reader.
*
* This constructor requires the same `parquet_reader_option` parameter as in
* `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the
* output table for each reading, and a byte limit on the amount of temporary memory to use
* when reading. pass_read_limit affects how many row groups we can read at a time by limiting
* the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an
* absolute limit - if a single row group cannot fit within the limit given, it will still be
* loaded.
*
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
* `0` if there is no limit
* @param options The options used to read Parquet file
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Destructor, destroying the internal reader instance.
*
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,23 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
{
}

/**
* @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
Expand Down
69 changes: 55 additions & 14 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,33 +321,74 @@ struct ColumnChunkDesc {
};

/**
* @brief Struct to store raw/intermediate file data before parsing.
* @brief The row_group_info class
*/
struct row_group_info {
size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is
// called with index and source_index
size_t start_row;
size_type source_index; // file index.

row_group_info() = default;

row_group_info(size_type index, size_t start_row, size_type source_index)
: index{index}, start_row{start_row}, source_index{source_index}
{
}
};

/**
* @brief Struct to store file-level data that remains constant for
* all passes/chunks for the file.
*/
struct file_intermediate_data {
// all row groups to read
std::vector<row_group_info> row_groups{};

// all chunks from the selected row groups. We may end up reading these chunks progressively
// instead of all at once
std::vector<gpu::ColumnChunkDesc> chunks{};

// skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we
// may not be visiting every row group that contains these bounds
size_t global_skip_rows;
size_t global_num_rows;
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct to store pass-level data that remains constant for a single pass.
*/
struct pass_intermediate_data {
std::vector<std::unique_ptr<datasource::buffer>> raw_page_data;
rmm::device_buffer decomp_page_data;

// rowgroup, chunk and page information for the current pass.
std::vector<row_group_info> row_groups{};
cudf::detail::hostdevice_vector<gpu::ColumnChunkDesc> chunks{};
cudf::detail::hostdevice_vector<gpu::PageInfo> pages_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingInfo> page_nesting_info{};
cudf::detail::hostdevice_vector<gpu::PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_buffer level_decode_data;
int level_type_size;
};

/**
* @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading.
*/
struct chunk_intermediate_data {
rmm::device_uvector<int32_t> page_keys{0, rmm::cuda_stream_default};
rmm::device_uvector<int32_t> page_index{0, rmm::cuda_stream_default};
rmm::device_uvector<string_index_pair> str_dict_index{0, rmm::cuda_stream_default};
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
std::vector<gpu::chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data{};
int level_type_size{0};

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
size_t skip_rows;
size_t num_rows;
};
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/io/parquet/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ table_with_metadata reader::read(parquet_reader_options const& options)
}

chunked_reader::chunked_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
std::vector<std::unique_ptr<datasource>>&& sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
_impl = std::make_unique<impl>(chunk_read_limit, std::move(sources), options, stream, mr);
_impl = std::make_unique<impl>(
chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr);
}

chunked_reader::~chunked_reader() = default;
Expand Down
Loading

0 comments on commit dea0df0

Please sign in to comment.