diff --git a/README.md b/README.md index f62f7885d63..8f8c2adac2f 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12 ### Conda -cuDF can be installed with conda (via [miniconda](https://docs.conda.io/projects/miniconda/en/latest/) or the full [Anaconda distribution](https://www.anaconda.com/download) from the `rapidsai` channel: +cuDF can be installed with conda (via [miniforge](https://github.com/conda-forge/miniforge)) from the `rapidsai` channel: ```bash conda install -c rapidsai -c conda-forge -c nvidia \ diff --git a/cpp/include/cudf/detail/tdigest/tdigest.hpp b/cpp/include/cudf/detail/tdigest/tdigest.hpp index 672b95e2d01..80a4460023f 100644 --- a/cpp/include/cudf/detail/tdigest/tdigest.hpp +++ b/cpp/include/cudf/detail/tdigest/tdigest.hpp @@ -143,29 +143,28 @@ std::unique_ptr make_tdigest_column(size_type num_rows, rmm::device_async_resource_ref mr); /** - * @brief Create a tdigest column of empty clusters. + * @brief Create an empty tdigest column. * - * The column created contains the specified number of rows of empty clusters. + * An empty tdigest column contains a single row of length 0 * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns A tdigest column of empty clusters. + * @returns An empty tdigest column. */ CUDF_EXPORT -std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); +std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Create a scalar of an empty tdigest cluster. + * @brief Create an empty tdigest scalar. * - * The returned scalar is a struct_scalar that contains a single row of an empty cluster. + * An empty tdigest scalar is a struct_scalar that contains a single row of length 0 * * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * - * @returns A scalar of an empty tdigest cluster. + * @returns An empty tdigest scalar. */ std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/include/cudf_test/tdigest_utilities.cuh b/cpp/include/cudf_test/tdigest_utilities.cuh index be7d19b2227..1758790cd64 100644 --- a/cpp/include/cudf_test/tdigest_utilities.cuh +++ b/cpp/include/cudf_test/tdigest_utilities.cuh @@ -270,8 +270,8 @@ void tdigest_simple_all_nulls_aggregation(Func op) static_cast(values).type(), tdigest_gen{}, op, values, delta); // NOTE: an empty tdigest column still has 1 row. - auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*result, *expected); } @@ -562,12 +562,12 @@ template void tdigest_merge_empty(MergeFunc merge_op) { // 3 empty tdigests all in the same group - auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto b = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto b = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); cols.push_back(*b); @@ -577,8 +577,8 @@ void tdigest_merge_empty(MergeFunc merge_op) auto const delta = 1000; auto result = merge_op(*values, delta); - auto expected = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto expected = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result); } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index a3f91f6859b..9ed2929a70e 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -893,7 +893,7 @@ __device__ void gpuDecodeLevels(page_state_s* s, { bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; - constexpr int batch_size = 32; + constexpr int batch_size = cudf::detail::warp_size; int cur_leaf_count = target_leaf_count; while (s->error == 0 && s->nz_count < target_leaf_count && s->input_value_count < s->num_input_values) { diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 5d10472b0ae..7c985643887 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -203,10 +203,9 @@ struct SchemaElement { bool operator==(SchemaElement const& other) const { return type == other.type && converted_type == other.converted_type && - type_length == other.type_length && repetition_type == other.repetition_type && - name == other.name && num_children == other.num_children && - decimal_scale == other.decimal_scale && decimal_precision == other.decimal_precision && - field_id == other.field_id; + type_length == other.type_length && name == other.name && + num_children == other.num_children && decimal_scale == other.decimal_scale && + decimal_precision == other.decimal_precision && field_id == other.field_id; } // the parquet format is a little squishy when it comes to interpreting diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 125d35f6499..1390339c1ae 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -400,7 +400,8 @@ struct ColumnChunkDesc { int32_t src_col_schema_, column_chunk_info const* chunk_info_, float list_bytes_per_row_est_, - bool strings_to_categorical_) + bool strings_to_categorical_, + int32_t src_file_idx_) : compressed_data(compressed_data_), compressed_size(compressed_size_), num_values(num_values_), @@ -419,7 +420,8 @@ struct ColumnChunkDesc { src_col_schema(src_col_schema_), h_chunk_info(chunk_info_), list_bytes_per_row_est(list_bytes_per_row_est_), - is_strings_to_cat(strings_to_categorical_) + is_strings_to_cat(strings_to_categorical_), + src_file_idx(src_file_idx_) { } @@ -456,6 +458,7 @@ struct ColumnChunkDesc { bool is_strings_to_cat{}; // convert strings to hashes bool is_large_string_col{}; // `true` if string data uses 64-bit offsets + int32_t src_file_idx{}; // source file index }; /** diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 245e1829c72..c588fedb85c 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1511,10 +1511,13 @@ void reader::impl::create_global_chunk_info() std::transform( _input_columns.begin(), _input_columns.end(), column_mapping.begin(), [&](auto const& col) { // translate schema_idx into something we can use for the page indexes - if (auto it = std::find_if( - columns.begin(), - columns.end(), - [&col](auto const& col_chunk) { return col_chunk.schema_idx == col.schema_idx; }); + if (auto it = std::find_if(columns.begin(), + columns.end(), + [&](auto const& col_chunk) { + return col_chunk.schema_idx == + _metadata->map_schema_index(col.schema_idx, + rg.source_index); + }); it != columns.end()) { return std::distance(columns.begin(), it); } @@ -1535,7 +1538,8 @@ void reader::impl::create_global_chunk_info() auto col = _input_columns[i]; // look up metadata auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); - auto& schema = _metadata->get_schema(col.schema_idx); + auto& schema = _metadata->get_schema( + _metadata->map_schema_index(col.schema_idx, rg.source_index), rg.source_index); auto [clock_rate, logical_type] = conversion_info(to_type_id(schema, _strings_to_categorical, _options.timestamp_type.id()), @@ -1574,9 +1578,9 @@ void reader::impl::create_global_chunk_info() col.schema_idx, chunk_info, list_bytes_per_row_est, - schema.type == BYTE_ARRAY and _strings_to_categorical)); + schema.type == BYTE_ARRAY and _strings_to_categorical, + rg.source_index)); } - // Adjust for skip_rows when updating the remaining rows after the first group remaining_rows -= (skip_rows) ? std::min(rg.start_row + row_group.num_rows - skip_rows, remaining_rows) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 8b5678f202b..6d566b5815e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -423,8 +423,13 @@ void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_inf std::vector chunks(rg.columns.size()); for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++) { - auto const& col_chunk = rg.columns[col_idx]; - auto& schema = get_schema(col_chunk.schema_idx); + auto const& col_chunk = rg.columns[col_idx]; + auto const is_schema_idx_mapped = + is_schema_index_mapped(col_chunk.schema_idx, rg_info.source_index); + auto const mapped_schema_idx = is_schema_idx_mapped + ? map_schema_index(col_chunk.schema_idx, rg_info.source_index) + : col_chunk.schema_idx; + auto& schema = get_schema(mapped_schema_idx, is_schema_idx_mapped ? rg_info.source_index : 0); auto const max_def_level = schema.max_definition_level; auto const max_rep_level = schema.max_repetition_level; @@ -559,22 +564,40 @@ aggregate_reader_metadata::aggregate_reader_metadata( num_rows(calc_num_rows()), num_row_groups(calc_num_row_groups()) { - // Validate that all sources have the same schema unless we are reading select columns - // from mismatched sources, in which case, we will only check the projected columns later. - if (per_file_metadata.size() > 1 and not has_cols_from_mismatched_srcs) { - auto const& first_meta = per_file_metadata.front(); + if (per_file_metadata.size() > 1) { + auto& first_meta = per_file_metadata.front(); auto const num_cols = first_meta.row_groups.size() > 0 ? first_meta.row_groups.front().columns.size() : 0; - auto const& schema = first_meta.schema; - - // Verify that the input files have matching numbers of columns and schema. - for (auto const& pfm : per_file_metadata) { - if (pfm.row_groups.size() > 0) { - CUDF_EXPECTS(num_cols == pfm.row_groups.front().columns.size(), - "All sources must have the same number of columns"); + auto& schema = first_meta.schema; + + // Validate that all sources have the same schema unless we are reading select columns + // from mismatched sources, in which case, we will only check the projected columns later. + if (not has_cols_from_mismatched_srcs) { + // Verify that the input files have matching numbers of columns and schema. + for (auto const& pfm : per_file_metadata) { + if (pfm.row_groups.size() > 0) { + CUDF_EXPECTS(num_cols == pfm.row_groups.front().columns.size(), + "All sources must have the same number of columns"); + } + CUDF_EXPECTS(schema == pfm.schema, "All sources must have the same schema"); } - CUDF_EXPECTS(schema == pfm.schema, "All sources must have the same schema"); } + + // Mark the column schema in the first (default) source as nullable if it is nullable in any of + // the input sources. This avoids recomputing this within build_column() and + // populate_metadata(). + std::for_each( + thrust::make_counting_iterator(static_cast(1)), + thrust::make_counting_iterator(schema.size()), + [&](auto const schema_idx) { + if (schema[schema_idx].repetition_type == REQUIRED and + std::any_of( + per_file_metadata.begin() + 1, per_file_metadata.end(), [&](auto const& pfm) { + return pfm.schema[schema_idx].repetition_type != REQUIRED; + })) { + schema[schema_idx].repetition_type = OPTIONAL; + } + }); } // Collect and apply arrow:schema from Parquet's key value metadata section @@ -884,15 +907,8 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t size_type src_idx, int schema_idx) const { - // schema_idx_maps will only have > 0 size when we are reading matching column projection from - // mismatched Parquet sources. - if (src_idx and not schema_idx_maps.empty()) { - auto const& schema_idx_map = schema_idx_maps[src_idx - 1]; - CUDF_EXPECTS(schema_idx_map.find(schema_idx) != schema_idx_map.end(), - "Unmapped schema index encountered in the specified source tree", - std::range_error); - schema_idx = schema_idx_map.at(schema_idx); - } + // Map schema index to the provided source file index + schema_idx = map_schema_index(schema_idx, src_idx); auto col = std::find_if(per_file_metadata[src_idx].row_groups[row_group_index].columns.begin(), @@ -924,6 +940,46 @@ aggregate_reader_metadata::get_rowgroup_metadata() const return rg_metadata; } +bool aggregate_reader_metadata::is_schema_index_mapped(int schema_idx, int pfm_idx) const +{ + // Check if schema_idx or pfm_idx is invalid + CUDF_EXPECTS( + schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast(per_file_metadata.size()), + "Parquet reader encountered an invalid schema_idx or pfm_idx", + std::out_of_range); + + // True if root index requested or zeroth file index or schema_idx maps doesn't exist. (i.e. + // schemas are identical). + if (schema_idx == 0 or pfm_idx == 0 or schema_idx_maps.empty()) { return true; } + + // Check if mapped + auto const& schema_idx_map = schema_idx_maps[pfm_idx - 1]; + return schema_idx_map.find(schema_idx) != schema_idx_map.end(); +} + +int aggregate_reader_metadata::map_schema_index(int schema_idx, int pfm_idx) const +{ + // Check if schema_idx or pfm_idx is invalid + CUDF_EXPECTS( + schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast(per_file_metadata.size()), + "Parquet reader encountered an invalid schema_idx or pfm_idx", + std::out_of_range); + + // Check if pfm_idx is zero or root index requested or schema_idx_maps doesn't exist (i.e. + // schemas are identical). + if (schema_idx == 0 or pfm_idx == 0 or schema_idx_maps.empty()) { return schema_idx; } + + // schema_idx_maps will only have > 0 size when we are reading matching column projection from + // mismatched Parquet sources. + auto const& schema_idx_map = schema_idx_maps[pfm_idx - 1]; + CUDF_EXPECTS(schema_idx_map.find(schema_idx) != schema_idx_map.end(), + "Unmapped schema index encountered in the specified source tree", + std::out_of_range); + + // Return the mapped schema idx. + return schema_idx_map.at(schema_idx); +} + std::string aggregate_reader_metadata::get_pandas_index() const { // Assumes that all input files have the same metadata @@ -1185,8 +1241,8 @@ aggregate_reader_metadata::select_columns( // Compares two schema elements to be equal except their number of children auto const equal_to_except_num_children = [](SchemaElement const& lhs, SchemaElement const& rhs) { return lhs.type == rhs.type and lhs.converted_type == rhs.converted_type and - lhs.type_length == rhs.type_length and lhs.repetition_type == rhs.repetition_type and - lhs.name == rhs.name and lhs.decimal_scale == rhs.decimal_scale and + lhs.type_length == rhs.type_length and lhs.name == rhs.name and + lhs.decimal_scale == rhs.decimal_scale and lhs.decimal_precision == rhs.decimal_precision and lhs.field_id == rhs.field_id; }; @@ -1209,6 +1265,11 @@ aggregate_reader_metadata::select_columns( "the selected path", std::invalid_argument); + // Get the schema_idx_map for this data source (pfm) + auto& schema_idx_map = schema_idx_maps[pfm_idx - 1]; + // Map the schema index from 0th tree (src) to the one in the current (dst) tree. + schema_idx_map[src_schema_idx] = dst_schema_idx; + // If src_schema_elem is a stub, it does not exist in the column_name_info and column_buffer // hierarchy. So continue on with mapping. if (src_schema_elem.is_stub()) { @@ -1262,15 +1323,6 @@ aggregate_reader_metadata::select_columns( pfm_idx); }); } - - // We're at a leaf and this is an input column (one with actual data stored) so map it. - if (src_schema_elem.num_children == 0) { - // Get the schema_idx_map for this data source (pfm) - auto& schema_idx_map = schema_idx_maps[pfm_idx - 1]; - - // Map the schema index from 0th tree (src) to the one in the current (dst) tree. - schema_idx_map[src_schema_idx] = dst_schema_idx; - } }; std::vector output_column_schemas; diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 6f2863136b2..6487c92f48f 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -234,6 +234,26 @@ class aggregate_reader_metadata { [[nodiscard]] auto get_num_row_groups() const { return num_row_groups; } + /** + * @brief Checks if a schema index from 0th source is mapped to the specified file index + * + * @param schema_idx The index of the SchemaElement in the zeroth file. + * @param pfm_idx The index of the file (per_file_metadata) to check mappings for. + * + * @return True if schema index is mapped + */ + [[nodiscard]] bool is_schema_index_mapped(int schema_idx, int pfm_idx) const; + + /** + * @brief Maps schema index from 0th source file to the specified file index + * + * @param schema_idx The index of the SchemaElement in the zeroth file. + * @param pfm_idx The index of the file (per_file_metadata) to map the schema_idx to. + * + * @return Mapped schema index + */ + [[nodiscard]] int map_schema_index(int schema_idx, int pfm_idx) const; + /** * @brief Extracts the schema_idx'th SchemaElement from the pfm_idx'th file * @@ -248,7 +268,7 @@ class aggregate_reader_metadata { CUDF_EXPECTS( schema_idx >= 0 and pfm_idx >= 0 and pfm_idx < static_cast(per_file_metadata.size()), "Parquet reader encountered an invalid schema_idx or pfm_idx", - std::invalid_argument); + std::out_of_range); return per_file_metadata[pfm_idx].schema[schema_idx]; } @@ -256,7 +276,10 @@ class aggregate_reader_metadata { [[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); } /** - * @brief Gets the concrete nesting depth of output cudf columns + * @brief Gets the concrete nesting depth of output cudf columns. + * + * Gets the nesting depth of the output cudf column for the given schema. + * The nesting depth must be equal for the given schema_index across all sources. * * @param schema_index Schema index of the input column * diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 52918f5bc80..8e67f233213 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -79,23 +79,30 @@ void print_pages(cudf::detail::hostdevice_vector& pages, rmm::cuda_str * is indicated when adding new values. This function generates the mappings of * the R/D levels to those start/end bounds * - * @param remap Maps column schema index to the R/D remapping vectors for that column - * @param src_col_schema The column schema to generate the new mapping for + * @param remap Maps column schema index to the R/D remapping vectors for that column for a + * particular input source file + * @param src_col_schema The source column schema to generate the new mapping for + * @param mapped_src_col_schema Mapped column schema for src_file_idx'th file + * @param src_file_idx The input source file index for the column schema * @param md File metadata information */ -void generate_depth_remappings(std::map, std::vector>>& remap, - int src_col_schema, - aggregate_reader_metadata const& md) +void generate_depth_remappings( + std::map, std::pair, std::vector>>& remap, + int const src_col_schema, + int const mapped_src_col_schema, + int const src_file_idx, + aggregate_reader_metadata const& md) { // already generated for this level - if (remap.find(src_col_schema) != remap.end()) { return; } - auto schema = md.get_schema(src_col_schema); - int max_depth = md.get_output_nesting_depth(src_col_schema); + if (remap.find({src_col_schema, src_file_idx}) != remap.end()) { return; } + auto const& schema = md.get_schema(mapped_src_col_schema, src_file_idx); + auto const max_depth = md.get_output_nesting_depth(src_col_schema); - CUDF_EXPECTS(remap.find(src_col_schema) == remap.end(), + CUDF_EXPECTS(remap.find({src_col_schema, src_file_idx}) == remap.end(), "Attempting to remap a schema more than once"); auto inserted = - remap.insert(std::pair, std::vector>>{src_col_schema, {}}); + remap.insert(std::pair, std::pair, std::vector>>{ + {src_col_schema, src_file_idx}, {}}); auto& depth_remap = inserted.first->second; std::vector& rep_depth_remap = (depth_remap.first); @@ -136,15 +143,15 @@ void generate_depth_remappings(std::map, std::ve auto find_shallowest = [&](int r) { int shallowest = -1; int cur_depth = max_depth - 1; - int schema_idx = src_col_schema; + int schema_idx = mapped_src_col_schema; while (schema_idx > 0) { - auto cur_schema = md.get_schema(schema_idx); + auto& cur_schema = md.get_schema(schema_idx, src_file_idx); if (cur_schema.max_repetition_level == r) { // if this is a repeated field, map it one level deeper shallowest = cur_schema.is_stub() ? cur_depth + 1 : cur_depth; } // if it's one-level encoding list - else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx))) { + else if (cur_schema.is_one_level_list(md.get_schema(cur_schema.parent_idx, src_file_idx))) { shallowest = cur_depth - 1; } if (!cur_schema.is_stub()) { cur_depth--; } @@ -159,10 +166,10 @@ void generate_depth_remappings(std::map, std::ve for (int s_idx = schema.max_definition_level; s_idx >= 0; s_idx--) { auto find_deepest = [&](int d) { SchemaElement prev_schema; - int schema_idx = src_col_schema; + int schema_idx = mapped_src_col_schema; int r1 = 0; while (schema_idx > 0) { - SchemaElement cur_schema = md.get_schema(schema_idx); + SchemaElement cur_schema = md.get_schema(schema_idx, src_file_idx); if (cur_schema.max_definition_level == d) { // if this is a repeated field, map it one level deeper r1 = cur_schema.is_stub() ? prev_schema.max_repetition_level @@ -175,10 +182,10 @@ void generate_depth_remappings(std::map, std::ve // we now know R1 from above. return the deepest nesting level that has the // same repetition level - schema_idx = src_col_schema; + schema_idx = mapped_src_col_schema; int depth = max_depth - 1; while (schema_idx > 0) { - SchemaElement cur_schema = md.get_schema(schema_idx); + SchemaElement cur_schema = md.get_schema(schema_idx, src_file_idx); if (cur_schema.max_repetition_level == r1) { // if this is a repeated field, map it one level deeper depth = cur_schema.is_stub() ? depth + 1 : depth; @@ -783,9 +790,20 @@ void reader::impl::allocate_nesting_info() std::vector per_page_nesting_info_size(num_columns); auto iter = thrust::make_counting_iterator(size_type{0}); std::transform(iter, iter + num_columns, per_page_nesting_info_size.begin(), [&](size_type i) { + // Schema index of the current input column auto const schema_idx = _input_columns[i].schema_idx; - auto const& schema = _metadata->get_schema(schema_idx); - return max(schema.max_definition_level + 1, _metadata->get_output_nesting_depth(schema_idx)); + // Get the max_definition_level of this column across all sources. + auto max_definition_level = _metadata->get_schema(schema_idx).max_definition_level + 1; + std::for_each(thrust::make_counting_iterator(static_cast(1)), + thrust::make_counting_iterator(_sources.size()), + [&](auto const src_file_idx) { + auto const& schema = _metadata->get_schema( + _metadata->map_schema_index(schema_idx, src_file_idx), src_file_idx); + max_definition_level = + std::max(max_definition_level, schema.max_definition_level + 1); + }); + + return std::max(max_definition_level, _metadata->get_output_nesting_depth(schema_idx)); }); // compute total # of page_nesting infos needed and allocate space. doing this in one @@ -813,6 +831,8 @@ void reader::impl::allocate_nesting_info() page_nesting_decode_info.device_ptr() + src_info_index; pages[target_page_index + p_idx].nesting_info_size = per_page_nesting_info_size[idx]; + // Set the number of output nesting levels from the zeroth source as nesting must be + // identical across sources. pages[target_page_index + p_idx].num_output_nesting_levels = _metadata->get_output_nesting_depth(src_col_schema); @@ -821,25 +841,36 @@ void reader::impl::allocate_nesting_info() target_page_index += subpass.column_page_count[idx]; } + // Reset the target_page_index + target_page_index = 0; + // fill in int nesting_info_index = 0; - std::map, std::vector>> depth_remapping; for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const src_col_schema = _input_columns[idx].schema_idx; - // schema of the input column - auto& schema = _metadata->get_schema(src_col_schema); // real depth of the output cudf column hierarchy (1 == no nesting, 2 == 1 level, etc) + // nesting depth must be same across sources so getting it from the zeroth source is ok int const max_output_depth = _metadata->get_output_nesting_depth(src_col_schema); + // Map to store depths if this column has lists + std::map, std::pair, std::vector>> depth_remapping; // if this column has lists, generate depth remapping - std::map, std::vector>> depth_remapping; - if (schema.max_repetition_level > 0) { - generate_depth_remappings(depth_remapping, src_col_schema, *_metadata); - } + std::for_each( + thrust::make_counting_iterator(static_cast(0)), + thrust::make_counting_iterator(_sources.size()), + [&](auto const src_file_idx) { + auto const mapped_schema_idx = _metadata->map_schema_index(src_col_schema, src_file_idx); + if (_metadata->get_schema(mapped_schema_idx, src_file_idx).max_repetition_level > 0) { + generate_depth_remappings( + depth_remapping, src_col_schema, mapped_schema_idx, src_file_idx, *_metadata); + } + }); // fill in host-side nesting info - int schema_idx = src_col_schema; + int schema_idx = src_col_schema; + // This is okay as we only use this to check stubness of cur_schema and + // to get its parent's indices, both of which are one to one mapped. auto cur_schema = _metadata->get_schema(schema_idx); int cur_depth = max_output_depth - 1; while (schema_idx > 0) { @@ -848,6 +879,9 @@ void reader::impl::allocate_nesting_info() if (!cur_schema.is_stub()) { // initialize each page within the chunk for (size_t p_idx = 0; p_idx < subpass.column_page_count[idx]; p_idx++) { + // Source file index for the current page. + auto const src_file_idx = + pass.chunks[pages[target_page_index + p_idx].chunk_idx].src_file_idx; PageNestingInfo* pni = &page_nesting_info[nesting_info_index + (p_idx * per_page_nesting_info_size[idx])]; @@ -855,9 +889,11 @@ void reader::impl::allocate_nesting_info() &page_nesting_decode_info[nesting_info_index + (p_idx * per_page_nesting_info_size[idx])]; + auto const mapped_src_col_schema = + _metadata->map_schema_index(src_col_schema, src_file_idx); // if we have lists, set our start and end depth remappings - if (schema.max_repetition_level > 0) { - auto remap = depth_remapping.find(src_col_schema); + if (_metadata->get_schema(mapped_src_col_schema, src_file_idx).max_repetition_level > 0) { + auto remap = depth_remapping.find({src_col_schema, src_file_idx}); CUDF_EXPECTS(remap != depth_remapping.end(), "Could not find depth remapping for schema"); std::vector const& rep_depth_remap = (remap->second.first); @@ -871,11 +907,15 @@ void reader::impl::allocate_nesting_info() } } + // Get the schema from the current input source. + auto& actual_cur_schema = _metadata->get_schema( + _metadata->map_schema_index(schema_idx, src_file_idx), src_file_idx); + // values indexed by output column index - nesting_info[cur_depth].max_def_level = cur_schema.max_definition_level; + nesting_info[cur_depth].max_def_level = actual_cur_schema.max_definition_level; pni[cur_depth].size = 0; pni[cur_depth].type = - to_type_id(cur_schema, _strings_to_categorical, _options.timestamp_type.id()); + to_type_id(actual_cur_schema, _strings_to_categorical, _options.timestamp_type.id()); pni[cur_depth].nullable = cur_schema.repetition_type == OPTIONAL; } @@ -888,6 +928,8 @@ void reader::impl::allocate_nesting_info() cur_schema = _metadata->get_schema(schema_idx); } + // Offset the page and nesting info indices + target_page_index += subpass.column_page_count[idx]; nesting_info_index += (per_page_nesting_info_size[idx] * subpass.column_page_count[idx]); } diff --git a/cpp/src/quantiles/tdigest/tdigest.cu b/cpp/src/quantiles/tdigest/tdigest.cu index 76cd55bf994..0d017cf1f13 100644 --- a/cpp/src/quantiles/tdigest/tdigest.cu +++ b/cpp/src/quantiles/tdigest/tdigest.cu @@ -292,33 +292,32 @@ std::unique_ptr make_tdigest_column(size_type num_rows, return make_structs_column(num_rows, std::move(children), 0, {}, stream, mr); } -std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::unique_ptr make_empty_tdigest_column(rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto offsets = cudf::make_fixed_width_column( - data_type(type_id::INT32), num_rows + 1, mask_state::UNALLOCATED, stream, mr); + data_type(type_id::INT32), 2, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), offsets->mutable_view().begin(), offsets->mutable_view().end(), 0); - auto min_col = cudf::make_numeric_column( - data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); + auto min_col = + cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), min_col->mutable_view().begin(), min_col->mutable_view().end(), 0); - auto max_col = cudf::make_numeric_column( - data_type(type_id::FLOAT64), num_rows, mask_state::UNALLOCATED, stream, mr); + auto max_col = + cudf::make_numeric_column(data_type(type_id::FLOAT64), 1, mask_state::UNALLOCATED, stream, mr); thrust::fill(rmm::exec_policy(stream), max_col->mutable_view().begin(), max_col->mutable_view().end(), 0); - return make_tdigest_column(num_rows, - cudf::make_empty_column(type_id::FLOAT64), - cudf::make_empty_column(type_id::FLOAT64), + return make_tdigest_column(1, + make_empty_column(type_id::FLOAT64), + make_empty_column(type_id::FLOAT64), std::move(offsets), std::move(min_col), std::move(max_col), @@ -339,7 +338,7 @@ std::unique_ptr make_tdigest_column_of_empty_clusters(size_type num_rows std::unique_ptr make_empty_tdigest_scalar(rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto contents = make_tdigest_column_of_empty_clusters(1, stream, mr)->release(); + auto contents = make_empty_tdigest_column(stream, mr)->release(); return std::make_unique( std::move(*std::make_unique(std::move(contents.children))), true, stream, mr); } diff --git a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu index d591fb5c171..2dd25a7b890 100644 --- a/cpp/src/quantiles/tdigest/tdigest_aggregation.cu +++ b/cpp/src/quantiles/tdigest/tdigest_aggregation.cu @@ -366,8 +366,8 @@ std::unique_ptr to_tdigest_scalar(std::unique_ptr&& tdigest, * @param group_cluster_wl Output. The set of cluster weight limits for each group. * @param group_num_clusters Output. The number of output clusters for each input group. * @param group_cluster_offsets Offsets per-group to the start of it's clusters - * @param may_have_empty_clusters Whether or not there could be empty clusters. Must only be - * set to false when there is no empty cluster, true otherwise. + * @param has_nulls Whether or not the input contains nulls + * */ template @@ -379,7 +379,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, double* group_cluster_wl, size_type* group_num_clusters, size_type const* group_cluster_offsets, - bool may_have_empty_clusters) + bool has_nulls) { int const tid = threadIdx.x + blockIdx.x * blockDim.x; @@ -399,12 +399,11 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, // a group with nothing in it. group_num_clusters[group_index] = 0; if (total_weight <= 0) { - // If the input contains empty clusters, we can potentially have a group that also generates - // empty clusters because -all- of the input values are null or empty cluster. In that case, the - // `reduce_by_key` call in the tdigest generation step will need a location to store the unused - // reduction value for that group of nulls and empty clusters. These "stubs" will be - // postprocessed out afterwards. - if (may_have_empty_clusters) { group_num_clusters[group_index] = 1; } + // if the input contains nulls we can potentially have a group that generates no + // clusters because -all- of the input values are null. in that case, the reduce_by_key call + // in the tdigest generation step will need a location to store the unused reduction value for + // that group of nulls. these "stubs" will be postprocessed out afterwards. + if (has_nulls) { group_num_clusters[group_index] = 1; } return; } @@ -503,8 +502,7 @@ CUDF_KERNEL void generate_cluster_limits_kernel(int delta, * stream that falls before our current cluster limit * @param group_info A functor which returns the info for the specified group (total weight, * size and start offset) - * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be - * set to false only when there is no empty cluster. + * @param has_nulls Whether or not the input data contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -518,7 +516,7 @@ generate_group_cluster_info(int delta, NearestWeight nearest_weight, GroupInfo group_info, CumulativeWeight cumulative_weight, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -537,7 +535,7 @@ generate_group_cluster_info(int delta, nullptr, group_num_clusters.begin(), nullptr, - may_have_empty_clusters); + has_nulls); // generate group cluster offsets (where the clusters for a given group start and end) auto group_cluster_offsets = cudf::make_numeric_column( @@ -569,7 +567,7 @@ generate_group_cluster_info(int delta, group_cluster_wl.begin(), group_num_clusters.begin(), group_cluster_offsets->view().begin(), - may_have_empty_clusters); + has_nulls); return {std::move(group_cluster_wl), std::move(group_cluster_offsets), @@ -582,7 +580,7 @@ std::unique_ptr build_output_column(size_type num_rows, std::unique_ptr&& offsets, std::unique_ptr&& min_col, std::unique_ptr&& max_col, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -597,7 +595,7 @@ std::unique_ptr build_output_column(size_type num_rows, size_type i) { return is_stub_weight(offsets[i]) ? 1 : 0; }; size_type const num_stubs = [&]() { - if (!may_have_empty_clusters) { return 0; } + if (!has_nulls) { return 0; } auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type(is_stub_digest)); return thrust::reduce(rmm::exec_policy(stream), iter, iter + num_rows); @@ -663,10 +661,6 @@ std::unique_ptr build_output_column(size_type num_rows, mr); } -/** - * @brief A functor which returns the cluster index within a group that the value at - * the given value index falls into. - */ template struct compute_tdigests_keys_fn { int const delta; @@ -712,8 +706,8 @@ struct compute_tdigests_keys_fn { * boundaries. * * @param delta tdigest compression level - * @param centroids_begin Beginning of the range of centroids. - * @param centroids_end End of the range of centroids. + * @param values_begin Beginning of the range of input values. + * @param values_end End of the range of input values. * @param cumulative_weight Functor which returns cumulative weight and group information for * an absolute input value index. * @param min_col Column containing the minimum value per group. @@ -721,8 +715,7 @@ struct compute_tdigests_keys_fn { * @param group_cluster_wl Cluster weight limits for each group. * @param group_cluster_offsets R-value reference of offsets into the cluster weight limits. * @param total_clusters Total number of clusters in all groups. - * @param may_have_empty_clusters Whether or not there could be empty clusters. It should be - * set to false only when there is no empty cluster. + * @param has_nulls Whether or not the input contains nulls * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory * @@ -738,7 +731,7 @@ std::unique_ptr compute_tdigests(int delta, rmm::device_uvector const& group_cluster_wl, std::unique_ptr&& group_cluster_offsets, size_type total_clusters, - bool may_have_empty_clusters, + bool has_nulls, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -757,9 +750,7 @@ std::unique_ptr compute_tdigests(int delta, // double // max // } // - if (total_clusters == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); - } + if (total_clusters == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } // each input group represents an individual tdigest. within each tdigest, we want the keys // to represent cluster indices (for example, if a tdigest had 100 clusters, the keys should fall @@ -802,7 +793,7 @@ std::unique_ptr compute_tdigests(int delta, std::move(group_cluster_offsets), std::move(min_col), std::move(max_col), - may_have_empty_clusters, + has_nulls, stream, mr); } @@ -1154,13 +1145,8 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto merged = cudf::detail::concatenate(tdigest_views, stream, cudf::get_current_device_resource_ref()); - auto merged_weights = merged->get_column(1).view(); - // If there are no values, we can simply return a column that has only empty tdigests. - if (merged_weights.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(num_groups, stream, mr); - } - // generate cumulative weights + auto merged_weights = merged->get_column(1).view(); auto cumulative_weights = cudf::make_numeric_column( data_type{type_id::FLOAT64}, merged_weights.size(), mask_state::UNALLOCATED, stream); auto keys = cudf::detail::make_counting_transform_iterator( @@ -1175,10 +1161,6 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, auto const delta = max_centroids; - // We do not know whether there is any empty cluster in the input without actually reading the - // data, which could be expensive. So, we just assume that there could be empty clusters. - auto const may_have_empty_clusters = true; - // generate cluster info auto [group_cluster_wl, group_cluster_offsets, total_clusters] = generate_group_cluster_info( delta, @@ -1195,7 +1177,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_labels, group_offsets, {tdigest_offsets.begin(), static_cast(tdigest_offsets.size())}}, - may_have_empty_clusters, + false, stream, mr); @@ -1220,7 +1202,7 @@ std::unique_ptr merge_tdigests(tdigest_column_view const& tdv, group_cluster_wl, std::move(group_cluster_offsets), total_clusters, - may_have_empty_clusters, + false, stream, mr); } @@ -1285,9 +1267,7 @@ std::unique_ptr group_tdigest(column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (col.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); - } + if (col.size() == 0) { return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } auto const delta = max_centroids; return cudf::type_dispatcher(col.type(), @@ -1313,7 +1293,7 @@ std::unique_ptr group_merge_tdigest(column_view const& input, tdigest_column_view tdv(input); if (num_groups == 0 || input.size() == 0) { - return cudf::tdigest::detail::make_tdigest_column_of_empty_clusters(1, stream, mr); + return cudf::tdigest::detail::make_empty_tdigest_column(stream, mr); } // bring group offsets back to the host diff --git a/cpp/tests/groupby/tdigest_tests.cu b/cpp/tests/groupby/tdigest_tests.cu index 3780dbb1d95..baa59026b07 100644 --- a/cpp/tests/groupby/tdigest_tests.cu +++ b/cpp/tests/groupby/tdigest_tests.cu @@ -469,16 +469,16 @@ TEST_F(TDigestMergeTest, EmptyGroups) cudf::test::fixed_width_column_wrapper keys{0, 0, 0, 0, 0, 0, 0}; int const delta = 1000; - auto a = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto a = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto b = cudf::type_dispatcher( static_cast(values_b).type(), tdigest_gen_grouped{}, keys, values_b, delta); - auto c = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto c = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); auto d = cudf::type_dispatcher( static_cast(values_d).type(), tdigest_gen_grouped{}, keys, values_d, delta); - auto e = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto e = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); std::vector cols; cols.push_back(*a); @@ -507,81 +507,3 @@ TEST_F(TDigestMergeTest, EmptyGroups) CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected, *result.second[0].results[0]); } - -std::unique_ptr do_agg( - cudf::column_view key, - cudf::column_view val, - std::function()> make_agg) -{ - std::vector keys; - keys.push_back(key); - cudf::table_view const key_table(keys); - - cudf::groupby::groupby gb(key_table); - std::vector requests; - cudf::groupby::aggregation_request req; - req.values = val; - req.aggregations.push_back(make_agg()); - requests.push_back(std::move(req)); - - auto result = gb.aggregate(std::move(requests)); - - std::vector> result_columns; - for (auto&& c : result.first->release()) { - result_columns.push_back(std::move(c)); - } - - EXPECT_EQ(result.second.size(), 1); - EXPECT_EQ(result.second[0].results.size(), 1); - result_columns.push_back(std::move(result.second[0].results[0])); - - return std::make_unique(std::move(result_columns)); -} - -TEST_F(TDigestMergeTest, AllGroupsHaveEmptyClusters) -{ - // The input must be sorted by the key. - // See `aggregate_result_functor::operator()` for details. - auto const keys = cudf::test::fixed_width_column_wrapper{{0, 0, 1, 1, 2}}; - auto const keys_view = cudf::column_view(keys); - auto val_elems = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i; }); - auto val_valids = cudf::detail::make_counting_transform_iterator(0, [](auto i) { - // All values are null - return false; - }); - auto const vals = cudf::test::fixed_width_column_wrapper{ - val_elems, val_elems + keys_view.size(), val_valids}; - - auto const delta = 10000; - - // Compute tdigest. The result should have 3 empty clusters, one per group. - auto const compute_result = do_agg(keys_view, cudf::column_view(vals), [&delta]() { - return cudf::make_tdigest_aggregation(delta); - }); - - auto const expected_computed_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; - cudf::column_view const expected_computed_keys_view{expected_computed_keys}; - auto const expected_computed_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - expected_computed_keys_view.size(), - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_keys_view, compute_result->get_column(0).view()); - // The computed values are nullable even though the input values are not. - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_computed_vals->view(), - compute_result->get_column(1).view()); - - // Merge tdigest. The result should have 3 empty clusters, one per group. - auto const merge_result = - do_agg(compute_result->get_column(0).view(), compute_result->get_column(1).view(), [&delta]() { - return cudf::make_merge_tdigest_aggregation(delta); - }); - - auto const expected_merged_keys = cudf::test::fixed_width_column_wrapper{{0, 1, 2}}; - cudf::column_view const expected_merged_keys_view{expected_merged_keys}; - auto const expected_merged_vals = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - expected_merged_keys_view.size(), - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_keys_view, merge_result->get_column(0).view()); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_merged_vals->view(), merge_result->get_column(1).view()); -} diff --git a/cpp/tests/quantiles/percentile_approx_test.cpp b/cpp/tests/quantiles/percentile_approx_test.cpp index 7359f0406fc..915717713df 100644 --- a/cpp/tests/quantiles/percentile_approx_test.cpp +++ b/cpp/tests/quantiles/percentile_approx_test.cpp @@ -371,8 +371,8 @@ struct PercentileApproxTest : public cudf::test::BaseFixture {}; TEST_F(PercentileApproxTest, EmptyInput) { - auto empty_ = cudf::tdigest::detail::make_tdigest_column_of_empty_clusters( - 1, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + auto empty_ = cudf::tdigest::detail::make_empty_tdigest_column( + cudf::get_default_stream(), cudf::get_current_device_resource_ref()); cudf::test::fixed_width_column_wrapper percentiles{0.0, 0.25, 0.3}; std::vector input; diff --git a/docs/cudf/source/user_guide/10min.ipynb b/docs/cudf/source/user_guide/10min.ipynb index 2eaa75b3189..95f5f9734dd 100644 --- a/docs/cudf/source/user_guide/10min.ipynb +++ b/docs/cudf/source/user_guide/10min.ipynb @@ -5,9 +5,9 @@ "id": "4c6c548b", "metadata": {}, "source": [ - "# 10 Minutes to cuDF and Dask-cuDF\n", + "# 10 Minutes to cuDF and Dask cuDF\n", "\n", - "Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask-cuDF, geared mainly towards new users.\n", + "Modelled after 10 Minutes to Pandas, this is a short introduction to cuDF and Dask cuDF, geared mainly towards new users.\n", "\n", "## What are these Libraries?\n", "\n", @@ -18,13 +18,14 @@ "[Dask cuDF](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) extends Dask where necessary to allow its DataFrame partitions to be processed using cuDF GPU DataFrames instead of Pandas DataFrames. For instance, when you call `dask_cudf.read_csv(...)`, your cluster's GPUs do the work of parsing the CSV file(s) by calling [`cudf.read_csv()`](https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.read_csv.html).\n", "\n", "\n", - "> [!NOTE] \n", - "> This notebook uses the explicit Dask cuDF API (`dask_cudf`) for clarity. However, we strongly recommend that you use Dask's [configuration infrastructure](https://docs.dask.org/en/latest/configuration.html) to set the `\"dataframe.backend\"` to `\"cudf\"`, and work with the `dask.dataframe` API directly. Please see the [Dask cuDF documentation](https://github.com/rapidsai/cudf/tree/main/python/dask_cudf) for more information.\n", + "
\n", + "Note: This notebook uses the explicit Dask cuDF API (dask_cudf) for clarity. However, we strongly recommend that you use Dask's configuration infrastructure to set the \"dataframe.backend\" option to \"cudf\", and work with the Dask DataFrame API directly. Please see the Dask cuDF documentation for more information.\n", + "
\n", "\n", "\n", - "## When to use cuDF and Dask-cuDF\n", + "## When to use cuDF and Dask cuDF\n", "\n", - "If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask-cuDF." + "If your workflow is fast enough on a single GPU or your data comfortably fits in memory on a single GPU, you would want to use cuDF. If you want to distribute your workflow across multiple GPUs, have more data than you can fit in memory on a single GPU, or want to analyze data spread across many files at once, you would want to use Dask cuDF." ] }, { @@ -115,7 +116,7 @@ "source": [ "ds = dask_cudf.from_cudf(s, npartitions=2)\n", "# Note the call to head here to show the first few entries, unlike\n", - "# cuDF objects, dask-cuDF objects do not have a printing\n", + "# cuDF objects, Dask-cuDF objects do not have a printing\n", "# representation that shows values since they may not be in local\n", "# memory.\n", "ds.head(n=3)" @@ -331,11 +332,11 @@ "id": "b17db919", "metadata": {}, "source": [ - "Now we will convert our cuDF dataframe into a dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n", + "Now we will convert our cuDF dataframe into a Dask-cuDF equivalent. Here we call out a key difference: to inspect the data we must call a method (here `.head()` to look at the first few values). In the general case (see the end of this notebook), the data in `ddf` will be distributed across multiple GPUs.\n", "\n", - "In this small case, we could call `ddf.compute()` to obtain a cuDF object from the dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n", + "In this small case, we could call `ddf.compute()` to obtain a cuDF object from the Dask-cuDF object. In general, we should avoid calling `.compute()` on large dataframes, and restrict ourselves to using it when we have some (relatively) small postprocessed result that we wish to inspect. Hence, throughout this notebook we will generally call `.head()` to inspect the first few values of a Dask-cuDF dataframe, occasionally calling out places where we use `.compute()` and why.\n", "\n", - "*To understand more of the differences between how cuDF and dask-cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*" + "*To understand more of the differences between how cuDF and Dask cuDF behave here, visit the [10 Minutes to Dask](https://docs.dask.org/en/stable/10-minutes-to-dask.html) tutorial after this one.*" ] }, { @@ -1680,7 +1681,7 @@ "id": "7aa0089f", "metadata": {}, "source": [ - "Note here we call `compute()` rather than `head()` on the dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)." + "Note here we call `compute()` rather than `head()` on the Dask-cuDF dataframe since we are happy that the number of matching rows will be small (and hence it is reasonable to bring the entire result back)." ] }, { @@ -2393,7 +2394,7 @@ "id": "f6094cbe", "metadata": {}, "source": [ - "Applying functions to a `Series`. Note that applying user defined functions directly with Dask-cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe." + "Applying functions to a `Series`. Note that applying user defined functions directly with Dask cuDF is not yet implemented. For now, you can use [map_partitions](http://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html) to apply a function to each partition of the distributed dataframe." ] }, { @@ -3492,7 +3493,7 @@ "id": "5ac3b004", "metadata": {}, "source": [ - "Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask-cuDF." + "Transposing a dataframe, using either the `transpose` method or `T` property. Currently, all columns must have the same type. Transposing is not currently implemented in Dask cuDF." ] }, { @@ -4181,7 +4182,7 @@ "id": "aa8a445b", "metadata": {}, "source": [ - "To convert the first few entries to pandas, we similarly call `.head()` on the dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert." + "To convert the first few entries to pandas, we similarly call `.head()` on the Dask-cuDF dataframe to obtain a local cuDF dataframe, which we can then convert." ] }, { @@ -4899,7 +4900,7 @@ "id": "787eae14", "metadata": {}, "source": [ - "Note that for the dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU." + "Note that for the Dask-cuDF case, we use `dask_cudf.read_csv` in preference to `dask_cudf.from_cudf(cudf.read_csv)` since the former can parallelize across multiple GPUs and handle larger CSV files that would fit in memory on a single GPU." ] }, { diff --git a/docs/dask_cudf/source/index.rst b/docs/dask_cudf/source/index.rst index 9a216690384..7fe6cbd45fa 100644 --- a/docs/dask_cudf/source/index.rst +++ b/docs/dask_cudf/source/index.rst @@ -3,39 +3,42 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -Welcome to dask-cudf's documentation! +Welcome to Dask cuDF's documentation! ===================================== -**Dask-cuDF** (pronounced "DASK KOO-dee-eff") is an extension +**Dask cuDF** (pronounced "DASK KOO-dee-eff") is an extension library for the `Dask `__ parallel computing -framework that provides a `cuDF -`__-backed distributed -dataframe with the same API as `Dask dataframes -`__. +framework. When installed, Dask cuDF is automatically registered +as the ``"cudf"`` dataframe backend for +`Dask DataFrame `__. + +.. note:: + Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU + or multi-node execution on their own. You must also deploy a + `dask.distributed ` cluster + to leverage multiple GPUs. We strongly recommend using `Dask-CUDA + `__ to simplify the + setup of the cluster, taking advantage of all features of the GPU + and networking hardware. If you are familiar with Dask and `pandas `__ or -`cuDF `__, then Dask-cuDF +`cuDF `__, then Dask cuDF should feel familiar to you. If not, we recommend starting with `10 minutes to Dask `__ followed -by `10 minutes to cuDF and Dask-cuDF +by `10 minutes to cuDF and Dask cuDF `__. -When running on multi-GPU systems, `Dask-CUDA -`__ is recommended to -simplify the setup of the cluster, taking advantage of all features of -the GPU and networking hardware. -Using Dask-cuDF +Using Dask cuDF --------------- -When installed, Dask-cuDF registers itself as a dataframe backend for -Dask. This means that in many cases, using cuDF-backed dataframes requires -only small changes to an existing workflow. The minimal change is to -select cuDF as the dataframe backend in :doc:`Dask's -configuration `. To do so, we must set the option -``dataframe.backend`` to ``cudf``. From Python, this can be achieved -like so:: +The Dask DataFrame API (Recommended) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Simply use the `Dask configuration ` system to +set the ``"dataframe.backend"`` option to ``"cudf"``. From Python, +this can be achieved like so:: import dask @@ -44,52 +47,157 @@ like so:: Alternatively, you can set ``DASK_DATAFRAME__BACKEND=cudf`` in the environment before running your code. -Dataframe creation from on-disk formats -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If your workflow creates Dask dataframes from on-disk formats -(for example using :func:`dask.dataframe.read_parquet`), then setting -the backend may well be enough to migrate your workflow. - -For example, consider reading a dataframe from parquet:: +Once this is done, the public Dask DataFrame API will leverage +``cudf`` automatically when a new DataFrame collection is created +from an on-disk format using any of the following ``dask.dataframe`` +functions:: - import dask.dataframe as dd +* :func:`dask.dataframe.read_parquet` +* :func:`dask.dataframe.read_json` +* :func:`dask.dataframe.read_csv` +* :func:`dask.dataframe.read_orc` +* :func:`dask.dataframe.read_hdf` +* :func:`dask.dataframe.from_dict` - # By default, we obtain a pandas-backed dataframe - df = dd.read_parquet("data.parquet", ...) +For example:: + import dask.dataframe as dd -To obtain a cuDF-backed dataframe, we must set the -``dataframe.backend`` configuration option:: + # By default, we obtain a pandas-backed dataframe + df = dd.read_parquet("data.parquet", ...) import dask - import dask.dataframe as dd dask.config.set({"dataframe.backend": "cudf"}) - # This gives us a cuDF-backed dataframe + # This now gives us a cuDF-backed dataframe df = dd.read_parquet("data.parquet", ...) -This code will use cuDF's GPU-accelerated :func:`parquet reader -` to read partitions of the data. +When other functions are used to create a new collection +(e.g. :func:`from_map`, :func:`from_pandas`, :func:`from_delayed`, +and :func:`from_array`), the backend of the new collection will +depend on the inputs to those functions. For example:: + + import pandas as pd + import cudf + + # This gives us a pandas-backed dataframe + dd.from_pandas(pd.DataFrame({"a": range(10)})) + + # This gives us a cuDF-backed dataframe + dd.from_pandas(cudf.DataFrame({"a": range(10)})) + +An existing collection can always be moved to a specific backend +using the :func:`dask.dataframe.DataFrame.to_backend` API:: + + # This ensures that we have a cuDF-backed dataframe + df = df.to_backend("cudf") + + # This ensures that we have a pandas-backed dataframe + df = df.to_backend("pandas") + +The explicit Dask cuDF API +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In addition to providing the ``"cudf"`` backend for Dask DataFrame, +Dask cuDF also provides an explicit ``dask_cudf`` API:: + + import dask_cudf + + # This always gives us a cuDF-backed dataframe + df = dask_cudf.read_parquet("data.parquet", ...) + +This API is used implicitly by the Dask DataFrame API when the ``"cudf"`` +backend is enabled. Therefore, using it directly will not provide any +performance benefit over the CPU/GPU-portable ``dask.dataframe`` API. +Also, using some parts of the explicit API are incompatible with +automatic query planning (see the next section). + +The explicit Dask cuDF API +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). +As long as the ``"dataframe.query-planning"`` configuration is set to +``True`` (the default) when ``dask.dataframe`` is first imported, `Dask +Expressions `__ will be used under the hood. + +For example, the following code will automatically benefit from predicate +pushdown when the result is computed:: + + df = dd.read_parquet("/my/parquet/dataset/") + result = df.sort_values('B')['A'] + +Unoptimized expression graph (``df.pprint()``):: + + Projection: columns='A' + SortValues: by=['B'] shuffle_method='tasks' options={} + ReadParquetFSSpec: path='/my/parquet/dataset/' ... + +Simplified expression graph (``df.simplify().pprint()``):: + + Projection: columns='A' + SortValues: by=['B'] shuffle_method='tasks' options={} + ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ... + +.. note:: + Dask will automatically simplify the expression graph (within + :func:`optimize`) when the result is converted to a task graph + (via :func:`compute` or :func:`persist`). You do not need to call + :func:`simplify` yourself. + + +Using Multiple GPUs and Multiple Nodes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try +to partition your data into small-enough tasks to fit comfortably in the +memory of a single GPU. This means the necessary compute tasks needed to +compute a query can often be streamed to a single GPU process for +out-of-core computing. This also means that the compute tasks can be +executed in parallel over a multi-GPU cluster. + +In order to execute your Dask workflow on multiple GPUs, you will +typically need to use `Dask-CUDA `__ +to deploy distributed Dask cluster, and +`Distributed `__ +to define a client object. For example:: + + from dask_cuda import LocalCUDACluster + from distributed import Client + + if __name__ == "__main__": + + client = Client( + LocalCUDACluster( + CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) + rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations + enable_cudf_spill=True, # Improve device memory stability + local_directory="/fast/scratch/", # Use fast local storage for spilling + ) + ) + + df = dd.read_parquet("/my/parquet/dataset/") + agg = df.groupby('B').sum() + agg.compute() # This will use the cluster defined above + +.. note:: + This example uses :func:`compute` to materialize a concrete + ``cudf.DataFrame`` object in local memory. Never call :func:`compute` + on a large collection that cannot fit comfortably in the memory of a + single GPU! See Dask's `documentation on managing computation + `__ + for more details. -Dataframe creation from in-memory formats -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Please see the `Dask-CUDA `__ +documentation for more information about deploying GPU-aware clusters +(including `best practices +`__). -If you already have a dataframe in memory and want to convert it to a -cuDF-backend one, there are two options depending on whether the -dataframe is already a Dask one or not. If you have a Dask dataframe, -then you can call :func:`dask.dataframe.to_backend` passing ``"cudf"`` -as the backend; if you have a pandas dataframe then you can either -call :func:`dask.dataframe.from_pandas` followed by -:func:`~dask.dataframe.to_backend` or first convert the dataframe with -:func:`cudf.from_pandas` and then parallelise this with -:func:`dask_cudf.from_cudf`. API Reference ------------- -Generally speaking, Dask-cuDF tries to offer exactly the same API as -Dask itself. There are, however, some minor differences mostly because +Generally speaking, Dask cuDF tries to offer exactly the same API as +Dask DataFrame. There are, however, some minor differences mostly because cuDF does not :doc:`perfectly mirror ` the pandas API, or because cuDF provides additional configuration flags (these mostly occur in data reading and writing interfaces). @@ -97,7 +205,7 @@ flags (these mostly occur in data reading and writing interfaces). As a result, straightforward workflows can be migrated without too much trouble, but more complex ones that utilise more features may need a bit of tweaking. The API documentation describes details of the -differences and all functionality that Dask-cuDF supports. +differences and all functionality that Dask cuDF supports. .. toctree:: :maxdepth: 2 diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 8b59a7eef08..7f1b0b1cd46 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3822,8 +3822,8 @@ def test_parquet_reader_with_mismatched_tables(store_schema): df1 = cudf.DataFrame( { "i32": cudf.Series([None, None, None], dtype="int32"), - "i64": cudf.Series([1234, None, 123], dtype="int64"), - "list": list([[1, 2], [None, 4], [5, 6]]), + "i64": cudf.Series([1234, 467, 123], dtype="int64"), + "list": list([[1, 2], None, [None, 6]]), "time": cudf.Series([1234, 123, 4123], dtype="datetime64[ms]"), "str": ["vfd", None, "ghu"], "d_list": list( @@ -3838,14 +3838,14 @@ def test_parquet_reader_with_mismatched_tables(store_schema): df2 = cudf.DataFrame( { - "str": ["abc", "def", None], + "str": ["abc", "def", "ghi"], "i64": cudf.Series([None, 65, 98], dtype="int64"), "times": cudf.Series([1234, None, 4123], dtype="datetime64[us]"), - "list": list([[7, 8], [9, 10], [None, 12]]), + "list": list([[7, 8], [9, 10], [11, 12]]), "d_list": list( [ [pd.Timedelta(minutes=4), None], - [None, None], + None, [pd.Timedelta(minutes=6), None], ] ), @@ -3900,38 +3900,27 @@ def test_parquet_reader_with_mismatched_structs(): { "a": 1, "b": { - "inner_a": 10, - "inner_b": {"inner_inner_b": 1, "inner_inner_a": 2}, + "a_a": 10, + "b_b": {"b_b_b": 1, "b_b_a": 2}, }, "c": 2, }, { "a": 3, - "b": {"inner_a": 30, "inner_b": {"inner_inner_a": 210}}, + "b": {"b_a": 30, "b_b": {"b_b_a": 210}}, "c": 4, }, - {"a": 5, "b": {"inner_a": 50, "inner_b": None}, "c": 6}, + {"a": 5, "b": {"b_a": 50, "b_b": None}, "c": 6}, {"a": 7, "b": None, "c": 8}, - {"a": None, "b": {"inner_a": None, "inner_b": None}, "c": None}, - None, - { - "a": None, - "b": { - "inner_a": None, - "inner_b": {"inner_inner_b": None, "inner_inner_a": 10}, - }, - "c": 10, - }, + {"a": 5, "b": {"b_a": None, "b_b": None}, "c": None}, ] data2 = [ - {"a": 1, "b": {"inner_b": {"inner_inner_a": None}}}, - {"a": 3, "b": {"inner_b": {"inner_inner_a": 1}}}, - {"a": 5, "b": {"inner_b": None}}, - {"a": 7, "b": {"inner_b": {"inner_inner_b": 1, "inner_inner_a": 0}}}, - {"a": None, "b": {"inner_b": None}}, + {"a": 1, "b": {"b_b": {"b_b_a": None}}}, + {"a": 5, "b": {"b_b": None}}, + {"a": 7, "b": {"b_b": {"b_b_b": 1, "b_b_a": 0}}}, + {"a": None, "b": {"b_b": None}}, None, - {"a": None, "b": {"inner_b": {"inner_inner_a": 1}}}, ] # cuDF tables from struct data @@ -3949,20 +3938,20 @@ def test_parquet_reader_with_mismatched_structs(): # Read the struct.b.inner_b.inner_inner_a column from parquet got = cudf.read_parquet( [buf1, buf2], - columns=["struct.b.inner_b.inner_inner_a"], + columns=["struct.b.b_b.b_b_a"], allow_mismatched_pq_schemas=True, ) got = ( cudf.Series(got["struct"]) .struct.field("b") - .struct.field("inner_b") - .struct.field("inner_inner_a") + .struct.field("b_b") + .struct.field("b_b_a") ) # Read with chunked reader got_chunked = read_parquet_chunked( [buf1, buf2], - columns=["struct.b.inner_b.inner_inner_a"], + columns=["struct.b.b_b.b_b_a"], chunk_read_limit=240, pass_read_limit=240, allow_mismatched_pq_schemas=True, @@ -3970,8 +3959,8 @@ def test_parquet_reader_with_mismatched_structs(): got_chunked = ( cudf.Series(got_chunked["struct"]) .struct.field("b") - .struct.field("inner_b") - .struct.field("inner_inner_a") + .struct.field("b_b") + .struct.field("b_b_a") ) # Construct the expected series @@ -3979,12 +3968,12 @@ def test_parquet_reader_with_mismatched_structs(): [ cudf.Series(df1["struct"]) .struct.field("b") - .struct.field("inner_b") - .struct.field("inner_inner_a"), + .struct.field("b_b") + .struct.field("b_b_a"), cudf.Series(df2["struct"]) .struct.field("b") - .struct.field("inner_b") - .struct.field("inner_inner_a"), + .struct.field("b_b") + .struct.field("b_b_a"), ] ).reset_index(drop=True) @@ -4023,12 +4012,12 @@ def test_parquet_reader_with_mismatched_schemas_error(): ) data1 = [ - {"a": 1, "b": {"inner_a": 1, "inner_b": 6}}, - {"a": 3, "b": {"inner_a": None, "inner_b": 2}}, + {"a": 1, "b": {"b_a": 1, "b_b": 6}}, + {"a": 3, "b": {"b_a": None, "b_b": 2}}, ] data2 = [ - {"b": {"inner_a": 1}, "c": "str"}, - {"b": {"inner_a": None}, "c": None}, + {"b": {"b_a": 1}, "c": "str"}, + {"b": {"b_a": None}, "c": None}, ] # cuDF tables from struct data @@ -4059,6 +4048,191 @@ def test_parquet_reader_with_mismatched_schemas_error(): ): cudf.read_parquet( [buf1, buf2], - columns=["struct.b.inner_b"], + columns=["struct.b.b_b"], allow_mismatched_pq_schemas=True, ) + + +def test_parquet_reader_mismatched_nullability(): + # Ensure that we can faithfully read the tables with mismatched nullabilities + df1 = cudf.DataFrame( + { + "timedelta": cudf.Series([12, 54, 1231], dtype="timedelta64[ms]"), + "duration_list": list( + [ + [ + [ + [pd.Timedelta(minutes=1), pd.Timedelta(minutes=2)], + None, + [pd.Timedelta(minutes=8), None], + ], + None, + ], + None, + [ + [ + [pd.Timedelta(minutes=1), pd.Timedelta(minutes=2)], + [pd.Timedelta(minutes=5), pd.Timedelta(minutes=3)], + [pd.Timedelta(minutes=8), pd.Timedelta(minutes=4)], + ] + ], + ] + ), + "int64": cudf.Series([1234, None, 4123], dtype="int64"), + "int32": cudf.Series([1234, 123, 4123], dtype="int32"), + "list": list([[1, 2], [1, 2], [1, 2]]), + "datetime": cudf.Series([1234, 123, 4123], dtype="datetime64[ms]"), + "string": cudf.Series(["kitten", "puppy", "cub"]), + } + ) + + df2 = cudf.DataFrame( + { + "timedelta": cudf.Series( + [None, None, None], dtype="timedelta64[ms]" + ), + "duration_list": list( + [ + [ + [ + [pd.Timedelta(minutes=1), pd.Timedelta(minutes=2)], + [pd.Timedelta(minutes=8), pd.Timedelta(minutes=1)], + ], + ], + [ + [ + [pd.Timedelta(minutes=1), pd.Timedelta(minutes=2)], + [pd.Timedelta(minutes=5), pd.Timedelta(minutes=3)], + [pd.Timedelta(minutes=8), pd.Timedelta(minutes=4)], + ] + ], + [ + [ + [pd.Timedelta(minutes=1), pd.Timedelta(minutes=2)], + [pd.Timedelta(minutes=5), pd.Timedelta(minutes=3)], + [pd.Timedelta(minutes=8), pd.Timedelta(minutes=4)], + ] + ], + ] + ), + "int64": cudf.Series([1234, 123, 4123], dtype="int64"), + "int32": cudf.Series([1234, None, 4123], dtype="int32"), + "list": list([[1, 2], None, [1, 2]]), + "datetime": cudf.Series( + [1234, None, 4123], dtype="datetime64[ms]" + ), + "string": cudf.Series(["kitten", None, "cub"]), + } + ) + + # Write tables to parquet with arrow schema for compatibility for duration column(s) + fname1 = BytesIO() + df1.to_parquet(fname1, store_schema=True) + fname2 = BytesIO() + df2.to_parquet(fname2, store_schema=True) + + # Read tables back with cudf and arrow in either order and compare + assert_eq( + cudf.read_parquet([fname1, fname2]), + cudf.concat([df1, df2]).reset_index(drop=True), + ) + assert_eq( + cudf.read_parquet([fname2, fname1]), + cudf.concat([df2, df1]).reset_index(drop=True), + ) + + +def test_parquet_reader_mismatched_nullability_structs(tmpdir): + data1 = [ + { + "a": "a", + "b": { + "b_a": 10, + "b_b": {"b_b_b": 1, "b_b_a": 12}, + }, + "c": [1, 2], + }, + { + "a": "b", + "b": { + "b_a": 30, + "b_b": {"b_b_b": 2, "b_b_a": 2}, + }, + "c": [3, 4], + }, + { + "a": "c", + "b": { + "b_a": 50, + "b_b": {"b_b_b": 4, "b_b_a": 5}, + }, + "c": [5, 6], + }, + { + "a": "d", + "b": { + "b_a": 135, + "b_b": {"b_b_b": 12, "b_b_a": 32}, + }, + "c": [7, 8], + }, + { + "a": "e", + "b": { + "b_a": 1, + "b_b": {"b_b_b": 1, "b_b_a": 5}, + }, + "c": [9, 10], + }, + { + "a": "f", + "b": { + "b_a": 32, + "b_b": {"b_b_b": 1, "b_b_a": 6}, + }, + "c": [11, 12], + }, + ] + + data2 = [ + { + "a": "g", + "b": { + "b_a": 10, + "b_b": {"b_b_b": None, "b_b_a": 2}, + }, + "c": None, + }, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]}, + {"a": "j", "b": None, "c": [8, 10]}, + {"a": None, "b": {"b_a": None, "b_b": None}, "c": None}, + None, + { + "a": None, + "b": {"b_a": None, "b_b": {"b_b_b": 1}}, + "c": [18, 19], + }, + {"a": None, "b": None, "c": None}, + ] + + pa_table1 = pa.Table.from_pydict({"struct": data1}) + df1 = cudf.DataFrame.from_arrow(pa_table1) + + pa_table2 = pa.Table.from_pydict({"struct": data2}) + df2 = cudf.DataFrame.from_arrow(pa_table2) + + # Write tables to parquet + buf1 = BytesIO() + df1.to_parquet(buf1) + buf2 = BytesIO() + df2.to_parquet(buf2) + + # Read tables back with cudf and compare with expected. + assert_eq( + cudf.read_parquet([buf1, buf2]), + cudf.concat([df1, df2]).reset_index(drop=True), + ) + assert_eq( + cudf.read_parquet([buf2, buf1]), + cudf.concat([df2, df1]).reset_index(drop=True), + ) diff --git a/python/custreamz/README.md b/python/custreamz/README.md index 1509dac9e61..8da17ef09dc 100644 --- a/python/custreamz/README.md +++ b/python/custreamz/README.md @@ -54,7 +54,7 @@ Please see the [Demo Docker Repository](https://hub.docker.com/r/rapidsai/rapids ### Conda -cuStreamz is installed with conda ([miniconda](https://conda.io/miniconda.html), or the full [Anaconda distribution](https://www.anaconda.com/download)) from the `rapidsai` or `rapidsai-nightly` channel: +cuStraamz can be installed with conda (via [miniforge](https://github.com/conda-forge/miniforge)) from the `rapidsai` channel: Release: ```bash diff --git a/python/dask_cudf/README.md b/python/dask_cudf/README.md index 6edb9f87d48..4655d2165f0 100644 --- a/python/dask_cudf/README.md +++ b/python/dask_cudf/README.md @@ -1,135 +1,63 @@ #
 Dask cuDF - A GPU Backend for Dask DataFrame
