Skip to content

Commit

Permalink
Fix from_pandas_edgelist with duplicate edges
Browse files Browse the repository at this point in the history
When creating a `Graph` or `DiGraph`, duplicate edges should be dropped.
Also, ensure arrays are contiguous when creating the PLC graph,
which can lead to incorrect results.
  • Loading branch information
eriknw committed Sep 17, 2024
1 parent 4d2dd27 commit 360b8fd
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 64 deletions.
2 changes: 1 addition & 1 deletion python/nx-cugraph/_nx_cugraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def get_info():


def _check_networkx_version():
import warnings
import re
import warnings

import networkx as nx

Expand Down
11 changes: 8 additions & 3 deletions python/nx-cugraph/nx_cugraph/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,14 @@ def _get_plc_graph(
is_multigraph=self.is_multigraph() and symmetrize is None,
is_symmetric=not self.is_directed() or symmetrize is not None,
),
src_or_offset_array=src_indices,
dst_or_index_array=dst_indices,
weight_array=edge_array,
# Use `cp.asarray` to ensure that data is contiguous in device memory.
# We could, perhaps, update e.g. `self.src_indices` with the new contiguous
# array. This does not copy if arrays are already contiguous.
src_or_offset_array=cp.asarray(src_indices, order="C"),
dst_or_index_array=cp.asarray(dst_indices, order="C"),
weight_array=(
None if edge_array is None else cp.asarray(edge_array, order="C")
),
store_transposed=store_transposed,
renumber=False,
do_expensive_check=False,
Expand Down
14 changes: 14 additions & 0 deletions python/nx-cugraph/nx_cugraph/convert_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np

from .generators._utils import _create_using_class
from .relabel import _deduplicate
from .utils import _cp_iscopied_asarray, index_dtype, networkx_algorithm

