diff --git a/cpp/include/cugraph_c/sampling_algorithms.h b/cpp/include/cugraph_c/sampling_algorithms.h index 5760d2098aa..859eaca7f3b 100644 --- a/cpp/include/cugraph_c/sampling_algorithms.h +++ b/cpp/include/cugraph_c/sampling_algorithms.h @@ -236,6 +236,15 @@ typedef enum cugraph_compression_type_t { cugraph_error_code_t cugraph_sampling_options_create(cugraph_sampling_options_t** options, cugraph_error_t** error); +/** + * @ingroup samplingC + * @brief Set flag to retain seeds (original sources) + * + * @param options - opaque pointer to the sampling options + * @param value - Boolean value to assign to the option + */ +void cugraph_sampling_set_retain_seeds(cugraph_sampling_options_t* options, bool_t value); + /** * @ingroup samplingC * @brief Set flag to renumber results @@ -335,6 +344,8 @@ void cugraph_sampling_options_free(cugraph_sampling_options_t* options); * output. If specified then the all data from @p label_list[i] will be shuffled to rank @p. This * cannot be specified unless @p start_vertex_labels is also specified * label_to_comm_rank[i]. If not specified then the output data will not be shuffled between ranks. + * @param [in] label_offsets Device array of the offsets for each label in the seed list. This + * parameter is only used with the retain_seeds option. * @param [in] fanout Host array defining the fan out at each step in the sampling algorithm. * We only support fanout values of type INT32 * @param [in/out] rng_state State of the random number generator, updated with each call @@ -354,6 +365,7 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, diff --git a/cpp/src/c_api/uniform_neighbor_sampling.cpp b/cpp/src/c_api/uniform_neighbor_sampling.cpp index 100e81a5bd2..45609fc0e01 100644 --- a/cpp/src/c_api/uniform_neighbor_sampling.cpp +++ b/cpp/src/c_api/uniform_neighbor_sampling.cpp @@ -40,6 +40,7 @@ struct cugraph_sampling_options_t { bool_t renumber_results_{FALSE}; cugraph_compression_type_t compression_type_{cugraph_compression_type_t::COO}; bool_t compress_per_hop_{FALSE}; + bool_t retain_seeds_{FALSE}; }; struct cugraph_sample_result_t { @@ -68,6 +69,7 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct cugraph::c_api::cugraph_type_erased_device_array_view_t const* start_vertex_labels_{nullptr}; cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_list_{nullptr}; cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_to_comm_rank_{nullptr}; + cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_offsets_{nullptr}; cugraph::c_api::cugraph_type_erased_host_array_view_t const* fan_out_{nullptr}; cugraph::c_api::cugraph_rng_state_t* rng_state_{nullptr}; cugraph::c_api::cugraph_sampling_options_t options_{}; @@ -81,6 +83,7 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct cugraph_type_erased_device_array_view_t const* start_vertex_labels, cugraph_type_erased_device_array_view_t const* label_list, cugraph_type_erased_device_array_view_t const* label_to_comm_rank, + cugraph_type_erased_device_array_view_t const* label_offsets, cugraph_type_erased_host_array_view_t const* fan_out, cugraph_rng_state_t* rng_state, cugraph::c_api::cugraph_sampling_options_t options, @@ -99,6 +102,9 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct label_to_comm_rank_( reinterpret_cast( label_to_comm_rank)), + label_offsets_( + reinterpret_cast( + label_offsets)), fan_out_( reinterpret_cast(fan_out)), rng_state_(reinterpret_cast(rng_state)), @@ -267,8 +273,13 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct std::move(edge_id), std::move(edge_type), std::move(hop), - std::nullopt, - std::nullopt, + options_.retain_seeds_ + ? std::make_optional(raft::device_span{ + start_vertices_->as_type(), start_vertices_->size_}) + : std::nullopt, + options_.retain_seeds_ ? std::make_optional(raft::device_span{ + label_offsets_->as_type(), label_offsets_->size_}) + : std::nullopt, offsets ? std::make_optional( raft::device_span{offsets->data(), offsets->size()}) : std::nullopt, @@ -304,8 +315,13 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct std::move(edge_id), std::move(edge_type), std::move(hop), - std::nullopt, - std::nullopt, + options_.retain_seeds_ + ? std::make_optional(raft::device_span{ + start_vertices_->as_type(), start_vertices_->size_}) + : std::nullopt, + options_.retain_seeds_ ? std::make_optional(raft::device_span{ + label_offsets_->as_type(), label_offsets_->size_}) + : std::nullopt, offsets ? std::make_optional( raft::device_span{offsets->data(), offsets->size()}) : std::nullopt, @@ -402,6 +418,12 @@ extern "C" cugraph_error_code_t cugraph_sampling_options_create( return CUGRAPH_SUCCESS; } +extern "C" void cugraph_sampling_set_retain_seeds(cugraph_sampling_options_t* options, bool_t value) +{ + auto internal_pointer = reinterpret_cast(options); + internal_pointer->retain_seeds_ = value; +} + extern "C" void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t* options, bool_t value) { @@ -871,6 +893,7 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, @@ -878,6 +901,13 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( cugraph_sample_result_t** result, cugraph_error_t** error) { + auto options_cpp = *reinterpret_cast(options); + + CAPI_EXPECTS((!options_cpp.retain_seeds_) || (label_offsets != nullptr), + CUGRAPH_INVALID_INPUT, + "must specify label_offsets if retain_seeds is true", + *error); + CAPI_EXPECTS((start_vertex_labels == nullptr) || (reinterpret_cast( start_vertex_labels) @@ -911,16 +941,16 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( "fan_out should be of type int", *error); - uniform_neighbor_sampling_functor functor{ - handle, - graph, - start_vertices, - start_vertex_labels, - label_list, - label_to_comm_rank, - fan_out, - rng_state, - *reinterpret_cast(options), - do_expensive_check}; + uniform_neighbor_sampling_functor functor{handle, + graph, + start_vertices, + start_vertex_labels, + label_list, + label_to_comm_rank, + label_offsets, + fan_out, + rng_state, + std::move(options_cpp), + do_expensive_check}; return cugraph::c_api::run_algorithm(graph, functor, result, error); } diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index 1f6f29d8683..fb837484a14 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -31,8 +31,6 @@ #include -#include - #include #include #include @@ -149,11 +147,11 @@ void bfs(raft::handle_t const& handle, auto constexpr invalid_distance = std::numeric_limits::max(); auto constexpr invalid_vertex = invalid_vertex_id::value; - thrust::fill(rmm::exec_policy(handle.get_thrust_policy()), + thrust::fill(handle.get_thrust_policy(), distances, distances + push_graph_view.local_vertex_partition_range_size(), invalid_distance); - thrust::fill(rmm::exec_policy(handle.get_thrust_policy()), + thrust::fill(handle.get_thrust_policy(), predecessor_first, predecessor_first + push_graph_view.local_vertex_partition_range_size(), invalid_vertex); @@ -161,7 +159,7 @@ void bfs(raft::handle_t const& handle, push_graph_view.local_vertex_partition_view()); if (n_sources) { thrust::for_each( - rmm::exec_policy(handle.get_thrust_policy()), + handle.get_thrust_policy(), sources, sources + n_sources, [vertex_partition, distances, predecessor_first] __device__(auto v) { diff --git a/cpp/tests/c_api/create_graph_test.c b/cpp/tests/c_api/create_graph_test.c index 758624a89e9..41b8691e79c 100644 --- a/cpp/tests/c_api/create_graph_test.c +++ b/cpp/tests/c_api/create_graph_test.c @@ -268,6 +268,7 @@ int test_create_sg_graph_csr() NULL, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c index a32056a9f15..3d8fb02ed46 100644 --- a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c @@ -133,6 +133,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -565,6 +566,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) d_label_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -841,6 +843,7 @@ int test_uniform_neighbor_sample_alex_bug(const cugraph_resource_handle_t* handl d_start_labels_view, d_label_list_view, d_label_to_output_comm_rank_view, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -1099,6 +1102,7 @@ int test_uniform_neighbor_sample_sort_by_hop(const cugraph_resource_handle_t* ha d_start_labels_view, d_label_list_view, d_label_to_output_comm_rank_view, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/cpp/tests/c_api/uniform_neighbor_sample_test.c b/cpp/tests/c_api/uniform_neighbor_sample_test.c index 94f0f788354..451dbca51a7 100644 --- a/cpp/tests/c_api/uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/uniform_neighbor_sample_test.c @@ -140,6 +140,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -661,6 +662,7 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 86b33594ed7..eafadfa4ff0 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -71,6 +71,8 @@ def uniform_neighbor_sample( prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, + retain_seeds: bool = False, + label_offsets: Sequence = None, use_legacy_names: bool = True, # deprecated compress_per_hop: bool = False, compression: str = "COO", @@ -142,6 +144,15 @@ def uniform_neighbor_sample( will return the renumber map and renumber map offsets as an additional dataframe. + retain_seeds: bool, optional (default=False) + If True, will retain the original seeds (original source vertices) + in the output even if they do not have outgoing neighbors. + + label_offsets: integer sequence, optional (default=None) + Offsets of each label within the start vertex list. + Only used if retain_seeds is True. Required if retain_seeds + is True. + use_legacy_names: bool, optional (default=True) Whether to use the legacy column names (sources, destinations). If True, will use "sources" and "destinations" as the column names. @@ -342,13 +353,15 @@ def uniform_neighbor_sample( else None, h_fan_out=fanout_vals, with_replacement=with_replacement, - do_expensive_check=False, + do_expensive_check=True, with_edge_properties=with_edge_properties, random_state=random_state, prior_sources_behavior=prior_sources_behavior, deduplicate_sources=deduplicate_sources, return_hops=return_hops, renumber=renumber, + retain_seeds=retain_seeds, + label_offsets=label_offsets, compression=compression, compress_per_hop=compress_per_hop, return_dict=True, diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py index 1c73ebb0216..9f0980d4199 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py @@ -93,6 +93,3 @@ def test_mg_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py index 4530dd3da86..4764c01f0fc 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py @@ -84,5 +84,3 @@ def test_mg_edge_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py index db34c68a054..ff8859a01b1 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION.: +# Copyright (c) 2020-2024, NVIDIA CORPORATION.: # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -111,11 +111,18 @@ def calc_betweenness_centrality( else: edge_attr = None - G = graph_file.get_graph( - download=True, - create_using=cugraph.Graph(directed=directed), - ignore_weights=not edgevals, - ) + G = None + if multi_gpu_batch: + G = graph_file.get_dask_graph( + create_using=cugraph.Graph(directed=directed), ignore_weights=not edgevals + ) + G.enable_batch() + else: + G = graph_file.get_graph( + download=True, + create_using=cugraph.Graph(directed=directed), + ignore_weights=not edgevals, + ) M = G.to_pandas_edgelist().rename( columns={"src": "0", "dst": "1", "wgt": edge_attr} @@ -130,8 +137,6 @@ def calc_betweenness_centrality( ) assert G is not None and Gnx is not None - if multi_gpu_batch: - G.enable_batch() calc_func = None if k is not None and seed is not None: diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py index c94c2dcaff6..35e199093ce 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py @@ -49,14 +49,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -96,7 +94,6 @@ def test_dask_mg_betweenness_centrality( benchmark, ): g = get_sg_graph(dataset, directed) - dataset.unload() dg = get_mg_graph(dataset, directed) random_state = subset_seed @@ -143,6 +140,3 @@ def test_dask_mg_betweenness_centrality( diff = cupy.isclose(mg_bc_results, sg_bc_results) assert diff.all() - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py index 68daff9238c..8606649c745 100644 --- a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py @@ -45,14 +45,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -118,6 +116,3 @@ def test_dask_mg_degree(dask_client, dataset, directed): check_names=False, check_dtype=False, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 80acfe1c4ad..5b83a05e2a2 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -47,7 +47,6 @@ def setup_function(): def get_sg_graph(dataset, directed, edge_ids): - dataset.unload() df = dataset.get_edgelist() if edge_ids: if not directed: @@ -71,7 +70,6 @@ def get_sg_graph(dataset, directed, edge_ids): def get_mg_graph(dataset, directed, edge_ids, weight): - dataset.unload() ddf = dataset.get_dask_edgelist() if weight: @@ -178,6 +176,3 @@ def test_dask_mg_edge_betweenness_centrality( assert len(edge_bc_diffs1) == 0 assert len(edge_bc_diffs2) == 0 - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py index 8cd77fb5e24..3a840c82e95 100644 --- a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py @@ -52,7 +52,6 @@ def setup_function(): def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -89,15 +88,11 @@ def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): dataset = DATASETS[0] - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -110,6 +105,3 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.eigenvector_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py index ebbe5974814..5dcbd8173df 100644 --- a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py @@ -53,7 +53,6 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -95,16 +94,12 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -136,14 +131,10 @@ def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.parametrize("dataset", DATASETS) def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -156,6 +147,3 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): with pytest.warns(UserWarning, match=warning_msg): dcg.katz_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/comms/test_comms_mg.py b/python/cugraph/cugraph/tests/comms/test_comms_mg.py index 75462924c9d..d096eb7e5c2 100644 --- a/python/cugraph/cugraph/tests/comms/test_comms_mg.py +++ b/python/cugraph/cugraph/tests/comms/test_comms_mg.py @@ -16,10 +16,9 @@ import pytest import cugraph.dask as dcg -import cudf -import dask_cudf import cugraph -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import karate, dolphins + # ============================================================================= # Pytest Setup / Teardown - called for each test function @@ -30,12 +29,36 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate, dolphins] IS_DIRECTED = [True, False] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_pagerank_result(dataset, is_mg): + """Return the cugraph.pagerank result for an MG or SG graph""" + + if is_mg: + dg = dataset.get_dask_graph(store_transposed=True) + return dcg.pagerank(dg).compute() + else: + g = dataset.get_graph(store_transposed=True) + return cugraph.pagerank(g) + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_pagerank(dask_client, directed): @@ -43,62 +66,17 @@ def test_dask_mg_pagerank(dask_client, directed): # Initialize and run pagerank on two distributed graphs # with same communicator - input_data_path1 = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() + input_data_path1 = karate.get_path() print(f"dataset1={input_data_path1}") - chunksize1 = dcg.get_chunksize(input_data_path1) + result_pr1 = get_pagerank_result(karate, is_mg=True) - input_data_path2 = (RAPIDS_DATASET_ROOT_DIR_PATH / "dolphins.csv").as_posix() + input_data_path2 = dolphins.get_path() print(f"dataset2={input_data_path2}") - chunksize2 = dcg.get_chunksize(input_data_path2) - - ddf1 = dask_cudf.read_csv( - input_data_path1, - blocksize=chunksize1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg1 = cugraph.Graph(directed=directed) - dg1.from_dask_cudf_edgelist(ddf1, "src", "dst") - - result_pr1 = dcg.pagerank(dg1).compute() - - ddf2 = dask_cudf.read_csv( - input_data_path2, - blocksize=chunksize2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg2 = cugraph.Graph(directed=directed) - dg2.from_dask_cudf_edgelist(ddf2, "src", "dst") - - result_pr2 = dcg.pagerank(dg2).compute() + result_pr2 = get_pagerank_result(dolphins, is_mg=True) # Calculate single GPU pagerank for verification of results - df1 = cudf.read_csv( - input_data_path1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g1 = cugraph.Graph(directed=directed) - g1.from_cudf_edgelist(df1, "src", "dst") - expected_pr1 = cugraph.pagerank(g1) - - df2 = cudf.read_csv( - input_data_path2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g2 = cugraph.Graph(directed=directed) - g2.from_cudf_edgelist(df2, "src", "dst") - expected_pr2 = cugraph.pagerank(g2) + expected_pr1 = get_pagerank_result(karate, is_mg=False) + expected_pr2 = get_pagerank_result(dolphins, is_mg=False) # Compare and verify pagerank results diff --git a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py index 45ec8eca0e8..311fd7a24bc 100644 --- a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py +++ b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py @@ -17,7 +17,6 @@ import cugraph import cugraph.dask as dcg -import dask_cudf from cudf.testing.testing import assert_frame_equal from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.datasets import karate, dolphins, email_Eu_core @@ -36,11 +35,13 @@ def setup_function(): # Parameters # ============================================================================= + DATASETS = [karate, dolphins, email_Eu_core] IS_DIRECTED = [True, False] NUM_VERTICES = [2, 5, 10, 20] OFFSETS = [None] + # ============================================================================= # Helper functions # ============================================================================= @@ -53,15 +54,7 @@ def get_sg_graph(dataset, directed): def get_mg_graph(dataset, directed): - input_data_path = dataset.get_path() - blocksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=blocksize, - delimiter=dataset.metadata["delim"], - names=dataset.metadata["col_names"], - dtype=dataset.metadata["col_types"], - ) + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( ddf, @@ -108,7 +101,7 @@ def test_mg_induced_subgraph( # FIXME: This parameter is not yet tested # mg_offsets = mg_offsets.compute().reset_index(drop=True) - mg_df, mg_offsets = result_induced_subgraph + mg_df, _ = result_induced_subgraph if mg_df is not None and sg_induced_subgraph is not None: # FIXME: 'edges()' or 'view_edgelist()' takes half the edges out if diff --git a/python/cugraph/cugraph/tests/community/test_leiden_mg.py b/python/cugraph/cugraph/tests/community/test_leiden_mg.py index b1908ae10a2..2904ecd12a2 100644 --- a/python/cugraph/cugraph/tests/community/test_leiden_mg.py +++ b/python/cugraph/cugraph/tests/community/test_leiden_mg.py @@ -13,123 +13,56 @@ import pytest - -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils - +from cugraph.datasets import karate_asymmetric, karate, dolphins -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark +# ============================================================================= +# Parameters +# ============================================================================= - gpubenchmark = pytest_benchmark.plugin.benchmark - def setFixtureParamNames(*args, **kwargs): - pass +DATASETS = [karate, dolphins] +DATASETS_ASYMMETRIC = [karate_asymmetric] # ============================================================================= -# Parameters +# Helper Functions # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + return dg -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_leiden_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_leiden_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Leiden and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.leiden(daskGraphFromDataset) + parts, mod = dcg.leiden(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_leiden_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.leiden(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_leiden_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.leiden(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_louvain_mg.py b/python/cugraph/cugraph/tests/community/test_louvain_mg.py index 19fffe96b5c..0dff7f1c8b0 100644 --- a/python/cugraph/cugraph/tests/community/test_louvain_mg.py +++ b/python/cugraph/cugraph/tests/community/test_louvain_mg.py @@ -14,122 +14,41 @@ import pytest import cugraph.dask as dcg +from cugraph.datasets import karate_asymmetric, karate, dolphins -import cugraph -import dask_cudf -from cugraph.testing import utils - - -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark - - gpubenchmark = pytest_benchmark.plugin.benchmark - - def setFixtureParamNames(*args, **kwargs): - pass +from test_leiden_mg import get_mg_graph # ============================================================================= # Parameters # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param +DATASETS_ASYMMETRIC = DATASETS_ASYMMETRIC = [karate_asymmetric] +DATASETS = [karate, dolphins] - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg - - -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_louvain_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_louvain_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Louvain and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.louvain(daskGraphFromDataset) + parts, mod = dcg.louvain(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_louvain_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.louvain(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_louvain_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.louvain(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py index 0a052845cf8..e2c47af8a1b 100644 --- a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py +++ b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py @@ -16,115 +16,81 @@ import random import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "start_list"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "start_list", "edgevals"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the triangle - count algo. - """ - start_list = input_combo["start_list"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - if start_list: +DATASETS = [karate, dolphins] +START_LIST = [True, False] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed, start): + G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + if start: # sample k nodes from the cuGraph graph - k = random.randint(1, 10) - srcs = G.view_edge_list()[G.source_columns] - dsts = G.view_edge_list()[G.destination_columns] - nodes = cudf.concat([srcs, dsts]).drop_duplicates() - start_list = nodes.sample(k) + start = G.select_random_vertices(num_vertices=random.randint(1, 10)) else: - start_list = None + start = None - sg_triangle_results = cugraph.triangle_count(G, start_list) - sg_triangle_results = sg_triangle_results.sort_values("vertex").reset_index( - drop=True - ) + return G, start - input_combo["sg_triangle_results"] = sg_triangle_results - input_combo["start_list"] = start_list - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) +def get_mg_graph(dataset, directed): + ddf = dataset.get_dask_edgelist() + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True + ddf, source="src", destination="dst", edge_attr="wgt", renumber=True ) - input_combo["MGGraph"] = dg - - return input_combo + return dg # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_triangles(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_sg_triangles(dask_client, dataset, start, benchmark): # This test is only for benchmark purposes. sg_triangle_results = None - G = input_expected_output["SGGraph"] - start_list = input_expected_output["start_list"] - sg_triangle_results = benchmark(cugraph.triangle_count, G, start_list) + G, start = get_sg_graph(dataset, False, start) + + sg_triangle_results = benchmark(cugraph.triangle_count, G, start) + sg_triangle_results.sort_values("vertex").reset_index(drop=True) assert sg_triangle_results is not None @pytest.mark.mg -def test_triangles(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - start_list = input_expected_output["start_list"] - - result_counts = benchmark(dcg.triangle_count, dg, start_list) +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_triangles(dask_client, dataset, start, benchmark): + G, start = get_sg_graph(dataset, False, start) + dg = get_mg_graph(dataset, False) + result_counts = benchmark(dcg.triangle_count, dg, start) result_counts = ( result_counts.drop_duplicates() .compute() @@ -132,8 +98,9 @@ def test_triangles(dask_client, benchmark, input_expected_output): .reset_index(drop=True) .rename(columns={"counts": "mg_counts"}) ) - - expected_output = input_expected_output["sg_triangle_results"] + expected_output = ( + cugraph.triangle_count(G, start).sort_values("vertex").reset_index(drop=True) + ) # Update the mg triangle count with sg triangle count results # for easy comparison using cuDF DataFrame methods. diff --git a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py index 26e8ed17bcb..4ab251c0e29 100644 --- a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py +++ b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py @@ -15,11 +15,9 @@ import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import netscience # ============================================================================= @@ -31,42 +29,47 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [netscience] # Directed graph is not currently supported IS_DIRECTED = [False, True] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper +# ============================================================================= + + +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_wcc(dask_client, directed): - - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() +def test_dask_mg_wcc(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst", renumber=True) - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + dg = get_mg_graph(dataset, directed) + # breakpoint() if not directed: expected_dist = cugraph.weakly_connected_components(g) result_dist = dcg.weakly_connected_components(dg) diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index f771ce513eb..3d9a7bef5be 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -15,107 +15,64 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins, karate_asymmetric # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -degree_type = ["incoming", "outgoing", "bidirectional"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (degree_type, "degree_type"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - sg_core_number_results = cugraph.core_number(G, degree_type) - sg_core_number_results = sg_core_number_results.sort_values("vertex").reset_index( - drop=True - ) +DATASETS = [karate, dolphins] +DEGREE_TYPE = ["incoming", "outgoing", "bidirectional"] - input_combo["sg_core_number_results"] = sg_core_number_results - input_combo["degree_type"] = degree_type - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True - ) +# ============================================================================= +# Helper Functions +# ============================================================================= - input_combo["MGGraph"] = dg - return input_combo +def get_sg_results(dataset, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + res = cugraph.core_number(G, degree_type) + res = res.sort_values("vertex").reset_index(drop=True) + return res # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_core_number(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_core_number(dask_client, dataset, degree_type, benchmark): # This test is only for benchmark purposes. sg_core_number_results = None - G = input_expected_output["SGGraph"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) sg_core_number_results = benchmark(cugraph.core_number, G, degree_type) assert sg_core_number_results is not None @pytest.mark.mg -def test_core_number(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - degree_type = input_expected_output["degree_type"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_core_number(dask_client, dataset, degree_type, benchmark): + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) - result_core_number = ( result_core_number.drop_duplicates() .compute() @@ -124,7 +81,7 @@ def test_core_number(dask_client, benchmark, input_expected_output): .rename(columns={"core_number": "mg_core_number"}) ) - expected_output = input_expected_output["sg_core_number_results"] + expected_output = get_sg_results(dataset, degree_type) # Update the mg core number with sg core number results # for easy comparison using cuDF DataFrame methods. @@ -135,30 +92,10 @@ def test_core_number(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_core_number_invalid_input(input_expected_output): - input_data_path = ( - utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv" - ).as_posix() - - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) +def test_core_number_invalid_input(): + dg = karate_asymmetric.get_graph(create_using=cugraph.Graph(directed=True)) invalid_degree_type = 3 - dg = input_expected_output["MGGraph"] + with pytest.raises(ValueError): dcg.core_number(dg, invalid_degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index b2ac18cf3a9..c7ad6d2d41d 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -1,4 +1,5 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2024, NVIDIA CORPORATION. + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,58 +16,39 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils +from cugraph.datasets import karate, dolphins from cudf.testing.testing import assert_frame_equal from cugraph.structure.symmetrize import symmetrize_df -from pylibcugraph.testing import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED - -core_number = [True, False] -degree_type = ["bidirectional", "outgoing", "incoming"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), (core_number, "core_number"), (degree_type, "degree_type") -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "core_number", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - core_number = input_combo["core_number"] - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) + + +DATASETS = [karate, dolphins] +CORE_NUMBER = [True, False] +DEGREE_TYPE = ["bidirectional", "outgoing", "incoming"] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_results(dataset, core_number, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) if core_number: # compute the core_number @@ -74,62 +56,41 @@ def input_expected_output(dask_client, input_combo): else: core_number = None - input_combo["core_number"] = core_number - - input_combo["SGGraph"] = G - sg_k_core_graph = cugraph.k_core( G, core_number=core_number, degree_type=degree_type ) - sg_k_core_results = sg_k_core_graph.view_edge_list() + res = sg_k_core_graph.view_edge_list() # FIXME: The result will come asymetric. Symmetrize the results srcCol = sg_k_core_graph.source_columns dstCol = sg_k_core_graph.destination_columns wgtCol = sg_k_core_graph.weight_column - sg_k_core_results = ( - symmetrize_df(sg_k_core_results, srcCol, dstCol, wgtCol) + res = ( + symmetrize_df(res, srcCol, dstCol, wgtCol) .sort_values([srcCol, dstCol]) .reset_index(drop=True) ) - input_combo["sg_k_core_results"] = sg_k_core_results - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - # FIXME: False when renumbering (C++ and python renumbering) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) - - input_combo["MGGraph"] = dg - - return input_combo + return res, core_number # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_k_core(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_k_core(dask_client, dataset, core_number, degree_type, benchmark): # This test is only for benchmark purposes. sg_k_core = None - G = input_expected_output["SGGraph"] - core_number = input_expected_output["core_number"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + if core_number: + # compute the core_number + core_number = cugraph.core_number(G, degree_type=degree_type) + else: + core_number = None sg_k_core = benchmark( cugraph.k_core, G, core_number=core_number, degree_type=degree_type ) @@ -137,15 +98,16 @@ def test_sg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - core_number = input_expected_output["core_number"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmark): + expected_k_core_results, core_number = get_sg_results( + dataset, core_number, degree_type + ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) - - expected_k_core_results = input_expected_output["sg_k_core_results"] - k_core_results = ( k_core_results.compute() .sort_values(["src", "dst"]) @@ -160,36 +122,13 @@ def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset = DATASETS[0] + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) with pytest.raises(ValueError): dcg.k_core(dg) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) degree_type = "invalid" with pytest.raises(ValueError): diff --git a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py index 45a3c46309d..64917d0c747 100644 --- a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py @@ -24,33 +24,61 @@ import dask_cudf import cugraph.dask as dcg import cugraph +from cugraph.datasets import karate, karate_disjoint from cugraph.testing import utils from cugraph.structure.number_map import NumberMap from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH from cudf.testing import assert_frame_equal, assert_series_equal # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate] +DATASETS_UNRENUMBERED = [karate_disjoint] IS_DIRECTED = [True, False] +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed): + dataset.unload() + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + + return g + + +def get_mg_graph(dataset, directed): + dataset.unload() + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=directed)) + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber(graph_file, dask_client): - - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -96,13 +124,9 @@ def test_mg_renumber(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_add_internal_vertex_id(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -131,33 +155,13 @@ def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_pagerank(dask_client, directed): +def test_dask_mg_pagerank(dask_client, dataset, directed): pandas.set_option("display.max_rows", 10000) - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst") - - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = get_sg_graph(dataset, directed) + dg = get_mg_graph(dataset, directed) expected_pr = cugraph.pagerank(g) result_pr = dcg.pagerank(dg).compute() @@ -178,20 +182,18 @@ def test_dask_mg_pagerank(dask_client, directed): print("Mismatches:", err) assert err == 0 + dataset.unload() + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_common_col_names(graph_file, dask_client): +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_common_col_names(dataset, dask_client): """ Ensure that commonly-used column names in the input do not conflict with names used internally by NumberMap. """ - M = utils.read_csv_for_nx(graph_file) + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) diff --git a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py index 3bdb5c079ef..09936e954e8 100644 --- a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,73 +17,54 @@ import dask_cudf import numpy as np -from cugraph.testing import UNDIRECTED_DATASETS, karate_disjoint - +from cugraph.datasets import karate, dolphins, karate_disjoint from cugraph.structure.replicate_edgelist import replicate_edgelist from cudf.testing.testing import assert_frame_equal -from pylibcugraph.testing.utils import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + edgeWeightCol = "weights" edgeIdCol = "edge_id" edgeTypeCol = "edge_type" srcCol = "src" dstCol = "dst" - -input_data = UNDIRECTED_DATASETS + [karate_disjoint] -datasets = [pytest.param(d) for d in input_data] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "distributed"), - ([True, False], "use_weights"), - ([True, False], "use_edge_ids"), - ([True, False], "use_edge_type_ids"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - return dict( - zip( - ( - "graph_file", - "use_weights", - "use_edge_ids", - "use_edge_type_ids", - "distributed", - ), - request.param, - ) - ) +DATASETS = [karate, dolphins, karate_disjoint] +IS_DISTRIBUTED = [True, False] +USE_WEIGHTS = [True, False] +USE_EDGE_IDS = [True, False] +USE_EDGE_TYPE_IDS = [True, False] # ============================================================================= # Tests # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.mark.mg -def test_mg_replicate_edgelist(dask_client, input_combo): - df = input_combo["graph_file"].get_edgelist() - distributed = input_combo["distributed"] - use_weights = input_combo["use_weights"] - use_edge_ids = input_combo["use_edge_ids"] - use_edge_type_ids = input_combo["use_edge_type_ids"] + +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("distributed", IS_DISTRIBUTED) +@pytest.mark.parametrize("use_weights", USE_WEIGHTS) +@pytest.mark.parametrize("use_edge_ids", USE_EDGE_IDS) +@pytest.mark.parametrize("use_edge_type_ids", USE_EDGE_TYPE_IDS) +def test_mg_replicate_edgelist( + dask_client, dataset, distributed, use_weights, use_edge_ids, use_edge_type_ids +): + dataset.unload() + df = dataset.get_edgelist() columns = [srcCol, dstCol] weight = None diff --git a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py index 05cc06e6282..913443fe400 100644 --- a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -25,6 +25,8 @@ # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py index 560b80993d9..304ead6fea9 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py @@ -963,6 +963,46 @@ def test_uniform_neighbor_sample_csr_csc_local(hops, seed): assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) +def test_uniform_neighbor_sample_retain_seeds(): + src = cupy.array([0, 1, 2, 3, 4, 5], dtype="int64") + dst = cupy.array([2, 3, 1, 7, 5, 6], dtype="int64") + + seeds = cupy.array([6, 0, 1, 7], dtype="int64") + batch = cupy.array([0, 0, 1, 1], dtype="int32") + batch_offsets = cupy.array([0, 2, 4], dtype="int64") + + fanout = [2, 2] + + df = cudf.DataFrame({"src": src, "dst": dst}) + + G = cugraph.MultiGraph(directed=True) + G.from_cudf_edgelist(df, source="src", destination="dst") + + batch_df = cudf.DataFrame({"seeds": seeds, "batch": batch}) + batch_offsets_s = cudf.Series(batch_offsets, name="batch_offsets") + results, offsets, renumber_map = cugraph.uniform_neighbor_sample( + G, + batch_df, + fanout, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=True, + random_state=62, + return_offsets=True, + label_offsets=batch_offsets_s, + return_hops=True, + prior_sources_behavior="exclude", + deduplicate_sources=True, + renumber=True, + retain_seeds=True, + compress_per_hop=False, + ) + + assert offsets.renumber_map_offsets.dropna().values_host.tolist() == [0, 4, 7] + assert renumber_map.renumber_map.values_host[[0, 1]].tolist() == [0, 6] + assert renumber_map.renumber_map.values_host[[4, 5]].tolist() == [1, 7] + + @pytest.mark.sg @pytest.mark.skip(reason="needs to be written!") def test_uniform_neighbor_sample_dcsr_dcsc_global(): diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd index b0e7ffaf82d..4da7c4328fd 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd @@ -292,6 +292,12 @@ cdef extern from "cugraph_c/algorithms.h": bool_t value, ) + cdef void \ + cugraph_sampling_set_retain_seeds( + cugraph_sampling_options_t* options, + bool_t value, + ) + cdef void \ cugraph_sampling_set_with_replacement( cugraph_sampling_options_t* options, diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd index c32b57f8621..dbd3ef4b7e1 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -50,6 +50,7 @@ cdef extern from "cugraph_c/sampling_algorithms.h": const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, diff --git a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx index b4145a80095..f002622f497 100644 --- a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx @@ -49,6 +49,7 @@ from pylibcugraph._cugraph_c.algorithms cimport ( cugraph_sampling_set_renumber_results, cugraph_sampling_set_compress_per_hop, cugraph_sampling_set_compression_type, + cugraph_sampling_set_retain_seeds, ) from pylibcugraph._cugraph_c.sampling_algorithms cimport ( cugraph_uniform_neighbor_sample, @@ -89,10 +90,12 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, batch_id_list=None, label_list=None, label_to_output_comm_rank=None, + label_offsets=None, prior_sources_behavior=None, deduplicate_sources=False, return_hops=False, renumber=False, + retain_seeds=False, compression='COO', compress_per_hop=False, random_state=None, @@ -143,6 +146,9 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, worker that should hold results for that batch id. Defaults to NULL (does nothing) + label_offsets: list[int] (Optional) + Offsets of each label within the start vertex list. + prior_sources_behavior: str (Optional) Options are "carryover", and "exclude". Default will leave the source list as-is. @@ -160,6 +166,11 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, per-batch basis and return the renumber map and batch offsets in additional to the standard returns. + retain_seeds: bool (Optional) + If True, will retain the original seeds (original source vertices) + in the output even if they do not have outgoing neighbors. + Defaults to False. + compression: str (Optional) Options: COO (default), CSR, CSC, DCSR, DCSR Sets the compression format for the returned samples. @@ -210,6 +221,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, assert_CAI_type(batch_id_list, "batch_id_list", True) assert_CAI_type(label_list, "label_list", True) assert_CAI_type(label_to_output_comm_rank, "label_to_output_comm_rank", True) + assert_CAI_type(label_offsets, "label_offsets", True) assert_AI_type(h_fan_out, "h_fan_out") cdef cugraph_sample_result_t* result_ptr @@ -234,6 +246,11 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cai_label_to_output_comm_rank_ptr = \ label_to_output_comm_rank.__cuda_array_interface__['data'][0] + cdef uintptr_t cai_label_offsets_ptr + if label_offsets is not None: + cai_label_offsets_ptr = \ + label_offsets.__cuda_array_interface__['data'][0] + cdef uintptr_t ai_fan_out_ptr = \ h_fan_out.__array_interface__["data"][0] @@ -270,6 +287,17 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, get_c_type_from_numpy_type(label_to_output_comm_rank.dtype) ) + cdef cugraph_type_erased_device_array_view_t* label_offsets_ptr = NULL + if retain_seeds: + if label_offsets is None: + raise ValueError("Must provide label offsets if retain_seeds is True") + label_offsets_ptr = \ + cugraph_type_erased_device_array_view_create( + cai_label_offsets_ptr, + len(label_offsets), + get_c_type_from_numpy_type(label_offsets.dtype) + ) + cdef cugraph_type_erased_host_array_view_t* fan_out_ptr = \ cugraph_type_erased_host_array_view_create( ai_fan_out_ptr, @@ -323,6 +351,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cugraph_sampling_set_renumber_results(sampling_options, c_renumber) cugraph_sampling_set_compression_type(sampling_options, compression_behavior_e) cugraph_sampling_set_compress_per_hop(sampling_options, c_compress_per_hop) + cugraph_sampling_set_retain_seeds(sampling_options, retain_seeds) error_code = cugraph_uniform_neighbor_sample( c_resource_handle_ptr, @@ -331,6 +360,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, batch_id_ptr, label_list_ptr, label_to_output_comm_rank_ptr, + label_offsets_ptr, fan_out_ptr, rng_state_ptr, sampling_options, @@ -347,6 +377,8 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cugraph_type_erased_host_array_view_free(fan_out_ptr) if batch_id_list is not None: cugraph_type_erased_device_array_view_free(batch_id_ptr) + if label_offsets is not None: + cugraph_type_erased_device_array_view_free(label_offsets_ptr) # Have the SamplingResult instance assume ownership of the result data. result = SamplingResult()