From 45371cbde5411a910130787337f9eccb7fd683c7 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Tue, 14 May 2024 10:18:44 -0400 Subject: [PATCH 1/3] [FEA] Support Seed Retention for Sampling with Renumbering (#4355) Exposes the ability to retain seeds even if they have no outgoing edges (and therefore are not sampled). Required to fix the current bug in cuGraph-PyG involving batch size and dropping seeds. Currently, this functionality can't be exposed through the MG Python API (#4358) but exposing it through the pylibcugraph API is sufficient to resolve this issue. This PR does expose it through the SG Python API. Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Seunghwa Kang (https://github.com/seunghwak) - Chuck Hastings (https://github.com/ChuckHastings) - Naim (https://github.com/naimnv) - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/4355 --- cpp/include/cugraph_c/sampling_algorithms.h | 12 ++++ cpp/src/c_api/uniform_neighbor_sampling.cpp | 60 ++++++++++++++----- cpp/tests/c_api/create_graph_test.c | 1 + .../c_api/mg_uniform_neighbor_sample_test.c | 4 ++ .../c_api/uniform_neighbor_sample_test.c | 2 + .../sampling/uniform_neighbor_sample.py | 15 ++++- .../sampling/test_uniform_neighbor_sample.py | 40 +++++++++++++ .../pylibcugraph/_cugraph_c/algorithms.pxd | 6 ++ .../_cugraph_c/sampling_algorithms.pxd | 3 +- .../pylibcugraph/uniform_neighbor_sample.pyx | 32 ++++++++++ 10 files changed, 158 insertions(+), 17 deletions(-) diff --git a/cpp/include/cugraph_c/sampling_algorithms.h b/cpp/include/cugraph_c/sampling_algorithms.h index 5760d2098aa..859eaca7f3b 100644 --- a/cpp/include/cugraph_c/sampling_algorithms.h +++ b/cpp/include/cugraph_c/sampling_algorithms.h @@ -236,6 +236,15 @@ typedef enum cugraph_compression_type_t { cugraph_error_code_t cugraph_sampling_options_create(cugraph_sampling_options_t** options, cugraph_error_t** error); +/** + * @ingroup samplingC + * @brief Set flag to retain seeds (original sources) + * + * @param options - opaque pointer to the sampling options + * @param value - Boolean value to assign to the option + */ +void cugraph_sampling_set_retain_seeds(cugraph_sampling_options_t* options, bool_t value); + /** * @ingroup samplingC * @brief Set flag to renumber results @@ -335,6 +344,8 @@ void cugraph_sampling_options_free(cugraph_sampling_options_t* options); * output. If specified then the all data from @p label_list[i] will be shuffled to rank @p. This * cannot be specified unless @p start_vertex_labels is also specified * label_to_comm_rank[i]. If not specified then the output data will not be shuffled between ranks. + * @param [in] label_offsets Device array of the offsets for each label in the seed list. This + * parameter is only used with the retain_seeds option. * @param [in] fanout Host array defining the fan out at each step in the sampling algorithm. * We only support fanout values of type INT32 * @param [in/out] rng_state State of the random number generator, updated with each call @@ -354,6 +365,7 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, diff --git a/cpp/src/c_api/uniform_neighbor_sampling.cpp b/cpp/src/c_api/uniform_neighbor_sampling.cpp index 100e81a5bd2..45609fc0e01 100644 --- a/cpp/src/c_api/uniform_neighbor_sampling.cpp +++ b/cpp/src/c_api/uniform_neighbor_sampling.cpp @@ -40,6 +40,7 @@ struct cugraph_sampling_options_t { bool_t renumber_results_{FALSE}; cugraph_compression_type_t compression_type_{cugraph_compression_type_t::COO}; bool_t compress_per_hop_{FALSE}; + bool_t retain_seeds_{FALSE}; }; struct cugraph_sample_result_t { @@ -68,6 +69,7 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct cugraph::c_api::cugraph_type_erased_device_array_view_t const* start_vertex_labels_{nullptr}; cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_list_{nullptr}; cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_to_comm_rank_{nullptr}; + cugraph::c_api::cugraph_type_erased_device_array_view_t const* label_offsets_{nullptr}; cugraph::c_api::cugraph_type_erased_host_array_view_t const* fan_out_{nullptr}; cugraph::c_api::cugraph_rng_state_t* rng_state_{nullptr}; cugraph::c_api::cugraph_sampling_options_t options_{}; @@ -81,6 +83,7 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct cugraph_type_erased_device_array_view_t const* start_vertex_labels, cugraph_type_erased_device_array_view_t const* label_list, cugraph_type_erased_device_array_view_t const* label_to_comm_rank, + cugraph_type_erased_device_array_view_t const* label_offsets, cugraph_type_erased_host_array_view_t const* fan_out, cugraph_rng_state_t* rng_state, cugraph::c_api::cugraph_sampling_options_t options, @@ -99,6 +102,9 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct label_to_comm_rank_( reinterpret_cast( label_to_comm_rank)), + label_offsets_( + reinterpret_cast( + label_offsets)), fan_out_( reinterpret_cast(fan_out)), rng_state_(reinterpret_cast(rng_state)), @@ -267,8 +273,13 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct std::move(edge_id), std::move(edge_type), std::move(hop), - std::nullopt, - std::nullopt, + options_.retain_seeds_ + ? std::make_optional(raft::device_span{ + start_vertices_->as_type(), start_vertices_->size_}) + : std::nullopt, + options_.retain_seeds_ ? std::make_optional(raft::device_span{ + label_offsets_->as_type(), label_offsets_->size_}) + : std::nullopt, offsets ? std::make_optional( raft::device_span{offsets->data(), offsets->size()}) : std::nullopt, @@ -304,8 +315,13 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct std::move(edge_id), std::move(edge_type), std::move(hop), - std::nullopt, - std::nullopt, + options_.retain_seeds_ + ? std::make_optional(raft::device_span{ + start_vertices_->as_type(), start_vertices_->size_}) + : std::nullopt, + options_.retain_seeds_ ? std::make_optional(raft::device_span{ + label_offsets_->as_type(), label_offsets_->size_}) + : std::nullopt, offsets ? std::make_optional( raft::device_span{offsets->data(), offsets->size()}) : std::nullopt, @@ -402,6 +418,12 @@ extern "C" cugraph_error_code_t cugraph_sampling_options_create( return CUGRAPH_SUCCESS; } +extern "C" void cugraph_sampling_set_retain_seeds(cugraph_sampling_options_t* options, bool_t value) +{ + auto internal_pointer = reinterpret_cast(options); + internal_pointer->retain_seeds_ = value; +} + extern "C" void cugraph_sampling_set_renumber_results(cugraph_sampling_options_t* options, bool_t value) { @@ -871,6 +893,7 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, @@ -878,6 +901,13 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( cugraph_sample_result_t** result, cugraph_error_t** error) { + auto options_cpp = *reinterpret_cast(options); + + CAPI_EXPECTS((!options_cpp.retain_seeds_) || (label_offsets != nullptr), + CUGRAPH_INVALID_INPUT, + "must specify label_offsets if retain_seeds is true", + *error); + CAPI_EXPECTS((start_vertex_labels == nullptr) || (reinterpret_cast( start_vertex_labels) @@ -911,16 +941,16 @@ cugraph_error_code_t cugraph_uniform_neighbor_sample( "fan_out should be of type int", *error); - uniform_neighbor_sampling_functor functor{ - handle, - graph, - start_vertices, - start_vertex_labels, - label_list, - label_to_comm_rank, - fan_out, - rng_state, - *reinterpret_cast(options), - do_expensive_check}; + uniform_neighbor_sampling_functor functor{handle, + graph, + start_vertices, + start_vertex_labels, + label_list, + label_to_comm_rank, + label_offsets, + fan_out, + rng_state, + std::move(options_cpp), + do_expensive_check}; return cugraph::c_api::run_algorithm(graph, functor, result, error); } diff --git a/cpp/tests/c_api/create_graph_test.c b/cpp/tests/c_api/create_graph_test.c index 758624a89e9..41b8691e79c 100644 --- a/cpp/tests/c_api/create_graph_test.c +++ b/cpp/tests/c_api/create_graph_test.c @@ -268,6 +268,7 @@ int test_create_sg_graph_csr() NULL, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c index a32056a9f15..3d8fb02ed46 100644 --- a/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/mg_uniform_neighbor_sample_test.c @@ -133,6 +133,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -565,6 +566,7 @@ int test_uniform_neighbor_from_alex(const cugraph_resource_handle_t* handle) d_label_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -841,6 +843,7 @@ int test_uniform_neighbor_sample_alex_bug(const cugraph_resource_handle_t* handl d_start_labels_view, d_label_list_view, d_label_to_output_comm_rank_view, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -1099,6 +1102,7 @@ int test_uniform_neighbor_sample_sort_by_hop(const cugraph_resource_handle_t* ha d_start_labels_view, d_label_list_view, d_label_to_output_comm_rank_view, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/cpp/tests/c_api/uniform_neighbor_sample_test.c b/cpp/tests/c_api/uniform_neighbor_sample_test.c index 94f0f788354..451dbca51a7 100644 --- a/cpp/tests/c_api/uniform_neighbor_sample_test.c +++ b/cpp/tests/c_api/uniform_neighbor_sample_test.c @@ -140,6 +140,7 @@ int generic_uniform_neighbor_sample_test(const cugraph_resource_handle_t* handle d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, @@ -661,6 +662,7 @@ int test_uniform_neighbor_sample_with_labels(const cugraph_resource_handle_t* ha d_start_labels_view, NULL, NULL, + NULL, h_fan_out_view, rng_state, sampling_options, diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 86b33594ed7..eafadfa4ff0 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -71,6 +71,8 @@ def uniform_neighbor_sample( prior_sources_behavior: str = None, deduplicate_sources: bool = False, renumber: bool = False, + retain_seeds: bool = False, + label_offsets: Sequence = None, use_legacy_names: bool = True, # deprecated compress_per_hop: bool = False, compression: str = "COO", @@ -142,6 +144,15 @@ def uniform_neighbor_sample( will return the renumber map and renumber map offsets as an additional dataframe. + retain_seeds: bool, optional (default=False) + If True, will retain the original seeds (original source vertices) + in the output even if they do not have outgoing neighbors. + + label_offsets: integer sequence, optional (default=None) + Offsets of each label within the start vertex list. + Only used if retain_seeds is True. Required if retain_seeds + is True. + use_legacy_names: bool, optional (default=True) Whether to use the legacy column names (sources, destinations). If True, will use "sources" and "destinations" as the column names. @@ -342,13 +353,15 @@ def uniform_neighbor_sample( else None, h_fan_out=fanout_vals, with_replacement=with_replacement, - do_expensive_check=False, + do_expensive_check=True, with_edge_properties=with_edge_properties, random_state=random_state, prior_sources_behavior=prior_sources_behavior, deduplicate_sources=deduplicate_sources, return_hops=return_hops, renumber=renumber, + retain_seeds=retain_seeds, + label_offsets=label_offsets, compression=compression, compress_per_hop=compress_per_hop, return_dict=True, diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py index 560b80993d9..304ead6fea9 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample.py @@ -963,6 +963,46 @@ def test_uniform_neighbor_sample_csr_csc_local(hops, seed): assert 1 == len(el[(el.src == majors.iloc[i]) & (el.dst == minors.iloc[i])]) +def test_uniform_neighbor_sample_retain_seeds(): + src = cupy.array([0, 1, 2, 3, 4, 5], dtype="int64") + dst = cupy.array([2, 3, 1, 7, 5, 6], dtype="int64") + + seeds = cupy.array([6, 0, 1, 7], dtype="int64") + batch = cupy.array([0, 0, 1, 1], dtype="int32") + batch_offsets = cupy.array([0, 2, 4], dtype="int64") + + fanout = [2, 2] + + df = cudf.DataFrame({"src": src, "dst": dst}) + + G = cugraph.MultiGraph(directed=True) + G.from_cudf_edgelist(df, source="src", destination="dst") + + batch_df = cudf.DataFrame({"seeds": seeds, "batch": batch}) + batch_offsets_s = cudf.Series(batch_offsets, name="batch_offsets") + results, offsets, renumber_map = cugraph.uniform_neighbor_sample( + G, + batch_df, + fanout, + with_replacement=False, + with_edge_properties=True, + with_batch_ids=True, + random_state=62, + return_offsets=True, + label_offsets=batch_offsets_s, + return_hops=True, + prior_sources_behavior="exclude", + deduplicate_sources=True, + renumber=True, + retain_seeds=True, + compress_per_hop=False, + ) + + assert offsets.renumber_map_offsets.dropna().values_host.tolist() == [0, 4, 7] + assert renumber_map.renumber_map.values_host[[0, 1]].tolist() == [0, 6] + assert renumber_map.renumber_map.values_host[[4, 5]].tolist() == [1, 7] + + @pytest.mark.sg @pytest.mark.skip(reason="needs to be written!") def test_uniform_neighbor_sample_dcsr_dcsc_global(): diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd index b0e7ffaf82d..4da7c4328fd 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/algorithms.pxd @@ -292,6 +292,12 @@ cdef extern from "cugraph_c/algorithms.h": bool_t value, ) + cdef void \ + cugraph_sampling_set_retain_seeds( + cugraph_sampling_options_t* options, + bool_t value, + ) + cdef void \ cugraph_sampling_set_with_replacement( cugraph_sampling_options_t* options, diff --git a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd index c32b57f8621..dbd3ef4b7e1 100644 --- a/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd +++ b/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -50,6 +50,7 @@ cdef extern from "cugraph_c/sampling_algorithms.h": const cugraph_type_erased_device_array_view_t* start_vertex_labels, const cugraph_type_erased_device_array_view_t* label_list, const cugraph_type_erased_device_array_view_t* label_to_comm_rank, + const cugraph_type_erased_device_array_view_t* label_offsets, const cugraph_type_erased_host_array_view_t* fan_out, cugraph_rng_state_t* rng_state, const cugraph_sampling_options_t* options, diff --git a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx index b4145a80095..f002622f497 100644 --- a/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx +++ b/python/pylibcugraph/pylibcugraph/uniform_neighbor_sample.pyx @@ -49,6 +49,7 @@ from pylibcugraph._cugraph_c.algorithms cimport ( cugraph_sampling_set_renumber_results, cugraph_sampling_set_compress_per_hop, cugraph_sampling_set_compression_type, + cugraph_sampling_set_retain_seeds, ) from pylibcugraph._cugraph_c.sampling_algorithms cimport ( cugraph_uniform_neighbor_sample, @@ -89,10 +90,12 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, batch_id_list=None, label_list=None, label_to_output_comm_rank=None, + label_offsets=None, prior_sources_behavior=None, deduplicate_sources=False, return_hops=False, renumber=False, + retain_seeds=False, compression='COO', compress_per_hop=False, random_state=None, @@ -143,6 +146,9 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, worker that should hold results for that batch id. Defaults to NULL (does nothing) + label_offsets: list[int] (Optional) + Offsets of each label within the start vertex list. + prior_sources_behavior: str (Optional) Options are "carryover", and "exclude". Default will leave the source list as-is. @@ -160,6 +166,11 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, per-batch basis and return the renumber map and batch offsets in additional to the standard returns. + retain_seeds: bool (Optional) + If True, will retain the original seeds (original source vertices) + in the output even if they do not have outgoing neighbors. + Defaults to False. + compression: str (Optional) Options: COO (default), CSR, CSC, DCSR, DCSR Sets the compression format for the returned samples. @@ -210,6 +221,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, assert_CAI_type(batch_id_list, "batch_id_list", True) assert_CAI_type(label_list, "label_list", True) assert_CAI_type(label_to_output_comm_rank, "label_to_output_comm_rank", True) + assert_CAI_type(label_offsets, "label_offsets", True) assert_AI_type(h_fan_out, "h_fan_out") cdef cugraph_sample_result_t* result_ptr @@ -234,6 +246,11 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cai_label_to_output_comm_rank_ptr = \ label_to_output_comm_rank.__cuda_array_interface__['data'][0] + cdef uintptr_t cai_label_offsets_ptr + if label_offsets is not None: + cai_label_offsets_ptr = \ + label_offsets.__cuda_array_interface__['data'][0] + cdef uintptr_t ai_fan_out_ptr = \ h_fan_out.__array_interface__["data"][0] @@ -270,6 +287,17 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, get_c_type_from_numpy_type(label_to_output_comm_rank.dtype) ) + cdef cugraph_type_erased_device_array_view_t* label_offsets_ptr = NULL + if retain_seeds: + if label_offsets is None: + raise ValueError("Must provide label offsets if retain_seeds is True") + label_offsets_ptr = \ + cugraph_type_erased_device_array_view_create( + cai_label_offsets_ptr, + len(label_offsets), + get_c_type_from_numpy_type(label_offsets.dtype) + ) + cdef cugraph_type_erased_host_array_view_t* fan_out_ptr = \ cugraph_type_erased_host_array_view_create( ai_fan_out_ptr, @@ -323,6 +351,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cugraph_sampling_set_renumber_results(sampling_options, c_renumber) cugraph_sampling_set_compression_type(sampling_options, compression_behavior_e) cugraph_sampling_set_compress_per_hop(sampling_options, c_compress_per_hop) + cugraph_sampling_set_retain_seeds(sampling_options, retain_seeds) error_code = cugraph_uniform_neighbor_sample( c_resource_handle_ptr, @@ -331,6 +360,7 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, batch_id_ptr, label_list_ptr, label_to_output_comm_rank_ptr, + label_offsets_ptr, fan_out_ptr, rng_state_ptr, sampling_options, @@ -347,6 +377,8 @@ def uniform_neighbor_sample(ResourceHandle resource_handle, cugraph_type_erased_host_array_view_free(fan_out_ptr) if batch_id_list is not None: cugraph_type_erased_device_array_view_free(batch_id_ptr) + if label_offsets is not None: + cugraph_type_erased_device_array_view_free(label_offsets_ptr) # Have the SamplingResult instance assume ownership of the result data. result = SamplingResult() From b9c0fc6d1c8c8a0c7bec01dc8377a7afc24cc425 Mon Sep 17 00:00:00 2001 From: Ralph Liu <137829296+nv-rliu@users.noreply.github.com> Date: Tue, 14 May 2024 16:12:46 -0400 Subject: [PATCH 2/3] Refactor Several MG Tests (#4244) Addresses #4187 This PR makes improvements to old MG testing conventions used in these testing directories: - centrality - comms - community - components - core - internals The goals of this PR is to improve readability and use of the MG tests. Changes are similar to #4197 > Instead of using nested fixtures. eg. `input_expected_output -> input_combo -> fixture_params` which can be confusing, the `@pytest.mark.parametrize` marker is used to iterate through combinations of parameters used for testing. The fixtures are also replaced by functions used to create SG and MG graphs. By using the Datasets API (which now supports MG Graphs thanks to @huiyuxie), the number of imports and `testing.utils` functions can be significantly reduced. Authors: - Ralph Liu (https://github.com/nv-rliu) Approvers: - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/4244 --- .../test_batch_betweenness_centrality_mg.py | 3 - ...st_batch_edge_betweenness_centrality_mg.py | 2 - .../centrality/test_betweenness_centrality.py | 21 ++- .../test_betweenness_centrality_mg.py | 6 - .../centrality/test_degree_centrality_mg.py | 5 - .../test_edge_betweenness_centrality_mg.py | 5 - .../test_eigenvector_centrality_mg.py | 8 - .../centrality/test_katz_centrality_mg.py | 12 -- .../cugraph/tests/comms/test_comms_mg.py | 92 ++++------- .../community/test_induced_subgraph_mg.py | 15 +- .../cugraph/tests/community/test_leiden_mg.py | 119 +++----------- .../tests/community/test_louvain_mg.py | 113 ++----------- .../tests/community/test_triangle_count_mg.py | 117 +++++-------- .../tests/components/test_connectivity_mg.py | 63 +++---- .../cugraph/tests/core/test_core_number_mg.py | 119 ++++---------- .../cugraph/tests/core/test_k_core_mg.py | 155 ++++++------------ .../tests/internals/test_renumber_mg.py | 96 +++++------ .../internals/test_replicate_edgelist_mg.py | 71 +++----- .../tests/internals/test_symmetrize_mg.py | 4 +- 19 files changed, 322 insertions(+), 704 deletions(-) diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py index 1c73ebb0216..9f0980d4199 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py @@ -93,6 +93,3 @@ def test_mg_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py index 4530dd3da86..4764c01f0fc 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py @@ -84,5 +84,3 @@ def test_mg_edge_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py index db34c68a054..ff8859a01b1 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION.: +# Copyright (c) 2020-2024, NVIDIA CORPORATION.: # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -111,11 +111,18 @@ def calc_betweenness_centrality( else: edge_attr = None - G = graph_file.get_graph( - download=True, - create_using=cugraph.Graph(directed=directed), - ignore_weights=not edgevals, - ) + G = None + if multi_gpu_batch: + G = graph_file.get_dask_graph( + create_using=cugraph.Graph(directed=directed), ignore_weights=not edgevals + ) + G.enable_batch() + else: + G = graph_file.get_graph( + download=True, + create_using=cugraph.Graph(directed=directed), + ignore_weights=not edgevals, + ) M = G.to_pandas_edgelist().rename( columns={"src": "0", "dst": "1", "wgt": edge_attr} @@ -130,8 +137,6 @@ def calc_betweenness_centrality( ) assert G is not None and Gnx is not None - if multi_gpu_batch: - G.enable_batch() calc_func = None if k is not None and seed is not None: diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py index c94c2dcaff6..35e199093ce 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py @@ -49,14 +49,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -96,7 +94,6 @@ def test_dask_mg_betweenness_centrality( benchmark, ): g = get_sg_graph(dataset, directed) - dataset.unload() dg = get_mg_graph(dataset, directed) random_state = subset_seed @@ -143,6 +140,3 @@ def test_dask_mg_betweenness_centrality( diff = cupy.isclose(mg_bc_results, sg_bc_results) assert diff.all() - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py index 68daff9238c..8606649c745 100644 --- a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py @@ -45,14 +45,12 @@ def setup_function(): def get_sg_graph(dataset, directed): - dataset.unload() G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) return G def get_mg_graph(dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( @@ -118,6 +116,3 @@ def test_dask_mg_degree(dask_client, dataset, directed): check_names=False, check_dtype=False, ) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 80acfe1c4ad..5b83a05e2a2 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -47,7 +47,6 @@ def setup_function(): def get_sg_graph(dataset, directed, edge_ids): - dataset.unload() df = dataset.get_edgelist() if edge_ids: if not directed: @@ -71,7 +70,6 @@ def get_sg_graph(dataset, directed, edge_ids): def get_mg_graph(dataset, directed, edge_ids, weight): - dataset.unload() ddf = dataset.get_dask_edgelist() if weight: @@ -178,6 +176,3 @@ def test_dask_mg_edge_betweenness_centrality( assert len(edge_bc_diffs1) == 0 assert len(edge_bc_diffs2) == 0 - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py index 8cd77fb5e24..3a840c82e95 100644 --- a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py @@ -52,7 +52,6 @@ def setup_function(): def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -89,15 +88,11 @@ def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): dataset = DATASETS[0] - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -110,6 +105,3 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.eigenvector_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py index ebbe5974814..5dcbd8173df 100644 --- a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py @@ -53,7 +53,6 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -95,16 +94,12 @@ def test_dask_mg_katz_centrality(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") @pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -136,14 +131,10 @@ def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): err = err + 1 assert err == 0 - # Clean-up stored dataset edge-lists - dataset.unload() - @pytest.mark.mg @pytest.mark.parametrize("dataset", DATASETS) def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): - dataset.unload() ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -156,6 +147,3 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): with pytest.warns(UserWarning, match=warning_msg): dcg.katz_centrality(dg) - - # Clean-up stored dataset edge-lists - dataset.unload() diff --git a/python/cugraph/cugraph/tests/comms/test_comms_mg.py b/python/cugraph/cugraph/tests/comms/test_comms_mg.py index 75462924c9d..d096eb7e5c2 100644 --- a/python/cugraph/cugraph/tests/comms/test_comms_mg.py +++ b/python/cugraph/cugraph/tests/comms/test_comms_mg.py @@ -16,10 +16,9 @@ import pytest import cugraph.dask as dcg -import cudf -import dask_cudf import cugraph -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import karate, dolphins + # ============================================================================= # Pytest Setup / Teardown - called for each test function @@ -30,12 +29,36 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate, dolphins] IS_DIRECTED = [True, False] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_pagerank_result(dataset, is_mg): + """Return the cugraph.pagerank result for an MG or SG graph""" + + if is_mg: + dg = dataset.get_dask_graph(store_transposed=True) + return dcg.pagerank(dg).compute() + else: + g = dataset.get_graph(store_transposed=True) + return cugraph.pagerank(g) + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.parametrize("directed", IS_DIRECTED) def test_dask_mg_pagerank(dask_client, directed): @@ -43,62 +66,17 @@ def test_dask_mg_pagerank(dask_client, directed): # Initialize and run pagerank on two distributed graphs # with same communicator - input_data_path1 = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() + input_data_path1 = karate.get_path() print(f"dataset1={input_data_path1}") - chunksize1 = dcg.get_chunksize(input_data_path1) + result_pr1 = get_pagerank_result(karate, is_mg=True) - input_data_path2 = (RAPIDS_DATASET_ROOT_DIR_PATH / "dolphins.csv").as_posix() + input_data_path2 = dolphins.get_path() print(f"dataset2={input_data_path2}") - chunksize2 = dcg.get_chunksize(input_data_path2) - - ddf1 = dask_cudf.read_csv( - input_data_path1, - blocksize=chunksize1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg1 = cugraph.Graph(directed=directed) - dg1.from_dask_cudf_edgelist(ddf1, "src", "dst") - - result_pr1 = dcg.pagerank(dg1).compute() - - ddf2 = dask_cudf.read_csv( - input_data_path2, - blocksize=chunksize2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg2 = cugraph.Graph(directed=directed) - dg2.from_dask_cudf_edgelist(ddf2, "src", "dst") - - result_pr2 = dcg.pagerank(dg2).compute() + result_pr2 = get_pagerank_result(dolphins, is_mg=True) # Calculate single GPU pagerank for verification of results - df1 = cudf.read_csv( - input_data_path1, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g1 = cugraph.Graph(directed=directed) - g1.from_cudf_edgelist(df1, "src", "dst") - expected_pr1 = cugraph.pagerank(g1) - - df2 = cudf.read_csv( - input_data_path2, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g2 = cugraph.Graph(directed=directed) - g2.from_cudf_edgelist(df2, "src", "dst") - expected_pr2 = cugraph.pagerank(g2) + expected_pr1 = get_pagerank_result(karate, is_mg=False) + expected_pr2 = get_pagerank_result(dolphins, is_mg=False) # Compare and verify pagerank results diff --git a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py index 45ec8eca0e8..311fd7a24bc 100644 --- a/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py +++ b/python/cugraph/cugraph/tests/community/test_induced_subgraph_mg.py @@ -17,7 +17,6 @@ import cugraph import cugraph.dask as dcg -import dask_cudf from cudf.testing.testing import assert_frame_equal from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.datasets import karate, dolphins, email_Eu_core @@ -36,11 +35,13 @@ def setup_function(): # Parameters # ============================================================================= + DATASETS = [karate, dolphins, email_Eu_core] IS_DIRECTED = [True, False] NUM_VERTICES = [2, 5, 10, 20] OFFSETS = [None] + # ============================================================================= # Helper functions # ============================================================================= @@ -53,15 +54,7 @@ def get_sg_graph(dataset, directed): def get_mg_graph(dataset, directed): - input_data_path = dataset.get_path() - blocksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=blocksize, - delimiter=dataset.metadata["delim"], - names=dataset.metadata["col_names"], - dtype=dataset.metadata["col_types"], - ) + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( ddf, @@ -108,7 +101,7 @@ def test_mg_induced_subgraph( # FIXME: This parameter is not yet tested # mg_offsets = mg_offsets.compute().reset_index(drop=True) - mg_df, mg_offsets = result_induced_subgraph + mg_df, _ = result_induced_subgraph if mg_df is not None and sg_induced_subgraph is not None: # FIXME: 'edges()' or 'view_edgelist()' takes half the edges out if diff --git a/python/cugraph/cugraph/tests/community/test_leiden_mg.py b/python/cugraph/cugraph/tests/community/test_leiden_mg.py index b1908ae10a2..2904ecd12a2 100644 --- a/python/cugraph/cugraph/tests/community/test_leiden_mg.py +++ b/python/cugraph/cugraph/tests/community/test_leiden_mg.py @@ -13,123 +13,56 @@ import pytest - -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils - +from cugraph.datasets import karate_asymmetric, karate, dolphins -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark +# ============================================================================= +# Parameters +# ============================================================================= - gpubenchmark = pytest_benchmark.plugin.benchmark - def setFixtureParamNames(*args, **kwargs): - pass +DATASETS = [karate, dolphins] +DATASETS_ASYMMETRIC = [karate_asymmetric] # ============================================================================= -# Parameters +# Helper Functions # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + return dg -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_leiden_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_leiden_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Leiden and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.leiden(daskGraphFromDataset) + parts, mod = dcg.leiden(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_leiden_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.leiden(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_leiden_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.leiden(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_louvain_mg.py b/python/cugraph/cugraph/tests/community/test_louvain_mg.py index 19fffe96b5c..0dff7f1c8b0 100644 --- a/python/cugraph/cugraph/tests/community/test_louvain_mg.py +++ b/python/cugraph/cugraph/tests/community/test_louvain_mg.py @@ -14,122 +14,41 @@ import pytest import cugraph.dask as dcg +from cugraph.datasets import karate_asymmetric, karate, dolphins -import cugraph -import dask_cudf -from cugraph.testing import utils - - -try: - from rapids_pytest_benchmark import setFixtureParamNames -except ImportError: - print( - "\n\nWARNING: rapids_pytest_benchmark is not installed, " - "falling back to pytest_benchmark fixtures.\n" - ) - - # if rapids_pytest_benchmark is not available, just perfrom time-only - # benchmarking and replace the util functions with nops - import pytest_benchmark - - gpubenchmark = pytest_benchmark.plugin.benchmark - - def setFixtureParamNames(*args, **kwargs): - pass +from test_leiden_mg import get_mg_graph # ============================================================================= # Parameters # ============================================================================= -DATASETS_ASYMMETRIC = [utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv"] - - -############################################################################### -# Fixtures -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.fixture( - scope="module", - params=DATASETS_ASYMMETRIC, - ids=[f"dataset={d.as_posix()}" for d in DATASETS_ASYMMETRIC], -) -def daskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates a directed Graph. - """ - # Since parameterized fixtures do not assign param names to param values, - # manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param - - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg -@pytest.fixture( - scope="module", - params=utils.DATASETS_UNDIRECTED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNDIRECTED], -) -def uddaskGraphFromDataset(request, dask_client): - """ - Returns a new dask dataframe created from the dataset file param. - This creates an undirected Graph. - """ - # Since parameterized fixtures do not assign param names to param - # values, manually call the helper to do so. - setFixtureParamNames(request, ["dataset"]) - dataset = request.param +DATASETS_ASYMMETRIC = DATASETS_ASYMMETRIC = [karate_asymmetric] +DATASETS = [karate, dolphins] - chunksize = dcg.get_chunksize(dataset) - ddf = dask_cudf.read_csv( - dataset, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist(ddf, "src", "dst", "value") - return dg - - -############################################################################### +# ============================================================================= # Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= # FIXME: Implement more robust tests + + @pytest.mark.mg -def test_mg_louvain_with_edgevals_directed_graph(daskGraphFromDataset): +@pytest.mark.parametrize("dataset", DATASETS_ASYMMETRIC) +def test_mg_louvain_with_edgevals_directed_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=True) # Directed graphs are not supported by Louvain and a ValueError should be # raised with pytest.raises(ValueError): - parts, mod = dcg.louvain(daskGraphFromDataset) + parts, mod = dcg.louvain(dg) -############################################################################### -# Tests -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -# FIXME: Implement more robust tests @pytest.mark.mg -def test_mg_louvain_with_edgevals_undirected_graph(uddaskGraphFromDataset): - parts, mod = dcg.louvain(uddaskGraphFromDataset) +@pytest.mark.parametrize("dataset", DATASETS) +def test_mg_louvain_with_edgevals_undirected_graph(dask_client, dataset): + dg = get_mg_graph(dataset, directed=False) + parts, mod = dcg.louvain(dg) # FIXME: either call Nx with the same dataset and compare results, or # hardcode golden results to compare to. diff --git a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py index 0a052845cf8..e2c47af8a1b 100644 --- a/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py +++ b/python/cugraph/cugraph/tests/community/test_triangle_count_mg.py @@ -16,115 +16,81 @@ import random import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "start_list"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "start_list", "edgevals"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the triangle - count algo. - """ - start_list = input_combo["start_list"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - if start_list: +DATASETS = [karate, dolphins] +START_LIST = [True, False] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed, start): + G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + if start: # sample k nodes from the cuGraph graph - k = random.randint(1, 10) - srcs = G.view_edge_list()[G.source_columns] - dsts = G.view_edge_list()[G.destination_columns] - nodes = cudf.concat([srcs, dsts]).drop_duplicates() - start_list = nodes.sample(k) + start = G.select_random_vertices(num_vertices=random.randint(1, 10)) else: - start_list = None + start = None - sg_triangle_results = cugraph.triangle_count(G, start_list) - sg_triangle_results = sg_triangle_results.sort_values("vertex").reset_index( - drop=True - ) + return G, start - input_combo["sg_triangle_results"] = sg_triangle_results - input_combo["start_list"] = start_list - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) +def get_mg_graph(dataset, directed): + ddf = dataset.get_dask_edgelist() + dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True + ddf, source="src", destination="dst", edge_attr="wgt", renumber=True ) - input_combo["MGGraph"] = dg - - return input_combo + return dg # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_triangles(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_sg_triangles(dask_client, dataset, start, benchmark): # This test is only for benchmark purposes. sg_triangle_results = None - G = input_expected_output["SGGraph"] - start_list = input_expected_output["start_list"] - sg_triangle_results = benchmark(cugraph.triangle_count, G, start_list) + G, start = get_sg_graph(dataset, False, start) + + sg_triangle_results = benchmark(cugraph.triangle_count, G, start) + sg_triangle_results.sort_values("vertex").reset_index(drop=True) assert sg_triangle_results is not None @pytest.mark.mg -def test_triangles(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - start_list = input_expected_output["start_list"] - - result_counts = benchmark(dcg.triangle_count, dg, start_list) +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("start", START_LIST) +def test_triangles(dask_client, dataset, start, benchmark): + G, start = get_sg_graph(dataset, False, start) + dg = get_mg_graph(dataset, False) + result_counts = benchmark(dcg.triangle_count, dg, start) result_counts = ( result_counts.drop_duplicates() .compute() @@ -132,8 +98,9 @@ def test_triangles(dask_client, benchmark, input_expected_output): .reset_index(drop=True) .rename(columns={"counts": "mg_counts"}) ) - - expected_output = input_expected_output["sg_triangle_results"] + expected_output = ( + cugraph.triangle_count(G, start).sort_values("vertex").reset_index(drop=True) + ) # Update the mg triangle count with sg triangle count results # for easy comparison using cuDF DataFrame methods. diff --git a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py index 26e8ed17bcb..4ab251c0e29 100644 --- a/python/cugraph/cugraph/tests/components/test_connectivity_mg.py +++ b/python/cugraph/cugraph/tests/components/test_connectivity_mg.py @@ -15,11 +15,9 @@ import pytest -import cudf -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import netscience # ============================================================================= @@ -31,42 +29,47 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [netscience] # Directed graph is not currently supported IS_DIRECTED = [False, True] -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) +# ============================================================================= +# Helper +# ============================================================================= + + +def get_mg_graph(dataset, directed): + """Returns an MG graph""" + ddf = dataset.get_dask_edgelist() + + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist(ddf, "src", "dst", "wgt") + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_wcc(dask_client, directed): - - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "netscience.csv").as_posix() +def test_dask_mg_wcc(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst", renumber=True) - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + dg = get_mg_graph(dataset, directed) + # breakpoint() if not directed: expected_dist = cugraph.weakly_connected_components(g) result_dist = dcg.weakly_connected_components(dg) diff --git a/python/cugraph/cugraph/tests/core/test_core_number_mg.py b/python/cugraph/cugraph/tests/core/test_core_number_mg.py index f771ce513eb..3d9a7bef5be 100644 --- a/python/cugraph/cugraph/tests/core/test_core_number_mg.py +++ b/python/cugraph/cugraph/tests/core/test_core_number_mg.py @@ -15,107 +15,64 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing.utils import gen_fixture_params_product +from cugraph.datasets import karate, dolphins, karate_asymmetric # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED -degree_type = ["incoming", "outgoing", "bidirectional"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (degree_type, "degree_type"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) - input_combo["SGGraph"] = G - sg_core_number_results = cugraph.core_number(G, degree_type) - sg_core_number_results = sg_core_number_results.sort_values("vertex").reset_index( - drop=True - ) +DATASETS = [karate, dolphins] +DEGREE_TYPE = ["incoming", "outgoing", "bidirectional"] - input_combo["sg_core_number_results"] = sg_core_number_results - input_combo["degree_type"] = degree_type - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, source="src", destination="dst", edge_attr="value", renumber=True - ) +# ============================================================================= +# Helper Functions +# ============================================================================= - input_combo["MGGraph"] = dg - return input_combo +def get_sg_results(dataset, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + res = cugraph.core_number(G, degree_type) + res = res.sort_values("vertex").reset_index(drop=True) + return res # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_core_number(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_core_number(dask_client, dataset, degree_type, benchmark): # This test is only for benchmark purposes. sg_core_number_results = None - G = input_expected_output["SGGraph"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) sg_core_number_results = benchmark(cugraph.core_number, G, degree_type) assert sg_core_number_results is not None @pytest.mark.mg -def test_core_number(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - degree_type = input_expected_output["degree_type"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_core_number(dask_client, dataset, degree_type, benchmark): + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) result_core_number = benchmark(dcg.core_number, dg, degree_type) - result_core_number = ( result_core_number.drop_duplicates() .compute() @@ -124,7 +81,7 @@ def test_core_number(dask_client, benchmark, input_expected_output): .rename(columns={"core_number": "mg_core_number"}) ) - expected_output = input_expected_output["sg_core_number_results"] + expected_output = get_sg_results(dataset, degree_type) # Update the mg core number with sg core number results # for easy comparison using cuDF DataFrame methods. @@ -135,30 +92,10 @@ def test_core_number(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_core_number_invalid_input(input_expected_output): - input_data_path = ( - utils.RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv" - ).as_posix() - - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) +def test_core_number_invalid_input(): + dg = karate_asymmetric.get_graph(create_using=cugraph.Graph(directed=True)) invalid_degree_type = 3 - dg = input_expected_output["MGGraph"] + with pytest.raises(ValueError): dcg.core_number(dg, invalid_degree_type) diff --git a/python/cugraph/cugraph/tests/core/test_k_core_mg.py b/python/cugraph/cugraph/tests/core/test_k_core_mg.py index b2ac18cf3a9..c7ad6d2d41d 100644 --- a/python/cugraph/cugraph/tests/core/test_k_core_mg.py +++ b/python/cugraph/cugraph/tests/core/test_k_core_mg.py @@ -1,4 +1,5 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2024, NVIDIA CORPORATION. + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,58 +16,39 @@ import pytest -import dask_cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils +from cugraph.datasets import karate, dolphins from cudf.testing.testing import assert_frame_equal from cugraph.structure.symmetrize import symmetrize_df -from pylibcugraph.testing import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED - -core_number = [True, False] -degree_type = ["bidirectional", "outgoing", "incoming"] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), (core_number, "core_number"), (degree_type, "degree_type") -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict(zip(("graph_file", "core_number", "degree_type"), request.param)) - - return parameters - - -@pytest.fixture(scope="module") -def input_expected_output(dask_client, input_combo): - """ - This fixture returns the inputs and expected results from the Core number - algo. - """ - core_number = input_combo["core_number"] - degree_type = input_combo["degree_type"] - input_data_path = input_combo["graph_file"] - G = utils.generate_cugraph_graph_from_file( - input_data_path, directed=False, edgevals=True - ) + + +DATASETS = [karate, dolphins] +CORE_NUMBER = [True, False] +DEGREE_TYPE = ["bidirectional", "outgoing", "incoming"] + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_results(dataset, core_number, degree_type): + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) if core_number: # compute the core_number @@ -74,62 +56,41 @@ def input_expected_output(dask_client, input_combo): else: core_number = None - input_combo["core_number"] = core_number - - input_combo["SGGraph"] = G - sg_k_core_graph = cugraph.k_core( G, core_number=core_number, degree_type=degree_type ) - sg_k_core_results = sg_k_core_graph.view_edge_list() + res = sg_k_core_graph.view_edge_list() # FIXME: The result will come asymetric. Symmetrize the results srcCol = sg_k_core_graph.source_columns dstCol = sg_k_core_graph.destination_columns wgtCol = sg_k_core_graph.weight_column - sg_k_core_results = ( - symmetrize_df(sg_k_core_results, srcCol, dstCol, wgtCol) + res = ( + symmetrize_df(res, srcCol, dstCol, wgtCol) .sort_values([srcCol, dstCol]) .reset_index(drop=True) ) - input_combo["sg_k_core_results"] = sg_k_core_results - - # Creating an edgelist from a dask cudf dataframe - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - # FIXME: False when renumbering (C++ and python renumbering) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - ) - - input_combo["MGGraph"] = dg - - return input_combo + return res, core_number # ============================================================================= # Tests # ============================================================================= + + @pytest.mark.mg -def test_sg_k_core(dask_client, benchmark, input_expected_output): +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_sg_k_core(dask_client, dataset, core_number, degree_type, benchmark): # This test is only for benchmark purposes. sg_k_core = None - G = input_expected_output["SGGraph"] - core_number = input_expected_output["core_number"] - degree_type = input_expected_output["degree_type"] - + G = dataset.get_graph(create_using=cugraph.Graph(directed=False)) + if core_number: + # compute the core_number + core_number = cugraph.core_number(G, degree_type=degree_type) + else: + core_number = None sg_k_core = benchmark( cugraph.k_core, G, core_number=core_number, degree_type=degree_type ) @@ -137,15 +98,16 @@ def test_sg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg -def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - core_number = input_expected_output["core_number"] +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("core_number", CORE_NUMBER) +@pytest.mark.parametrize("degree_type", DEGREE_TYPE) +def test_dask_mg_k_core(dask_client, dataset, core_number, degree_type, benchmark): + expected_k_core_results, core_number = get_sg_results( + dataset, core_number, degree_type + ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) k_core_results = benchmark(dcg.k_core, dg, core_number=core_number) - - expected_k_core_results = input_expected_output["sg_k_core_results"] - k_core_results = ( k_core_results.compute() .sort_values(["src", "dst"]) @@ -160,36 +122,13 @@ def test_dask_mg_k_core(dask_client, benchmark, input_expected_output): @pytest.mark.mg def test_dask_mg_k_core_invalid_input(dask_client): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset = DATASETS[0] + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=True)) - dg = cugraph.Graph(directed=True) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) with pytest.raises(ValueError): dcg.k_core(dg) - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=False)) degree_type = "invalid" with pytest.raises(ValueError): diff --git a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py index 45a3c46309d..64917d0c747 100644 --- a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py @@ -24,33 +24,61 @@ import dask_cudf import cugraph.dask as dcg import cugraph +from cugraph.datasets import karate, karate_disjoint from cugraph.testing import utils from cugraph.structure.number_map import NumberMap from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH from cudf.testing import assert_frame_equal, assert_series_equal # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate] +DATASETS_UNRENUMBERED = [karate_disjoint] IS_DIRECTED = [True, False] +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def get_sg_graph(dataset, directed): + dataset.unload() + g = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + + return g + + +def get_mg_graph(dataset, directed): + dataset.unload() + dg = dataset.get_dask_graph(create_using=cugraph.Graph(directed=directed)) + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber(graph_file, dask_client): - - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -96,13 +124,9 @@ def test_mg_renumber(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): - M = utils.read_csv_for_nx(graph_file) +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_add_internal_vertex_id(dataset, dask_client): + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) @@ -131,33 +155,13 @@ def test_mg_renumber_add_internal_vertex_id(graph_file, dask_client): @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_pagerank(dask_client, directed): +def test_dask_mg_pagerank(dask_client, dataset, directed): pandas.set_option("display.max_rows", 10000) - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - blocksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst") - - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + g = get_sg_graph(dataset, directed) + dg = get_mg_graph(dataset, directed) expected_pr = cugraph.pagerank(g) result_pr = dcg.pagerank(dg).compute() @@ -178,20 +182,18 @@ def test_dask_mg_pagerank(dask_client, directed): print("Mismatches:", err) assert err == 0 + dataset.unload() + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", - utils.DATASETS_UNRENUMBERED, - ids=[f"dataset={d.as_posix()}" for d in utils.DATASETS_UNRENUMBERED], -) -def test_mg_renumber_common_col_names(graph_file, dask_client): +@pytest.mark.parametrize("dataset", DATASETS_UNRENUMBERED) +def test_mg_renumber_common_col_names(dataset, dask_client): """ Ensure that commonly-used column names in the input do not conflict with names used internally by NumberMap. """ - M = utils.read_csv_for_nx(graph_file) + M = utils.read_csv_for_nx(dataset.get_path()) sources = cudf.Series(M["0"]) destinations = cudf.Series(M["1"]) diff --git a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py index 3bdb5c079ef..09936e954e8 100644 --- a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,73 +17,54 @@ import dask_cudf import numpy as np -from cugraph.testing import UNDIRECTED_DATASETS, karate_disjoint - +from cugraph.datasets import karate, dolphins, karate_disjoint from cugraph.structure.replicate_edgelist import replicate_edgelist from cudf.testing.testing import assert_frame_equal -from pylibcugraph.testing.utils import gen_fixture_params_product # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + edgeWeightCol = "weights" edgeIdCol = "edge_id" edgeTypeCol = "edge_type" srcCol = "src" dstCol = "dst" - -input_data = UNDIRECTED_DATASETS + [karate_disjoint] -datasets = [pytest.param(d) for d in input_data] - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([True, False], "distributed"), - ([True, False], "use_weights"), - ([True, False], "use_edge_ids"), - ([True, False], "use_edge_type_ids"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - return dict( - zip( - ( - "graph_file", - "use_weights", - "use_edge_ids", - "use_edge_type_ids", - "distributed", - ), - request.param, - ) - ) +DATASETS = [karate, dolphins, karate_disjoint] +IS_DISTRIBUTED = [True, False] +USE_WEIGHTS = [True, False] +USE_EDGE_IDS = [True, False] +USE_EDGE_TYPE_IDS = [True, False] # ============================================================================= # Tests # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) -@pytest.mark.mg -def test_mg_replicate_edgelist(dask_client, input_combo): - df = input_combo["graph_file"].get_edgelist() - distributed = input_combo["distributed"] - use_weights = input_combo["use_weights"] - use_edge_ids = input_combo["use_edge_ids"] - use_edge_type_ids = input_combo["use_edge_type_ids"] + +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("distributed", IS_DISTRIBUTED) +@pytest.mark.parametrize("use_weights", USE_WEIGHTS) +@pytest.mark.parametrize("use_edge_ids", USE_EDGE_IDS) +@pytest.mark.parametrize("use_edge_type_ids", USE_EDGE_TYPE_IDS) +def test_mg_replicate_edgelist( + dask_client, dataset, distributed, use_weights, use_edge_ids, use_edge_type_ids +): + dataset.unload() + df = dataset.get_edgelist() columns = [srcCol, dstCol] weight = None diff --git a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py index 05cc06e6282..913443fe400 100644 --- a/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_symmetrize_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -25,6 +25,8 @@ # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() From a77840eac34e529bf4a76e6fa3907fea5fef15c0 Mon Sep 17 00:00:00 2001 From: Chuck Hastings <45364586+ChuckHastings@users.noreply.github.com> Date: Wed, 15 May 2024 00:33:25 -0400 Subject: [PATCH 3/3] BFS fix for exec_policy (#4417) Shouldn't be constructing rmm::exec_policy from another rmm::exec_policy. Recent raft change causes this to not compile, but we shouldn't have been doing this in the first place. Authors: - Chuck Hastings (https://github.com/ChuckHastings) Approvers: - Seunghwa Kang (https://github.com/seunghwak) - Naim (https://github.com/naimnv) URL: https://github.com/rapidsai/cugraph/pull/4417 --- cpp/src/traversal/bfs_impl.cuh | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index 1f6f29d8683..fb837484a14 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -31,8 +31,6 @@ #include -#include - #include #include #include @@ -149,11 +147,11 @@ void bfs(raft::handle_t const& handle, auto constexpr invalid_distance = std::numeric_limits::max(); auto constexpr invalid_vertex = invalid_vertex_id::value; - thrust::fill(rmm::exec_policy(handle.get_thrust_policy()), + thrust::fill(handle.get_thrust_policy(), distances, distances + push_graph_view.local_vertex_partition_range_size(), invalid_distance); - thrust::fill(rmm::exec_policy(handle.get_thrust_policy()), + thrust::fill(handle.get_thrust_policy(), predecessor_first, predecessor_first + push_graph_view.local_vertex_partition_range_size(), invalid_vertex); @@ -161,7 +159,7 @@ void bfs(raft::handle_t const& handle, push_graph_view.local_vertex_partition_view()); if (n_sources) { thrust::for_each( - rmm::exec_policy(handle.get_thrust_policy()), + handle.get_thrust_policy(), sources, sources + n_sources, [vertex_partition, distances, predecessor_first] __device__(auto v) {