From 6b795336797817a096bd02cec99f473b314fee05 Mon Sep 17 00:00:00 2001 From: jnke2016 Date: Mon, 25 Sep 2023 21:47:16 -0700 Subject: [PATCH] add support for weighted mg jaccard , update tests and remove experimental calls --- .../cugraph/dask/link_prediction/jaccard.py | 205 +----------------- .../cugraph/dask/link_prediction/overlap.py | 10 +- .../cugraph/dask/link_prediction/sorensen.py | 10 +- .../tests/link_prediction/test_jaccard_mg.py | 54 +---- .../tests/link_prediction/test_overlap_mg.py | 54 +---- .../tests/link_prediction/test_sorensen_mg.py | 54 +---- 6 files changed, 43 insertions(+), 344 deletions(-) diff --git a/python/cugraph/cugraph/dask/link_prediction/jaccard.py b/python/cugraph/cugraph/dask/link_prediction/jaccard.py index 218e6206fc3..32f679f57e0 100644 --- a/python/cugraph/cugraph/dask/link_prediction/jaccard.py +++ b/python/cugraph/cugraph/dask/link_prediction/jaccard.py @@ -1,204 +1 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from dask.distributed import wait, default_client -import cugraph.dask.comms.comms as Comms -import dask_cudf -import cudf -from cugraph.dask.common.input_utils import get_distributed_data -from cugraph.utilities import renumber_vertex_pair - -from pylibcugraph import ( - jaccard_coefficients as pylibcugraph_jaccard_coefficients, -) -from pylibcugraph import ResourceHandle - - -def convert_to_cudf(cp_arrays): - """ - Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper - """ - - cupy_first, cupy_second, cupy_similarity = cp_arrays - - df = cudf.DataFrame() - df["first"] = cupy_first - df["second"] = cupy_second - df["jaccard_coeff"] = cupy_similarity - - return df - - -def _call_plc_jaccard( - sID, mg_graph_x, vertex_pair, use_weight, do_expensive_check, vertex_pair_col_name -): - - first = vertex_pair[vertex_pair_col_name[0]] - second = vertex_pair[vertex_pair_col_name[1]] - - return pylibcugraph_jaccard_coefficients( - resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), - graph=mg_graph_x, - first=first, - second=second, - use_weight=use_weight, - do_expensive_check=do_expensive_check, - ) - - -def jaccard(input_graph, vertex_pair=None, use_weight=False): - """ - Compute the Jaccard similarity between each pair of vertices connected by - an edge, or between arbitrary pairs of vertices specified by the user. - Jaccard similarity is defined between two sets as the ratio of the volume - of their intersection divided by the volume of their union. In the context - of graphs, the neighborhood of a vertex is seen as a set. The Jaccard - similarity weight of each edge represents the strength of connection - between vertices based on the relative similarity of their neighbors. If - first is specified but second is not, or vice versa, an exception will be - thrown. - - NOTE: If the vertex_pair parameter is not specified then the behavior - of cugraph.jaccard is different from the behavior of - networkx.jaccard_coefficient. - - cugraph.dask.jaccard, in the absence of a specified vertex pair list, will - compute the two_hop_neighbors of the entire graph to construct a vertex pair - list and will return the jaccard coefficient for those vertex pairs. This is - not advisable as the vertex_pairs can grow exponentially with respect to the - size of the datasets - - networkx.jaccard_coefficient, in the absence of a specified vertex - pair list, will return an upper triangular dense matrix, excluding - the diagonal as well as vertex pairs that are directly connected - by an edge in the graph, of jaccard coefficients. Technically, networkx - returns a lazy iterator across this upper triangular matrix where - the actual jaccard coefficient is computed when the iterator is - dereferenced. Computing a dense matrix of results is not feasible - if the number of vertices in the graph is large (100,000 vertices - would result in 4.9 billion values in that iterator). - - If your graph is small enough (or you have enough memory and patience) - you can get the interesting (non-zero) values that are part of the networkx - solution by doing the following: - - But please remember that cugraph will fill the dataframe with the entire - solution you request, so you'll need enough memory to store the 2-hop - neighborhood dataframe. - - - Parameters - ---------- - input_graph : cugraph.Graph - cuGraph Graph instance, should contain the connectivity information - as an edge list (edge weights are not supported yet for this algorithm). The - graph should be undirected where an undirected edge is represented by a - directed edge in both direction. The adjacency list will be computed if - not already present. - - This implementation only supports undirected, unweighted Graph. - - vertex_pair : cudf.DataFrame, optional (default=None) - A GPU dataframe consisting of two columns representing pairs of - vertices. If provided, the jaccard coefficient is computed for the - given vertex pairs. If the vertex_pair is not provided then the - current implementation computes the jaccard coefficient for all - adjacent vertices in the graph. - - use_weight : bool, optional (default=False) - Currently not supported - - Returns - ------- - result : dask_cudf.DataFrame - GPU distributed data frame containing 2 dask_cudf.Series - - ddf['first']: dask_cudf.Series - The first vertex ID of each pair (will be identical to first if specified). - ddf['second']: dask_cudf.Series - The second vertex ID of each pair (will be identical to second if - specified). - ddf['jaccard_coeff']: dask_cudf.Series - The computed jaccard coefficient between the first and the second - vertex ID. - """ - - if input_graph.is_directed(): - raise ValueError("input graph must be undirected") - - if vertex_pair is None: - # Call two_hop neighbor of the entire graph - vertex_pair = input_graph.get_two_hop_neighbors() - - vertex_pair_col_name = vertex_pair.columns - - if use_weight: - raise ValueError("'use_weight' is currently not supported.") - - if input_graph.is_weighted(): - raise ValueError("Weighted graphs are currently not supported.") - - if isinstance(vertex_pair, (dask_cudf.DataFrame, cudf.DataFrame)): - vertex_pair = renumber_vertex_pair(input_graph, vertex_pair) - - elif vertex_pair is not None: - raise ValueError("vertex_pair must be a dask_cudf or cudf dataframe") - - if not isinstance(vertex_pair, (dask_cudf.DataFrame)): - vertex_pair = dask_cudf.from_cudf( - vertex_pair, npartitions=len(Comms.get_workers()) - ) - vertex_pair = get_distributed_data(vertex_pair) - wait(vertex_pair) - vertex_pair = vertex_pair.worker_to_parts - - # Initialize dask client - client = default_client() - - do_expensive_check = False - - if vertex_pair is not None: - result = [ - client.submit( - _call_plc_jaccard, - Comms.get_session_id(), - input_graph._plc_graph[w], - vertex_pair[w][0], - use_weight, - do_expensive_check, - vertex_pair_col_name, - workers=[w], - allow_other_workers=False, - ) - for w in Comms.get_workers() - ] - - wait(result) - - cudf_result = [client.submit(convert_to_cudf, cp_arrays) for cp_arrays in result] - - wait(cudf_result) - - ddf = dask_cudf.from_delayed(cudf_result).persist() - wait(ddf) - - # Wait until the inactive futures are released - wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)]) - - if input_graph.renumbered: - ddf = input_graph.unrenumber(ddf, "first") - ddf = input_graph.unrenumber(ddf, "second") - - return ddf +sore \ No newline at end of file diff --git a/python/cugraph/cugraph/dask/link_prediction/overlap.py b/python/cugraph/cugraph/dask/link_prediction/overlap.py index 5540be28fd1..4bda05e3c95 100644 --- a/python/cugraph/cugraph/dask/link_prediction/overlap.py +++ b/python/cugraph/cugraph/dask/link_prediction/overlap.py @@ -96,7 +96,9 @@ def overlap(input_graph, vertex_pair=None, use_weight=False): adjacent vertices in the graph. use_weight : bool, optional (default=False) - Currently not supported + Flag to indicate whether to compute weighted overlap (if use_weight==True) + or un-weighted overlap (if use_weight==False). + 'input_graph' must be weighted if 'use_weight=True'. Returns ------- @@ -122,12 +124,6 @@ def overlap(input_graph, vertex_pair=None, use_weight=False): vertex_pair_col_name = vertex_pair.columns - if use_weight: - raise ValueError("'use_weight' is currently not supported.") - - if input_graph.is_weighted(): - raise ValueError("Weighted graphs are currently not supported.") - if isinstance(vertex_pair, (dask_cudf.DataFrame, cudf.DataFrame)): vertex_pair = renumber_vertex_pair(input_graph, vertex_pair) diff --git a/python/cugraph/cugraph/dask/link_prediction/sorensen.py b/python/cugraph/cugraph/dask/link_prediction/sorensen.py index 24295ac330c..163b0d0dc16 100644 --- a/python/cugraph/cugraph/dask/link_prediction/sorensen.py +++ b/python/cugraph/cugraph/dask/link_prediction/sorensen.py @@ -92,7 +92,9 @@ def sorensen(input_graph, vertex_pair=None, use_weight=False): adjacent vertices in the graph. use_weight : bool, optional (default=False) - Currently not supported + Flag to indicate whether to compute weighted sorensen (if use_weight==True) + or un-weighted sorensen (if use_weight==False). + 'input_graph' must be weighted if 'use_weight=True'. Returns ------- @@ -118,12 +120,6 @@ def sorensen(input_graph, vertex_pair=None, use_weight=False): vertex_pair_col_name = vertex_pair.columns - if use_weight: - raise ValueError("'use_weight' is currently not supported.") - - if input_graph.is_weighted(): - raise ValueError("Weighted graphs are currently not supported.") - if isinstance(vertex_pair, (dask_cudf.DataFrame, cudf.DataFrame)): vertex_pair = renumber_vertex_pair(input_graph, vertex_pair) diff --git a/python/cugraph/cugraph/tests/link_prediction/test_jaccard_mg.py b/python/cugraph/cugraph/tests/link_prediction/test_jaccard_mg.py index b56a6baae2b..3202bf0a065 100644 --- a/python/cugraph/cugraph/tests/link_prediction/test_jaccard_mg.py +++ b/python/cugraph/cugraph/tests/link_prediction/test_jaccard_mg.py @@ -34,6 +34,7 @@ def setup_function(): IS_DIRECTED = [False] HAS_VERTEX_PAIR = [True, False] +IS_WEIGHTED = [True, False] # ============================================================================= @@ -48,6 +49,7 @@ def setup_function(): (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), + (IS_WEIGHTED, "is_weighted"), ) @@ -57,7 +59,8 @@ def input_combo(request): Simply return the current combination of params as a dictionary for use in tests or other parameterized fixtures. """ - parameters = dict(zip(("graph_file", "directed", "has_vertex_pair"), request.param)) + parameters = dict(zip( + ("graph_file", "directed", "has_vertex_pair", "is_weighted"), request.param)) return parameters @@ -72,7 +75,9 @@ def input_expected_output(input_combo): input_data_path = input_combo["graph_file"] directed = input_combo["directed"] has_vertex_pair = input_combo["has_vertex_pair"] - G = utils.generate_cugraph_graph_from_file(input_data_path, directed=directed) + is_weighted = input_combo["is_weighted"] + G = utils.generate_cugraph_graph_from_file( + input_data_path, directed=directed, edgevals=is_weighted) if has_vertex_pair: # Sample random vertices from the graph and compute the two_hop_neighbors # with those seeds @@ -84,7 +89,7 @@ def input_expected_output(input_combo): vertex_pair = None input_combo["vertex_pair"] = vertex_pair - sg_cugraph_jaccard = cugraph.experimental.jaccard(G, input_combo["vertex_pair"]) + sg_cugraph_jaccard = cugraph.jaccard(G, input_combo["vertex_pair"], use_weight=is_weighted) # Save the results back to the input_combo dictionary to prevent redundant # cuGraph runs. Other tests using the input_combo fixture will look for # them, and if not present they will have to re-run the same cuGraph call. @@ -104,6 +109,7 @@ def input_expected_output(input_combo): ddf, source="src", destination="dst", + edge_attr="value" if is_weighted else None, renumber=True, store_transposed=True, ) @@ -122,8 +128,10 @@ def input_expected_output(input_combo): def test_dask_mg_jaccard(dask_client, benchmark, input_expected_output): dg = input_expected_output["MGGraph"] + use_weight = input_expected_output["is_weighted"] - result_jaccard = benchmark(dcg.jaccard, dg, input_expected_output["vertex_pair"]) + result_jaccard = benchmark( + dcg.jaccard, dg, input_expected_output["vertex_pair"], use_weight=use_weight) result_jaccard = ( result_jaccard.compute() @@ -151,41 +159,3 @@ def test_dask_mg_jaccard(dask_client, benchmark, input_expected_output): assert len(jaccard_coeff_diffs1) == 0 assert len(jaccard_coeff_diffs2) == 0 - - -@pytest.mark.mg -def test_dask_mg_weighted_jaccard(dask_client): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) - with pytest.raises(ValueError): - dcg.jaccard(dg) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) - - use_weight = True - with pytest.raises(ValueError): - dcg.jaccard(dg, use_weight=use_weight) diff --git a/python/cugraph/cugraph/tests/link_prediction/test_overlap_mg.py b/python/cugraph/cugraph/tests/link_prediction/test_overlap_mg.py index ce4bf619f47..5763b0f0e4f 100644 --- a/python/cugraph/cugraph/tests/link_prediction/test_overlap_mg.py +++ b/python/cugraph/cugraph/tests/link_prediction/test_overlap_mg.py @@ -34,6 +34,7 @@ def setup_function(): IS_DIRECTED = [False] HAS_VERTEX_PAIR = [True, False] +IS_WEIGHTED = [True, False] # ============================================================================= @@ -48,6 +49,7 @@ def setup_function(): (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), + (IS_WEIGHTED, "is_weighted"), ) @@ -57,7 +59,7 @@ def input_combo(request): Simply return the current combination of params as a dictionary for use in tests or other parameterized fixtures. """ - parameters = dict(zip(("graph_file", "directed", "has_vertex_pair"), request.param)) + parameters = dict(zip(("graph_file", "directed", "has_vertex_pair", "is_weighted"), request.param)) return parameters @@ -72,7 +74,9 @@ def input_expected_output(input_combo): input_data_path = input_combo["graph_file"] directed = input_combo["directed"] has_vertex_pair = input_combo["has_vertex_pair"] - G = utils.generate_cugraph_graph_from_file(input_data_path, directed=directed) + is_weighted = input_combo["is_weighted"] + G = utils.generate_cugraph_graph_from_file( + input_data_path, directed=directed, edgevals=is_weighted) if has_vertex_pair: # Sample random vertices from the graph and compute the two_hop_neighbors # with those seeds @@ -84,7 +88,8 @@ def input_expected_output(input_combo): vertex_pair = None input_combo["vertex_pair"] = vertex_pair - sg_cugraph_overlap = cugraph.experimental.overlap(G, input_combo["vertex_pair"]) + sg_cugraph_overlap = cugraph.overlap( + G, input_combo["vertex_pair"], use_weight=is_weighted) # Save the results back to the input_combo dictionary to prevent redundant # cuGraph runs. Other tests using the input_combo fixture will look for # them, and if not present they will have to re-run the same cuGraph call. @@ -104,6 +109,7 @@ def input_expected_output(input_combo): ddf, source="src", destination="dst", + edge_attr="value" if is_weighted else None, renumber=True, store_transposed=True, ) @@ -125,8 +131,10 @@ def input_expected_output(input_combo): def test_dask_mg_overlap(dask_client, benchmark, input_expected_output): dg = input_expected_output["MGGraph"] + use_weight = input_expected_output["is_weighted"] - result_overlap = benchmark(dcg.overlap, dg, input_expected_output["vertex_pair"]) + result_overlap = benchmark( + dcg.overlap, dg, input_expected_output["vertex_pair"], use_weight=use_weight) result_overlap = ( result_overlap.compute() @@ -154,41 +162,3 @@ def test_dask_mg_overlap(dask_client, benchmark, input_expected_output): assert len(overlap_coeff_diffs1) == 0 assert len(overlap_coeff_diffs2) == 0 - - -@pytest.mark.mg -def test_dask_mg_weighted_overlap(): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) - with pytest.raises(ValueError): - dcg.overlap(dg) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) - - use_weight = True - with pytest.raises(ValueError): - dcg.overlap(dg, use_weight=use_weight) diff --git a/python/cugraph/cugraph/tests/link_prediction/test_sorensen_mg.py b/python/cugraph/cugraph/tests/link_prediction/test_sorensen_mg.py index af6b60771a0..bc05929d50f 100644 --- a/python/cugraph/cugraph/tests/link_prediction/test_sorensen_mg.py +++ b/python/cugraph/cugraph/tests/link_prediction/test_sorensen_mg.py @@ -35,6 +35,7 @@ def setup_function(): IS_DIRECTED = [False] HAS_VERTEX_PAIR = [True, False] +IS_WEIGHTED = [True, False] # ============================================================================= @@ -49,6 +50,7 @@ def setup_function(): (datasets, "graph_file"), (IS_DIRECTED, "directed"), (HAS_VERTEX_PAIR, "has_vertex_pair"), + (IS_WEIGHTED, "is_weighted"), ) @@ -58,7 +60,7 @@ def input_combo(request): Simply return the current combination of params as a dictionary for use in tests or other parameterized fixtures. """ - parameters = dict(zip(("graph_file", "directed", "has_vertex_pair"), request.param)) + parameters = dict(zip(("graph_file", "directed", "has_vertex_pair", "is_weighted"), request.param)) return parameters @@ -73,7 +75,9 @@ def input_expected_output(input_combo): input_data_path = input_combo["graph_file"] directed = input_combo["directed"] has_vertex_pair = input_combo["has_vertex_pair"] - G = utils.generate_cugraph_graph_from_file(input_data_path, directed=directed) + is_weighted = input_combo["is_weighted"] + G = utils.generate_cugraph_graph_from_file( + input_data_path, directed=directed,edgevals=is_weighted) if has_vertex_pair: # Sample random vertices from the graph and compute the two_hop_neighbors # with those seeds @@ -85,7 +89,8 @@ def input_expected_output(input_combo): vertex_pair = None input_combo["vertex_pair"] = vertex_pair - sg_cugraph_sorensen = cugraph.experimental.sorensen(G, input_combo["vertex_pair"]) + sg_cugraph_sorensen = cugraph.sorensen( + G, input_combo["vertex_pair"], use_weight=is_weighted) # Save the results back to the input_combo dictionary to prevent redundant # cuGraph runs. Other tests using the input_combo fixture will look for # them, and if not present they will have to re-run the same cuGraph call. @@ -105,6 +110,7 @@ def input_expected_output(input_combo): ddf, source="src", destination="dst", + edge_attr="value" if is_weighted else None, renumber=True, store_transposed=True, ) @@ -124,8 +130,10 @@ def input_expected_output(input_combo): def test_dask_mg_sorensen(dask_client, benchmark, input_expected_output): dg = input_expected_output["MGGraph"] + use_weight = input_expected_output["is_weighted"] - result_sorensen = benchmark(dcg.sorensen, dg, input_expected_output["vertex_pair"]) + result_sorensen = benchmark( + dcg.sorensen, dg, input_expected_output["vertex_pair"], use_weight=use_weight) result_sorensen = ( result_sorensen.compute() @@ -153,41 +161,3 @@ def test_dask_mg_sorensen(dask_client, benchmark, input_expected_output): assert len(sorensen_coeff_diffs1) == 0 assert len(sorensen_coeff_diffs2) == 0 - - -@pytest.mark.mg -def test_dask_mg_weighted_sorensen(dask_client): - input_data_path = datasets[0] - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) - with pytest.raises(ValueError): - dcg.sorensen(dg) - - dg = cugraph.Graph(directed=False) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - store_transposed=True, - ) - - use_weight = True - with pytest.raises(ValueError): - dcg.sorensen(dg, use_weight=use_weight)