diff --git a/python/nx-cugraph/_nx_cugraph/__init__.py b/python/nx-cugraph/_nx_cugraph/__init__.py index 41c18c27ecf..b334d3b7590 100644 --- a/python/nx-cugraph/_nx_cugraph/__init__.py +++ b/python/nx-cugraph/_nx_cugraph/__init__.py @@ -297,8 +297,8 @@ def get_info(): def _check_networkx_version(): - import warnings import re + import warnings import networkx as nx diff --git a/python/nx-cugraph/nx_cugraph/classes/graph.py b/python/nx-cugraph/nx_cugraph/classes/graph.py index 7425eacb2b4..9188c0ab168 100644 --- a/python/nx-cugraph/nx_cugraph/classes/graph.py +++ b/python/nx-cugraph/nx_cugraph/classes/graph.py @@ -695,9 +695,14 @@ def _get_plc_graph( is_multigraph=self.is_multigraph() and symmetrize is None, is_symmetric=not self.is_directed() or symmetrize is not None, ), - src_or_offset_array=src_indices, - dst_or_index_array=dst_indices, - weight_array=edge_array, + # Use `cp.asarray` to ensure that data is contiguous in device memory. + # We could, perhaps, update e.g. `self.src_indices` with the new contiguous + # array. This does not copy if arrays are already contiguous. + src_or_offset_array=cp.asarray(src_indices, order="C"), + dst_or_index_array=cp.asarray(dst_indices, order="C"), + weight_array=( + None if edge_array is None else cp.asarray(edge_array, order="C") + ), store_transposed=store_transposed, renumber=False, do_expensive_check=False, diff --git a/python/nx-cugraph/nx_cugraph/convert_matrix.py b/python/nx-cugraph/nx_cugraph/convert_matrix.py index 38139b913cf..3f1c7dd56cc 100644 --- a/python/nx-cugraph/nx_cugraph/convert_matrix.py +++ b/python/nx-cugraph/nx_cugraph/convert_matrix.py @@ -15,6 +15,7 @@ import numpy as np from .generators._utils import _create_using_class +from .relabel import _deduplicate from .utils import _cp_iscopied_asarray, index_dtype, networkx_algorithm __all__ = [ @@ -155,6 +156,19 @@ def from_pandas_edgelist( ] kwargs["edge_keys"] = edge_keys + if not graph_class.is_multigraph(): + src_indices, dst_indices, kwargs["edge_values"], kwargs["edge_masks"] = ( + # PERF: we may not need to do a generic merge as done in `_deduplicate`; + # we just need the last value since all edges have all edge attributes. + # Generic merging handles cases when edges have different attributes. + _deduplicate( + src_indices, + dst_indices, + kwargs.get("edge_values"), + kwargs.get("edge_masks"), + ) + ) + G = graph_class.from_coo(N, src_indices, dst_indices, **kwargs) if inplace: return create_using._become(G) diff --git a/python/nx-cugraph/nx_cugraph/relabel.py b/python/nx-cugraph/nx_cugraph/relabel.py index 20d1337a99c..12127db3f2c 100644 --- a/python/nx-cugraph/nx_cugraph/relabel.py +++ b/python/nx-cugraph/nx_cugraph/relabel.py @@ -10,8 +10,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import itertools from collections import defaultdict +from typing import TYPE_CHECKING import cupy as cp import networkx as nx @@ -21,6 +24,10 @@ from .utils import _get_int_dtype, _groupby, index_dtype, networkx_algorithm +if TYPE_CHECKING: # pragma: no cover + from nx_cugraph.typing import AttrKey, Dtype, EdgeValue + + __all__ = [ "convert_node_labels_to_integers", "relabel_nodes", @@ -149,66 +156,13 @@ def relabel_nodes(G, mapping, copy=True): new_edge_indices.append(edge_index) edge_indices = cp.array(new_edge_indices, index_dtype) else: - stacked_dup = cp.vstack((src_indices_dup, dst_indices_dup)) - if not edge_values: - # Drop duplicates - stacked = cp.unique(stacked_dup, axis=1) - else: - # Drop duplicates. This relies heavily on `_groupby`. - # It has not been compared to alternative implementations. - # I wonder if there are ways to use assignment using duplicate indices. - (stacked, ind, inv) = cp.unique( - stacked_dup, axis=1, return_index=True, return_inverse=True - ) - if ind.dtype != int_dtype: - ind = ind.astype(int_dtype) - if inv.dtype != int_dtype: - inv = inv.astype(int_dtype) - - # We need to merge edge data - mask = cp.ones(src_indices.size, dtype=bool) - mask[ind] = False - edge_data = [val[mask] for val in edge_values.values()] - edge_data.extend(val[mask] for val in edge_masks.values()) - groups = _groupby(inv[mask], edge_data) - - edge_values = {key: val[ind] for key, val in edge_values.items()} - edge_masks = {key: val[ind] for key, val in edge_masks.items()} - - value_keys = list(edge_values.keys()) - mask_keys = list(edge_masks.keys()) - - values_to_update = defaultdict(list) - masks_to_update = defaultdict(list) - for k, v in groups.items(): - it = iter(v) - vals = dict(zip(value_keys, it)) # zip(strict=False) - masks = dict(zip(mask_keys, it)) # zip(strict=True) - for key, val in vals.items(): - if key in masks: - val = val[masks[key]] - if val.size > 0: - values_to_update[key].append((k, val[-1])) - masks_to_update[key].append((k, True)) - else: - values_to_update[key].append((k, val[-1])) - if key in edge_masks: - masks_to_update[key].append((k, True)) - - int_dtype = _get_int_dtype(src_indices.size - 1) - for k, v in values_to_update.items(): - ii, jj = zip(*v) - edge_val = edge_values[k] - edge_val[cp.array(ii, dtype=int_dtype)] = cp.array( - jj, dtype=edge_val.dtype - ) - for k, v in masks_to_update.items(): - ii, jj = zip(*v) - edge_masks[k][cp.array(ii, dtype=int_dtype)] = cp.array( - jj, dtype=bool - ) - src_indices = stacked[0] - dst_indices = stacked[1] + src_indices, dst_indices, edge_values, edge_masks = _deduplicate( + src_indices_dup, + dst_indices_dup, + edge_values, + edge_masks, + int_dtype=int_dtype, + ) if G.is_multigraph(): # `edge_keys` and `edge_indices` are preserved for free if no nodes were merged @@ -234,6 +188,88 @@ def relabel_nodes(G, mapping, copy=True): return rv +def _deduplicate( + src_indices: cp.ndarray, + dst_indices: cp.ndarray, + edge_values: dict[AttrKey, cp.ndarray[EdgeValue]] | None, + edge_masks: dict[AttrKey, cp.ndarray[bool]] | None, + *, + int_dtype: Dtype | None = None, +) -> tuple[ + cp.ndarray, + cp.ndarray, + dict[AttrKey, cp.ndarray[EdgeValue]] | None, + dict[AttrKey, cp.ndarray[bool]] | None, +]: + """Drop duplicate edges given indices and values. + + This is useful when ingesting user data to create Graphs or DiGraphs. + It merges data from duplicate edges. + """ + stacked_dup = cp.vstack((src_indices, dst_indices)) + if not edge_values: + # Drop duplicates + stacked = cp.unique(stacked_dup, axis=1) + else: + # Drop duplicates. This relies heavily on `_groupby`. + # It has not been compared to alternative implementations. + # I wonder if there are ways to use assignment using duplicate indices. + (stacked, ind, inv) = cp.unique( + stacked_dup, axis=1, return_index=True, return_inverse=True + ) + if int_dtype is not None and ind.dtype != int_dtype: + ind = ind.astype(int_dtype) + if int_dtype is not None and inv.dtype != int_dtype: + inv = inv.astype(int_dtype) + + # We need to merge edge data + mask = cp.ones(src_indices.size, dtype=bool) + mask[ind] = False + edge_data = [val[mask] for val in edge_values.values()] + if edge_masks: + edge_data.extend(val[mask] for val in edge_masks.values()) + groups = _groupby(inv[mask], edge_data) + + edge_values = {key: val[ind] for key, val in edge_values.items()} + value_keys = list(edge_values.keys()) + values_to_update = defaultdict(list) + if edge_masks: + edge_masks = {key: val[ind] for key, val in edge_masks.items()} + mask_keys = list(edge_masks.keys()) + masks_to_update = defaultdict(list) + for k, v in groups.items(): + it = iter(v) + vals = dict(zip(value_keys, it)) # zip(strict=False) + masks = dict(zip(mask_keys, it)) # zip(strict=True) + for key, val in vals.items(): + if key in masks: + val = val[masks[key]] + if val.size > 0: + values_to_update[key].append((k, val[-1])) + masks_to_update[key].append((k, True)) + else: + values_to_update[key].append((k, val[-1])) + if key in edge_masks: + masks_to_update[key].append((k, True)) + else: + for k, v in groups.items(): + for key, val in zip(value_keys, v): + values_to_update[key].append((k, val[-1])) + + int_dtype = _get_int_dtype(src_indices.size - 1) + for k, v in values_to_update.items(): + ii, jj = zip(*v) + edge_val = edge_values[k] + edge_val[cp.array(ii, dtype=int_dtype)] = cp.array(jj, dtype=edge_val.dtype) + if edge_masks: + for k, v in masks_to_update.items(): + ii, jj = zip(*v) + edge_masks[k][cp.array(ii, dtype=int_dtype)] = cp.array(jj, dtype=bool) + src_indices = stacked[0] + dst_indices = stacked[1] + return src_indices, dst_indices, edge_values, edge_masks + + @networkx_algorithm(version_added="24.08") def convert_node_labels_to_integers( G, first_label=0, ordering="default", label_attribute=None diff --git a/python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py b/python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py index 0a9cc087ce0..47c5772f6df 100644 --- a/python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py +++ b/python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py @@ -17,6 +17,8 @@ import nx_cugraph as nxcg from nx_cugraph.utils import _cp_iscopied_asarray +from .testing_utils import assert_graphs_equal + try: import cudf except ModuleNotFoundError: @@ -84,3 +86,18 @@ def test_from_pandas_edgelist(data, create_using): source.to_numpy(), orig_object=source ) assert is_copied is True + + +@pytest.mark.parametrize("edge_attr", [None, True]) +@pytest.mark.parametrize("create_using", CREATE_USING) +def test_multiple_edges(edge_attr, create_using): + df = pd.DataFrame( + { + "source": [0, 0, 0, 1, 1, 2, 0, 0], + "target": [1, 1, 1, 0, 2, 0, 0, 0], + "x": [0, 1, 2, 3, 4, 5, 6, 7], + } + ) + Gnx = nx.from_pandas_edgelist(df, edge_attr=edge_attr, create_using=create_using) + Gcg = nxcg.from_pandas_edgelist(df, edge_attr=edge_attr, create_using=create_using) + assert_graphs_equal(Gnx, Gcg) diff --git a/python/nx-cugraph/pyproject.toml b/python/nx-cugraph/pyproject.toml index e7b4ea44dd8..69a906300c6 100644 --- a/python/nx-cugraph/pyproject.toml +++ b/python/nx-cugraph/pyproject.toml @@ -170,6 +170,7 @@ external = [ ] ignore = [ # Would be nice to fix these + "B905", # `zip()` without an explicit `strict=` parameter (Note: possible since py39 was dropped; we should do this!) "D100", # Missing docstring in public module "D101", # Missing docstring in public class "D102", # Missing docstring in public method @@ -215,6 +216,7 @@ ignore = [ "SIM105", # Use contextlib.suppress(...) instead of try-except-pass (Note: try-except-pass is much faster) "SIM108", # Use ternary operator ... instead of if-else-block (Note: if-else better for coverage and sometimes clearer) "TRY003", # Avoid specifying long messages outside the exception class (Note: why?) + "UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)` (Note: tuple is faster for now) # Ignored categories "C90", # mccabe (Too strict, but maybe we should make things less complex)