Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
nx-cugraph: dispatch graph method to gpu or cpu
Browse files Browse the repository at this point in the history
This is to avoid unnecessary conversions to networkx data structures.

Also, update to pass tests with networkx 3.4.2
eriknw committed Nov 8, 2024
1 parent 0e57e65 commit 80e1aeb
Showing 17 changed files with 269 additions and 29 deletions.
16 changes: 11 additions & 5 deletions python/nx-cugraph/_nx_cugraph/__init__.py
Original file line number Diff line number Diff line change
@@ -343,14 +343,16 @@ def update_env_var(varname):
return d


def _check_networkx_version() -> tuple[int, int]:
def _check_networkx_version() -> tuple[int, int] | tuple[int, int, int]:
"""Check the version of networkx and return ``(major, minor)`` version tuple."""
import re
import warnings

import networkx as nx

version_major, version_minor = nx.__version__.split(".")[:2]
version_major, version_minor, *version_bug = nx.__version__.split(".")[:3]
if has_bug := bool(version_bug):
version_bug = version_bug[0]
if version_major != "3":
warnings.warn(
f"nx-cugraph version {__version__} is only known to work with networkx "
@@ -363,15 +365,19 @@ def _check_networkx_version() -> tuple[int, int]:
# Allow single-digit minor versions, e.g. 3.4 and release candidates, e.g. 3.4rc0
pattern = r"^\d(rc\d+)?$"

if not re.match(pattern, version_minor):
if not re.match(pattern, version_bug if has_bug else version_minor):
raise RuntimeWarning(
f"nx-cugraph version {__version__} does not work with networkx version "
f"{nx.__version__}. Please upgrade (or fix) your Python environment."
)

nxver_major = int(version_major)
nxver_minor = int(re.match(r"^\d+", version_minor).group())
return (nxver_major, nxver_minor)
if not has_bug:
nxver_minor = int(re.match(r"^\d+", version_minor).group())
return (nxver_major, nxver_minor)
nxver_minor = int(version_minor)
nxver_bug = int(re.match(r"^\d+", version_bug).group())
return (nxver_major, nxver_minor, nxver_bug)


if __name__ == "__main__":
2 changes: 1 addition & 1 deletion python/nx-cugraph/nx_cugraph/__init__.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
from _nx_cugraph._version import __git_commit__, __version__
from _nx_cugraph import _check_networkx_version

_nxver: tuple[int, int] = _check_networkx_version()
_nxver: tuple[int, int] | tuple[int, int, int] = _check_networkx_version()

from . import utils

Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ def complete_bipartite_graph(n1, n2, create_using=None):
nodes.extend(range(n2) if nodes2 is None else nodes2)
if len(set(nodes)) != len(nodes):
raise nx.NetworkXError("Inputs n1 and n2 must contain distinct nodes")
if _nxver <= (3, 3):
if _nxver < (3, 4):
name = f"complete_bipartite_graph({orig_n1}, {orig_n2})"
else:
name = f"complete_bipartite_graph({n1}, {n2})"
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
__all__ = ["louvain_communities"]

# max_level argument was added to NetworkX 3.3
if _nxver <= (3, 2):
if _nxver < (3, 3):
_max_level_param = {
"max_level : int, optional": (
"Upper limit of the number of macro-iterations (max: 500)."
2 changes: 1 addition & 1 deletion python/nx-cugraph/nx_cugraph/algorithms/core.py
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ def k_truss(G, k):
else:
is_compat_graph = False
if nxcg.number_of_selfloops(G) > 0:
if _nxver <= (3, 2):
if _nxver < (3, 3):
exc_class = nx.NetworkXError
else:
exc_class = nx.NetworkXNotImplemented
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ def hits(
if nstart is not None:
nstart = G._dict_to_nodearray(nstart, 0, dtype)
if max_iter <= 0:
if _nxver <= (3, 2):
if _nxver < (3, 3):
raise ValueError("`maxiter` must be a positive integer.")
raise nx.PowerIterationFailedConvergence(max_iter)
try:
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ def shortest_path(
paths = nxcg.all_pairs_dijkstra_path(G, weight=weight, dtype=dtype)
else: # method == 'bellman-ford':
paths = nxcg.all_pairs_bellman_ford_path(G, weight=weight, dtype=dtype)
if _nxver <= (3, 4):
if _nxver < (3, 5):
paths = dict(paths)
# To target
elif method == "unweighted":
@@ -130,7 +130,7 @@ def shortest_path_length(
# To target
elif method == "unweighted":
lengths = nxcg.single_target_shortest_path_length(G, target)
if _nxver <= (3, 4):
if _nxver < (3, 5):
lengths = dict(lengths)
elif method == "dijkstra":
lengths = nxcg.single_source_dijkstra_path_length(
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ def single_source_shortest_path_length(G, source, cutoff=None):
def single_target_shortest_path_length(G, target, cutoff=None):
G = _to_graph(G)
rv = _bfs(G, target, cutoff, "Target", return_type="length")
if _nxver <= (3, 4):
if _nxver < (3, 5):
return iter(rv.items())
return rv

@@ -62,7 +62,7 @@ def bidirectional_shortest_path(G, source, target):
# TODO PERF: do bidirectional traversal in core
G = _to_graph(G)
if source not in G or target not in G:
if _nxver <= (3, 3):
if _nxver < (3, 4):
raise nx.NodeNotFound(
f"Either source {source} or target {target} is not in G"
)
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ def _bfs(G, source, *, depth_limit=None, reverse=False):
return distances[mask], predecessors[mask], node_ids[mask]


if _nxver <= (3, 3):
if _nxver < (3, 4):

@networkx_algorithm(is_incomplete=True, version_added="24.02", _plc="bfs")
def generic_bfs_edges(
22 changes: 22 additions & 0 deletions python/nx-cugraph/nx_cugraph/classes/digraph.py
Original file line number Diff line number Diff line change
@@ -105,6 +105,27 @@ def to_cudagraph_class(cls) -> type[CudaDiGraph]:
def to_networkx_class(cls) -> type[nx.DiGraph]:
return nx.DiGraph

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.DiGraph or CudaDiGraph
__contains__ = Graph.__dict__["__contains__"]
__len__ = Graph.__dict__["__len__"]
__iter__ = Graph.__dict__["__iter__"]
get_edge_data = Graph.__dict__["get_edge_data"]
has_edge = Graph.__dict__["has_edge"]
neighbors = Graph.__dict__["neighbors"]
has_node = Graph.__dict__["has_node"]
nbunch_iter = Graph.__dict__["nbunch_iter"]
number_of_nodes = Graph.__dict__["number_of_nodes"]
order = Graph.__dict__["order"]
successors = Graph.__dict__["neighbors"] # Alias

clear = Graph.clear
clear_edges = Graph.clear_edges
number_of_edges = Graph.number_of_edges


class CudaDiGraph(CudaGraph):
#################
@@ -244,6 +265,7 @@ def to_undirected(self, reciprocal=False, as_view=False):
rv.graph.update(deepcopy(self.graph))
return rv

successors = CudaGraph.neighbors # Alias
# Many more methods to implement...

###################
130 changes: 127 additions & 3 deletions python/nx-cugraph/nx_cugraph/classes/graph.py
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@
True, # Include all node values
# `.graph` attributes are always included now
)
_EDGE_KEY_INDEX = 0
_NODE_KEY_INDEX = 1

# Use to indicate when a full conversion to GPU failed so we don't try again.
_CANT_CONVERT_TO_GPU = "_CANT_CONVERT_TO_GPU"
@@ -84,6 +86,56 @@ def clear(self) -> None:
super().clear()


class _graph_property:
"""Dispatch property to NetworkX or CudaGraph based on cache.
For example, this will use any cached CudaGraph for ``len(G)``, which
prevents creating NetworkX data structures.
"""

def __init__(self, attr, *, edge_data=False, node_data=False):
self._attr = attr
self._edge_data = edge_data
self._node_data = node_data

def __get__(self, instance, owner=None):
nx_class = owner.to_networkx_class()
if instance is None:
# Let's handle e.g. `nxcg.Graph.__len__` to look and behave correctly.
#
# If you want the instance of `_graph_property`, get it from the class dict:
# >>> nxcg.Graph.__dict__["__len__"]
#
# Alternatives:
# - `return op.methodcaller(self._attr)`
# - This dispatches, but does not have e.g. __name__
# - `return getattr(nx_class, self._attr)`
# - This does not dispatch--it always uses networkx--but does have attrs
prop = owner.__dict__[self._attr]

def inner(self, *args, **kwargs):
return prop.__get__(self, owner)(*args, **kwargs)

# Standard function-wrapping
nx_func = getattr(nx_class, self._attr)
inner.__name__ = nx_func.__name__
inner.__doc__ = nx_func.__doc__
inner.__qualname__ = nx_func.__qualname__
inner.__defaults__ = nx_func.__defaults__
inner.__kwdefaults__ = nx_func.__kwdefaults__
inner.__dict__.update(nx_func.__dict__)
inner.__module__ = owner.__module__
inner.__wrapped__ = nx_func
return inner

cuda_graph = instance._get_cudagraph(
edge_data=self._edge_data, node_data=self._node_data
)
if cuda_graph is not None:
return getattr(cuda_graph, self._attr)
return getattr(nx_class, self._attr).__get__(instance, owner)


class Graph(nx.Graph):
# Tell networkx to dispatch calls with this object to nx-cugraph
__networkx_backend__: ClassVar[str] = "cugraph" # nx >=3.2
@@ -211,8 +263,7 @@ def _cudagraph(self):
cache[_CACHE_KEY] = Gcg
return Gcg

@_cudagraph.setter
def _cudagraph(self, val, *, clear_cpu=True):
def _set_cudagraph(self, val, *, clear_cpu=True):
"""Set the full ``CudaGraph`` for this graph, or remove from device if None."""
if (cache := getattr(self, "__networkx_cache__", None)) is None:
# Should we warn?
@@ -229,6 +280,32 @@ def _cudagraph(self, val, *, clear_cpu=True):
for key in self._nx_attrs:
self.__dict__[key] = None

def _get_cudagraph(self, *, edge_data=False, node_data=False):
"""Get a valid cached ``CudaGraph``, optionally with edge or node data.
Returns None if no valid graph is found.
Parameters
----------
edge_data : bool, default False
Whether to return a CudaGraph with edge data.
node_data : bool, default False
Whether to return a CudaGraph with node data.
"""
nx_cache = getattr(self, "__networkx_cache__", None)
if nx_cache is None or _CANT_CONVERT_TO_GPU in nx_cache:
return None
cache = nx_cache.get("backends", {}).get("cugraph", {})
if _CACHE_KEY in cache:
# Always return the canonical CudaGraph if it exists
return cache[_CACHE_KEY]
for key, val in cache.items():
if (key[_EDGE_KEY_INDEX] is True or edge_data is False) and (
key[_NODE_KEY_INDEX] is True or node_data is False
):
return val
return None

@nx.Graph.name.setter
def name(self, s):
# Don't clear the cache when setting the name, since `.graph` is shared.
@@ -510,6 +587,53 @@ def from_dcsc(
**attr,
)

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.Graph or CudaGraph
__contains__ = _graph_property("__contains__")
__len__ = _graph_property("__len__")
__iter__ = _graph_property("__iter__")

@networkx_api
def clear(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear()
if cudagraph is not None:
cudagraph.clear()
self._set_cudagraph(cudagraph, clear_cpu=False)

@networkx_api
def clear_edges(self) -> None:
cudagraph = self._cudagraph if self._is_on_gpu else None
if self._is_on_cpu:
super().clear_edges()
if cudagraph is not None:
cudagraph.clear_edges()
self._set_cudagraph(cudagraph, clear_cpu=False)

get_edge_data = _graph_property("get_edge_data", edge_data=True)
has_edge = _graph_property("has_edge")
neighbors = _graph_property("neighbors")
has_node = _graph_property("has_node")
nbunch_iter = _graph_property("nbunch_iter")

@networkx_api
def number_of_edges(
self, u: NodeKey | None = None, v: NodeKey | None = None
) -> int:
if u is not None or v is not None:
# NotImplemented by CudaGraph
nx_class = self.to_networkx_class()
return nx_class.number_of_edges(self, u, v)
return _graph_property("number_of_edges").__get__(self, self.__class__)()

number_of_nodes = _graph_property("number_of_nodes")
order = _graph_property("order")
# Future work: implement more graph methods, and handle e.g. `copy`


class CudaGraph:
# Tell networkx to dispatch calls with this object to nx-cugraph
@@ -805,7 +929,7 @@ def to_undirected(self, as_view: bool = False) -> CudaGraph:

def _to_compat_graph(self) -> Graph:
rv = self._to_compat_graph_class()()
rv._cudagraph = self
rv._set_cudagraph(self)
return rv

# Not implemented...
21 changes: 21 additions & 0 deletions python/nx-cugraph/nx_cugraph/classes/multidigraph.py
Original file line number Diff line number Diff line change
@@ -50,6 +50,27 @@ def to_cudagraph_class(cls) -> type[CudaMultiDiGraph]:
def to_networkx_class(cls) -> type[nx.MultiDiGraph]:
return nx.MultiDiGraph

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.MultiDiGraph or CudaMultiDiGraph
__contains__ = Graph.__dict__["__contains__"]
__len__ = Graph.__dict__["__len__"]
__iter__ = Graph.__dict__["__iter__"]
get_edge_data = Graph.__dict__["get_edge_data"]
has_edge = Graph.__dict__["has_edge"]
neighbors = Graph.__dict__["neighbors"]
has_node = Graph.__dict__["has_node"]
nbunch_iter = Graph.__dict__["nbunch_iter"]
number_of_nodes = Graph.__dict__["number_of_nodes"]
order = Graph.__dict__["order"]
successors = Graph.__dict__["neighbors"] # Alias

clear = Graph.clear
clear_edges = Graph.clear_edges
number_of_edges = Graph.number_of_edges


class CudaMultiDiGraph(CudaMultiGraph, CudaDiGraph):
is_directed = classmethod(MultiDiGraph.is_directed.__func__)
31 changes: 25 additions & 6 deletions python/nx-cugraph/nx_cugraph/classes/multigraph.py
Original file line number Diff line number Diff line change
@@ -277,6 +277,26 @@ def from_dcsc(
**attr,
)

##########################
# Networkx graph methods #
##########################

# Dispatch to nx.MultiGraph or CudaMultiGraph
__contains__ = Graph.__dict__["__contains__"]
__len__ = Graph.__dict__["__len__"]
__iter__ = Graph.__dict__["__iter__"]
get_edge_data = Graph.__dict__["get_edge_data"]
has_edge = Graph.__dict__["has_edge"]
neighbors = Graph.__dict__["neighbors"]
has_node = Graph.__dict__["has_node"]
nbunch_iter = Graph.__dict__["nbunch_iter"]
number_of_nodes = Graph.__dict__["number_of_nodes"]
order = Graph.__dict__["order"]

clear = Graph.clear
clear_edges = Graph.clear_edges
number_of_edges = Graph.number_of_edges


class CudaMultiGraph(CudaGraph):
# networkx properties
@@ -390,14 +410,13 @@ def get_edge_data(
mask = (self.src_indices == u) & (self.dst_indices == v)
if not mask.any():
return default
if self.edge_keys is None:
if self.edge_keys is None and key is not None:
if self.edge_indices is None:
self._calculate_edge_indices()
if key is not None:
try:
mask = mask & (self.edge_indices == key)
except TypeError:
return default
try:
mask = mask & (self.edge_indices == key)
except TypeError:
return default
indices = cp.nonzero(mask)[0]
if indices.size == 0:
return default
2 changes: 1 addition & 1 deletion python/nx-cugraph/nx_cugraph/convert_matrix.py
Original file line number Diff line number Diff line change
@@ -140,7 +140,7 @@ def from_pandas_edgelist(
and (
# In nx <= 3.3, `edge_key` was ignored if `edge_attr` is None
edge_attr is not None
or _nxver > (3, 3)
or _nxver >= (3, 4)
)
):
try:
7 changes: 5 additions & 2 deletions python/nx-cugraph/nx_cugraph/interface.py
Original file line number Diff line number Diff line change
@@ -228,7 +228,7 @@ def key(testpath):
}
)

if _nxver <= (3, 2):
if _nxver < (3, 3):
xfail.update(
{
# NetworkX versions prior to 3.2.1 have tests written to
@@ -265,7 +265,7 @@ def key(testpath):
}
)

if _nxver <= (3, 1):
if _nxver < (3, 2):
# MAINT: networkx 3.0, 3.1
# NetworkX 3.2 added the ability to "fallback to nx" if backend algorithms
# raise NotImplementedError or `can_run` returns False. The tests below
@@ -423,6 +423,9 @@ def key(testpath):
}
)

if _nxver == (3, 4, 2):
xfail[key("test_pylab.py:test_return_types")] = "Ephemeral NetworkX bug"

too_slow = "Too slow to run"
skip = {
key("test_tree_isomorphism.py:test_positive"): too_slow,
45 changes: 45 additions & 0 deletions python/nx-cugraph/nx_cugraph/tests/test_graph_methods.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@

from .testing_utils import assert_graphs_equal

CREATE_USING = [nxcg.Graph, nxcg.DiGraph, nxcg.MultiGraph, nxcg.MultiDiGraph]


def _create_Gs():
rv = []
@@ -65,3 +67,46 @@ def test_multidigraph_to_undirected():
Gcg = nxcg.CudaMultiDiGraph(Gnx)
with pytest.raises(NotImplementedError):
Gcg.to_undirected()


@pytest.mark.parametrize("create_using", CREATE_USING)
@pytest.mark.parametrize(
"method",
[
("__iter__", ()),
("__len__", ()),
("clear", ()),
("clear_edges", ()),
("nbunch_iter", ()),
("number_of_edges", ()),
("number_of_nodes", ()),
("order", ()),
("__contains__", (0,)),
("neighbors", (0,)),
("has_node", (0,)),
("successors", (0,)),
("get_edge_data", (0, 1)),
("has_edge", (0, 1)),
("nbunch_iter", ([0, 1],)),
],
)
def test_method_does_not_create_host_data(create_using, method):
attr, args = method
if attr == "successors" and not create_using.is_directed():
return
G = nxcg.complete_graph(3, create_using=create_using)
assert G._is_on_gpu
assert not G._is_on_cpu
getattr(G, attr)(*args)
assert G._is_on_gpu
assert not G._is_on_cpu
# Also usable from the class and dispatches correctly
func = getattr(create_using, attr)
func(G, *args)
assert G._is_on_gpu
assert not G._is_on_cpu
# Basic "looks like networkx" checks
nx_class = create_using.to_networkx_class()
nx_func = getattr(nx_class, attr)
assert func.__name__ == nx_func.__name__
assert func.__module__.startswith("nx_cugraph")
4 changes: 2 additions & 2 deletions python/nx-cugraph/nx_cugraph/utils/decorators.py
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ def __new__(
_plc=_plc,
)
instance = object.__new__(cls)
if nodes_or_number is not None and _nxver > (3, 2):
if nodes_or_number is not None and _nxver >= (3, 3):
func = nx.utils.decorators.nodes_or_number(nodes_or_number)(func)
# update_wrapper sets __wrapped__, which will be used for the signature
update_wrapper(instance, func)
@@ -121,7 +121,7 @@ def __new__(
# Set methods so they are in __dict__
instance._can_run = instance._can_run
instance._should_run = instance._should_run
if nodes_or_number is not None and _nxver <= (3, 2):
if nodes_or_number is not None and _nxver < (3, 3):
instance = nx.utils.decorators.nodes_or_number(nodes_or_number)(instance)
return instance

0 comments on commit 80e1aeb

Please sign in to comment.