Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into tdigest_merge_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
nvdbaranec committed Sep 17, 2024
2 parents 00edf5b + 27c29eb commit e80154d
Show file tree
Hide file tree
Showing 29 changed files with 850 additions and 498 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12

### Conda

cuDF can be installed with conda (via [miniconda](https://docs.conda.io/projects/miniconda/en/latest/) or the full [Anaconda distribution](https://www.anaconda.com/download) from the `rapidsai` channel:
cuDF can be installed with conda (via [miniforge](https://github.com/conda-forge/miniforge)) from the `rapidsai` channel:

```bash
conda install -c rapidsai -c conda-forge -c nvidia \
Expand Down
17 changes: 8 additions & 9 deletions cpp/include/cudf/detail/tdigest/tdigest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,28 @@ std::unique_ptr<column> make_tdigest_column(size_type num_rows,
rmm::device_async_resource_ref mr);

/**
* @brief Create a tdigest column of empty clusters.
* @brief Create an empty tdigest column.
*
* The column created contains the specified number of rows of empty clusters.
* An empty tdigest column contains a single row of length 0
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns A tdigest column of empty clusters.
* @returns An empty tdigest column.
*/
CUDF_EXPORT
std::unique_ptr<column> make_tdigest_column_of_empty_clusters(size_type num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
std::unique_ptr<column> make_empty_tdigest_column(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Create a scalar of an empty tdigest cluster.
* @brief Create an empty tdigest scalar.
*
* The returned scalar is a struct_scalar that contains a single row of an empty cluster.
* An empty tdigest scalar is a struct_scalar that contains a single row of length 0
*
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*
* @returns A scalar of an empty tdigest cluster.
* @returns An empty tdigest scalar.
*/
std::unique_ptr<scalar> make_empty_tdigest_scalar(rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cudf_test/tdigest_utilities.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op)
static_cast<column_view>(values).type(), tdigest_gen{}, op, values, delta);

// NOTE: an empty tdigest column still has 1 row.
auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected);
}
Expand Down Expand Up @@ -562,12 +562,12 @@ template <typename MergeFunc>
void tdigest_merge_empty(MergeFunc merge_op)
{
// 3 empty tdigests all in the same group
auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto a = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto b = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto c = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::vector<column_view> cols;
cols.push_back(*a);
cols.push_back(*b);
Expand All @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op)
auto const delta = 1000;
auto result = merge_op(*values, delta);

auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(
1, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
auto expected = cudf::tdigest::detail::make_empty_tdigest_column(
cudf::get_default_stream(), cudf::get_current_device_resource_ref());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ __device__ void gpuDecodeLevels(page_state_s* s,
{
bool has_repetition = s->col.max_level[level_type::REPETITION] > 0;

constexpr int batch_size = 32;
constexpr int batch_size = cudf::detail::warp_size;
int cur_leaf_count = target_leaf_count;
while (s->error == 0 && s->nz_count < target_leaf_count &&
s->input_value_count < s->num_input_values) {
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ struct SchemaElement {
bool operator==(SchemaElement const& other) const
{
return type == other.type && converted_type == other.converted_type &&
type_length == other.type_length && repetition_type == other.repetition_type &&
name == other.name && num_children == other.num_children &&
decimal_scale == other.decimal_scale && decimal_precision == other.decimal_precision &&
field_id == other.field_id;
type_length == other.type_length && name == other.name &&
num_children == other.num_children && decimal_scale == other.decimal_scale &&
decimal_precision == other.decimal_precision && field_id == other.field_id;
}

// the parquet format is a little squishy when it comes to interpreting
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ struct ColumnChunkDesc {
int32_t src_col_schema_,
column_chunk_info const* chunk_info_,
float list_bytes_per_row_est_,
bool strings_to_categorical_)
bool strings_to_categorical_,
int32_t src_file_idx_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
Expand All @@ -419,7 +420,8 @@ struct ColumnChunkDesc {
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
is_strings_to_cat(strings_to_categorical_),
src_file_idx(src_file_idx_)

{
}
Expand Down Expand Up @@ -456,6 +458,7 @@ struct ColumnChunkDesc {

bool is_strings_to_cat{}; // convert strings to hashes
bool is_large_string_col{}; // `true` if string data uses 64-bit offsets
int32_t src_file_idx{}; // source file index
};

/**
Expand Down
18 changes: 11 additions & 7 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1511,10 +1511,13 @@ void reader::impl::create_global_chunk_info()
std::transform(
_input_columns.begin(), _input_columns.end(), column_mapping.begin(), [&](auto const& col) {
// translate schema_idx into something we can use for the page indexes
if (auto it = std::find_if(
columns.begin(),
columns.end(),
[&col](auto const& col_chunk) { return col_chunk.schema_idx == col.schema_idx; });
if (auto it = std::find_if(columns.begin(),
columns.end(),
[&](auto const& col_chunk) {
return col_chunk.schema_idx ==
_metadata->map_schema_index(col.schema_idx,
rg.source_index);
});
it != columns.end()) {
return std::distance(columns.begin(), it);
}
Expand All @@ -1535,7 +1538,8 @@ void reader::impl::create_global_chunk_info()
auto col = _input_columns[i];
// look up metadata
auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx);
auto& schema = _metadata->get_schema(col.schema_idx);
auto& schema = _metadata->get_schema(
_metadata->map_schema_index(col.schema_idx, rg.source_index), rg.source_index);

auto [clock_rate, logical_type] =
conversion_info(to_type_id(schema, _strings_to_categorical, _options.timestamp_type.id()),
Expand Down Expand Up @@ -1574,9 +1578,9 @@ void reader::impl::create_global_chunk_info()
col.schema_idx,
chunk_info,
list_bytes_per_row_est,
schema.type == BYTE_ARRAY and _strings_to_categorical));
schema.type == BYTE_ARRAY and _strings_to_categorical,
rg.source_index));
}

// Adjust for skip_rows when updating the remaining rows after the first group
remaining_rows -=
(skip_rows) ? std::min<int>(rg.start_row + row_group.num_rows - skip_rows, remaining_rows)
Expand Down
120 changes: 86 additions & 34 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,13 @@ void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_inf
std::vector<column_chunk_info> chunks(rg.columns.size());

for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++) {
auto const& col_chunk = rg.columns[col_idx];
auto& schema = get_schema(col_chunk.schema_idx);
auto const& col_chunk = rg.columns[col_idx];
auto const is_schema_idx_mapped =
is_schema_index_mapped(col_chunk.schema_idx, rg_info.source_index);
auto const mapped_schema_idx = is_schema_idx_mapped
? map_schema_index(col_chunk.schema_idx, rg_info.source_index)
: col_chunk.schema_idx;
auto& schema = get_schema(mapped_schema_idx, is_schema_idx_mapped ? rg_info.source_index : 0);
auto const max_def_level = schema.max_definition_level;
auto const max_rep_level = schema.max_repetition_level;

Expand Down Expand Up @@ -559,22 +564,40 @@ aggregate_reader_metadata::aggregate_reader_metadata(
num_rows(calc_num_rows()),
num_row_groups(calc_num_row_groups())
{
// Validate that all sources have the same schema unless we are reading select columns
// from mismatched sources, in which case, we will only check the projected columns later.
if (per_file_metadata.size() > 1 and not has_cols_from_mismatched_srcs) {
auto const& first_meta = per_file_metadata.front();
if (per_file_metadata.size() > 1) {
auto& first_meta = per_file_metadata.front();
auto const num_cols =
first_meta.row_groups.size() > 0 ? first_meta.row_groups.front().columns.size() : 0;
auto const& schema = first_meta.schema;

// Verify that the input files have matching numbers of columns and schema.
for (auto const& pfm : per_file_metadata) {
if (pfm.row_groups.size() > 0) {
CUDF_EXPECTS(num_cols == pfm.row_groups.front().columns.size(),
"All sources must have the same number of columns");
auto& schema = first_meta.schema;

// Validate that all sources have the same schema unless we are reading select columns
// from mismatched sources, in which case, we will only check the projected columns later.
if (not has_cols_from_mismatched_srcs) {
// Verify that the input files have matching numbers of columns and schema.
for (auto const& pfm : per_file_metadata) {
if (pfm.row_groups.size() > 0) {
CUDF_EXPECTS(num_cols == pfm.row_groups.front().columns.size(),
"All sources must have the same number of columns");
}
CUDF_EXPECTS(schema == pfm.schema, "All sources must have the same schema");
}
CUDF_EXPECTS(schema == pfm.schema, "All sources must have the same schema");
}

// Mark the column schema in the first (default) source as nullable if it is nullable in any of
// the input sources. This avoids recomputing this within build_column() and
// populate_metadata().
std::for_each(
thrust::make_counting_iterator(static_cast<size_t>(1)),
thrust::make_counting_iterator(schema.size()),
[&](auto const schema_idx) {
if (schema[schema_idx].repetition_type == REQUIRED and
std::any_of(
per_file_metadata.begin() + 1, per_file_metadata.end(), [&](auto const& pfm) {
return pfm.schema[schema_idx].repetition_type != REQUIRED;
})) {
schema[schema_idx].repetition_type = OPTIONAL;
}
});
}

// Collect and apply arrow:schema from Parquet's key value metadata section
Expand Down Expand Up @@ -884,15 +907,8 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t
size_type src_idx,
int schema_idx) const
{
// schema_idx_maps will only have > 0 size when we are reading matching column projection from
// mismatched Parquet sources.
if (src_idx and not schema_idx_maps.empty()) {
auto const& schema_idx_map = schema_idx_maps[src_idx - 1];
CUDF_EXPECTS(schema_idx_map.find(schema_idx) != schema_idx_map.end(),
"Unmapped schema index encountered in the specified source tree",
std::range_error);
schema_idx = schema_idx_map.at(schema_idx);
}
// Map schema index to the provided source file index
schema_idx = map_schema_index(schema_idx, src_idx);

auto col =
std::find_if(per_file_metadata[src_idx].row_groups[row_group_index].columns.begin(),
Expand Down Expand Up @@ -924,6 +940,46 @@ aggregate_reader_metadata::get_rowgroup_metadata() const
return rg_metadata;
}

bool aggregate_reader_metadata::is_schema_index_mapped(int schema_idx, int pfm_idx) const
{
// Check if schema_idx or pfm_idx is invalid
CUDF_EXPECTS(
schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast<int>(per_file_metadata.size()),
"Parquet reader encountered an invalid schema_idx or pfm_idx",
std::out_of_range);

// True if root index requested or zeroth file index or schema_idx maps doesn't exist. (i.e.
// schemas are identical).
if (schema_idx == 0 or pfm_idx == 0 or schema_idx_maps.empty()) { return true; }

// Check if mapped
auto const& schema_idx_map = schema_idx_maps[pfm_idx - 1];
return schema_idx_map.find(schema_idx) != schema_idx_map.end();
}

int aggregate_reader_metadata::map_schema_index(int schema_idx, int pfm_idx) const
{
// Check if schema_idx or pfm_idx is invalid
CUDF_EXPECTS(
schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast<int>(per_file_metadata.size()),
"Parquet reader encountered an invalid schema_idx or pfm_idx",
std::out_of_range);

// Check if pfm_idx is zero or root index requested or schema_idx_maps doesn't exist (i.e.
// schemas are identical).
if (schema_idx == 0 or pfm_idx == 0 or schema_idx_maps.empty()) { return schema_idx; }

// schema_idx_maps will only have > 0 size when we are reading matching column projection from
// mismatched Parquet sources.
auto const& schema_idx_map = schema_idx_maps[pfm_idx - 1];
CUDF_EXPECTS(schema_idx_map.find(schema_idx) != schema_idx_map.end(),
"Unmapped schema index encountered in the specified source tree",
std::out_of_range);

// Return the mapped schema idx.
return schema_idx_map.at(schema_idx);
}

std::string aggregate_reader_metadata::get_pandas_index() const
{
// Assumes that all input files have the same metadata
Expand Down Expand Up @@ -1185,8 +1241,8 @@ aggregate_reader_metadata::select_columns(
// Compares two schema elements to be equal except their number of children
auto const equal_to_except_num_children = [](SchemaElement const& lhs, SchemaElement const& rhs) {
return lhs.type == rhs.type and lhs.converted_type == rhs.converted_type and
lhs.type_length == rhs.type_length and lhs.repetition_type == rhs.repetition_type and
lhs.name == rhs.name and lhs.decimal_scale == rhs.decimal_scale and
lhs.type_length == rhs.type_length and lhs.name == rhs.name and
lhs.decimal_scale == rhs.decimal_scale and
lhs.decimal_precision == rhs.decimal_precision and lhs.field_id == rhs.field_id;
};

Expand All @@ -1209,6 +1265,11 @@ aggregate_reader_metadata::select_columns(
"the selected path",
std::invalid_argument);

// Get the schema_idx_map for this data source (pfm)
auto& schema_idx_map = schema_idx_maps[pfm_idx - 1];
// Map the schema index from 0th tree (src) to the one in the current (dst) tree.
schema_idx_map[src_schema_idx] = dst_schema_idx;

// If src_schema_elem is a stub, it does not exist in the column_name_info and column_buffer
// hierarchy. So continue on with mapping.
if (src_schema_elem.is_stub()) {
Expand Down Expand Up @@ -1262,15 +1323,6 @@ aggregate_reader_metadata::select_columns(
pfm_idx);
});
}

// We're at a leaf and this is an input column (one with actual data stored) so map it.
if (src_schema_elem.num_children == 0) {
// Get the schema_idx_map for this data source (pfm)
auto& schema_idx_map = schema_idx_maps[pfm_idx - 1];

// Map the schema index from 0th tree (src) to the one in the current (dst) tree.
schema_idx_map[src_schema_idx] = dst_schema_idx;
}
};

std::vector<int> output_column_schemas;
Expand Down
27 changes: 25 additions & 2 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,26 @@ class aggregate_reader_metadata {

[[nodiscard]] auto get_num_row_groups() const { return num_row_groups; }

/**
* @brief Checks if a schema index from 0th source is mapped to the specified file index
*
* @param schema_idx The index of the SchemaElement in the zeroth file.
* @param pfm_idx The index of the file (per_file_metadata) to check mappings for.
*
* @return True if schema index is mapped
*/
[[nodiscard]] bool is_schema_index_mapped(int schema_idx, int pfm_idx) const;

/**
* @brief Maps schema index from 0th source file to the specified file index
*
* @param schema_idx The index of the SchemaElement in the zeroth file.
* @param pfm_idx The index of the file (per_file_metadata) to map the schema_idx to.
*
* @return Mapped schema index
*/
[[nodiscard]] int map_schema_index(int schema_idx, int pfm_idx) const;

/**
* @brief Extracts the schema_idx'th SchemaElement from the pfm_idx'th file
*
Expand All @@ -248,15 +268,18 @@ class aggregate_reader_metadata {
CUDF_EXPECTS(
schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast<int>(per_file_metadata.size()),
"Parquet reader encountered an invalid schema_idx or pfm_idx",
std::invalid_argument);
std::out_of_range);
return per_file_metadata[pfm_idx].schema[schema_idx];
}

[[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; }
[[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); }

/**
* @brief Gets the concrete nesting depth of output cudf columns
* @brief Gets the concrete nesting depth of output cudf columns.
*
* Gets the nesting depth of the output cudf column for the given schema.
* The nesting depth must be equal for the given schema_index across all sources.
*
* @param schema_index Schema index of the input column
*
Expand Down
Loading

0 comments on commit e80154d

Please sign in to comment.