Skip to content

Commit

Permalink
Further optimize from_pandas_edgelist with cudf (#4528)
Browse files Browse the repository at this point in the history
This continues #4525 (and [this comment](#4525 (comment))) to avoid copies and to be more optimal whether using pandas, cudf, or cudf.pandas. Notably, using `s.to_numpy` with cudf will return a _numpy_ array, but `cudf.pandas` may return a _cupy_ array (proxy).

Also, `s.to_numpy(copy=False)` ([from comment](#4525 (comment))) is not used, b/c cudf's `to_numpy` raises if `copy=False`. We get the behavior we want by not specifying `copy=`.

I don't know if this is the best way to determine whether a copy occurred or not, but this seems like a useful pattern to establish, because we want to make ingest more efficient.

CC @rlratzel

Authors:
  - Erik Welch (https://github.com/eriknw)
  - Ralph Liu (https://github.com/nv-rliu)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4528
  • Loading branch information
eriknw authored Jul 30, 2024
1 parent 0e69733 commit 4ff7acb
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 9 deletions.
35 changes: 28 additions & 7 deletions python/nx-cugraph/nx_cugraph/convert_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import numpy as np

from .generators._utils import _create_using_class
from .utils import index_dtype, networkx_algorithm
from .utils import _cp_iscopied_asarray, index_dtype, networkx_algorithm

__all__ = [
"from_pandas_edgelist",
Expand All @@ -34,16 +34,30 @@ def from_pandas_edgelist(
edge_key=None,
):
"""cudf.DataFrame inputs also supported; value columns with str is unsuppported."""
# This function never shares ownership of the underlying arrays of the DataFrame
# columns. We will perform a copy if necessary even if given e.g. a cudf.DataFrame.
graph_class, inplace = _create_using_class(create_using)
# Try to be optimal whether using pandas, cudf, or cudf.pandas
src_array = df[source].to_numpy()
dst_array = df[target].to_numpy()
src_series = df[source]
dst_series = df[target]
try:
# Optimistically try to use cupy, but fall back to numpy if necessary
src_array = cp.asarray(src_array)
dst_array = cp.asarray(dst_array)
src_array = src_series.to_cupy()
dst_array = dst_series.to_cupy()
except (AttributeError, TypeError, ValueError, NotImplementedError):
src_array = src_series.to_numpy()
dst_array = dst_series.to_numpy()
try:
# Minimize unnecessary data copies by tracking whether we copy or not
is_src_copied, src_array = _cp_iscopied_asarray(
src_array, orig_object=src_series
)
is_dst_copied, dst_array = _cp_iscopied_asarray(
dst_array, orig_object=dst_series
)
np_or_cp = cp
except ValueError:
is_src_copied = is_dst_copied = False
src_array = np.asarray(src_array)
dst_array = np.asarray(dst_array)
np_or_cp = np
Expand All @@ -65,8 +79,15 @@ def from_pandas_edgelist(
src_indices = cp.asarray(np_or_cp.searchsorted(nodes, src_array), index_dtype)
dst_indices = cp.asarray(np_or_cp.searchsorted(nodes, dst_array), index_dtype)
else:
src_indices = cp.array(src_array)
dst_indices = cp.array(dst_array)
# Copy if necessary so we don't share ownership of input arrays.
if is_src_copied:
src_indices = src_array
else:
src_indices = cp.array(src_array)
if is_dst_copied:
dst_indices = dst_array
else:
dst_indices = cp.array(dst_array)

if not graph_class.is_directed():
# Symmetrize the edges
Expand Down
86 changes: 86 additions & 0 deletions python/nx-cugraph/nx_cugraph/tests/test_convert_matrix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import networkx as nx
import pandas as pd
import pytest

import nx_cugraph as nxcg
from nx_cugraph.utils import _cp_iscopied_asarray

try:
import cudf
except ModuleNotFoundError:
cudf = None


DATA = [
{"source": [0, 1], "target": [1, 2]}, # nodes are 0, 1, 2
{"source": [0, 1], "target": [1, 3]}, # nodes are 0, 1, 3 (need renumbered!)
{"source": ["a", "b"], "target": ["b", "c"]}, # nodes are 'a', 'b', 'c'
]
CREATE_USING = [nx.Graph, nx.DiGraph, nx.MultiGraph, nx.MultiDiGraph]


@pytest.mark.skipif("not cudf")
@pytest.mark.parametrize("data", DATA)
@pytest.mark.parametrize("create_using", CREATE_USING)
def test_from_cudf_edgelist(data, create_using):
df = cudf.DataFrame(data)
nxcg.from_pandas_edgelist(df, create_using=create_using) # Basic smoke test
source = df["source"]
if source.dtype == int:
is_copied, src_array = _cp_iscopied_asarray(source)
assert is_copied is False
is_copied, src_array = _cp_iscopied_asarray(source.to_cupy())
assert is_copied is False
is_copied, src_array = _cp_iscopied_asarray(source, orig_object=source)
assert is_copied is False
is_copied, src_array = _cp_iscopied_asarray(
source.to_cupy(), orig_object=source
)
assert is_copied is False
# to numpy
is_copied, src_array = _cp_iscopied_asarray(source.to_numpy())
assert is_copied is True
is_copied, src_array = _cp_iscopied_asarray(
source.to_numpy(), orig_object=source
)
assert is_copied is True
else:
with pytest.raises(TypeError):
_cp_iscopied_asarray(source)
with pytest.raises(TypeError):
_cp_iscopied_asarray(source.to_cupy())
with pytest.raises(ValueError, match="Unsupported dtype"):
_cp_iscopied_asarray(source.to_numpy())
with pytest.raises(ValueError, match="Unsupported dtype"):
_cp_iscopied_asarray(source.to_numpy(), orig_object=source)


@pytest.mark.parametrize("data", DATA)
@pytest.mark.parametrize("create_using", CREATE_USING)
def test_from_pandas_edgelist(data, create_using):
df = pd.DataFrame(data)
nxcg.from_pandas_edgelist(df, create_using=create_using) # Basic smoke test
source = df["source"]
if source.dtype == int:
is_copied, src_array = _cp_iscopied_asarray(source)
assert is_copied is True
is_copied, src_array = _cp_iscopied_asarray(source, orig_object=source)
assert is_copied is True
is_copied, src_array = _cp_iscopied_asarray(source.to_numpy())
assert is_copied is True
is_copied, src_array = _cp_iscopied_asarray(
source.to_numpy(), orig_object=source
)
assert is_copied is True
22 changes: 20 additions & 2 deletions python/nx-cugraph/nx_cugraph/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -10,10 +10,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cupy as cp
import numpy as np
import pytest

from nx_cugraph.utils import _get_int_dtype
from nx_cugraph.utils import _cp_iscopied_asarray, _get_int_dtype


def test_get_int_dtype():
Expand Down Expand Up @@ -85,3 +86,20 @@ def test_get_int_dtype():
_get_int_dtype(7, signed=True, unsigned=True)
assert _get_int_dtype(7, signed=True, unsigned=False) == np.int8
assert _get_int_dtype(7, signed=False, unsigned=True) == np.uint8


def test_cp_iscopied_asarray():
# We don't yet run doctest, so do simple copy/paste test here.
#
# >>> is_copied, a = _cp_iscopied_asarray([1, 2, 3])
# >>> is_copied
# True
# >>> a
# array([1, 2, 3])
# >>> _cp_iscopied_asarray(a)
# (False, array([1, 2, 3]))
is_copied, a = _cp_iscopied_asarray([1, 2, 3])
assert is_copied is True
assert isinstance(a, cp.ndarray)
assert repr(a) == "array([1, 2, 3])"
assert _cp_iscopied_asarray(a)[0] is False
32 changes: 32 additions & 0 deletions python/nx-cugraph/nx_cugraph/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def pairwise(it):
"_get_int_dtype",
"_get_float_dtype",
"_dtype_param",
"_cp_iscopied_asarray",
]

# This may switch to np.uint32 at some point
Expand Down Expand Up @@ -206,3 +207,34 @@ def _get_float_dtype(
f"Dtype {dtype} cannot be safely promoted to float32 or float64"
)
return rv


def _cp_iscopied_asarray(a, *args, orig_object=None, **kwargs):
"""Like ``cp.asarray``, but also returns whether the input was copied.
Use this to avoid unnecessary copies. If given, ``orig_object`` will
also be inspected to determine if it was copied.
>>> is_copied, a = _cp_iscopied_asarray([1, 2, 3])
>>> is_copied
True
>>> a
array([1, 2, 3])
>>> _cp_iscopied_asarray(a)
(False, array([1, 2, 3]))
"""
arr = cp.asarray(a, *args, **kwargs)
ptr = arr.__cuda_array_interface__["data"][0]
if (
hasattr(a, "__cuda_array_interface__")
and a.__cuda_array_interface__["data"][0] == ptr
and (
orig_object is None
or hasattr(orig_object, "__cuda_array_interface__")
and orig_object.__cuda_array_interface__["data"][0] == ptr
)
# Should we also check device_id?
# and getattr(getattr(a, "data", None), "device_id", None) == arr.data.device_id
):
return False, arr
return True, arr

0 comments on commit 4ff7acb

Please sign in to comment.