__all__ = [
Expand Down Expand Up @@ -155,6 +156,19 @@ def from_pandas_edgelist(
]
kwargs["edge_keys"] = edge_keys

if not graph_class.is_multigraph():
src_indices, dst_indices, kwargs["edge_values"], kwargs["edge_masks"] = (
# PERF: we may not need to do a generic merge as done in `_deduplicate`;
# we just need the last value since all edges have all edge attributes.
# Generic merging handles cases when edges have different attributes.
_deduplicate(
src_indices,
dst_indices,
kwargs.get("edge_values"),
kwargs.get("edge_masks"),
)
)

G = graph_class.from_coo(N, src_indices, dst_indices, **kwargs)
if inplace:
return create_using._become(G)
Expand Down
156 changes: 96 additions & 60 deletions python/nx-cugraph/nx_cugraph/relabel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import itertools
from collections import defaultdict
from typing import TYPE_CHECKING

import cupy as cp
import networkx as nx
Expand All @@ -21,6 +24,10 @@

from .utils import _get_int_dtype, _groupby, index_dtype, networkx_algorithm

if TYPE_CHECKING: # pragma: no cover
from nx_cugraph.typing import AttrKey, Dtype, EdgeValue


__all__ = [
"convert_node_labels_to_integers",
"relabel_nodes",
Expand Down Expand Up @@ -149,66 +156,13 @@ def relabel_nodes(G, mapping, copy=True):
new_edge_indices.append(edge_index)
edge_indices = cp.array(new_edge_indices, index_dtype)
else:
stacked_dup = cp.vstack((src_indices_dup, dst_indices_dup))
if not edge_values:
# Drop duplicates
stacked = cp.unique(stacked_dup, axis=1)
else:
# Drop duplicates. This relies heavily on `_groupby`.
# It has not been compared to alternative implementations.
# I wonder if there are ways to use assignment using duplicate indices.
(stacked, ind, inv) = cp.unique(
stacked_dup, axis=1, return_index=True, return_inverse=True
)
if ind.dtype != int_dtype:
ind = ind.astype(int_dtype)
if inv.dtype != int_dtype:
inv = inv.astype(int_dtype)

# We need to merge edge data
mask = cp.ones(src_indices.size, dtype=bool)
mask[ind] = False
edge_data = [val[mask] for val in edge_values.values()]
edge_data.extend(val[mask] for val in edge_masks.values())
groups = _groupby(inv[mask], edge_data)

edge_values = {key: val[ind] for key, val in edge_values.items()}
edge_masks = {key: val[ind] for key, val in edge_masks.items()}

value_keys = list(edge_values.keys())
mask_keys = list(edge_masks.keys())

values_to_update = defaultdict(list)
masks_to_update = defaultdict(list)
for k, v in groups.items():
it = iter(v)
vals = dict(zip(value_keys, it)) # zip(strict=False)
masks = dict(zip(mask_keys, it)) # zip(strict=True)
for key, val in vals.items():
if key in masks:
val = val[masks[key]]
if val.size > 0:
values_to_update[key].append((k, val[-1]))
masks_to_update[key].append((k, True))
else:
values_to_update[key].append((k, val[-1]))
if key in edge_masks:
masks_to_update[key].append((k, True))

int_dtype = _get_int_dtype(src_indices.size - 1)
for k, v in values_to_update.items():
ii, jj = zip(*v)
edge_val = edge_values[k]
edge_val[cp.array(ii, dtype=int_dtype)] = cp.array(
jj, dtype=edge_val.dtype
)
for k, v in masks_to_update.items():
ii, jj = zip(*v)
edge_masks[k][cp.array(ii, dtype=int_dtype)] = cp.array(
jj, dtype=bool
)
src_indices = stacked[0]
dst_indices = stacked[1]
src_indices, dst_indices, edge_values, edge_masks = _deduplicate(
src_indices_dup,
dst_indices_dup,
edge_values,
edge_masks,
int_dtype=int_dtype,
)

if G.is_multigraph():
# `edge_keys` and `edge_indices` are preserved for free if no nodes were merged
Expand All @@ -234,6 +188,88 @@ def relabel_nodes(G, mapping, copy=True):
return rv


def _deduplicate(
src_indices: cp.ndarray,
dst_indices: cp.ndarray,
edge_values: dict[AttrKey, cp.ndarray[EdgeValue]] | None,
edge_masks: dict[AttrKey, cp.ndarray[bool]] | None,
*,
int_dtype: Dtype | None = None,
) -> tuple[
cp.ndarray,
cp.ndarray,
dict[AttrKey, cp.ndarray[EdgeValue]] | None,
dict[AttrKey, cp.ndarray[bool]] | None,
]:
"""Drop duplicate edges given indices and values.
This is useful when ingesting user data to create Graphs or DiGraphs.
It merges data from duplicate edges.
"""
stacked_dup = cp.vstack((src_indices, dst_indices))
if not edge_values:
# Drop duplicates
stacked = cp.unique(stacked_dup, axis=1)
else:
# Drop duplicates. This relies heavily on `_groupby`.
# It has not been compared to alternative implementations.
# I wonder if there are ways to use assignment using duplicate indices.
(stacked, ind, inv) = cp.unique(
stacked_dup, axis=1, return_index=True, return_inverse=True
)
if int_dtype is not None and ind.dtype != int_dtype:
ind = ind.astype(int_dtype)
if int_dtype is not None and inv.dtype != int_dtype:
inv = inv.astype(int_dtype)

# We need to merge edge data
mask = cp.ones(src_indices.size, dtype=bool)
mask[ind] = False
edge_data = [val[mask] for val in edge_values.values()]
if edge_masks:
edge_data.extend(val[mask] for val in edge_masks.values())
groups = _groupby(inv[mask], edge_data)

edge_values = {key: val[ind] for key, val in edge_values.items()}
value_keys = list(edge_values.keys())
values_to_update = defaultdict(list)
if edge_masks:
edge_masks = {key: val[ind] for key, val in edge_masks.items()}
mask_keys = list(edge_masks.keys())
masks_to_update = defaultdict(list)
for k, v in groups.items():
it = iter(v)
vals = dict(zip(value_keys, it)) # zip(strict=False)
masks = dict(zip(mask_keys, it)) # zip(strict=True)
for key, val in vals.items():
if key in masks:
val = val[masks[key]]
if val.size > 0:
values_to_update[key].append((k, val[-1]))
masks_to_update[key].append((k, True))
else:
values_to_update[key].append((k, val[-1]))
if key in edge_masks:
masks_to_update[key].append((k, True))
else:
for k, v in groups.items():
for key, val in zip(value_keys, v):
values_to_update[key].append((k, val[-1]))

int_dtype = _get_int_dtype(src_indices.size - 1)
for k, v in values_to_update.items():
ii, jj = zip(*v)
edge_val = edge_values[k]
edge_val[cp.array(ii, dtype=int_dtype)] = cp.array(jj, dtype=edge_val.dtype)
if edge_masks:
for k, v in masks_to_update.items():
ii, jj = zip(*v)
edge_masks[k][cp.array(ii, dtype=int_dtype)] = cp.array(jj, dtype=bool)
src_indices = stacked[0]
dst_indices = stacked[1]
return src_indices, dst_indices, edge_values, edge_masks


@networkx_algorithm(version_added="24.08")
def convert_node_labels_to_integers(
G, first_label=0, ordering="default", label_attribute=None
Expand Down
17 changes: 17 additions & 0 deletions python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import nx_cugraph as nxcg
from nx_cugraph.utils import _cp_iscopied_asarray

from .testing_utils import assert_graphs_equal

try:
import cudf
except ModuleNotFoundError:
Expand Down Expand Up @@ -84,3 +86,18 @@ def test_from_pandas_edgelist(data, create_using):
source.to_numpy(), orig_object=source
)
assert is_copied is True


@pytest.mark.parametrize("edge_attr", [None, True])
@pytest.mark.parametrize("create_using", CREATE_USING)
def test_multiple_edges(edge_attr, create_using):
df = pd.DataFrame(
{
"source": [0, 0, 0, 1, 1, 2, 0, 0],
"target": [1, 1, 1, 0, 2, 0, 0, 0],
"x": [0, 1, 2, 3, 4, 5, 6, 7],
}
)
Gnx = nx.from_pandas_edgelist(df, edge_attr=edge_attr, create_using=create_using)
Gcg = nxcg.from_pandas_edgelist(df, edge_attr=edge_attr, create_using=create_using)
assert_graphs_equal(Gnx, Gcg)
2 changes: 2 additions & 0 deletions python/nx-cugraph/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ external = [
]
ignore = [
# Would be nice to fix these
"B905", # `zip()` without an explicit `strict=` parameter (Note: possible since py39 was dropped; we should do this!)
"D100", # Missing docstring in public module
"D101", # Missing docstring in public class
"D102", # Missing docstring in public method
Expand Down Expand Up @@ -215,6 +216,7 @@ ignore = [
"SIM105", # Use contextlib.suppress(...) instead of try-except-pass (Note: try-except-pass is much faster)
"SIM108", # Use ternary operator ... instead of if-else-block (Note: if-else better for coverage and sometimes clearer)
"TRY003", # Avoid specifying long messages outside the exception class (Note: why?)
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)` (Note: tuple is faster for now)

# Ignored categories
"C90", # mccabe (Too strict, but maybe we should make things less complex)
Expand Down

0 comments on commit 360b8fd

Please sign in to comment.