-Dask cuDF (a.k.a. dask-cudf or `dask_cudf`) is an extension library for [Dask DataFrame](https://docs.dask.org/en/stable/dataframe.html). When installed, Dask cuDF is automatically registered as the `"cudf"` [dataframe backend](https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html) for Dask DataFrame. - -## Using Dask cuDF - -### The Dask DataFrame API (Recommended) - -Simply set the `"dataframe.backend"` [configuration](https://docs.dask.org/en/stable/configuration.html) to `"cudf"` in Dask, and the public Dask DataFrame API will leverage `cudf` automatically: - -```python -import dask -dask.config.set({"dataframe.backend": "cudf"}) - -import dask.dataframe as dd -# This gives us a cuDF-backed dataframe -df = dd.read_parquet("data.parquet", ...) -``` +Dask cuDF (a.k.a. dask-cudf or `dask_cudf`) is an extension library for [Dask DataFrame](https://docs.dask.org/en/stable/dataframe.html) that provides a Pandas-like API for parallel and larger-than-memory DataFrame computing on GPUs. When installed, Dask cuDF is automatically registered as the `"cudf"` [dataframe backend](https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html) for Dask DataFrame. > [!IMPORTANT] -> The `"dataframe.backend"` configuration will only be used for collection creation when the following APIs are used: `read_parquet`, `read_json`, `read_csv`, `read_orc`, `read_hdf`, and `from_dict`. For example, if `from_map`, `from_pandas`, `from_delayed`, or `from_array` are used, the backend of the new collection will depend on the input to the function: - -```python -import pandas as pd -import cudf - -# This gives us a Pandas-backed dataframe -dd.from_pandas(pd.DataFrame({"a": range(10)})) - -# This gives us a cuDF-backed dataframe -dd.from_pandas(cudf.DataFrame({"a": range(10)})) -``` - -A cuDF-backed DataFrame collection can be moved to the `"pandas"` backend: - -```python -df = df.to_backend("pandas") -``` - -Similarly, a Pandas-backed DataFrame collection can be moved to the `"cudf"` backend: - -```python -df = df.to_backend("cudf") -``` - -### The Explicit Dask cuDF API - -In addition to providing the `"cudf"` backend for Dask DataFrame, Dask cuDF also provides an explicit `dask_cudf` API: - -```python -import dask_cudf - -# This always gives us a cuDF-backed dataframe -df = dask_cudf.read_parquet("data.parquet", ...) -``` - -> [!NOTE] -> This API is used implicitly by the Dask DataFrame API when the `"cudf"` backend is enabled. Therefore, using it directly will not provide any performance benefit over the CPU/GPU-portable `dask.dataframe` API. Also, using some parts of the explicit API are incompatible with automatic query planning (see the next section). +> Dask cuDF does not provide support for multi-GPU or multi-node execution on its own. You must also deploy a distributed cluster (ideally with [Dask-CUDA](https://docs.rapids.ai/api/dask-cuda/stable/)) to leverage multiple GPUs efficiently. -See the [Dask cuDF's API documentation](https://docs.rapids.ai/api/dask-cudf/stable/) for further information. - -## Query Planning - -Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). As long as the `"dataframe.query-planning"` configuration is set to `True` (the default) when `dask.dataframe` is first imported, [Dask Expressions](https://github.com/dask/dask-expr) will be used under the hood. - -For example, the following user code will automatically benefit from predicate pushdown when the result is computed. - -```python -df = dd.read_parquet("/my/parquet/dataset/") -result = df.sort_values('B')['A'] -``` - -Unoptimized expression graph (`df.pprint()`): -``` -Projection: columns='A' - SortValues: by=['B'] shuffle_method='tasks' options={} - ReadParquetFSSpec: path='/my/parquet/dataset/' ... -``` +## Using Dask cuDF -Simplified expression graph (`df.simplify().pprint()`): -``` -Projection: columns='A' - SortValues: by=['B'] shuffle_method='tasks' options={} - ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ... -``` +Please visit [the official documentation page](https://docs.rapids.ai/api/dask-cudf/stable/) for detailed information about using Dask cuDF. -> [!NOTE] -> Dask will automatically simplify the expression graph (within `optimize`) when the result is converted to a task graph (via `compute` or `persist`). The user does not need to call `simplify` themself. +## Installation +See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to-date information and commands for installing Dask cuDF and other RAPIDS packages. -## Using Multiple GPUs and Multiple Nodes +## Resources -Whenever possible, Dask cuDF (i.e. Dask DataFrame) will automatically try to partition your data into small-enough tasks to fit comfortably in the memory of a single GPU. This means the necessary compute tasks needed to compute a query can often be streamed to a single GPU process for out-of-core computing. This also means that the compute tasks can be executed in parallel over a multi-GPU cluster. +- [Dask cuDF documentation](https://docs.rapids.ai/api/dask-cudf/stable/) +- [cuDF documentation](https://docs.rapids.ai/api/cudf/stable/) +- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/) +- [Dask-CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/) +- [Deployment](https://docs.rapids.ai/deployment/stable/) +- [RAPIDS Community](https://rapids.ai/learn-more/#get-involved): Get help, contribute, and collaborate. -> [!IMPORTANT] -> Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU or multi-node execution on their own. You must deploy a distributed cluster (ideally with [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/)) to leverage multiple GPUs. +### Quick-start example -In order to execute your Dask workflow on multiple GPUs, you will typically need to use [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/) to deploy distributed Dask cluster, and [Distributed](https://distributed.dask.org/en/stable/client.html) to define a `client` object. For example: +A very common Dask cuDF use case is single-node multi-GPU data processing. These workflows typically use the following pattern: ```python - +import dask +import dask.dataframe as dd from dask_cuda import LocalCUDACluster from distributed import Client -client = Client( +if __name__ == "__main__": + + # Define a GPU-aware cluster to leverage multiple GPUs + client = Client( LocalCUDACluster( - CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) - rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations - enable_cudf_spill=True, # Improve device memory stability - local_directory="/fast/scratch/", # Use fast local storage for spilling + CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1) + rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations + enable_cudf_spill=True, # Improve device memory stability + local_directory="/fast/scratch/", # Use fast local storage for spilling ) -) + ) -df = dd.read_parquet("/my/parquet/dataset/") -agg = df.groupby('B').sum() -agg.compute() # This will use the cluster defined above -``` + # Set the default dataframe backend to "cudf" + dask.config.set({"dataframe.backend": "cudf"}) -> [!NOTE] -> This example uses `compute` to materialize a concrete `cudf.DataFrame` object in local memory. Never call `compute` on a large collection that cannot fit comfortably in the memory of a single GPU! See Dask's [documentation on managing computation](https://distributed.dask.org/en/stable/manage-computation.html) for more details. + # Create your DataFrame collection from on-disk + # or in-memory data + df = dd.read_parquet("/my/parquet/dataset/") -Please see the [Dask CUDA](https://docs.rapids.ai/api/dask-cuda/stable/) documentation for more information about deploying GPU-aware clusters (including [best practices](https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/)). + # Use cudf-like syntax to transform and/or query your data + query = df.groupby('item')['price'].mean() -## Install - -See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to-date information and commands for installing Dask cuDF and other RAPIDS packages. + # Compute, persist, or write out the result + query.head() +``` -## Resources +If you do not have multiple GPUs available, using `LocalCUDACluster` is optional. However, it is still a good idea to [enable cuDF spilling](https://docs.rapids.ai/api/cudf/stable/developer_guide/library_design/#spilling-to-host-memory). -- [Dask cuDF API documentation](https://docs.rapids.ai/api/dask-cudf/stable/) -- [cuDF API documentation](https://docs.rapids.ai/api/cudf/stable/) -- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/) -- [Dask CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/) -- [Deployment](https://docs.rapids.ai/deployment/stable/) -- [RAPIDS Community](https://rapids.ai/learn-more/#get-involved): Get help, contribute, and collaborate. +If you wish to scale across multiple nodes, you will need to use a different mechanism to deploy your Dask-CUDA workers. Please see [the RAPIDS deployment documentation](https://docs.rapids.ai/deployment/stable/) for more instructions.