diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 8fed467bf6d..cdf1e937e67 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -39,6 +39,7 @@ ) from cugraph.dask.common.mg_utils import run_gc_on_dask_cluster import cugraph.dask.comms.comms as Comms +from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates class simpleDistributedGraphImpl: @@ -95,6 +96,7 @@ def _make_plc_graph( weight_type, edge_id_type, edge_type_id, + drop_multi_edges, ): weights = None edge_ids = None @@ -149,6 +151,7 @@ def _make_plc_graph( num_arrays=num_arrays, store_transposed=store_transposed, do_expensive_check=False, + drop_multi_edges=drop_multi_edges, ) del edata_x gc.collect() @@ -267,7 +270,7 @@ def __from_edgelist( input_ddf, source, destination, - multi=self.properties.multi_edge, + multi=True, # Deprecated parameter symmetrize=not self.properties.directed, ) value_col = None @@ -277,7 +280,7 @@ def __from_edgelist( source, destination, value_col_names, - multi=self.properties.multi_edge, + multi=True, # Deprecated parameter symmetrize=not self.properties.directed, ) @@ -364,6 +367,7 @@ def __from_edgelist( self.weight_type, self.edge_id_type, self.edge_type_id_type, + not self.properties.multi_edge, ) for w, edata in persisted_keys_d.items() } @@ -455,6 +459,15 @@ def view_edge_list(self): else: is_multi_column = True + if not self.properties.multi_edge: + # Drop parallel edges for non MultiGraph + # FIXME: Drop multi edges with the CAPI instead. + _client = default_client() + workers = _client.scheduler_info()["workers"] + edgelist_df = _memory_efficient_drop_duplicates( + edgelist_df, [srcCol, dstCol], len(workers) + ) + edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[ [srcCol, dstCol] ].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 22d82eb1796..121a4c6245a 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -264,7 +264,7 @@ def __from_edgelist( source, destination, edge_attr, - multi=self.properties.multi_edge, + multi=self.properties.multi_edge, # Deprecated parameter symmetrize=not self.properties.directed, ) @@ -279,7 +279,7 @@ def __from_edgelist( elist, source, destination, - multi=self.properties.multi_edge, + multi=self.properties.multi_edge, # Deprecated parameter symmetrize=not self.properties.directed, ) @@ -298,7 +298,10 @@ def __from_edgelist( self._replicate_edgelist() self._make_plc_graph( - value_col=value_col, store_transposed=store_transposed, renumber=renumber + value_col=value_col, + store_transposed=store_transposed, + renumber=renumber, + drop_multi_edges=not self.properties.multi_edge, ) def to_pandas_edgelist( @@ -477,6 +480,7 @@ def view_edge_list(self): edgelist_df[simpleGraphImpl.srcCol] <= edgelist_df[simpleGraphImpl.dstCol] ] + elif not use_initial_input_df and self.properties.renumbered: # Do not unrenumber the vertices if the initial input df was used if not self.properties.directed: @@ -484,6 +488,7 @@ def view_edge_list(self): edgelist_df[simpleGraphImpl.srcCol] <= edgelist_df[simpleGraphImpl.dstCol] ] + edgelist_df = self.renumber_map.unrenumber( edgelist_df, simpleGraphImpl.srcCol ) @@ -1084,6 +1089,7 @@ def _make_plc_graph( value_col: Dict[str, cudf.DataFrame] = None, store_transposed: bool = False, renumber: bool = True, + drop_multi_edges: bool = False, ): """ Parameters @@ -1100,6 +1106,8 @@ def _make_plc_graph( Whether to renumber the vertices of the graph. Required if inputted vertex ids are not of int32 or int64 type. + drop_multi_edges: bool (default=False) + Whether to drop multi edges """ if value_col is None: @@ -1163,6 +1171,7 @@ def _make_plc_graph( renumber=renumber, do_expensive_check=True, input_array_format=input_array_format, + drop_multi_edges=drop_multi_edges, ) def to_directed(self, DiG, store_transposed=False): diff --git a/python/cugraph/cugraph/structure/symmetrize.py b/python/cugraph/cugraph/structure/symmetrize.py index b324ff65834..30c6394ade9 100644 --- a/python/cugraph/cugraph/structure/symmetrize.py +++ b/python/cugraph/cugraph/structure/symmetrize.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,6 +15,7 @@ import cudf import dask_cudf from dask.distributed import default_client +import warnings def symmetrize_df( @@ -54,6 +55,11 @@ def symmetrize_df( Name of the column in the data frame containing the weight ids multi : bool, optional (default=False) + [Deprecated, Multi will be removed in future version, and the removal + of multi edges will no longer be supported from 'symmetrize'. + Multi edges will be removed upon creation of graph instance directly + based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.] + Set to True if graph is a Multi(Di)Graph. This allows multiple edges instead of dropping them. @@ -84,6 +90,12 @@ def symmetrize_df( if multi: return result else: + warnings.warn( + "Multi is deprecated and the removal of multi edges will no longer be " + "supported from 'symmetrize'. Multi edges will be removed upon creation " + "of graph instance.", + FutureWarning, + ) vertex_col_name = src_name + dst_name result = result.groupby(by=[*vertex_col_name], as_index=False).min() return result @@ -128,6 +140,11 @@ def symmetrize_ddf( Name of the column in the data frame containing the weight ids multi : bool, optional (default=False) + [Deprecated, Multi will be removed in future version, and the removal + of multi edges will no longer be supported from 'symmetrize'. + Multi edges will be removed upon creation of graph instance directly + based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.] + Set to True if graph is a Multi(Di)Graph. This allows multiple edges instead of dropping them. @@ -165,8 +182,15 @@ def symmetrize_ddf( else: result = ddf if multi: + result = result.reset_index(drop=True).repartition(npartitions=len(workers) * 2) return result else: + warnings.warn( + "Multi is deprecated and the removal of multi edges will no longer be " + "supported from 'symmetrize'. Multi edges will be removed upon creation " + "of graph instance.", + FutureWarning, + ) vertex_col_name = src_name + dst_name result = _memory_efficient_drop_duplicates( result, vertex_col_name, len(workers) @@ -181,6 +205,7 @@ def symmetrize( value_col_name=None, multi=False, symmetrize=True, + do_expensive_check=False, ): """ Take a dataframe of source destination pairs along with associated @@ -208,6 +233,11 @@ def symmetrize( weights column name. multi : bool, optional (default=False) + [Deprecated, Multi will be removed in future version, and the removal + of multi edges will no longer be supported from 'symmetrize'. + Multi edges will be removed upon creation of graph instance directly + based on if the graph is `curgaph.MultiGraph` or `cugraph.Graph`.] + Set to True if graph is a Multi(Di)Graph. This allows multiple edges instead of dropping them. @@ -234,8 +264,9 @@ def symmetrize( if "edge_id" in input_df.columns and symmetrize: raise ValueError("Edge IDs are not supported on undirected graphs") - csg.null_check(input_df[source_col_name]) - csg.null_check(input_df[dest_col_name]) + if do_expensive_check: # FIXME: Optimize this check as it is currently expensive + csg.null_check(input_df[source_col_name]) + csg.null_check(input_df[dest_col_name]) if isinstance(input_df, dask_cudf.DataFrame): output_df = symmetrize_ddf( diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 460a25cbd14..371410b8bd5 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -26,6 +26,7 @@ from cugraph.testing import UNDIRECTED_DATASETS from cugraph.dask import uniform_neighbor_sample from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.structure.symmetrize import _memory_efficient_drop_duplicates from cugraph.datasets import email_Eu_core, small_tree from pylibcugraph.testing.utils import gen_fixture_params_product @@ -135,6 +136,14 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo): dg = input_combo["MGGraph"] input_df = dg.input_df + # Drop parallel edges for non MultiGraph + # FIXME: Drop multi edges with the CAPI instead. + vertex_col_name = ["src", "dst"] + workers = dask_client.scheduler_info()["workers"] + input_df = _memory_efficient_drop_duplicates( + input_df, vertex_col_name, len(workers) + ) + result_nbr = uniform_neighbor_sample( dg, input_combo["start_list"], diff --git a/python/pylibcugraph/pylibcugraph/graphs.pyx b/python/pylibcugraph/pylibcugraph/graphs.pyx index 76ad7690840..def47390ce5 100644 --- a/python/pylibcugraph/pylibcugraph/graphs.pyx +++ b/python/pylibcugraph/pylibcugraph/graphs.pyx @@ -463,9 +463,9 @@ cdef class MGGraph(_GPUGraph): edge_type_view_ptr_ptr, store_transposed, num_arrays, - do_expensive_check, drop_self_loops, drop_multi_edges, + do_expensive_check, &(self.c_graph_ptr), &error_ptr)