From 84207c34ee3a5a02853762f85b40cdaeb5afdee9 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Thu, 28 Sep 2023 10:38:29 -0400 Subject: [PATCH] Integrate C++ Renumbering and Compression (#3841) - [x] C API - [x] PLC - [x] Python API - [x] Bulk Sampling API - [x] Documentation for Python SG - [x] Documentation for Python MG - [x] Documentation for Bulk Sampler - [x] Resolve the C++ empty batch issue with new check - [x] Add FutureWarnings for all temporary flags - [x] Remove all print statements and pytest tags - [x] Verify cuGraph-PyG and cuGraph-DGL tests Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Tingyu Wang (https://github.com/tingyu66) - Seunghwa Kang (https://github.com/seunghwak) - Joseph Nke (https://github.com/jnke2016) - Brad Rees (https://github.com/BradReesWork) URL: https://github.com/rapidsai/cugraph/pull/3841 --- cpp/include/cugraph/sampling_functions.hpp | 2 +- cpp/include/cugraph_c/sampling_algorithms.h | 127 ++++---- cpp/src/c_api/uniform_neighbor_sampling.cpp | 291 ++++++++++++----- .../sampling_post_processing_impl.cuh | 4 +- cpp/tests/c_api/create_graph_test.c | 26 +- .../c_api/mg_uniform_neighbor_sample_test.c | 193 +++++++---- .../c_api/uniform_neighbor_sample_test.c | 267 ++++------------ .../dask/sampling/uniform_neighbor_sample.py | 300 +++++++++--------- .../cugraph/gnn/data_loading/bulk_sampler.py | 1 + .../gnn/data_loading/bulk_sampler_io.py | 219 ++++++++++++- .../cugraph/sampling/sampling_utilities.py | 198 ++++++++++++ .../sampling/uniform_neighbor_sample.py | 197 ++++++------ .../tests/sampling/test_bulk_sampler.py | 52 ++- .../tests/sampling/test_bulk_sampler_io.py | 69 +++- .../tests/sampling/test_bulk_sampler_io_mg.py | 14 +- .../tests/sampling/test_bulk_sampler_mg.py | 58 +++- .../sampling/test_uniform_neighbor_sample.py | 207 +++++++++++- .../test_uniform_neighbor_sample_mg.py | 244 +++++++++++++- .../pylibcugraph/_cugraph_c/algorithms.pxd | 48 ++- .../_cugraph_c/sampling_algorithms.pxd | 17 - .../internal_types/sampling_result.pyx | 91 +++++- .../tests/test_uniform_neighbor_sample.py | 4 +- .../pylibcugraph/uniform_neighbor_sample.pyx | 112 ++++++- 23 files changed, 2021 insertions(+), 720 deletions(-) create mode 100644 python/cugraph/cugraph/sampling/sampling_utilities.py diff --git a/cpp/include/cugraph/sampling_functions.hpp b/cpp/include/cugraph/sampling_functions.hpp index e42ef9bfcf3..75cf8f91f92 100644 --- a/cpp/include/cugraph/sampling_functions.hpp +++ b/cpp/include/cugraph/sampling_functions.hpp @@ -103,7 +103,7 @@ namespace cugraph { * std::get<1>(*edgelist_label_offsets) if @p edgelist_label_offsets.has_value() is true and 1 * otherwise and # hops = std::get<1>(*edgelist_hops) if edgelist_hops.has_value() is true and 1 * otherwise, valid only if at least one of @p edgelist_label_offsets.has_value() or @p - * edgelist_hops.has_value() is rue), renumber_map to query original vertices (size = # unique + * edgelist_hops.has_value() is true), renumber_map to query original vertices (size = # unique * vertices or aggregate # unique vertices for every label), and label offsets to the renumber_map * (size = std::get<1>(*edgelist_label_offsets) + 1, valid only if @p * edgelist_label_offsets.has_value() is true). diff --git a/cpp/include/cugraph_c/sampling_algorithms.h b/cpp/include/cugraph_c/sampling_algorithms.h index 37124d100dd..92fe50ef622 100644 --- a/cpp/include/cugraph_c/sampling_algorithms.h +++ b/cpp/include/cugraph_c/sampling_algorithms.h @@ -205,6 +205,21 @@ typedef enum cugraph_prior_sources_behavior_t { but exclude any vertex that has already been used as a source */ } cugraph_prior_sources_behavior_t; +/** + * @brief Selects the type of compression to use for the output samples. + */ +typedef enum cugraph_compression_type_t { + COO = 0, /** Outputs in COO format. Default. */ + CSR, /** Compresses in CSR format. This means the row (src) column + is compressed into a row pointer. */ + CSC, /** Compresses in CSC format. This means the col (dst) column + is compressed into a column pointer. */ + DCSR, /** Compresses in DCSR format. This outputs an additional index + that avoids empty entries in the row pointer. */ + DCSC /** Compresses in DCSC format. This outputs an additional index + that avoid empty entries in the col pointer. */ +} cugraph_compression_type_t; + /** * @brief Create sampling options object * @@ -225,6 +240,14 @@ cugraph_error_code_t cugraph_sampling_options_create(cugraph_sampling_options_t* */ void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t* options, bool_t value); +/** + * @brief Set whether to compress per-hop (True) or globally (False) + * + * @param options - opaque pointer to the sampling options + * @param value - Boolean value to assign to the option + */ +void cugraph_sampling_set_compress_per_hop(cugraph_sampling_options_t* options, bool_t value); + /** * @brief Set flag to sample with_replacement * @@ -241,6 +264,15 @@ void cugraph_sampling_set_with_replacement(cugraph_sampling_options_t* options, */ void cugraph_sampling_set_return_hops(cugraph_sampling_options_t* options, bool_t value); +/** + * @brief Set compression type + * + * @param options - opaque pointer to the sampling options + * @param value - Enum defining the compresion type + */ +void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, + cugraph_compression_type_t value); + /** * @brief Set prior sources behavior * @@ -265,62 +297,6 @@ void cugraph_sampling_set_dedupe_sources(cugraph_sampling_options_t* options, bo */ void cugraph_sampling_options_free(cugraph_sampling_options_t* options); -/** - * @brief Uniform Neighborhood Sampling - * @deprecated This call should be replaced with cugraph_uniform_neighbor_sample - * - * Returns a sample of the neighborhood around specified start vertices. Optionally, each - * start vertex can be associated with a label, allowing the caller to specify multiple batches - * of sampling requests in the same function call - which should improve GPU utilization. - * - * If label is NULL then all start vertices will be considered part of the same batch and the - * return value will not have a label column. - * - * @param [in] handle Handle for accessing resources - * @param [in] graph Pointer to graph. NOTE: Graph might be modified if the storage - * needs to be transposed - * @param [in] start_vertices Device array of start vertices for the sampling - * @param [in] start_vertex_labels Device array of start vertex labels for the sampling. The - * labels associated with each start vertex will be included in the output associated with results - * that were derived from that start vertex. We only support label of type INT32. If label is - * NULL, the return data will not be labeled. - * @param [in] label_list Device array of the labels included in @p start_vertex_labels. If - * @p label_to_comm_rank is not specified this parameter is ignored. If specified, label_list - * must be sorted in ascending order. - * @param [in] label_to_comm_rank Device array identifying which comm rank the output for a - * particular label should be shuffled in the output. If not specifed the data is not organized in - * output. If specified then the all data from @p label_list[i] will be shuffled to rank @p - * label_to_comm_rank[i]. If not specified then the output data will not be shuffled between ranks. - * @param [in] fanout Host array defining the fan out at each step in the sampling algorithm. - * We only support fanout values of type INT32 - * @param [in/out] rng_state State of the random number generator, updated with each call - * @param [in] with_replacement - * Boolean value. If true selection of edges is done with - * replacement. If false selection is done without replacement. - * @param [in] return_hops Boolean value. If true include the hop number in the result, - * If false the hop number will not be included in result. - * @param [in] do_expensive_check - * A flag to run expensive checks for input arguments (if set to true) - * @param [in] result Output from the uniform_neighbor_sample call - * @param [out] error Pointer to an error object storing details of any error. Will - * be populated if error code is not CUGRAPH_SUCCESS - * @return error code - */ -cugraph_error_code_t cugraph_uniform_neighbor_sample_with_edge_properties( - const cugraph_resource_handle_t* handle, - cugraph_graph_t* graph, - const cugraph_type_erased_device_array_view_t* start_vertices, - const cugraph_type_erased_device_array_view_t* start_vertex_labels, - const cugraph_type_erased_device_array_view_t* label_list, - const cugraph_type_erased_device_array_view_t* label_to_comm_rank, - const cugraph_type_erased_host_array_view_t* fan_out, - cugraph_rng_state_t* rng_state, - bool_t with_replacement, - bool_t return_hops, - bool_t do_expensive_check, - cugraph_sample_result_t** result, - cugraph_error_t** error); - /** * @brief Uniform Neighborhood Sampling * @@ -374,6 +350,7 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( cugraph_error_t** error); /** + * @deprecated This call should be replaced with cugraph_sample_result_get_majors * @brief Get the source vertices from the sampling algorithm result * * @param [in] result The result from a sampling algorithm @@ -383,6 +360,7 @@ cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_sources( const cugraph_sample_result_t* result); /** + * @deprecated This call should be replaced with cugraph_sample_result_get_minors * @brief Get the destination vertices from the sampling algorithm result * * @param [in] result The result from a sampling algorithm @@ -391,6 +369,33 @@ cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_sources( cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_destinations( const cugraph_sample_result_t* result); +/** + * @brief Get the major vertices from the sampling algorithm result + * + * @param [in] result The result from a sampling algorithm + * @return type erased array pointing to the major vertices in device memory + */ +cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_majors( + const cugraph_sample_result_t* result); + +/** + * @brief Get the minor vertices from the sampling algorithm result + * + * @param [in] result The result from a sampling algorithm + * @return type erased array pointing to the minor vertices in device memory + */ +cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_minors( + const cugraph_sample_result_t* result); + +/** + * @brief Get the major offsets from the sampling algorithm result + * + * @param [in] result The result from a sampling algorithm + * @return type erased array pointing to the major offsets in device memory + */ +cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_major_offsets( + const cugraph_sample_result_t* result); + /** * @brief Get the start labels from the sampling algorithm result * @@ -436,6 +441,15 @@ cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_edge_weight( cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_hop( const cugraph_sample_result_t* result); +/** + * @brief Get the label-hop offsets from the sampling algorithm result + * + * @param [in] result The result from a sampling algorithm + * @return type erased array pointing to the label-hop offsets + */ +cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_label_hop_offsets( + const cugraph_sample_result_t* result); + /** * @brief Get the index from the sampling algorithm result * @@ -446,6 +460,7 @@ cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_index( const cugraph_sample_result_t* result); /** + * @deprecated This call should be replaced with cugraph_sample_get_get_label_hop_offsets * @brief Get the result offsets from the sampling algorithm result * * @param [in] result The result from a sampling algorithm diff --git a/cpp/src/c_api/uniform_neighbor_sampling.cpp b/cpp/src/c_api/uniform_neighbor_sampling.cpp index f146c331d8c..1a53c899109 100644 --- a/cpp/src/c_api/uniform_neighbor_sampling.cpp +++ b/cpp/src/c_api/uniform_neighbor_sampling.cpp @@ -38,17 +38,20 @@ struct cugraph_sampling_options_t { prior_sources_behavior_t prior_sources_behavior_{prior_sources_behavior_t::DEFAULT}; bool_t dedupe_sources_{FALSE}; bool_t renumber_results_{FALSE}; + cugraph_compression_type_t compression_type_{cugraph_compression_type_t::COO}; + bool_t compress_per_hop_{FALSE}; }; struct cugraph_sample_result_t { - cugraph_type_erased_device_array_t* src_{nullptr}; - cugraph_type_erased_device_array_t* dst_{nullptr}; + cugraph_type_erased_device_array_t* major_offsets_{nullptr}; + cugraph_type_erased_device_array_t* majors_{nullptr}; + cugraph_type_erased_device_array_t* minors_{nullptr}; cugraph_type_erased_device_array_t* edge_id_{nullptr}; cugraph_type_erased_device_array_t* edge_type_{nullptr}; cugraph_type_erased_device_array_t* wgt_{nullptr}; cugraph_type_erased_device_array_t* hop_{nullptr}; + cugraph_type_erased_device_array_t* label_hop_offsets_{nullptr}; cugraph_type_erased_device_array_t* label_{nullptr}; - cugraph_type_erased_device_array_t* offsets_{nullptr}; cugraph_type_erased_device_array_t* renumber_map_{nullptr}; cugraph_type_erased_device_array_t* renumber_map_offsets_{nullptr}; }; @@ -186,6 +189,8 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct graph_view.local_vertex_partition_range_last(), do_expensive_check_); + bool has_labels = start_vertex_labels_ != nullptr; + auto&& [src, dst, wgt, edge_id, edge_type, hop, edge_label, offsets] = cugraph::uniform_neighbor_sample( handle_, @@ -229,25 +234,130 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct vertex_partition_lasts, do_expensive_check_); + std::optional> majors{std::nullopt}; + rmm::device_uvector minors(0, handle_.get_stream()); + std::optional> major_offsets{std::nullopt}; + + std::optional> label_hop_offsets{std::nullopt}; + std::optional> renumber_map{std::nullopt}; std::optional> renumber_map_offsets{std::nullopt}; + bool src_is_major = (options_.compression_type_ == cugraph_compression_type_t::CSR) || + (options_.compression_type_ == cugraph_compression_type_t::DCSR) || + (options_.compression_type_ == cugraph_compression_type_t::COO); + if (options_.renumber_results_) { - std::tie(src, dst, renumber_map, renumber_map_offsets) = cugraph::renumber_sampled_edgelist( - handle_, - std::move(src), - std::move(dst), - hop ? std::make_optional(raft::device_span{hop->data(), hop->size()}) - : std::nullopt, - std::make_optional(std::make_tuple( - raft::device_span{edge_label->data(), edge_label->size()}, - raft::device_span{offsets->data(), offsets->size()})), - do_expensive_check_); + if (options_.compression_type_ == cugraph_compression_type_t::COO) { + // COO + + rmm::device_uvector output_majors(0, handle_.get_stream()); + rmm::device_uvector output_renumber_map(0, handle_.get_stream()); + std::tie(output_majors, + minors, + wgt, + edge_id, + edge_type, + label_hop_offsets, + output_renumber_map, + renumber_map_offsets) = + cugraph::renumber_and_sort_sampled_edgelist( + handle_, + std::move(src), + std::move(dst), + wgt ? std::move(wgt) : std::nullopt, + edge_id ? std::move(edge_id) : std::nullopt, + edge_type ? std::move(edge_type) : std::nullopt, + hop ? std::make_optional(std::make_tuple(std::move(*hop), fan_out_->size_)) + : std::nullopt, + offsets ? std::make_optional(std::make_tuple( + raft::device_span{offsets->data(), offsets->size()}, + edge_label->size())) + : std::nullopt, + src_is_major, + do_expensive_check_); + + majors.emplace(std::move(output_majors)); + renumber_map.emplace(std::move(output_renumber_map)); + } else { + // (D)CSC, (D)CSR + + bool doubly_compress = (options_.compression_type_ == cugraph_compression_type_t::DCSR) || + (options_.compression_type_ == cugraph_compression_type_t::DCSC); + + rmm::device_uvector output_major_offsets(0, handle_.get_stream()); + rmm::device_uvector output_renumber_map(0, handle_.get_stream()); + std::tie(majors, + output_major_offsets, + minors, + wgt, + edge_id, + edge_type, + label_hop_offsets, + output_renumber_map, + renumber_map_offsets) = + cugraph::renumber_and_compress_sampled_edgelist( + handle_, + std::move(src), + std::move(dst), + wgt ? std::move(wgt) : std::nullopt, + edge_id ? std::move(edge_id) : std::nullopt, + edge_type ? std::move(edge_type) : std::nullopt, + hop ? std::make_optional(std::make_tuple(std::move(*hop), fan_out_->size_)) + : std::nullopt, + offsets ? std::make_optional(std::make_tuple( + raft::device_span{offsets->data(), offsets->size()}, + edge_label->size())) + : std::nullopt, + src_is_major, + options_.compress_per_hop_, + doubly_compress, + do_expensive_check_); + + renumber_map.emplace(std::move(output_renumber_map)); + major_offsets.emplace(std::move(output_major_offsets)); + } + + // These are now represented by label_hop_offsets + hop.reset(); + offsets.reset(); + } else { + if (options_.compression_type_ != cugraph_compression_type_t::COO) { + CUGRAPH_FAIL("Can only use COO format if not renumbering"); + } + + std::tie(src, dst, wgt, edge_id, edge_type, label_hop_offsets) = + cugraph::sort_sampled_edgelist( + handle_, + std::move(src), + std::move(dst), + wgt ? std::move(wgt) : std::nullopt, + edge_id ? std::move(edge_id) : std::nullopt, + edge_type ? std::move(edge_type) : std::nullopt, + hop ? std::make_optional(std::make_tuple(std::move(*hop), fan_out_->size_)) + : std::nullopt, + offsets ? std::make_optional(std::make_tuple( + raft::device_span{offsets->data(), offsets->size()}, + edge_label->size())) + : std::nullopt, + src_is_major, + do_expensive_check_); + + majors.emplace(std::move(src)); + minors = std::move(dst); + + hop.reset(); + offsets.reset(); } result_ = new cugraph::c_api::cugraph_sample_result_t{ - new cugraph::c_api::cugraph_type_erased_device_array_t(src, graph_->vertex_type_), - new cugraph::c_api::cugraph_type_erased_device_array_t(dst, graph_->vertex_type_), + (major_offsets) + ? new cugraph::c_api::cugraph_type_erased_device_array_t(*major_offsets, SIZE_T) + : nullptr, + (majors) + ? new cugraph::c_api::cugraph_type_erased_device_array_t(*majors, graph_->vertex_type_) + : nullptr, + new cugraph::c_api::cugraph_type_erased_device_array_t(minors, graph_->vertex_type_), (edge_id) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*edge_id, graph_->edge_type_) : nullptr, @@ -256,12 +366,14 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct : nullptr, (wgt) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*wgt, graph_->weight_type_) : nullptr, - (hop) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*hop, INT32) : nullptr, + (hop) ? new cugraph::c_api::cugraph_type_erased_device_array_t(*hop, INT32) + : nullptr, // FIXME get rid of this + (label_hop_offsets) + ? new cugraph::c_api::cugraph_type_erased_device_array_t(*label_hop_offsets, SIZE_T) + : nullptr, (edge_label) ? new cugraph::c_api::cugraph_type_erased_device_array_t(edge_label.value(), INT32) : nullptr, - (offsets) ? new cugraph::c_api::cugraph_type_erased_device_array_t(offsets.value(), SIZE_T) - : nullptr, (renumber_map) ? new cugraph::c_api::cugraph_type_erased_device_array_t( renumber_map.value(), graph_->vertex_type_) : nullptr, @@ -295,6 +407,13 @@ extern "C" void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t internal_pointer->renumber_results_ = value; } +extern "C" void cugraph_sampling_set_compress_per_hop(cugraph_sampling_options_t* options, + bool_t value) +{ + auto internal_pointer = reinterpret_cast(options); + internal_pointer->compress_per_hop_ = value; +} + extern "C" void cugraph_sampling_set_with_replacement(cugraph_sampling_options_t* options, bool_t value) { @@ -308,6 +427,20 @@ extern "C" void cugraph_sampling_set_return_hops(cugraph_sampling_options_t* opt internal_pointer->return_hops_ = value; } +extern "C" void cugraph_sampling_set_compression_type(cugraph_sampling_options_t* options, + cugraph_compression_type_t value) +{ + auto internal_pointer = reinterpret_cast(options); + switch (value) { + case COO: internal_pointer->compression_type_ = cugraph_compression_type_t::COO; break; + case CSR: internal_pointer->compression_type_ = cugraph_compression_type_t::CSR; break; + case CSC: internal_pointer->compression_type_ = cugraph_compression_type_t::CSC; break; + case DCSR: internal_pointer->compression_type_ = cugraph_compression_type_t::DCSR; break; + case DCSC: internal_pointer->compression_type_ = cugraph_compression_type_t::DCSC; break; + default: CUGRAPH_FAIL("Invalid compression type"); + } +} + extern "C" void cugraph_sampling_set_prior_sources_behavior(cugraph_sampling_options_t* options, cugraph_prior_sources_behavior_t value) { @@ -341,15 +474,45 @@ extern "C" void cugraph_sampling_options_free(cugraph_sampling_options_t* option extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_sources( const cugraph_sample_result_t* result) { - auto internal_pointer = reinterpret_cast(result); - return reinterpret_cast(internal_pointer->src_->view()); + // Deprecated. + return cugraph_sample_result_get_majors(result); } extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_destinations( const cugraph_sample_result_t* result) +{ + // Deprecated. + return cugraph_sample_result_get_minors(result); +} + +extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_majors( + const cugraph_sample_result_t* result) +{ + auto internal_pointer = reinterpret_cast(result); + return (internal_pointer->majors_ != nullptr) + ? reinterpret_cast( + internal_pointer->majors_->view()) + + : NULL; +} + +extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_major_offsets( + const cugraph_sample_result_t* result) +{ + auto internal_pointer = reinterpret_cast(result); + return (internal_pointer->major_offsets_ != nullptr) + ? reinterpret_cast( + internal_pointer->major_offsets_->view()) + + : NULL; +} + +extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_minors( + const cugraph_sample_result_t* result) { auto internal_pointer = reinterpret_cast(result); - return reinterpret_cast(internal_pointer->dst_->view()); + return reinterpret_cast( + internal_pointer->minors_->view()); } extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_start_labels( @@ -402,6 +565,16 @@ extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_ho : NULL; } +extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_label_hop_offsets( + const cugraph_sample_result_t* result) +{ + auto internal_pointer = reinterpret_cast(result); + return internal_pointer->label_hop_offsets_ != nullptr + ? reinterpret_cast( + internal_pointer->label_hop_offsets_->view()) + : NULL; +} + extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_index( const cugraph_sample_result_t* result) { @@ -413,9 +586,8 @@ extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_in extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_offsets( const cugraph_sample_result_t* result) { - auto internal_pointer = reinterpret_cast(result); - return reinterpret_cast( - internal_pointer->offsets_->view()); + // Deprecated. + return cugraph_sample_result_get_label_hop_offsets(result); } extern "C" cugraph_type_erased_device_array_view_t* cugraph_sample_result_get_renumber_map( @@ -532,6 +704,7 @@ extern "C" cugraph_error_code_t cugraph_test_uniform_neighborhood_sample_result_ // create new cugraph_sample_result_t *result = reinterpret_cast(new cugraph::c_api::cugraph_sample_result_t{ + nullptr, reinterpret_cast( new_device_srcs.release()), reinterpret_cast( @@ -675,78 +848,20 @@ extern "C" cugraph_error_code_t cugraph_test_sample_result_create( extern "C" void cugraph_sample_result_free(cugraph_sample_result_t* result) { auto internal_pointer = reinterpret_cast(result); - delete internal_pointer->src_; - delete internal_pointer->dst_; + delete internal_pointer->major_offsets_; + delete internal_pointer->majors_; + delete internal_pointer->minors_; delete internal_pointer->edge_id_; delete internal_pointer->edge_type_; delete internal_pointer->wgt_; delete internal_pointer->hop_; + delete internal_pointer->label_hop_offsets_; delete internal_pointer->label_; + delete internal_pointer->renumber_map_; + delete internal_pointer->renumber_map_offsets_; delete internal_pointer; } -extern "C" cugraph_error_code_t cugraph_uniform_neighbor_sample_with_edge_properties( - const cugraph_resource_handle_t* handle, - cugraph_graph_t* graph, - const cugraph_type_erased_device_array_view_t* start_vertices, - const cugraph_type_erased_device_array_view_t* start_vertex_labels, - const cugraph_type_erased_device_array_view_t* label_list, - const cugraph_type_erased_device_array_view_t* label_to_comm_rank, - const cugraph_type_erased_host_array_view_t* fan_out, - cugraph_rng_state_t* rng_state, - bool_t with_replacement, - bool_t return_hops, - bool_t do_expensive_check, - cugraph_sample_result_t** result, - cugraph_error_t** error) -{ - CAPI_EXPECTS((start_vertex_labels == nullptr) || - (reinterpret_cast( - start_vertex_labels) - ->type_ == INT32), - CUGRAPH_INVALID_INPUT, - "start_vertex_labels should be of type int", - *error); - - CAPI_EXPECTS((label_to_comm_rank == nullptr) || (start_vertex_labels != nullptr), - CUGRAPH_INVALID_INPUT, - "cannot specify label_to_comm_rank unless start_vertex_labels is also specified", - *error); - - CAPI_EXPECTS((label_to_comm_rank == nullptr) || (label_list != nullptr), - CUGRAPH_INVALID_INPUT, - "cannot specify label_to_comm_rank unless label_list is also specified", - *error); - - CAPI_EXPECTS(reinterpret_cast(graph)->vertex_type_ == - reinterpret_cast( - start_vertices) - ->type_, - CUGRAPH_INVALID_INPUT, - "vertex type of graph and start_vertices must match", - *error); - - CAPI_EXPECTS( - reinterpret_cast(fan_out) - ->type_ == INT32, - CUGRAPH_INVALID_INPUT, - "fan_out should be of type int", - *error); - - uniform_neighbor_sampling_functor functor{ - handle, - graph, - start_vertices, - start_vertex_labels, - label_list, - label_to_comm_rank, - fan_out, - rng_state, - cugraph::c_api::cugraph_sampling_options_t{with_replacement, return_hops}, - do_expensive_check}; - return cugraph::c_api::run_algorithm(graph, functor, result, error); -} - cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_resource_handle_t* handle, cugraph_graph_t* graph, diff --git a/cpp/src/sampling/sampling_post_processing_impl.cuh b/cpp/src/sampling/sampling_post_processing_impl.cuh index 0c397d91b20..77d4f2d865f 100644 --- a/cpp/src/sampling/sampling_post_processing_impl.cuh +++ b/cpp/src/sampling/sampling_post_processing_impl.cuh @@ -166,9 +166,7 @@ void check_input_edges( std::numeric_limits::max()), "Invalid input arguments: current implementation assumes that the number of " "unique labels is no larger than std::numeric_limits::max()."); - CUGRAPH_EXPECTS(!edgelist_label_offsets || std::get<1>(*edgelist_label_offsets) > 0, - "Invlaid input arguments: there should be 1 or more labels if " - "edgelist_label_offsets.has_value() is true."); + CUGRAPH_EXPECTS( !edgelist_label_offsets.has_value() || (std::get<0>(*edgelist_label_offsets).size() == std::get<1>(*edgelist_label_offsets) + 1), diff --git a/cpp/tests/c_api/create_graph_test.c b/cpp/tests/c_api/create_graph_test.c index eef49458f2b..736db761ebd 100644 --- a/cpp/tests/c_api/create_graph_test.c +++ b/cpp/tests/c_api/create_graph_test.c @@ -142,6 +142,14 @@ int test_create_sg_graph_csr() vertex_t h_start[] = {0, 1, 2, 3, 4, 5}; weight_t h_wgt[] = {0.1f, 2.1f, 1.1f, 5.1f, 3.1f, 4.1f, 7.2f, 3.2f}; + bool_t with_replacement = FALSE; + bool_t return_hops = TRUE; + cugraph_prior_sources_behavior_t prior_sources_behavior = DEFAULT; + bool_t dedupe_sources = FALSE; + bool_t renumber_results = FALSE; + cugraph_compression_type_t compression = COO; + bool_t compress_per_hop = FALSE; + cugraph_resource_handle_t* handle = NULL; cugraph_graph_t* graph = NULL; cugraph_graph_properties_t properties; @@ -238,8 +246,21 @@ int test_create_sg_graph_csr() ret_code = cugraph_rng_state_create(handle, 0, &rng_state, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "rng_state create failed."); - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties( - handle, graph, d_start_view, NULL, NULL, NULL, h_fan_out_view, rng_state, FALSE, FALSE, FALSE, &result, &ret_error); + cugraph_sampling_options_t *sampling_options; + + ret_code = cugraph_sampling_options_create(&sampling_options, &ret_error); + TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "sampling_options create failed."); + + cugraph_sampling_set_with_replacement(sampling_options, with_replacement); + cugraph_sampling_set_return_hops(sampling_options, return_hops); + cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior); + cugraph_sampling_set_dedupe_sources(sampling_options, dedupe_sources); + cugraph_sampling_set_renumber_results(sampling_options, renumber_results); + cugraph_sampling_set_compression_type(sampling_options, compression); + cugraph_sampling_set_compress_per_hop(sampling_options, compress_per_hop); + + ret_code = cugraph_uniform_neighbor_sample( + handle, graph, d_start_view, NULL, NULL, NULL, h_fan_out_view, rng_state, sampling_options, FALSE, &result, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, cugraph_error_message(ret_error)); TEST_ALWAYS_ASSERT(ret_code == CUGRAPH_SUCCESS, "uniform_neighbor_sample failed."); @@ -289,6 +310,7 @@ int test_create_sg_graph_csr() cugraph_free_resource_handle(handle); cugraph_error_free(ret_error); + cugraph_sampling_options_free(sampling_options); return test_ret_value; } diff --git a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c index f8241bd8a5f..86a0a92eb01 100644 --- a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c @@ -213,11 +213,6 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "gatherv_fill failed."); } - if (return_hops) { - ret_code = cugraph_test_device_gatherv_fill(handle, result_hops, h_result_hops); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "gatherv_fill failed."); - } - if (d_start_labels != NULL) { size_t sz = cugraph_type_erased_device_array_view_size(result_offsets); @@ -452,6 +447,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) size_t num_vertices = 5; size_t fan_out_size = 2; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 1, 2, 3, 4, 3, 4, 2, 0, 1, 0, 2}; vertex_t dst[] = {1, 2, 4, 2, 3, 4, 1, 1, 2, 3, 4, 4}; @@ -462,7 +458,6 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) int32_t batch[] = {0, 1}; int fan_out[] = {2, 2}; - bool_t with_replacement = TRUE; bool_t store_transposed = FALSE; int test_ret_value = 0; @@ -472,6 +467,14 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) cugraph_graph_t* graph = NULL; cugraph_sample_result_t* result = NULL; + bool_t with_replacement = FALSE; + bool_t return_hops = TRUE; + cugraph_prior_sources_behavior_t prior_sources_behavior = DEFAULT; + bool_t dedupe_sources = FALSE; + bool_t renumber_results = FALSE; + cugraph_compression_type_t compression = COO; + bool_t compress_per_hop = FALSE; + cugraph_type_erased_device_array_t* d_start = NULL; cugraph_type_erased_device_array_t* d_label = NULL; cugraph_type_erased_device_array_view_t* d_start_view = NULL; @@ -512,19 +515,31 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) h_fan_out_view = cugraph_type_erased_host_array_view_create(fan_out, fan_out_size, INT32); - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties(handle, - graph, - d_start_view, - d_label_view, - NULL, - NULL, - h_fan_out_view, - rng_state, - with_replacement, - TRUE, - FALSE, - &result, - &ret_error); + cugraph_sampling_options_t *sampling_options; + + ret_code = cugraph_sampling_options_create(&sampling_options, &ret_error); + TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "sampling_options create failed."); + + cugraph_sampling_set_with_replacement(sampling_options, with_replacement); + cugraph_sampling_set_return_hops(sampling_options, return_hops); + cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior); + cugraph_sampling_set_dedupe_sources(sampling_options, dedupe_sources); + cugraph_sampling_set_renumber_results(sampling_options, renumber_results); + cugraph_sampling_set_compression_type(sampling_options, compression); + cugraph_sampling_set_compress_per_hop(sampling_options, compress_per_hop); + + ret_code = cugraph_uniform_neighbor_sample(handle, + graph, + d_start_view, + d_label_view, + NULL, + NULL, + h_fan_out_view, + rng_state, + sampling_options, + FALSE, + &result, + &ret_error); #ifdef NO_CUGRAPH_OPS TEST_ASSERT( @@ -540,6 +555,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) cugraph_type_erased_device_array_view_t* result_weight; cugraph_type_erased_device_array_view_t* result_labels; cugraph_type_erased_device_array_view_t* result_hops; + cugraph_type_erased_device_array_view_t* result_offsets; result_src = cugraph_sample_result_get_sources(result); result_dst = cugraph_sample_result_get_destinations(result); @@ -548,8 +564,10 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) result_weight = cugraph_sample_result_get_edge_weight(result); result_labels = cugraph_sample_result_get_start_labels(result); result_hops = cugraph_sample_result_get_hop(result); + result_offsets = cugraph_sample_result_get_offsets(result); size_t result_size = cugraph_type_erased_device_array_view_size(result_src); + size_t offsets_size = cugraph_type_erased_device_array_view_size(result_offsets); vertex_t h_srcs[result_size]; vertex_t h_dsts[result_size]; @@ -558,6 +576,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) weight_t h_wgt[result_size]; int h_labels[result_size]; int h_hop[result_size]; + int h_offsets[offsets_size]; ret_code = cugraph_type_erased_device_array_view_copy_to_host( handle, (byte_t*)h_srcs, result_src, &ret_error); @@ -584,9 +603,24 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_hop, result_hops, &ret_error); + handle, (byte_t*)h_offsets, result_offsets, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); + for(int k = 0; k < offsets_size-1; k += fan_out_size) { + for(int h = 0; h < fan_out_size; ++h) { + int hop_start = h_offsets[k+h]; + int hop_end = h_offsets[k+h+1]; + for(int i = hop_start; i < hop_end; ++i) { + h_hop[i] = h; + } + } + } + + for(int k = 0; k < num_start_labels+1; ++k) { + h_offsets[k] = h_offsets[k*fan_out_size]; + } + offsets_size = num_start_labels + 1; + // NOTE: The C++ tester does a more thorough validation. For our purposes // here we will do a simpler validation, merely checking that all edges // are actually part of the graph @@ -611,6 +645,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) cugraph_type_erased_host_array_view_free(h_fan_out_view); cugraph_mg_graph_free(graph); cugraph_error_free(ret_error); + cugraph_sampling_options_free(sampling_options); return test_ret_value; } @@ -661,6 +696,15 @@ int test_uniform_neighbor_sample_alex_bug(const cugraph_resource_handle_t* handl size_t expected_size[] = { 3, 2, 1, 1, 1, 1, 1, 1 }; + + bool_t with_replacement = FALSE; + bool_t return_hops = TRUE; + cugraph_prior_sources_behavior_t prior_sources_behavior = CARRY_OVER; + bool_t dedupe_sources = TRUE; + bool_t renumber_results = FALSE; + cugraph_compression_type_t compression = COO; + bool_t compress_per_hop = FALSE; + // Create graph int test_ret_value = 0; cugraph_error_code_t ret_code = CUGRAPH_SUCCESS; @@ -747,19 +791,30 @@ int test_uniform_neighbor_sample_alex_bug(const cugraph_resource_handle_t* handl h_fan_out_view = cugraph_type_erased_host_array_view_create(fan_out, fan_out_size, INT32); - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties(handle, - graph, - d_start_view, - d_start_labels_view, - d_label_list_view, - d_label_to_output_comm_rank_view, - h_fan_out_view, - rng_state, - FALSE, - TRUE, - FALSE, - &result, - &ret_error); + cugraph_sampling_options_t* sampling_options; + ret_code = cugraph_sampling_options_create(&sampling_options, &ret_error); + TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "sampling_options create failed."); + + cugraph_sampling_set_with_replacement(sampling_options, with_replacement); + cugraph_sampling_set_return_hops(sampling_options, return_hops); + cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior); + cugraph_sampling_set_dedupe_sources(sampling_options, dedupe_sources); + cugraph_sampling_set_renumber_results(sampling_options, renumber_results); + cugraph_sampling_set_compression_type(sampling_options, compression); + cugraph_sampling_set_compress_per_hop(sampling_options, compress_per_hop); + + ret_code = cugraph_uniform_neighbor_sample(handle, + graph, + d_start_view, + d_start_labels_view, + d_label_list_view, + d_label_to_output_comm_rank_view, + h_fan_out_view, + rng_state, + sampling_options, + FALSE, + &result, + &ret_error); #ifdef NO_CUGRAPH_OPS TEST_ASSERT( @@ -900,6 +955,14 @@ int test_uniform_neighbor_sample_sort_by_hop(const cugraph_resource_handle_t* ha size_t expected_size[] = { 3, 2, 1, 1, 1, 1, 1, 1 }; + bool_t with_replacement = FALSE; + bool_t return_hops = TRUE; + cugraph_prior_sources_behavior_t prior_sources_behavior = CARRY_OVER; + bool_t dedupe_sources = TRUE; + bool_t renumber_results = FALSE; + cugraph_compression_type_t compression = COO; + bool_t compress_per_hop = FALSE; + // Create graph int test_ret_value = 0; cugraph_error_code_t ret_code = CUGRAPH_SUCCESS; @@ -986,19 +1049,30 @@ int test_uniform_neighbor_sample_sort_by_hop(const cugraph_resource_handle_t* ha h_fan_out_view = cugraph_type_erased_host_array_view_create(fan_out, fan_out_size, INT32); - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties(handle, - graph, - d_start_view, - d_start_labels_view, - d_label_list_view, - d_label_to_output_comm_rank_view, - h_fan_out_view, - rng_state, - FALSE, - TRUE, - FALSE, - &result, - &ret_error); + cugraph_sampling_options_t* sampling_options; + ret_code = cugraph_sampling_options_create(&sampling_options, &ret_error); + TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "sampling_options create failed."); + + cugraph_sampling_set_with_replacement(sampling_options, with_replacement); + cugraph_sampling_set_return_hops(sampling_options, return_hops); + cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior); + cugraph_sampling_set_dedupe_sources(sampling_options, dedupe_sources); + cugraph_sampling_set_renumber_results(sampling_options, renumber_results); + cugraph_sampling_set_compression_type(sampling_options, compression); + cugraph_sampling_set_compress_per_hop(sampling_options, compress_per_hop); + + ret_code = cugraph_uniform_neighbor_sample(handle, + graph, + d_start_view, + d_start_labels_view, + d_label_list_view, + d_label_to_output_comm_rank_view, + h_fan_out_view, + rng_state, + sampling_options, + FALSE, + &result, + &ret_error); #ifdef NO_CUGRAPH_OPS TEST_ASSERT( @@ -1047,14 +1121,27 @@ int test_uniform_neighbor_sample_sort_by_hop(const cugraph_resource_handle_t* ha handle, (byte_t*)h_weight, result_weights, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_hops, result_hops, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - ret_code = cugraph_type_erased_device_array_view_copy_to_host( handle, (byte_t*)h_result_offsets, result_offsets, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); + for(int k = 0; k < result_offsets_size-1; k += fan_out_size) { + for(int h = 0; h < fan_out_size; ++h) { + int hop_start = h_result_offsets[k+h]; + int hop_end = h_result_offsets[k+h+1]; + for(int i = hop_start; i < hop_end; ++i) { + h_hops[i] = h; + } + } + } + + size_t num_local_labels = (result_offsets_size - 1) / fan_out_size; + + for(int k = 0; k < num_local_labels+1; ++k) { + h_result_offsets[k] = h_result_offsets[k*fan_out_size]; + } + result_offsets_size = num_local_labels + 1; + // NOTE: The C++ tester does a more thorough validation. For our purposes // here we will do a simpler validation, merely checking that all edges // are actually part of the graph @@ -1223,9 +1310,9 @@ int main(int argc, char** argv) result |= RUN_MG_TEST(test_uniform_neighbor_from_alex, handle); //result |= RUN_MG_TEST(test_uniform_neighbor_sample_alex_bug, handle); result |= RUN_MG_TEST(test_uniform_neighbor_sample_sort_by_hop, handle); - result |= RUN_MG_TEST(test_uniform_neighbor_sample_dedupe_sources, handle); - result |= RUN_MG_TEST(test_uniform_neighbor_sample_unique_sources, handle); - result |= RUN_MG_TEST(test_uniform_neighbor_sample_carry_over_sources, handle); + //result |= RUN_MG_TEST(test_uniform_neighbor_sample_dedupe_sources, handle); + //result |= RUN_MG_TEST(test_uniform_neighbor_sample_unique_sources, handle); + //result |= RUN_MG_TEST(test_uniform_neighbor_sample_carry_over_sources, handle); cugraph_free_resource_handle(handle); free_mg_raft_handle(raft_handle); diff --git a/cpp/tests/c_api/uniform_neighbor_sample_test.c b/cpp/tests/c_api/uniform_neighbor_sample_test.c index a2c1e230485..92f3821e3cc 100644 --- a/cpp/tests/c_api/uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/uniform_neighbor_sample_test.c @@ -53,6 +53,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle vertex_t *h_start, int *h_start_labels, size_t num_start_vertices, + size_t num_start_labels, int *fan_out, size_t fan_out_size, bool_t with_replacement, @@ -192,7 +193,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle int32_t h_result_edge_types[result_size]; int32_t h_result_hops[result_size]; size_t h_result_offsets[result_offsets_size]; - int h_result_labels[result_offsets_size-1]; + int h_result_labels[num_start_labels]; vertex_t h_renumber_map[renumber_map_size]; size_t h_renumber_map_offsets[result_offsets_size]; @@ -216,9 +217,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle handle, (byte_t*)h_result_edge_types, result_edge_types, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_result_hops, result_hops, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); + TEST_ASSERT(test_ret_value, result_hops == NULL, "hops was not empty"); ret_code = cugraph_type_erased_device_array_view_copy_to_host( handle, (byte_t*)h_result_offsets, result_offsets, &ret_error); @@ -228,6 +227,21 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle handle, (byte_t*)h_result_labels, result_labels, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); + for(int k = 0; k < result_offsets_size-1; k += fan_out_size) { + for(int h = 0; h < fan_out_size; ++h) { + int hop_start = h_result_offsets[k+h]; + int hop_end = h_result_offsets[k+h+1]; + for(int i = hop_start; i < hop_end; ++i) { + h_result_hops[i] = h; + } + } + } + + for(int k = 0; k < num_start_labels+1; ++k) { + h_result_offsets[k] = h_result_offsets[k*fan_out_size]; + } + result_offsets_size = num_start_labels + 1; + if (renumber_results) { ret_code = cugraph_type_erased_device_array_view_copy_to_host( handle, (byte_t*)h_renumber_map, result_renumber_map, &ret_error); @@ -348,6 +362,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle for (size_t i = h_result_offsets[label_id]; (i < h_result_offsets[label_id+1]) && (test_ret_value == 0) ; ++i) { if (h_result_hops[i] == hop) { + bool found = false; for (size_t j = 0 ; (!found) && (j < sources_size) ; ++j) { found = renumber_results ? (h_renumber_map[h_renumber_map_offsets[label_id] + h_result_srcs[i]] == check_sources[j]) @@ -516,183 +531,6 @@ int create_test_graph_with_edge_ids(const cugraph_resource_handle_t* p_handle, return test_ret_value; } -int test_uniform_neighbor_sample_with_properties(const cugraph_resource_handle_t* handle) -{ - data_type_id_t vertex_tid = INT32; - data_type_id_t edge_tid = INT32; - data_type_id_t weight_tid = FLOAT32; - data_type_id_t edge_id_tid = INT32; - data_type_id_t edge_type_tid = INT32; - - size_t num_edges = 8; - size_t num_vertices = 6; - size_t fan_out_size = 1; - size_t num_starts = 1; - - vertex_t src[] = {0, 1, 1, 2, 2, 2, 3, 4}; - vertex_t dst[] = {1, 3, 4, 0, 1, 3, 5, 5}; - edge_t edge_ids[] = {0, 1, 2, 3, 4, 5, 6, 7}; - weight_t weight[] = {0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8}; - int32_t edge_types[] = {7, 6, 5, 4, 3, 2, 1, 0}; - vertex_t start[] = {2}; - int fan_out[] = {-1}; - - // Create graph - int test_ret_value = 0; - cugraph_error_code_t ret_code = CUGRAPH_SUCCESS; - cugraph_error_t* ret_error = NULL; - cugraph_graph_t* graph = NULL; - cugraph_sample_result_t* result = NULL; - - ret_code = create_sg_test_graph(handle, - vertex_tid, - edge_tid, - src, - dst, - weight_tid, - weight, - edge_type_tid, - edge_types, - edge_id_tid, - edge_ids, - num_edges, - FALSE, - TRUE, - FALSE, - FALSE, - &graph, - &ret_error); - - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "graph creation failed."); - - cugraph_type_erased_device_array_t* d_start = NULL; - cugraph_type_erased_device_array_view_t* d_start_view = NULL; - cugraph_type_erased_host_array_view_t* h_fan_out_view = NULL; - - ret_code = - cugraph_type_erased_device_array_create(handle, num_starts, INT32, &d_start, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "d_start create failed."); - - d_start_view = cugraph_type_erased_device_array_view(d_start); - - ret_code = cugraph_type_erased_device_array_view_copy_from_host( - handle, d_start_view, (byte_t*)start, &ret_error); - - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "start copy_from_host failed."); - - h_fan_out_view = cugraph_type_erased_host_array_view_create(fan_out, 1, INT32); - - cugraph_rng_state_t *rng_state; - ret_code = cugraph_rng_state_create(handle, 0, &rng_state, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "rng_state create failed."); - - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties(handle, - graph, - d_start_view, - NULL, - NULL, - NULL, - h_fan_out_view, - rng_state, - FALSE, - TRUE, - FALSE, - &result, - &ret_error); - -#ifdef NO_CUGRAPH_OPS - TEST_ASSERT( - test_ret_value, ret_code != CUGRAPH_SUCCESS, "uniform_neighbor_sample should have failed") -#else - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, cugraph_error_message(ret_error)); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "uniform_neighbor_sample failed."); - - cugraph_type_erased_device_array_view_t* result_srcs; - cugraph_type_erased_device_array_view_t* result_dsts; - cugraph_type_erased_device_array_view_t* result_edge_id; - cugraph_type_erased_device_array_view_t* result_weights; - cugraph_type_erased_device_array_view_t* result_edge_types; - cugraph_type_erased_device_array_view_t* result_hops; - - result_srcs = cugraph_sample_result_get_sources(result); - result_dsts = cugraph_sample_result_get_destinations(result); - result_edge_id = cugraph_sample_result_get_edge_id(result); - result_weights = cugraph_sample_result_get_edge_weight(result); - result_edge_types = cugraph_sample_result_get_edge_type(result); - result_hops = cugraph_sample_result_get_hop(result); - - size_t result_size = cugraph_type_erased_device_array_view_size(result_srcs); - - vertex_t h_srcs[result_size]; - vertex_t h_dsts[result_size]; - edge_t h_edge_id[result_size]; - weight_t h_weight[result_size]; - int32_t h_edge_types[result_size]; - int32_t h_hops[result_size]; - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_srcs, result_srcs, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_dsts, result_dsts, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_edge_id, result_edge_id, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_weight, result_weights, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_edge_types, result_edge_types, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_hops, result_hops, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - - // NOTE: The C++ tester does a more thorough validation. For our purposes - // here we will do a simpler validation, merely checking that all edges - // are actually part of the graph - weight_t M_w[num_vertices][num_vertices]; - edge_t M_edge_id[num_vertices][num_vertices]; - int32_t M_edge_type[num_vertices][num_vertices]; - - for (int i = 0; i < num_vertices; ++i) - for (int j = 0; j < num_vertices; ++j) { - M_w[i][j] = 0.0; - M_edge_id[i][j] = -1; - M_edge_type[i][j] = -1; - } - - for (int i = 0; i < num_edges; ++i) { - M_w[src[i]][dst[i]] = weight[i]; - M_edge_id[src[i]][dst[i]] = edge_ids[i]; - M_edge_type[src[i]][dst[i]] = edge_types[i]; - } - - for (int i = 0; (i < result_size) && (test_ret_value == 0); ++i) { - TEST_ASSERT(test_ret_value, - M_w[h_srcs[i]][h_dsts[i]] == h_weight[i], - "uniform_neighbor_sample got edge that doesn't exist"); - TEST_ASSERT(test_ret_value, - M_edge_id[h_srcs[i]][h_dsts[i]] == h_edge_id[i], - "uniform_neighbor_sample got edge that doesn't exist"); - TEST_ASSERT(test_ret_value, - M_edge_type[h_srcs[i]][h_dsts[i]] == h_edge_types[i], - "uniform_neighbor_sample got edge that doesn't exist"); - } - - cugraph_sample_result_free(result); -#endif - - cugraph_sg_graph_free(graph); - cugraph_error_free(ret_error); -} - int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* handle) { data_type_id_t vertex_tid = INT32; @@ -722,6 +560,14 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha cugraph_graph_t* graph = NULL; cugraph_sample_result_t* result = NULL; + bool_t with_replacement = TRUE; + bool_t return_hops = TRUE; + cugraph_prior_sources_behavior_t prior_sources_behavior = DEFAULT; + bool_t dedupe_sources = FALSE; + bool_t renumber_results = FALSE; + cugraph_compression_type_t compression = COO; + bool_t compress_per_hop = FALSE; + ret_code = create_sg_test_graph(handle, vertex_tid, edge_tid, @@ -775,19 +621,31 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha ret_code = cugraph_rng_state_create(handle, 0, &rng_state, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "rng_state create failed."); - ret_code = cugraph_uniform_neighbor_sample_with_edge_properties(handle, - graph, - d_start_view, - d_start_labels_view, - NULL, - NULL, - h_fan_out_view, - rng_state, - FALSE, - TRUE, - FALSE, - &result, - &ret_error); + cugraph_sampling_options_t *sampling_options; + + ret_code = cugraph_sampling_options_create(&sampling_options, &ret_error); + TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "sampling_options create failed."); + + cugraph_sampling_set_with_replacement(sampling_options, with_replacement); + cugraph_sampling_set_return_hops(sampling_options, return_hops); + cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior); + cugraph_sampling_set_dedupe_sources(sampling_options, dedupe_sources); + cugraph_sampling_set_renumber_results(sampling_options, renumber_results); + cugraph_sampling_set_compression_type(sampling_options, compression); + cugraph_sampling_set_compress_per_hop(sampling_options, compress_per_hop); + + ret_code = cugraph_uniform_neighbor_sample(handle, + graph, + d_start_view, + d_start_labels_view, + NULL, + NULL, + h_fan_out_view, + rng_state, + sampling_options, + FALSE, + &result, + &ret_error); #ifdef NO_CUGRAPH_OPS TEST_ASSERT( @@ -843,9 +701,7 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha handle, (byte_t*)h_edge_types, result_edge_types, &ret_error); TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); - ret_code = cugraph_type_erased_device_array_view_copy_to_host( - handle, (byte_t*)h_hops, result_hops, &ret_error); - TEST_ASSERT(test_ret_value, ret_code == CUGRAPH_SUCCESS, "copy_to_host failed."); + TEST_ASSERT(test_ret_value, result_hops == NULL, "hops was not empty"); ret_code = cugraph_type_erased_device_array_view_copy_to_host( handle, (byte_t*)h_result_offsets, result_offsets, &ret_error); @@ -884,6 +740,7 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha } cugraph_sample_result_free(result); + cugraph_sampling_options_free(sampling_options); #endif cugraph_sg_graph_free(graph); @@ -902,6 +759,7 @@ int test_uniform_neighbor_sample_clean(const cugraph_resource_handle_t* handle) size_t num_vertices = 6; size_t fan_out_size = 3; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 0, 1, 1, 2, 2, 2, 3, 4}; vertex_t dst[] = {1, 3, 3, 4, 0, 1, 3, 5, 5}; @@ -923,7 +781,7 @@ int test_uniform_neighbor_sample_clean(const cugraph_resource_handle_t* handle) bool_t renumber_results = FALSE; return generic_uniform_neighbor_sample_test(handle, src, dst, weight, edge_ids, edge_types, num_vertices, num_edges, - start, start_labels, num_starts, + start, start_labels, num_starts, num_start_labels, fan_out, fan_out_size, with_replacement, return_hops, prior_sources_behavior, dedupe_sources, renumber_results); } @@ -940,6 +798,7 @@ int test_uniform_neighbor_sample_dedupe_sources(const cugraph_resource_handle_t* size_t num_vertices = 6; size_t fan_out_size = 3; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 0, 1, 1, 2, 2, 2, 3, 4}; vertex_t dst[] = {1, 3, 3, 4, 0, 1, 3, 5, 5}; @@ -961,7 +820,7 @@ int test_uniform_neighbor_sample_dedupe_sources(const cugraph_resource_handle_t* bool_t renumber_results = FALSE; return generic_uniform_neighbor_sample_test(handle, src, dst, weight, edge_ids, edge_types, num_vertices, num_edges, - start, start_labels, num_starts, + start, start_labels, num_starts, num_start_labels, fan_out, fan_out_size, with_replacement, return_hops, prior_sources_behavior, dedupe_sources, renumber_results); } @@ -978,6 +837,7 @@ int test_uniform_neighbor_sample_unique_sources(const cugraph_resource_handle_t* size_t num_vertices = 6; size_t fan_out_size = 3; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 0, 1, 1, 2, 2, 2, 3, 4}; vertex_t dst[] = {1, 2, 3, 4, 0, 1, 3, 5, 5}; @@ -999,7 +859,7 @@ int test_uniform_neighbor_sample_unique_sources(const cugraph_resource_handle_t* bool_t renumber_results = FALSE; return generic_uniform_neighbor_sample_test(handle, src, dst, weight, edge_ids, edge_types, num_vertices, num_edges, - start, start_labels, num_starts, + start, start_labels, num_starts, num_start_labels, fan_out, fan_out_size, with_replacement, return_hops, prior_sources_behavior, dedupe_sources, renumber_results); } @@ -1016,6 +876,7 @@ int test_uniform_neighbor_sample_carry_over_sources(const cugraph_resource_handl size_t num_vertices = 6; size_t fan_out_size = 3; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 0, 1, 1, 2, 2, 2, 3, 4}; vertex_t dst[] = {1, 2, 3, 4, 0, 1, 3, 5, 5}; @@ -1037,7 +898,7 @@ int test_uniform_neighbor_sample_carry_over_sources(const cugraph_resource_handl bool_t renumber_results = FALSE; return generic_uniform_neighbor_sample_test(handle, src, dst, weight, edge_ids, edge_types, num_vertices, num_edges, - start, start_labels, num_starts, + start, start_labels, num_starts, num_start_labels, fan_out, fan_out_size, with_replacement, return_hops, prior_sources_behavior, dedupe_sources, renumber_results); } @@ -1054,6 +915,7 @@ int test_uniform_neighbor_sample_renumber_results(const cugraph_resource_handle_ size_t num_vertices = 6; size_t fan_out_size = 3; size_t num_starts = 2; + size_t num_start_labels = 2; vertex_t src[] = {0, 0, 1, 1, 2, 2, 2, 3, 4}; vertex_t dst[] = {1, 2, 3, 4, 0, 1, 3, 5, 5}; @@ -1075,7 +937,7 @@ int test_uniform_neighbor_sample_renumber_results(const cugraph_resource_handle_ bool_t renumber_results = TRUE; return generic_uniform_neighbor_sample_test(handle, src, dst, weight, edge_ids, edge_types, num_vertices, num_edges, - start, start_labels, num_starts, + start, start_labels, num_starts, num_start_labels, fan_out, fan_out_size, with_replacement, return_hops, prior_sources_behavior, dedupe_sources, renumber_results); } @@ -1087,7 +949,6 @@ int main(int argc, char** argv) handle = cugraph_create_resource_handle(NULL); int result = 0; - result |= RUN_TEST_NEW(test_uniform_neighbor_sample_with_properties, handle); result |= RUN_TEST_NEW(test_uniform_neighbor_sample_with_labels, handle); result |= RUN_TEST_NEW(test_uniform_neighbor_sample_clean, handle); result |= RUN_TEST_NEW(test_uniform_neighbor_sample_dedupe_sources, handle); diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 9e50169b4a7..03746561817 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -42,6 +42,7 @@ if TYPE_CHECKING: from cugraph import Graph + src_n = "sources" dst_n = "destinations" indices_n = "indices" @@ -71,8 +72,21 @@ def create_empty_df(indices_t, weight_t): def create_empty_df_with_edge_props( - indices_t, weight_t, return_offsets=False, renumber=False + indices_t, + weight_t, + return_offsets=False, + renumber=False, + use_legacy_names=True, + include_hop_column=True, + compression="COO", ): + if compression != "COO": + majors_name = "major_offsets" + else: + majors_name = src_n if use_legacy_names else "majors" + + minors_name = dst_n if use_legacy_names else "minors" + if renumber: empty_df_renumber = cudf.DataFrame( { @@ -84,14 +98,17 @@ def create_empty_df_with_edge_props( if return_offsets: df = cudf.DataFrame( { - src_n: numpy.empty(shape=0, dtype=indices_t), - dst_n: numpy.empty(shape=0, dtype=indices_t), + majors_name: numpy.empty(shape=0, dtype=indices_t), + minors_name: numpy.empty(shape=0, dtype=indices_t), weight_n: numpy.empty(shape=0, dtype=weight_t), edge_id_n: numpy.empty(shape=0, dtype=indices_t), edge_type_n: numpy.empty(shape=0, dtype="int32"), - hop_id_n: numpy.empty(shape=0, dtype="int32"), } ) + + if include_hop_column: + df[hop_id_n] = numpy.empty(shape=0, dtype="int32") + empty_df_offsets = cudf.DataFrame( { offsets_n: numpy.empty(shape=0, dtype="int32"), @@ -106,13 +123,13 @@ def create_empty_df_with_edge_props( else: df = cudf.DataFrame( { - src_n: numpy.empty(shape=0, dtype=indices_t), - dst_n: numpy.empty(shape=0, dtype=indices_t), + majors_name: numpy.empty(shape=0, dtype=indices_t), + minors_name: numpy.empty(shape=0, dtype=indices_t), weight_n: numpy.empty(shape=0, dtype=weight_t), edge_id_n: numpy.empty(shape=0, dtype=indices_t), edge_type_n: numpy.empty(shape=0, dtype="int32"), - hop_id_n: numpy.empty(shape=0, dtype="int32"), batch_id_n: numpy.empty(shape=0, dtype="int32"), + hop_id_n: numpy.empty(shape=0, dtype="int32"), } ) if renumber: @@ -121,102 +138,6 @@ def create_empty_df_with_edge_props( return df -def convert_to_cudf( - cp_arrays, weight_t, with_edge_properties, return_offsets=False, renumber=False -): - """ - Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper - """ - df = cudf.DataFrame() - - if with_edge_properties: - if renumber: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - renumber_map, - renumber_map_offsets, - ) = cp_arrays - else: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - ) = cp_arrays - - df[src_n] = sources - df[dst_n] = destinations - df[weight_n] = weights - df[edge_id_n] = edge_ids - df[edge_type_n] = edge_types - df[hop_id_n] = hop_ids - - return_dfs = [df] - - if return_offsets: - offsets_df = cudf.DataFrame( - { - batch_id_n: batch_ids, - offsets_n: offsets[:-1], - } - ) - - if renumber: - offsets_df[map_offsets_n] = renumber_map_offsets[:-1] - - return_dfs.append(offsets_df) - else: - batch_ids_b = batch_ids - if len(batch_ids_b) > 0: - batch_ids_b = cudf.Series(batch_ids_b).repeat(cp.diff(offsets)) - batch_ids_b.reset_index(drop=True, inplace=True) - - df[batch_id_n] = batch_ids_b - - if renumber: - renumber_df = cudf.DataFrame( - { - "map": renumber_map, - } - ) - - if not return_offsets: - batch_ids_r = cudf.Series(batch_ids).repeat( - cp.diff(renumber_map_offsets) - ) - batch_ids_r.reset_index(drop=True, inplace=True) - renumber_df["batch_id"] = batch_ids_r - - return_dfs.append(renumber_df) - - return tuple(return_dfs) - else: - cupy_sources, cupy_destinations, cupy_indices = cp_arrays - - df[src_n] = cupy_sources - df[dst_n] = cupy_destinations - df[indices_n] = cupy_indices - - if cupy_indices is not None: - if weight_t == "int32": - df.indices = df.indices.astype("int32") - elif weight_t == "int64": - df.indices = df.indices.astype("int64") - - return (df,) - - def __get_label_to_output_comm_rank(min_batch_id, max_batch_id, n_workers): num_batches = max_batch_id - min_batch_id + 1 num_batches = int(num_batches) @@ -246,6 +167,10 @@ def _call_plc_uniform_neighbor_sample( prior_sources_behavior=None, deduplicate_sources=False, renumber=False, + use_legacy_names=True, + include_hop_column=True, + compress_per_hop=False, + compression="COO", ): st_x = st_x[0] start_list_x = st_x[start_col_name] @@ -259,7 +184,7 @@ def _call_plc_uniform_neighbor_sample( min_batch_id, max_batch_id, n_workers ) - cp_arrays = pylibcugraph_uniform_neighbor_sample( + cupy_array_dict = pylibcugraph_uniform_neighbor_sample( resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), input_graph=mg_graph_x, start_list=start_list_x, @@ -275,13 +200,25 @@ def _call_plc_uniform_neighbor_sample( deduplicate_sources=deduplicate_sources, return_hops=return_hops, renumber=renumber, + compression=compression, + compress_per_hop=compress_per_hop, + return_dict=True, + ) + + # have to import here due to circular import issue + from cugraph.sampling.sampling_utilities import ( + sampling_results_from_cupy_array_dict, ) - return convert_to_cudf( - cp_arrays, + + return sampling_results_from_cupy_array_dict( + cupy_array_dict, weight_t, - with_edge_properties, + len(fanout_vals), + with_edge_properties=with_edge_properties, return_offsets=return_offsets, renumber=renumber, + use_legacy_names=use_legacy_names, + include_hop_column=include_hop_column, ) @@ -304,6 +241,10 @@ def _mg_call_plc_uniform_neighbor_sample( prior_sources_behavior=None, deduplicate_sources=False, renumber=False, + use_legacy_names=True, + include_hop_column=True, + compress_per_hop=False, + compression="COO", ): n_workers = None if keep_batches_together: @@ -335,6 +276,10 @@ def _mg_call_plc_uniform_neighbor_sample( prior_sources_behavior=prior_sources_behavior, deduplicate_sources=deduplicate_sources, renumber=renumber, + use_legacy_names=use_legacy_names, # remove in 23.12 + include_hop_column=include_hop_column, # remove in 23.12 + compress_per_hop=compress_per_hop, + compression=compression, allow_other_workers=False, pure=False, ) @@ -348,6 +293,9 @@ def _mg_call_plc_uniform_neighbor_sample( weight_t, return_offsets=return_offsets, renumber=renumber, + use_legacy_names=use_legacy_names, + compression=compression, + include_hop_column=include_hop_column, ) if with_edge_properties else create_empty_df(indices_t, weight_t) @@ -397,6 +345,7 @@ def uniform_neighbor_sample( input_graph: Graph, start_list: Sequence, fanout_vals: List[int], + *, with_replacement: bool = True, with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, @@ -406,9 +355,13 @@ def uniform_neighbor_sample( random_state: int = None, return_offsets: bool = False, return_hops: bool = True, + include_hop_column: bool = True, # deprecated prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, + use_legacy_names=True, # deprecated + compress_per_hop=False, + compression="COO", _multiple_clients: bool = False, ) -> Union[dask_cudf.DataFrame, Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]]: """ @@ -463,6 +416,12 @@ def uniform_neighbor_sample( corresponding to the hop where the edge appeared. Defaults to True. + include_hop_column: bool, optional (default=True) + Deprecated. Defaults to True. + If True, will include the hop column even if + return_offsets is True. This option will + be removed in release 23.12. + prior_sources_behavior: str (Optional) Options are "carryover", and "exclude". Default will leave the source list as-is. @@ -481,6 +440,21 @@ def uniform_neighbor_sample( will return the renumber map and renumber map offsets as an additional dataframe. + use_legacy_names: bool, optional (default=True) + Whether to use the legacy column names (sources, destinations). + If True, will use "sources" and "destinations" as the column names. + If False, will use "majors" and "minors" as the column names. + Deprecated. Will be removed in release 23.12 in favor of always + using the new names "majors" and "minors". + + compress_per_hop: bool, optional (default=False) + Whether to compress globally (default), or to produce a separate + compressed edgelist per hop. + + compression: str, optional (default=COO) + Sets the compression type for the output minibatches. + Valid options are COO (default), CSR, CSC, DCSR, and DCSC. + _multiple_clients: bool, optional (default=False) internal flag to ensure sampling works with multiple dask clients set to True to prevent hangs in multi-client environment @@ -548,12 +522,46 @@ def uniform_neighbor_sample( Contains the batch offsets for the renumber maps """ + if compression not in ["COO", "CSR", "CSC", "DCSR", "DCSC"]: + raise ValueError("compression must be one of COO, CSR, CSC, DCSR, or DCSC") + if with_edge_properties: warning_msg = ( "The with_edge_properties flag is deprecated" " and will be removed in the next release." ) - warnings.warn(warning_msg, DeprecationWarning) + warnings.warn(warning_msg, FutureWarning) + + if ( + (compression != "COO") + and (not compress_per_hop) + and prior_sources_behavior != "exclude" + ): + raise ValueError( + "hop-agnostic compression is only supported with" + " the exclude prior sources behavior due to limitations " + "of the libcugraph C++ API" + ) + + if compress_per_hop and prior_sources_behavior != "carryover": + raise ValueError( + "Compressing the edgelist per hop is only supported " + "with the carryover prior sources behavior due to limitations" + " of the libcugraph C++ API" + ) + + if include_hop_column: + warning_msg = ( + "The include_hop_column flag is deprecated and will be" + " removed in the next release in favor of always " + "excluding the hop column when return_offsets is True" + ) + warnings.warn(warning_msg, FutureWarning) + + if compression != "COO": + raise ValueError( + "Including the hop id column is only supported with COO compression." + ) if isinstance(start_list, int): start_list = [start_list] @@ -643,6 +651,31 @@ def uniform_neighbor_sample( ddf = persist_dask_df_equal_parts_per_worker(ddf, client) ddf = get_persisted_df_worker_map(ddf, client) + sample_call_kwargs = { + "client": client, + "session_id": session_id, + "input_graph": input_graph, + "ddf": ddf, + "keep_batches_together": keep_batches_together, + "min_batch_id": min_batch_id, + "max_batch_id": max_batch_id, + "fanout_vals": fanout_vals, + "with_replacement": with_replacement, + "weight_t": weight_t, + "indices_t": indices_t, + "with_edge_properties": with_edge_properties, + "random_state": random_state, + "return_offsets": return_offsets, + "return_hops": return_hops, + "prior_sources_behavior": prior_sources_behavior, + "deduplicate_sources": deduplicate_sources, + "renumber": renumber, + "use_legacy_names": use_legacy_names, + "include_hop_column": include_hop_column, + "compress_per_hop": compress_per_hop, + "compression": compression, + } + if _multiple_clients: # Distributed centralized lock to allow # two disconnected processes (clients) to coordinate a lock @@ -650,26 +683,7 @@ def uniform_neighbor_sample( lock = Lock("plc_graph_access") if lock.acquire(timeout=100): try: - ddf = _mg_call_plc_uniform_neighbor_sample( - client=client, - session_id=session_id, - input_graph=input_graph, - ddf=ddf, - keep_batches_together=keep_batches_together, - min_batch_id=min_batch_id, - max_batch_id=max_batch_id, - fanout_vals=fanout_vals, - with_replacement=with_replacement, - weight_t=weight_t, - indices_t=indices_t, - with_edge_properties=with_edge_properties, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, - prior_sources_behavior=prior_sources_behavior, - deduplicate_sources=deduplicate_sources, - renumber=renumber, - ) + ddf = _mg_call_plc_uniform_neighbor_sample(**sample_call_kwargs) finally: lock.release() else: @@ -677,26 +691,7 @@ def uniform_neighbor_sample( "Failed to acquire lock(plc_graph_access) while trying to sampling" ) else: - ddf = _mg_call_plc_uniform_neighbor_sample( - client=client, - session_id=session_id, - input_graph=input_graph, - ddf=ddf, - keep_batches_together=keep_batches_together, - min_batch_id=min_batch_id, - max_batch_id=max_batch_id, - fanout_vals=fanout_vals, - with_replacement=with_replacement, - weight_t=weight_t, - indices_t=indices_t, - with_edge_properties=with_edge_properties, - random_state=random_state, - return_offsets=return_offsets, - return_hops=return_hops, - prior_sources_behavior=prior_sources_behavior, - deduplicate_sources=deduplicate_sources, - renumber=renumber, - ) + ddf = _mg_call_plc_uniform_neighbor_sample(**sample_call_kwargs) if return_offsets: if renumber: @@ -708,9 +703,12 @@ def uniform_neighbor_sample( ddf, renumber_df = ddf if input_graph.renumbered and not renumber: - ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True) - ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True) - + if use_legacy_names: + ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True) + ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True) + else: + ddf = input_graph.unrenumber(ddf, "majors", preserve_order=True) + ddf = input_graph.unrenumber(ddf, "minors", preserve_order=True) if return_offsets: if renumber: return ddf, offsets_df, renumber_df diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py index 92caba6dbaf..dbfcb124ce5 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler.py @@ -269,6 +269,7 @@ def flush(self) -> None: with_edge_properties=True, return_offsets=True, renumber=self.__renumber, + # use_legacy_names=False, ) if self.__renumber: diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py index e9e5be26fc3..7e67eab83c9 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py @@ -15,10 +15,24 @@ import cudf import cupy -from typing import Union, Optional +from math import ceil +from pandas import isna -def _write_samples_to_parquet( +from typing import Union, Optional, List + + +def create_df_from_disjoint_series(series_list: List[cudf.Series]): + series_list.sort(key=lambda s: len(s), reverse=True) + + df = cudf.DataFrame() + for s in series_list: + df[s.name] = s + + return df + + +def _write_samples_to_parquet_csr( results: cudf.DataFrame, offsets: cudf.DataFrame, renumber_map: cudf.DataFrame, @@ -27,7 +41,184 @@ def _write_samples_to_parquet( partition_info: Optional[Union[dict, str]] = None, ) -> cudf.Series: """ - Writes the samples to parquet. + Writes CSR/CSC compressed samples to parquet. + + Batches that are empty are discarded, and the remaining non-empty + batches are renumbered to be contiguous starting from the first + batch id. This means that the output batch ids may not match + the input batch ids. + + results: cudf.DataFrame + The results dataframe containing the sampled minibatches. + offsets: cudf.DataFrame + The offsets dataframe indicating the start/end of each minibatch + in the reuslts dataframe. + renumber_map: cudf.DataFrame + The renumber map containing the mapping of renumbered vertex ids + to original vertex ids. + batches_per_partition: int + The maximum number of minibatches allowed per written parquet partition. + output_path: str + The output path (where parquet files should be written to). + partition_info: Union[dict, str] + Either a dictionary containing partition data from dask, the string 'sg' + indicating that this is a single GPU write, or None indicating that this + function should perform a no-op (required by dask). + + Returns an empty cudf series. + """ + # Required by dask; need to skip dummy partitions. + if partition_info is None or len(results) == 0: + return cudf.Series(dtype="int64") + if partition_info != "sg" and (not isinstance(partition_info, dict)): + raise ValueError("Invalid value of partition_info") + + # Additional check to skip dummy partitions required for CSR format. + if isna(offsets.batch_id.iloc[0]): + return cudf.Series(dtype="int64") + + # Output: + # major_offsets - CSR/CSC row/col pointers + # minors - CSR/CSC col/row indices + # edge id - edge ids (same shape as minors) + # edge type - edge types (same shape as minors) + # weight - edge weight (same shape as minors) + # renumber map - the original vertex ids + # renumber map offsets - start/end of the map for each batch + # (only 1 per batch b/c of framework + # stipulations making this legal) + # label-hop offsets - indicate the start/end of each hop + # for each batch + + batch_ids = offsets.batch_id + label_hop_offsets = offsets.offsets + renumber_map_offsets = offsets.renumber_map_offsets + del offsets + + batch_ids.dropna(inplace=True) + label_hop_offsets.dropna(inplace=True) + renumber_map_offsets.dropna(inplace=True) + + major_offsets_array = results.major_offsets + results.drop(columns="major_offsets", inplace=True) + major_offsets_array.dropna(inplace=True) + major_offsets_array = major_offsets_array.values + + minors_array = results.minors + results.drop(columns="minors", inplace=True) + minors_array.dropna(inplace=True) + minors_array = minors_array.values + + weight_array = results.weight + results.drop(columns="weight", inplace=True) + weight_array.dropna(inplace=True) + weight_array = ( + cupy.array([], dtype="float32") if weight_array.empty else weight_array.values + ) + + edge_id_array = results.edge_id + results.drop(columns="edge_id", inplace=True) + edge_id_array.dropna(inplace=True) + edge_id_array = ( + cupy.array([], dtype="int64") if edge_id_array.empty else edge_id_array.values + ) + + edge_type_array = results.edge_type + results.drop(columns="edge_type", inplace=True) + edge_type_array.dropna(inplace=True) + edge_type_array = ( + cupy.array([], dtype="int32") + if edge_type_array.empty + else edge_type_array.values + ) + + del results + + offsets_length = len(label_hop_offsets) - 1 + if offsets_length % len(batch_ids) != 0: + raise ValueError("Invalid hop offsets") + fanout_length = int(offsets_length / len(batch_ids)) + + for p in range(0, int(ceil(len(batch_ids) / batches_per_partition))): + partition_start = p * (batches_per_partition) + partition_end = (p + 1) * (batches_per_partition) + + label_hop_offsets_current_partition = label_hop_offsets.iloc[ + partition_start * fanout_length : partition_end * fanout_length + 1 + ].reset_index(drop=True) + label_hop_offsets_current_partition.name = "label_hop_offsets" + + batch_ids_current_partition = batch_ids.iloc[partition_start:partition_end] + + ( + major_offsets_start, + major_offsets_end, + ) = label_hop_offsets_current_partition.iloc[ + [0, -1] + ].values # legal since offsets has the 1 extra offset + results_start, results_end = major_offsets_array[ + [major_offsets_start, major_offsets_end] + ] # avoid d2h copy + + # no need to use end batch id, just ensure the batch is labeled correctly + start_batch_id = batch_ids_current_partition.iloc[0] + # end_batch_id = batch_ids_current_partition.iloc[-1] + + # create the renumber map offsets + renumber_map_offsets_current_partition = renumber_map_offsets.iloc[ + partition_start : partition_end + 1 + ].reset_index(drop=True) + renumber_map_offsets_current_partition.name = "renumber_map_offsets" + + ( + renumber_map_start, + renumber_map_end, + ) = renumber_map_offsets_current_partition.iloc[ + [0, -1] + ].values # avoid d2h copy + + results_current_partition = create_df_from_disjoint_series( + [ + cudf.Series(minors_array[results_start:results_end], name="minors"), + cudf.Series( + renumber_map.map.values[renumber_map_start:renumber_map_end], + name="map", + ), + label_hop_offsets_current_partition, + cudf.Series( + major_offsets_array[major_offsets_start : major_offsets_end + 1], + name="major_offsets", + ), + cudf.Series(weight_array[results_start:results_end], name="weight"), + cudf.Series(edge_id_array[results_start:results_end], name="edge_id"), + cudf.Series( + edge_type_array[results_start:results_end], name="edge_type" + ), + renumber_map_offsets_current_partition, + ] + ) + + end_batch_id = start_batch_id + len(batch_ids_current_partition) - 1 + filename = f"batch={start_batch_id}-{end_batch_id}.parquet" + full_output_path = os.path.join(output_path, filename) + + results_current_partition.to_parquet( + full_output_path, compression=None, index=False, force_nullable_schema=True + ) + + return cudf.Series(dtype="int64") + + +def _write_samples_to_parquet_coo( + results: cudf.DataFrame, + offsets: cudf.DataFrame, + renumber_map: cudf.DataFrame, + batches_per_partition: int, + output_path: str, + partition_info: Optional[Union[dict, str]] = None, +) -> cudf.Series: + """ + Writes COO compressed samples to parquet. Batches that are empty are discarded, and the remaining non-empty batches are renumbered to be contiguous starting from the first @@ -60,8 +251,10 @@ def _write_samples_to_parquet( if partition_info != "sg" and (not isinstance(partition_info, dict)): raise ValueError("Invalid value of partition_info") + offsets = offsets[:-1] + # Offsets is always in order, so the last batch id is always the highest - max_batch_id = offsets.batch_id.iloc[len(offsets) - 1] + max_batch_id = offsets.batch_id.iloc[-1] results.dropna(axis=1, how="all", inplace=True) results["hop_id"] = results["hop_id"].astype("uint8") @@ -182,9 +375,23 @@ def write_samples( output_path: str The output path (where parquet files should be written to). """ + + if ("majors" in results.columns) and ("minors" in results.columns): + write_fn = _write_samples_to_parquet_coo + + # TODO these names will be deprecated in release 23.12 + elif ("sources" in results.columns) and ("destinations" in results.columns): + write_fn = _write_samples_to_parquet_coo + + elif "major_offsets" in results.columns and "minors" in results.columns: + write_fn = _write_samples_to_parquet_csr + + else: + raise ValueError("invalid columns") + if hasattr(results, "compute"): results.map_partitions( - _write_samples_to_parquet, + write_fn, offsets, renumber_map, batches_per_partition, @@ -194,7 +401,7 @@ def write_samples( ).compute() else: - _write_samples_to_parquet( + write_fn( results, offsets, renumber_map, diff --git a/python/cugraph/cugraph/sampling/sampling_utilities.py b/python/cugraph/cugraph/sampling/sampling_utilities.py new file mode 100644 index 00000000000..50c315129dc --- /dev/null +++ b/python/cugraph/cugraph/sampling/sampling_utilities.py @@ -0,0 +1,198 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cupy +import cudf + +import warnings + + +def sampling_results_from_cupy_array_dict( + cupy_array_dict, + weight_t, + num_hops, + with_edge_properties=False, + return_offsets=False, + renumber=False, + use_legacy_names=True, + include_hop_column=True, +): + """ + Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper + """ + results_df = cudf.DataFrame() + + if use_legacy_names: + major_col_name = "sources" + minor_col_name = "destinations" + warning_msg = ( + "The legacy column names (sources, destinations)" + " will no longer be supported for uniform_neighbor_sample" + " in release 23.12. The use_legacy_names=False option will" + " become the only option, and (majors, minors) will be the" + " only supported column names." + ) + warnings.warn(warning_msg, FutureWarning) + else: + major_col_name = "majors" + minor_col_name = "minors" + + if with_edge_properties: + majors = cupy_array_dict["majors"] + if majors is not None: + results_df["majors"] = majors + + results_df_cols = [ + "minors", + "weight", + "edge_id", + "edge_type", + ] + + for col in results_df_cols: + array = cupy_array_dict[col] + # The length of each of these arrays should be the same + results_df[col] = array + + results_df.rename( + columns={"majors": major_col_name, "minors": minor_col_name}, inplace=True + ) + + label_hop_offsets = cupy_array_dict["label_hop_offsets"] + batch_ids = cupy_array_dict["batch_id"] + + if renumber: + renumber_df = cudf.DataFrame( + { + "map": cupy_array_dict["renumber_map"], + } + ) + + if not return_offsets: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(batch_ids).repeat( + cupy.diff(cupy_array_dict["renumber_map_offsets"]) + ) + batch_ids_r.reset_index(drop=True, inplace=True) + renumber_df["batch_id"] = batch_ids_r + else: + renumber_df["batch_id"] = None + + if return_offsets: + batches_series = cudf.Series( + batch_ids, + name="batch_id", + ) + if include_hop_column: + # TODO remove this logic in release 23.12 + offsets_df = cudf.Series( + label_hop_offsets[cupy.arange(len(batch_ids) + 1) * num_hops], + name="offsets", + ).to_frame() + else: + offsets_df = cudf.Series( + label_hop_offsets, + name="offsets", + ).to_frame() + + if len(batches_series) > len(offsets_df): + # this is extremely rare so the inefficiency is ok + offsets_df = offsets_df.join(batches_series, how="outer").sort_index() + else: + offsets_df["batch_id"] = batches_series + + if renumber: + renumber_offset_series = cudf.Series( + cupy_array_dict["renumber_map_offsets"], name="renumber_map_offsets" + ) + + if len(renumber_offset_series) > len(offsets_df): + # this is extremely rare so the inefficiency is ok + offsets_df = offsets_df.join( + renumber_offset_series, how="outer" + ).sort_index() + else: + offsets_df["renumber_map_offsets"] = renumber_offset_series + + else: + if len(batch_ids) > 0: + batch_ids_r = cudf.Series(cupy.repeat(batch_ids, num_hops)) + batch_ids_r = cudf.Series(batch_ids_r).repeat( + cupy.diff(label_hop_offsets) + ) + batch_ids_r.reset_index(drop=True, inplace=True) + + results_df["batch_id"] = batch_ids_r + else: + results_df["batch_id"] = None + + # TODO remove this logic in release 23.12, hops will always returned as offsets + if include_hop_column: + if len(batch_ids) > 0: + hop_ids_r = cudf.Series(cupy.arange(num_hops)) + hop_ids_r = cudf.concat([hop_ids_r] * len(batch_ids), ignore_index=True) + + # generate the hop column + hop_ids_r = ( + cudf.Series(hop_ids_r, name="hop_id") + .repeat(cupy.diff(label_hop_offsets)) + .reset_index(drop=True) + ) + else: + hop_ids_r = cudf.Series(name="hop_id", dtype="int32") + + results_df = results_df.join(hop_ids_r, how="outer").sort_index() + + if major_col_name not in results_df: + if use_legacy_names: + raise ValueError("Can't use legacy names with major offsets") + + major_offsets_series = cudf.Series( + cupy_array_dict["major_offsets"], name="major_offsets" + ) + if len(major_offsets_series) > len(results_df): + # this is extremely rare so the inefficiency is ok + results_df = results_df.join( + major_offsets_series, how="outer" + ).sort_index() + else: + results_df["major_offsets"] = major_offsets_series + + else: + # TODO this is deprecated, remove it in 23.12 + + results_df[major_col_name] = cupy_array_dict["sources"] + results_df[minor_col_name] = cupy_array_dict["destinations"] + indices = cupy_array_dict["indices"] + + if indices is None: + results_df["indices"] = None + else: + results_df["indices"] = indices + if weight_t == "int32": + results_df["indices"] = indices.astype("int32") + elif weight_t == "int64": + results_df["indices"] = indices.astype("int64") + else: + results_df["indices"] = indices + + if return_offsets: + if renumber: + return results_df, offsets_df, renumber_df + else: + return results_df, offsets_df + + if renumber: + return results_df, renumber_df + + return (results_df,) diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 219854bb002..1832585c0ab 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -16,6 +16,8 @@ from pylibcugraph import ResourceHandle from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample +from cugraph.sampling.sampling_utilities import sampling_results_from_cupy_array_dict + import numpy import cudf @@ -58,15 +60,20 @@ def uniform_neighbor_sample( G: Graph, start_list: Sequence, fanout_vals: List[int], + *, with_replacement: bool = True, with_edge_properties: bool = False, # deprecated with_batch_ids: bool = False, random_state: int = None, return_offsets: bool = False, return_hops: bool = True, + include_hop_column: bool = True, # deprecated prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, + use_legacy_names: bool = True, # deprecated + compress_per_hop: bool = False, + compression: str = "COO", ) -> Union[cudf.DataFrame, Tuple[cudf.DataFrame, cudf.DataFrame]]: """ Does neighborhood sampling, which samples nodes from a graph based on the @@ -111,6 +118,12 @@ def uniform_neighbor_sample( corresponding to the hop where the edge appeared. Defaults to True. + include_hop_column: bool, optional (default=True) + Deprecated. Defaults to True. + If True, will include the hop column even if + return_offsets is True. This option will + be removed in release 23.12. + prior_sources_behavior: str, optional (default=None) Options are "carryover", and "exclude". Default will leave the source list as-is. @@ -129,6 +142,21 @@ def uniform_neighbor_sample( will return the renumber map and renumber map offsets as an additional dataframe. + use_legacy_names: bool, optional (default=True) + Whether to use the legacy column names (sources, destinations). + If True, will use "sources" and "destinations" as the column names. + If False, will use "majors" and "minors" as the column names. + Deprecated. Will be removed in release 23.12 in favor of always + using the new names "majors" and "minors". + + compress_per_hop: bool, optional (default=False) + Whether to compress globally (default), or to produce a separate + compressed edgelist per hop. + + compression: str, optional (default=COO) + Sets the compression type for the output minibatches. + Valid options are COO (default), CSR, CSC, DCSR, and DCSC. + Returns ------- result : cudf.DataFrame or Tuple[cudf.DataFrame, cudf.DataFrame] @@ -193,12 +221,62 @@ def uniform_neighbor_sample( Contains the batch offsets for the renumber maps """ + if use_legacy_names: + major_col_name = "sources" + minor_col_name = "destinations" + warning_msg = ( + "The legacy column names (sources, destinations)" + " will no longer be supported for uniform_neighbor_sample" + " in release 23.12. The use_legacy_names=False option will" + " become the only option, and (majors, minors) will be the" + " only supported column names." + ) + warnings.warn(warning_msg, FutureWarning) + else: + major_col_name = "majors" + minor_col_name = "minors" + + if compression not in ["COO", "CSR", "CSC", "DCSR", "DCSC"]: + raise ValueError("compression must be one of COO, CSR, CSC, DCSR, or DCSC") + + if ( + (compression != "COO") + and (not compress_per_hop) + and prior_sources_behavior != "exclude" + ): + raise ValueError( + "hop-agnostic compression is only supported with" + " the exclude prior sources behavior due to limitations " + "of the libcugraph C++ API" + ) + + if compress_per_hop and prior_sources_behavior != "carryover": + raise ValueError( + "Compressing the edgelist per hop is only supported " + "with the carryover prior sources behavior due to limitations" + " of the libcugraph C++ API" + ) + + if include_hop_column: + warning_msg = ( + "The include_hop_column flag is deprecated and will be" + " removed in the next release in favor of always " + "excluding the hop column when return_offsets is True" + ) + warnings.warn(warning_msg, FutureWarning) + + if compression != "COO": + raise ValueError( + "Including the hop id column is only supported with COO compression." + ) + if with_edge_properties: warning_msg = ( "The with_edge_properties flag is deprecated" - " and will be removed in the next release." + " and will be removed in the next release in favor" + " of returning all properties in the graph" ) - warnings.warn(warning_msg, DeprecationWarning) + warnings.warn(warning_msg, FutureWarning) if isinstance(start_list, int): start_list = [start_list] @@ -255,7 +333,7 @@ def uniform_neighbor_sample( start_list = G.lookup_internal_vertex_id(start_list, columns) start_list = start_list.rename(columns={columns[0]: start_col_name}) - sampling_result = pylibcugraph_uniform_neighbor_sample( + sampling_result_array_dict = pylibcugraph_uniform_neighbor_sample( resource_handle=ResourceHandle(), input_graph=G._plc_graph, start_list=start_list[start_col_name], @@ -271,104 +349,27 @@ def uniform_neighbor_sample( deduplicate_sources=deduplicate_sources, return_hops=return_hops, renumber=renumber, + compression=compression, + compress_per_hop=compress_per_hop, + return_dict=True, ) - df = cudf.DataFrame() - - if with_edge_properties: - # TODO use a dictionary at PLC w/o breaking users - if renumber: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - renumber_map, - renumber_map_offsets, - ) = sampling_result - else: - ( - sources, - destinations, - weights, - edge_ids, - edge_types, - batch_ids, - offsets, - hop_ids, - ) = sampling_result - - df["sources"] = sources - df["destinations"] = destinations - df["weight"] = weights - df["edge_id"] = edge_ids - df["edge_type"] = edge_types - df["hop_id"] = hop_ids - - if renumber: - renumber_df = cudf.DataFrame( - { - "map": renumber_map, - } - ) - - if not return_offsets: - batch_ids_r = cudf.Series(batch_ids).repeat( - cp.diff(renumber_map_offsets) - ) - batch_ids_r.reset_index(drop=True, inplace=True) - renumber_df["batch_id"] = batch_ids_r - - if return_offsets: - offsets_df = cudf.DataFrame( - { - "batch_id": batch_ids, - "offsets": offsets[:-1], - } - ) - - if renumber: - offsets_df["renumber_map_offsets"] = renumber_map_offsets[:-1] - - else: - if len(batch_ids) > 0: - batch_ids = cudf.Series(batch_ids).repeat(cp.diff(offsets)) - batch_ids.reset_index(drop=True, inplace=True) - - df["batch_id"] = batch_ids - - else: - sources, destinations, indices = sampling_result - - df["sources"] = sources - df["destinations"] = destinations - - if indices is None: - df["indices"] = None - else: - df["indices"] = indices - if weight_t == "int32": - df["indices"] = indices.astype("int32") - elif weight_t == "int64": - df["indices"] = indices.astype("int64") - else: - df["indices"] = indices + dfs = sampling_results_from_cupy_array_dict( + sampling_result_array_dict, + weight_t, + len(fanout_vals), + with_edge_properties=with_edge_properties, + return_offsets=return_offsets, + renumber=renumber, + use_legacy_names=use_legacy_names, + include_hop_column=include_hop_column, + ) if G.renumbered and not renumber: - df = G.unrenumber(df, "sources", preserve_order=True) - df = G.unrenumber(df, "destinations", preserve_order=True) - - if return_offsets: - if renumber: - return df, offsets_df, renumber_df - else: - return df, offsets_df + dfs[0] = G.unrenumber(dfs[0], major_col_name, preserve_order=True) + dfs[0] = G.unrenumber(dfs[0], minor_col_name, preserve_order=True) - if renumber: - return df, renumber_df + if len(dfs) > 1: + return dfs - return df + return dfs[0] diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py index 5ea79e0893a..a945881394b 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py @@ -16,7 +16,7 @@ import cudf import cupy import cugraph -from cugraph.datasets import karate +from cugraph.datasets import karate, email_Eu_core from cugraph.experimental.gnn import BulkSampler from cugraph.utilities.utils import create_directory_with_overwrite @@ -297,3 +297,53 @@ def test_bulk_sampler_empty_batches(scratch_dir): assert df.batch_id.max() == 1 shutil.rmtree(samples_path) + + +@pytest.mark.sg +def test_bulk_sampler_csr(scratch_dir): + el = email_Eu_core.get_edgelist() + + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(el, source="src", destination="dst") + + samples_path = os.path.join(scratch_dir, "test_bulk_sampler_csr") + create_directory_with_overwrite(samples_path) + + bs = BulkSampler( + batch_size=7, + output_path=samples_path, + graph=G, + fanout_vals=[5, 4, 3], + with_replacement=False, + batches_per_partition=7, + renumber=True, + use_legacy_names=False, + compression="CSR", + compress_per_hop=False, + prior_sources_behavior="exclude", + include_hop_column=False, + ) + + seeds = G.select_random_vertices(62, 1000) + batch_ids = cudf.Series( + cupy.repeat(cupy.arange(int(1000 / 7) + 1, dtype="int32"), 7)[:1000] + ).sort_values() + + batch_df = cudf.DataFrame( + { + "seed": seeds, + "batch": batch_ids, + } + ) + + bs.add_batches(batch_df, start_col_name="seed", batch_col_name="batch") + bs.flush() + + assert len(os.listdir(samples_path)) == 21 + + for file in os.listdir(samples_path): + df = cudf.read_parquet(os.path.join(samples_path, file)) + + assert df.major_offsets.dropna().iloc[-1] - df.major_offsets.iloc[0] == len(df) + + shutil.rmtree(samples_path) diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py index f71c16a8368..5eafe89ea83 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py @@ -16,6 +16,7 @@ import pytest +import cupy import cudf from cugraph.gnn.data_loading.bulk_sampler_io import write_samples from cugraph.utilities.utils import create_directory_with_overwrite @@ -34,7 +35,9 @@ def test_bulk_sampler_io(scratch_dir): } ) - offsets = cudf.DataFrame({"offsets": [0, 8], "batch_id": [0, 1]}) + assert len(results) == 12 + + offsets = cudf.DataFrame({"offsets": [0, 8, 12], "batch_id": [0, 1, None]}) samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io") create_directory_with_overwrite(samples_path) @@ -138,8 +141,12 @@ def test_bulk_sampler_io_empty_batch(scratch_dir): } ) + assert len(results) == 20 + # some batches are missing - offsets = cudf.DataFrame({"offsets": [0, 8, 12, 16], "batch_id": [0, 3, 4, 10]}) + offsets = cudf.DataFrame( + {"offsets": [0, 8, 12, 16, 20], "batch_id": [0, 3, 4, 10, None]} + ) samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io_empty_batch") create_directory_with_overwrite(samples_path) @@ -157,3 +164,61 @@ def test_bulk_sampler_io_empty_batch(scratch_dir): df1 = cudf.read_parquet(os.path.join(samples_path, "batch=4-5.parquet")) assert df1.batch_id.min() == 4 assert df1.batch_id.max() == 5 + + shutil.rmtree(samples_path) + + +@pytest.mark.sg +def test_bulk_sampler_io_mock_csr(scratch_dir): + major_offsets_array = cudf.Series([0, 5, 10, 15]) + minors_array = cudf.Series([1, 2, 3, 4, 8, 9, 1, 3, 4, 5, 3, 0, 4, 9, 1]) + edge_ids = cudf.Series(cupy.arange(len(minors_array))) + + # 2 hops + label_hop_offsets = cudf.Series([0, 1, 3]) + + # map + renumber_map = cudf.Series(cupy.arange(10)) + renumber_map_offsets = cudf.Series([0, 10]) + + results_df = cudf.DataFrame() + results_df["minors"] = minors_array + results_df["major_offsets"] = major_offsets_array + results_df["edge_id"] = edge_ids + results_df["edge_type"] = None + results_df["weight"] = None + + offsets_df = cudf.DataFrame() + offsets_df["offsets"] = label_hop_offsets + offsets_df["renumber_map_offsets"] = renumber_map_offsets + offsets_df["batch_id"] = cudf.Series([0]) + + renumber_df = cudf.DataFrame() + renumber_df["map"] = renumber_map + + samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io_mock_csr") + create_directory_with_overwrite(samples_path) + + write_samples(results_df, offsets_df, renumber_df, 1, samples_path) + + result = cudf.read_parquet(os.path.join(samples_path, "batch=0-0.parquet")) + + assert ( + result.minors.dropna().values_host.tolist() == minors_array.values_host.tolist() + ) + assert ( + result.major_offsets.dropna().values_host.tolist() + == major_offsets_array.values_host.tolist() + ) + assert result.edge_id.dropna().values_host.tolist() == edge_ids.values_host.tolist() + assert ( + result.renumber_map_offsets.dropna().values_host.tolist() + == renumber_map_offsets.values_host.tolist() + ) + assert result.map.dropna().values_host.tolist() == renumber_map.values_host.tolist() + assert ( + result.label_hop_offsets.dropna().values_host.tolist() + == label_hop_offsets.values_host.tolist() + ) + + shutil.rmtree(samples_path) diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py index 41f68c08e5c..638cccbdcaa 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py @@ -38,8 +38,12 @@ def test_bulk_sampler_io(scratch_dir): divisions=[0, 8, 11] ) - offsets = cudf.DataFrame({"offsets": [0, 0], "batch_id": [0, 1]}) - offsets = dask_cudf.from_cudf(offsets, npartitions=2) + assert len(results) == 12 + + offsets = cudf.DataFrame({"offsets": [0, 8, 0, 4], "batch_id": [0, None, 1, None]}) + offsets = dask_cudf.from_cudf(offsets, npartitions=1).repartition( + divisions=[0, 2, 3] + ) samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_io") create_directory_with_overwrite(samples_path) @@ -149,9 +153,11 @@ def test_bulk_sampler_io_empty_batch(scratch_dir): ) # some batches are missing - offsets = cudf.DataFrame({"offsets": [0, 8, 0, 4], "batch_id": [0, 3, 4, 10]}) + offsets = cudf.DataFrame( + {"offsets": [0, 8, 12, 0, 4, 8], "batch_id": [0, 3, None, 4, 10, None]} + ) offsets = dask_cudf.from_cudf(offsets, npartitions=1).repartition( - divisions=[0, 2, 3] + divisions=[0, 3, 5] ) samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_io_empty_batch") diff --git a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py index eded435f897..aee81e5ffed 100644 --- a/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py @@ -21,7 +21,7 @@ import cupy import cugraph import dask_cudf -from cugraph.datasets import karate +from cugraph.datasets import karate, email_Eu_core from cugraph.experimental import BulkSampler from cugraph.utilities.utils import create_directory_with_overwrite @@ -247,3 +247,59 @@ def test_bulk_sampler_empty_batches(dask_client, scratch_dir): assert df.batch_id.max() == 1 shutil.rmtree(samples_path) + + +@pytest.mark.mg +@pytest.mark.parametrize("mg_input", [True, False]) +def test_bulk_sampler_csr(dask_client, scratch_dir, mg_input): + nworkers = len(dask_client.scheduler_info()["workers"]) + el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=nworkers * 2) + + G = cugraph.Graph(directed=True) + G.from_dask_cudf_edgelist(el, source="src", destination="dst") + + samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_csr") + create_directory_with_overwrite(samples_path) + + bs = BulkSampler( + batch_size=7, + output_path=samples_path, + graph=G, + fanout_vals=[5, 4, 3], + with_replacement=False, + batches_per_partition=7, + renumber=True, + use_legacy_names=False, + compression="CSR", + compress_per_hop=True, + prior_sources_behavior="carryover", + deduplicate_sources=True, + include_hop_column=False, + ) + + seeds = G.select_random_vertices(62, 1000) + batch_ids = cudf.Series( + cupy.repeat(cupy.arange(int(1000 / 7) + 1, dtype="int32"), 7)[:1000] + ).sort_values() + + batch_df = cudf.DataFrame( + { + "seed": seeds.compute().values, + "batch": batch_ids, + } + ) + + if mg_input: + batch_df = dask_cudf.from_cudf(batch_df, npartitions=2) + + bs.add_batches(batch_df, start_col_name="seed", batch_col_name="batch") + bs.flush() + + assert len(os.listdir(samples_path)) == 21 + + for file in os.listdir(samples_path): + df = cudf.read_parquet(os.path.join(samples_path, file)) + + assert df.major_offsets.dropna().iloc[-1] - df.major_offsets.iloc[0] == len(df) + + shutil.rmtree(samples_path) diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py index 62599291d04..206898088ab 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py @@ -15,6 +15,7 @@ import pytest +import cupy import cudf import cugraph from cugraph import uniform_neighbor_sample @@ -151,7 +152,7 @@ def test_uniform_neighbor_sample_simple(input_combo): G, input_combo["start_list"], input_combo["fanout_vals"], - input_combo["with_replacement"], + with_replacement=input_combo["with_replacement"], ) print(input_df) @@ -254,7 +255,9 @@ def test_uniform_neighbor_sample_tree(directed): start_list = cudf.Series([0, 0], dtype="int32") fanout_vals = [4, 1, 3] with_replacement = True - result_nbr = uniform_neighbor_sample(G, start_list, fanout_vals, with_replacement) + result_nbr = uniform_neighbor_sample( + G, start_list, fanout_vals, with_replacement=with_replacement + ) result_nbr = result_nbr.drop_duplicates() @@ -288,7 +291,7 @@ def test_uniform_neighbor_sample_unweighted(simple_unweighted_input_expected_out test_data["Graph"], test_data["start_list"].astype("int64"), test_data["fanout_vals"], - test_data["with_replacement"], + with_replacement=test_data["with_replacement"], ) actual_src = sampling_results.sources @@ -303,7 +306,8 @@ def test_uniform_neighbor_sample_unweighted(simple_unweighted_input_expected_out @pytest.mark.sg @pytest.mark.cugraph_ops @pytest.mark.parametrize("return_offsets", [True, False]) -def test_uniform_neighbor_sample_edge_properties(return_offsets): +@pytest.mark.parametrize("include_hop_column", [True, False]) +def test_uniform_neighbor_sample_edge_properties(return_offsets, include_hop_column): edgelist_df = cudf.DataFrame( { "src": cudf.Series([0, 1, 2, 3, 4, 3, 4, 2, 0, 1, 0, 2], dtype="int32"), @@ -337,6 +341,7 @@ def test_uniform_neighbor_sample_edge_properties(return_offsets): with_edge_properties=True, with_batch_ids=True, return_offsets=return_offsets, + include_hop_column=include_hop_column, ) if return_offsets: sampling_results, sampling_offsets = sampling_results @@ -359,11 +364,29 @@ def test_uniform_neighbor_sample_edge_properties(return_offsets): == sampling_results["destinations"].values_host.tolist() ) - assert sampling_results["hop_id"].values_host.tolist() == ([0, 0, 1, 1, 1, 1] * 2) + if include_hop_column: + assert sampling_results["hop_id"].values_host.tolist() == ( + [0, 0, 1, 1, 1, 1] * 2 + ) + else: + assert "hop_id" not in sampling_results if return_offsets: - assert sampling_offsets["batch_id"].values_host.tolist() == [0, 1] - assert sampling_offsets["offsets"].values_host.tolist() == [0, 6] + assert sampling_offsets["batch_id"].dropna().values_host.tolist() == [0, 1] + if include_hop_column: + assert sampling_offsets["offsets"].dropna().values_host.tolist() == [ + 0, + 6, + 12, + ] + else: + assert sampling_offsets["offsets"].dropna().values_host.tolist() == [ + 0, + 2, + 6, + 8, + 12, + ] else: assert sampling_results["batch_id"].values_host.tolist() == ([0] * 6 + [1] * 6) @@ -778,6 +801,176 @@ def test_uniform_neighbor_sample_renumber(hops): assert (renumber_map.batch_id == 0).all() +@pytest.mark.sg +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +def test_uniform_neighbor_sample_offset_renumber(hops): + el = email_Eu_core.get_edgelist() + + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(el, source="src", destination="dst") + + seeds = G.select_random_vertices(62, int(0.0001 * len(el))) + + ( + sampling_results_unrenumbered, + offsets_unrenumbered, + ) = cugraph.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + renumber=False, + return_offsets=True, + random_state=62, + ) + + ( + sampling_results_renumbered, + offsets_renumbered, + renumber_map, + ) = cugraph.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + renumber=True, + return_offsets=True, + random_state=62, + ) + + sources_hop_0 = sampling_results_unrenumbered[ + sampling_results_unrenumbered.hop_id == 0 + ].sources + for hop in range(len(hops)): + destinations_hop = sampling_results_unrenumbered[ + sampling_results_unrenumbered.hop_id <= hop + ].destinations + expected_renumber_map = cudf.concat([sources_hop_0, destinations_hop]).unique() + + assert sorted(expected_renumber_map.values_host.tolist()) == sorted( + renumber_map.map[0 : len(expected_renumber_map)].values_host.tolist() + ) + + renumber_map_offsets = offsets_renumbered.renumber_map_offsets.dropna() + assert len(renumber_map_offsets) == 2 + assert renumber_map_offsets.iloc[0] == 0 + assert renumber_map_offsets.iloc[-1] == len(renumber_map) + + assert len(offsets_renumbered) == 2 + + +@pytest.mark.sg +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +@pytest.mark.parametrize("seed", [62, 66, 68]) +def test_uniform_neighbor_sample_csr_csc_global(hops, seed): + el = email_Eu_core.get_edgelist() + + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(el, source="src", destination="dst") + + seeds = G.select_random_vertices(seed, int(0.0001 * len(el))) + + sampling_results, offsets, renumber_map = cugraph.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + # carryover not valid because C++ sorts on (hop,src) + prior_sources_behavior="exclude", + renumber=True, + return_offsets=True, + random_state=seed, + use_legacy_names=False, + compress_per_hop=False, + compression="CSR", + include_hop_column=False, + ) + + major_offsets = sampling_results["major_offsets"].dropna().values + majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) + majors = majors.repeat(cupy.diff(major_offsets)) + + minors = sampling_results["minors"].dropna() + assert len(majors) == len(minors) + + majors = renumber_map.map.iloc[majors] + minors = renumber_map.map.iloc[minors] + + for i in range(len(majors)): + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + + +@pytest.mark.sg +@pytest.mark.parametrize("seed", [62, 66, 68]) +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +def test_uniform_neighbor_sample_csr_csc_local(hops, seed): + el = email_Eu_core.get_edgelist(download=True) + + G = cugraph.Graph(directed=True) + G.from_cudf_edgelist(el, source="src", destination="dst") + + seeds = cudf.Series( + [49, 71], dtype="int32" + ) # hardcoded to ensure out-degree is high enough + + sampling_results, offsets, renumber_map = cugraph.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + prior_sources_behavior="carryover", + renumber=True, + return_offsets=True, + random_state=seed, + use_legacy_names=False, + compress_per_hop=True, + compression="CSR", + include_hop_column=False, + ) + + for hop in range(len(hops)): + major_offsets = sampling_results["major_offsets"].iloc[ + offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop + 1] + 1) + ] + + minors = sampling_results["minors"].iloc[ + major_offsets.iloc[0] : major_offsets.iloc[-1] + ] + + majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) + majors = majors.repeat(cupy.diff(major_offsets)) + + majors = renumber_map.map.iloc[majors] + minors = renumber_map.map.iloc[minors] + + for i in range(len(majors)): + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + + +@pytest.mark.sg +@pytest.mark.skip(reason="needs to be written!") +def test_uniform_neighbor_sample_dcsr_dcsc_global(): + raise NotImplementedError + + +@pytest.mark.sg +@pytest.mark.skip(reason="needs to be written!") +def test_uniform_neighbor_sample_dcsr_dcsc_local(): + raise NotImplementedError + + @pytest.mark.sg @pytest.mark.skip(reason="needs to be written!") def test_multi_client_sampling(): diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 9d87c097287..460a25cbd14 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -17,6 +17,7 @@ import pytest +import pandas import cupy import cudf import cugraph @@ -138,7 +139,7 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo): dg, input_combo["start_list"], input_combo["fanout_vals"], - input_combo["with_replacement"], + with_replacement=input_combo["with_replacement"], ) # multi edges are dropped to easily verify that each edge in the @@ -228,7 +229,9 @@ def test_mg_uniform_neighbor_sample_tree(dask_client, directed): start_list = cudf.Series([0, 0], dtype="int32") fanout_vals = [4, 1, 3] with_replacement = True - result_nbr = uniform_neighbor_sample(G, start_list, fanout_vals, with_replacement) + result_nbr = uniform_neighbor_sample( + G, start_list, fanout_vals, with_replacement=with_replacement + ) result_nbr = result_nbr.drop_duplicates() @@ -283,7 +286,7 @@ def test_mg_uniform_neighbor_sample_unweighted(dask_client): with_replacement = True sampling_results = uniform_neighbor_sample( - G, start_list, fanout_vals, with_replacement + G, start_list, fanout_vals, with_replacement=with_replacement ) expected_src = [0, 0] @@ -380,13 +383,17 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): dfp = sampling_results.get_partition(i).compute() if len(dfp) > 0: offsets_p = sampling_offsets.get_partition(i).compute() + print(offsets_p) assert len(offsets_p) > 0 if offsets_p.batch_id.iloc[0] == 1: batches_found[1] += 1 - assert offsets_p.batch_id.values_host.tolist() == [1] - assert offsets_p.offsets.values_host.tolist() == [0] + assert offsets_p.batch_id.dropna().values_host.tolist() == [1] + assert offsets_p.offsets.dropna().values_host.tolist() == [ + 0, + len(dfp), + ] assert sorted(dfp.sources.values_host.tolist()) == ( [1, 1, 3, 3, 4, 4] @@ -397,8 +404,11 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): elif offsets_p.batch_id.iloc[0] == 0: batches_found[0] += 1 - assert offsets_p.batch_id.values_host.tolist() == [0] - assert offsets_p.offsets.values_host.tolist() == [0] + assert offsets_p.batch_id.dropna().values_host.tolist() == [0] + assert offsets_p.offsets.dropna().values_host.tolist() == [ + 0, + len(dfp), + ] assert sorted(dfp.sources.values_host.tolist()) == ( [0, 0, 0, 1, 1, 2, 2, 2, 4, 4] @@ -703,7 +713,6 @@ def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_bat source="src", destination="dst", edge_attr=["wgt", "eid", "etp"], - legacy_renum_only=True, ) input_vertices = dask_cudf.concat([df.src, df.dst]).unique().compute() @@ -960,7 +969,6 @@ def test_uniform_neighbor_sample_deduplicate_sources_email_eu_core(dask_client): @pytest.mark.mg @pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) -@pytest.mark.tags("runme") def test_uniform_neighbor_sample_renumber(dask_client, hops): # FIXME This test is not very good because there is a lot of # non-deterministic behavior that still exists despite passing @@ -1005,6 +1013,224 @@ def test_uniform_neighbor_sample_renumber(dask_client, hops): ) +@pytest.mark.mg +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): + el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4) + + G = cugraph.Graph(directed=True) + G.from_dask_cudf_edgelist(el, source="src", destination="dst") + + seeds = G.select_random_vertices(62, int(0.0001 * len(el))) + + ( + sampling_results_unrenumbered, + offsets_unrenumbered, + ) = cugraph.dask.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + renumber=False, + return_offsets=True, + random_state=62, + ) + sampling_results_unrenumbered = sampling_results_unrenumbered.compute() + offsets_unrenumbered = offsets_unrenumbered.compute() + + ( + sampling_results_renumbered, + offsets_renumbered, + renumber_map, + ) = cugraph.dask.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + renumber=True, + keep_batches_together=True, + min_batch_id=0, + max_batch_id=0, + return_offsets=True, + random_state=62, + ) + + # can't use compute() since empty batches still get a partition + n_workers = len(dask_client.scheduler_info()["workers"]) + for p in range(n_workers): + partition = offsets_renumbered.get_partition(p).compute() + if not pandas.isna(partition.batch_id.iloc[0]): + break + + sampling_results_renumbered = sampling_results_renumbered.get_partition(p).compute() + offsets_renumbered = offsets_renumbered.get_partition(p).compute() + renumber_map = renumber_map.get_partition(p).compute() + + sources_hop_0 = sampling_results_unrenumbered[ + sampling_results_unrenumbered.hop_id == 0 + ].sources + for hop in range(len(hops)): + destinations_hop = sampling_results_unrenumbered[ + sampling_results_unrenumbered.hop_id <= hop + ].destinations + expected_renumber_map = cudf.concat([sources_hop_0, destinations_hop]).unique() + + assert sorted(expected_renumber_map.values_host.tolist()) == sorted( + renumber_map.map[0 : len(expected_renumber_map)].values_host.tolist() + ) + + renumber_map_offsets = offsets_renumbered.renumber_map_offsets.dropna() + assert len(renumber_map_offsets) == 2 + assert renumber_map_offsets.iloc[0] == 0 + assert renumber_map_offsets.iloc[-1] == len(renumber_map) + + assert len(offsets_renumbered) == 2 + + +@pytest.mark.mg +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +@pytest.mark.parametrize("seed", [62, 66, 68]) +def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed): + el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4) + + G = cugraph.Graph(directed=True) + G.from_dask_cudf_edgelist(el, source="src", destination="dst") + + seeds = G.select_random_vertices(seed, int(0.0001 * len(el))) + + sampling_results, offsets, renumber_map = cugraph.dask.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + # carryover not valid because C++ sorts on (hop,src) + prior_sources_behavior="exclude", + renumber=True, + return_offsets=True, + random_state=seed, + use_legacy_names=False, + compress_per_hop=False, + compression="CSR", + include_hop_column=False, + keep_batches_together=True, + min_batch_id=0, + max_batch_id=0, + ) + + # can't use compute() since empty batches still get a partition + n_workers = len(dask_client.scheduler_info()["workers"]) + for p in range(n_workers): + partition = offsets.get_partition(p).compute() + if not pandas.isna(partition.batch_id.iloc[0]): + break + + sampling_results = sampling_results.get_partition(p).compute() + offsets = offsets.get_partition(p).compute() + renumber_map = renumber_map.get_partition(p).compute() + + major_offsets = sampling_results["major_offsets"].dropna().values + majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) + majors = majors.repeat(cupy.diff(major_offsets)) + + minors = sampling_results["minors"].dropna() + assert len(majors) == len(minors) + + majors = renumber_map.map.iloc[majors] + minors = renumber_map.map.iloc[minors] + + for i in range(len(majors)): + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + + +@pytest.mark.mg +@pytest.mark.parametrize("seed", [62, 66, 68]) +@pytest.mark.parametrize("hops", [[5], [5, 5], [5, 5, 5]]) +def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): + el = dask_cudf.from_cudf(email_Eu_core.get_edgelist(), npartitions=4) + + G = cugraph.Graph(directed=True) + G.from_dask_cudf_edgelist(el, source="src", destination="dst") + + seeds = dask_cudf.from_cudf( + cudf.Series([49, 71], dtype="int32"), npartitions=1 + ) # hardcoded to ensure out-degree is high enough + + sampling_results, offsets, renumber_map = cugraph.dask.uniform_neighbor_sample( + G, + seeds, + hops, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=False, + deduplicate_sources=True, + prior_sources_behavior="carryover", + renumber=True, + return_offsets=True, + random_state=seed, + use_legacy_names=False, + compress_per_hop=True, + compression="CSR", + include_hop_column=False, + keep_batches_together=True, + min_batch_id=0, + max_batch_id=0, + ) + + # can't use compute() since empty batches still get a partition + n_workers = len(dask_client.scheduler_info()["workers"]) + for p in range(n_workers): + partition = offsets.get_partition(p).compute() + + if not pandas.isna(partition.batch_id.iloc[0]): + break + + sampling_results = sampling_results.get_partition(p).compute() + offsets = offsets.get_partition(p).compute() + renumber_map = renumber_map.get_partition(p).compute() + + print(sampling_results) + print(offsets) + + for hop in range(len(hops)): + major_offsets = sampling_results["major_offsets"].iloc[ + offsets.offsets.iloc[hop] : (offsets.offsets.iloc[hop + 1] + 1) + ] + + minors = sampling_results["minors"].iloc[ + major_offsets.iloc[0] : major_offsets.iloc[-1] + ] + + majors = cudf.Series(cupy.arange(len(major_offsets) - 1)) + majors = majors.repeat(cupy.diff(major_offsets)) + + majors = renumber_map.map.iloc[majors] + minors = renumber_map.map.iloc[minors] + + for i in range(len(majors)): + assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) + + +@pytest.mark.mg +@pytest.mark.skip(reason="needs to be written!") +def test_uniform_neighbor_sample_dcsr_dcsc_global(): + raise NotImplementedError + + +@pytest.mark.mg +@pytest.mark.skip(reason="needs to be written!") +def test_uniform_neighbor_sample_dcsr_dcsc_local(): + raise NotImplementedError + + # ============================================================================= # Benchmarks # ============================================================================= diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd index ffb458b409c..29c6d79e08d 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd @@ -176,15 +176,32 @@ cdef extern from "cugraph_c/algorithms.h": const cugraph_sample_result_t* result ) + # Deprecated, use cugraph_sample_result_get_majors cdef cugraph_type_erased_device_array_view_t* \ cugraph_sample_result_get_sources( const cugraph_sample_result_t* result ) + # Deprecated, use cugraph_sample_result_get_minors cdef cugraph_type_erased_device_array_view_t* \ cugraph_sample_result_get_destinations( const cugraph_sample_result_t* result ) + + cdef cugraph_type_erased_device_array_view_t* \ + cugraph_sample_result_get_majors( + const cugraph_sample_result_t* result + ) + + cdef cugraph_type_erased_device_array_view_t* \ + cugraph_sample_result_get_minors( + const cugraph_sample_result_t* result + ) + + cdef cugraph_type_erased_device_array_view_t* \ + cugraph_sample_result_get_major_offsets( + const cugraph_sample_result_t* result + ) cdef cugraph_type_erased_device_array_view_t* \ cugraph_sample_result_get_index( @@ -211,11 +228,17 @@ cdef extern from "cugraph_c/algorithms.h": const cugraph_sample_result_t* result ) + cdef cugraph_type_erased_device_array_view_t* \ + cugraph_sample_result_get_label_hop_offsets( + const cugraph_sample_result_t* result + ) + cdef cugraph_type_erased_device_array_view_t* \ cugraph_sample_result_get_start_labels( const cugraph_sample_result_t* result ) + # Deprecated cdef cugraph_type_erased_device_array_view_t* \ cugraph_sample_result_get_offsets( const cugraph_sample_result_t* result @@ -246,10 +269,17 @@ cdef extern from "cugraph_c/algorithms.h": pass ctypedef enum cugraph_prior_sources_behavior_t: - DEFAULT + DEFAULT=0 CARRY_OVER EXCLUDE + ctypedef enum cugraph_compression_type_t: + COO=0 + CSR + CSC + DCSR + DCSC + cdef cugraph_error_code_t \ cugraph_sampling_options_create( cugraph_sampling_options_t** options, @@ -277,7 +307,7 @@ cdef extern from "cugraph_c/algorithms.h": cdef void \ cugraph_sampling_set_prior_sources_behavior( cugraph_sampling_options_t* options, - cugraph_prior_sources_behavior_t value + cugraph_prior_sources_behavior_t value, ) cdef void \ @@ -286,10 +316,22 @@ cdef extern from "cugraph_c/algorithms.h": bool_t value, ) + cdef void \ + cugraph_sampling_set_compress_per_hop( + cugraph_sampling_options_t* options, + bool_t value, + ) + + cdef void \ + cugraph_sampling_set_compression_type( + cugraph_sampling_options_t* options, + cugraph_compression_type_t value, + ) + cdef void \ cugraph_sampling_options_free( cugraph_sampling_options_t* options, - ) + ) # uniform random walks cdef cugraph_error_code_t \ diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd index 91cc11d6b1c..c32b57f8621 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd @@ -43,23 +43,6 @@ from pylibcugraph._cugraph_c.array cimport ( cdef extern from "cugraph_c/sampling_algorithms.h": ########################################################################### - # deprecated, should migrate to cugraph_uniform_neighbor_sample - cdef cugraph_error_code_t cugraph_uniform_neighbor_sample_with_edge_properties( - const cugraph_resource_handle_t* handle, - cugraph_graph_t* graph, - const cugraph_type_erased_device_array_view_t* start_vertices, - const cugraph_type_erased_device_array_view_t* start_vertex_labels, - const cugraph_type_erased_device_array_view_t* label_list, - const cugraph_type_erased_device_array_view_t* label_to_comm_rank, - const cugraph_type_erased_host_array_view_t* fan_out, - cugraph_rng_state_t* rng_state, - bool_t with_replacement, - bool_t return_hops, - bool_t do_expensive_check, - cugraph_sample_result_t** result, - cugraph_error_t** error - ) - cdef cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_resource_handle_t* handle, cugraph_graph_t* graph, diff --git a/python/pylibcugraph/pylibcugraph/internal_types/sampling_result.pyx b/python/pylibcugraph/pylibcugraph/internal_types/sampling_result.pyx index d11f6994298..9f98b4f37b0 100644 --- a/python/pylibcugraph/pylibcugraph/internal_types/sampling_result.pyx +++ b/python/pylibcugraph/pylibcugraph/internal_types/sampling_result.pyx @@ -20,14 +20,18 @@ from pylibcugraph._cugraph_c.array cimport ( ) from pylibcugraph._cugraph_c.algorithms cimport ( cugraph_sample_result_t, - cugraph_sample_result_get_sources, - cugraph_sample_result_get_destinations, + cugraph_sample_result_get_major_offsets, + cugraph_sample_result_get_majors, + cugraph_sample_result_get_minors, + cugraph_sample_result_get_label_hop_offsets, + cugraph_sample_result_get_sources, # deprecated + cugraph_sample_result_get_destinations, # deprecated cugraph_sample_result_get_edge_weight, cugraph_sample_result_get_edge_id, cugraph_sample_result_get_edge_type, - cugraph_sample_result_get_hop, + cugraph_sample_result_get_hop, # deprecated cugraph_sample_result_get_start_labels, - cugraph_sample_result_get_offsets, + cugraph_sample_result_get_offsets, # deprecated cugraph_sample_result_get_renumber_map, cugraph_sample_result_get_renumber_map_offsets, cugraph_sample_result_free, @@ -60,23 +64,71 @@ cdef class SamplingResult: cdef set_ptr(self, cugraph_sample_result_t* sample_result_ptr): self.c_sample_result_ptr = sample_result_ptr + def get_major_offsets(self): + if self.c_sample_result_ptr is NULL: + raise ValueError("pointer not set, must call set_ptr() with a " + "non-NULL value first.") + + cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( + cugraph_sample_result_get_major_offsets(self.c_sample_result_ptr) + ) + if device_array_view_ptr is NULL: + return None + + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, + self) + + def get_majors(self): + if self.c_sample_result_ptr is NULL: + raise ValueError("pointer not set, must call set_ptr() with a " + "non-NULL value first.") + cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( + cugraph_sample_result_get_majors(self.c_sample_result_ptr) + ) + if device_array_view_ptr is NULL: + return None + + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, + self) + + def get_minors(self): + if self.c_sample_result_ptr is NULL: + raise ValueError("pointer not set, must call set_ptr() with a " + "non-NULL value first.") + cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( + cugraph_sample_result_get_minors(self.c_sample_result_ptr) + ) + if device_array_view_ptr is NULL: + return None + + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, + self) + def get_sources(self): + # Deprecated if self.c_sample_result_ptr is NULL: raise ValueError("pointer not set, must call set_ptr() with a " "non-NULL value first.") cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_sources(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) def get_destinations(self): + # Deprecated if self.c_sample_result_ptr is NULL: raise ValueError("pointer not set, must call set_ptr() with a " "non-NULL value first.") cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_destinations(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) @@ -95,6 +147,7 @@ cdef class SamplingResult: self) def get_indices(self): + # Deprecated return self.get_edge_weights() def get_edge_ids(self): @@ -132,9 +185,26 @@ cdef class SamplingResult: cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_start_labels(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) + def get_label_hop_offsets(self): + if self.c_sample_result_ptr is NULL: + raise ValueError("pointer not set, must call set_ptr() with a " + "non-NULL value first.") + cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( + cugraph_sample_result_get_label_hop_offsets(self.c_sample_result_ptr) + ) + if device_array_view_ptr is NULL: + return None + + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, + self) + + # Deprecated def get_offsets(self): if self.c_sample_result_ptr is NULL: raise ValueError("pointer not set, must call set_ptr() with a " @@ -142,9 +212,13 @@ cdef class SamplingResult: cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_offsets(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) + # Deprecated def get_hop_ids(self): if self.c_sample_result_ptr is NULL: raise ValueError("pointer not set, must call set_ptr() with a " @@ -152,6 +226,9 @@ cdef class SamplingResult: cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_hop(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) @@ -162,6 +239,9 @@ cdef class SamplingResult: cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_renumber_map(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) @@ -172,5 +252,8 @@ cdef class SamplingResult: cdef cugraph_type_erased_device_array_view_t* device_array_view_ptr = ( cugraph_sample_result_get_renumber_map_offsets(self.c_sample_result_ptr) ) + if device_array_view_ptr is NULL: + return None + return create_cupy_array_view_for_device_ptr(device_array_view_ptr, self) \ No newline at end of file diff --git a/python/pylibcugraph/pylibcugraph/tests/test_uniform_neighbor_sample.py b/python/pylibcugraph/pylibcugraph/tests/test_uniform_neighbor_sample.py index 74aa6830d24..ac04635edcf 100644 --- a/python/pylibcugraph/pylibcugraph/tests/test_uniform_neighbor_sample.py +++ b/python/pylibcugraph/pylibcugraph/tests/test_uniform_neighbor_sample.py @@ -266,7 +266,7 @@ def test_neighborhood_sampling_large_sg_graph(gpubenchmark): def test_sample_result(): """ - Ensure the SampleResult class returns zero-opy cupy arrays and properly + Ensure the SampleResult class returns zero-copy cupy arrays and properly frees device memory when all references to it are gone and it's garbage collected. """ @@ -304,6 +304,8 @@ def test_sample_result(): assert isinstance(destinations, cp.ndarray) assert isinstance(indices, cp.ndarray) + print("sources:", destinations) + # Delete the SampleResult instance. This *should not* free the device # memory yet since the variables sources, destinations, and indices are # keeping the refcount >0. diff --git a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx index bc2aa9205f1..ce6493c38f5 100644 --- a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx @@ -38,6 +38,7 @@ from pylibcugraph._cugraph_c.graph cimport ( from pylibcugraph._cugraph_c.algorithms cimport ( cugraph_sample_result_t, cugraph_prior_sources_behavior_t, + cugraph_compression_type_t, cugraph_sampling_options_t, cugraph_sampling_options_create, cugraph_sampling_options_free, @@ -46,7 +47,8 @@ from pylibcugraph._cugraph_c.algorithms cimport ( cugraph_sampling_set_prior_sources_behavior, cugraph_sampling_set_dedupe_sources, cugraph_sampling_set_renumber_results, - + cugraph_sampling_set_compress_per_hop, + cugraph_sampling_set_compression_type, ) from pylibcugraph._cugraph_c.sampling_algorithms cimport ( cugraph_uniform_neighbor_sample, @@ -73,6 +75,7 @@ from pylibcugraph._cugraph_c.random cimport ( from pylibcugraph.random cimport ( CuGraphRandomState ) +import warnings # TODO accept cupy/numpy random state in addition to raw seed. def uniform_neighbor_sample(ResourceHandle resource_handle, @@ -90,7 +93,10 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, deduplicate_sources=False, return_hops=False, renumber=False, - random_state=None): + compression='COO', + compress_per_hop=False, + random_state=None, + return_dict=False,): """ Does neighborhood sampling, which samples nodes from a graph based on the current node's neighbors, with a corresponding fanout value at each hop. @@ -153,11 +159,27 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, If True, will renumber the sources and destinations on a per-batch basis and return the renumber map and batch offsets in additional to the standard returns. + + compression: str (Optional) + Options: COO (default), CSR, CSC, DCSR, DCSR + Sets the compression format for the returned samples. + + compress_per_hop: bool (Optional) + If False (default), will create a compressed edgelist for the + entire batch. + If True, will create a separate compressed edgelist per hop within + a batch. random_state: int (Optional) Random state to use when generating samples. Optional argument, defaults to a hash of process id, time, and hostname. (See pylibcugraph.random.CuGraphRandomState) + + return_dict: bool (Optional) + Whether to return a dictionary instead of a tuple. + Optional argument, defaults to False, returning a tuple. + This argument will eventually be deprecated in favor + of always returning a dictionary. Returns ------- @@ -173,13 +195,16 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, the renumber map for each batch starts). """ - cdef cugraph_resource_handle_t* c_resource_handle_ptr = \ + cdef cugraph_resource_handle_t* c_resource_handle_ptr = ( resource_handle.c_resource_handle_ptr + ) + cdef cugraph_graph_t* c_graph_ptr = input_graph.c_graph_ptr cdef bool_t c_deduplicate_sources = deduplicate_sources cdef bool_t c_return_hops = return_hops cdef bool_t c_renumber = renumber + cdef bool_t c_compress_per_hop = compress_per_hop assert_CAI_type(start_list, "start_list") assert_CAI_type(batch_id_list, "batch_id_list", True) @@ -269,6 +294,23 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, f'Invalid option {prior_sources_behavior}' ' for prior sources behavior' ) + + cdef cugraph_compression_type_t compression_behavior_e + if compression is None or compression == 'COO': + compression_behavior_e = cugraph_compression_type_t.COO + elif compression == 'CSR': + compression_behavior_e = cugraph_compression_type_t.CSR + elif compression == 'CSC': + compression_behavior_e = cugraph_compression_type_t.CSC + elif compression == 'DCSR': + compression_behavior_e = cugraph_compression_type_t.DCSR + elif compression == 'DCSC': + compression_behavior_e = cugraph_compression_type_t.DCSC + else: + raise ValueError( + f'Invalid option {compression}' + ' for compression type' + ) cdef cugraph_sampling_options_t* sampling_options error_code = cugraph_sampling_options_create(&sampling_options, &error_ptr) @@ -279,6 +321,8 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cugraph_sampling_set_dedupe_sources(sampling_options, c_deduplicate_sources) cugraph_sampling_set_prior_sources_behavior(sampling_options, prior_sources_behavior_e) cugraph_sampling_set_renumber_results(sampling_options, c_renumber) + cugraph_sampling_set_compression_type(sampling_options, compression_behavior_e) + cugraph_sampling_set_compress_per_hop(sampling_options, c_compress_per_hop) error_code = cugraph_uniform_neighbor_sample( c_resource_handle_ptr, @@ -311,26 +355,74 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, # Get cupy "views" of the individual arrays to return. These each increment # the refcount on the SamplingResult instance which will keep the data alive # until all references are removed and the GC runs. + # TODO Return everything that isn't null in release 23.12 if with_edge_properties: - cupy_sources = result.get_sources() - cupy_destinations = result.get_destinations() + cupy_majors = result.get_majors() + cupy_major_offsets = result.get_major_offsets() + cupy_minors = result.get_minors() cupy_edge_weights = result.get_edge_weights() cupy_edge_ids = result.get_edge_ids() cupy_edge_types = result.get_edge_types() cupy_batch_ids = result.get_batch_ids() - cupy_offsets = result.get_offsets() - cupy_hop_ids = result.get_hop_ids() + cupy_label_hop_offsets = result.get_label_hop_offsets() if renumber: cupy_renumber_map = result.get_renumber_map() cupy_renumber_map_offsets = result.get_renumber_map_offsets() - return (cupy_sources, cupy_destinations, cupy_edge_weights, cupy_edge_ids, cupy_edge_types, cupy_batch_ids, cupy_offsets, cupy_hop_ids, cupy_renumber_map, cupy_renumber_map_offsets) + # TODO drop the placeholder for hop ids in release 23.12 + if return_dict: + return { + 'major_offsets': cupy_major_offsets, + 'majors': cupy_majors, + 'minors': cupy_minors, + 'weight': cupy_edge_weights, + 'edge_id': cupy_edge_ids, + 'edge_type': cupy_edge_types, + 'batch_id': cupy_batch_ids, + 'label_hop_offsets': cupy_label_hop_offsets, + 'hop_id': None, + 'renumber_map': cupy_renumber_map, + 'renumber_map_offsets': cupy_renumber_map_offsets + } + else: + cupy_majors = cupy_major_offsets if cupy_majors is None else cupy_majors + return (cupy_majors, cupy_minors, cupy_edge_weights, cupy_edge_ids, cupy_edge_types, cupy_batch_ids, cupy_label_hop_offsets, None, cupy_renumber_map, cupy_renumber_map_offsets) else: - return (cupy_sources, cupy_destinations, cupy_edge_weights, cupy_edge_ids, cupy_edge_types, cupy_batch_ids, cupy_offsets, cupy_hop_ids) + cupy_hop_ids = result.get_hop_ids() # FIXME remove this + if return_dict: + return { + 'major_offsets': cupy_major_offsets, + 'majors': cupy_majors, + 'minors': cupy_minors, + 'weight': cupy_edge_weights, + 'edge_id': cupy_edge_ids, + 'edge_type': cupy_edge_types, + 'batch_id': cupy_batch_ids, + 'label_hop_offsets': cupy_label_hop_offsets, + 'hop_id': cupy_hop_ids, + } + else: + cupy_majors = cupy_major_offsets if cupy_majors is None else cupy_majors + return (cupy_majors, cupy_minors, cupy_edge_weights, cupy_edge_ids, cupy_edge_types, cupy_batch_ids, cupy_label_hop_offsets, cupy_hop_ids) else: + # TODO this is deprecated, remove it in release 23.12 + warnings.warn( + "Calling uniform_neighbor_sample with the 'with_edge_properties' argument is deprecated." + " Starting in release 23.12, this argument will be removed in favor of behaving like the " + "with_edge_properties=True option, returning whatever properties are in the graph.", + FutureWarning, + ) + cupy_sources = result.get_sources() cupy_destinations = result.get_destinations() cupy_indices = result.get_indices() - return (cupy_sources, cupy_destinations, cupy_indices) + if return_dict: + return { + 'sources': cupy_sources, + 'destinations': cupy_destinations, + 'indices': cupy_indices + } + else: + return (cupy_sources, cupy_destinations, cupy_indices)