Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Additional Checks to get_edgelist and get_dask_edgelist #4256

Merged
merged 14 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions python/cugraph/cugraph/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def get_edgelist(self, download=False, reader="cudf"):
reader : 'cudf' or 'pandas' (default='cudf')
The library used to read a CSV and return an edgelist DataFrame.
"""
if self._edgelist is None:
if self._edgelist is None or not isinstance(self._edgelist, cudf.DataFrame):
full_path = self.get_path()
if not full_path.is_file():
if download:
Expand Down Expand Up @@ -213,7 +213,7 @@ def get_edgelist(self, download=False, reader="cudf"):

return self._edgelist.copy()

def get_dask_edgelist(self, download=False):
def get_dask_edgelist(self, download=False, overwrite=False):
nv-rliu marked this conversation as resolved.
Show resolved Hide resolved
"""
Return a distributed Edgelist.

Expand All @@ -223,7 +223,9 @@ def get_dask_edgelist(self, download=False):
Automatically download the dataset from the 'url' location within
the YAML file.
"""
if self._edgelist is None:
if self._edgelist is None or not isinstance(
self._edgelist, dask_cudf.DataFrame
):
full_path = self.get_path()
if not full_path.is_file():
if download:
Expand Down Expand Up @@ -286,7 +288,7 @@ def get_graph(
for certain algorithms, such as pagerank.
"""
if self._edgelist is None:
self.get_edgelist(download)
self.get_edgelist(download=download)

if create_using is None:
G = Graph()
Expand Down Expand Up @@ -351,7 +353,7 @@ def get_dask_graph(
for certain algorithms.
"""
if self._edgelist is None:
self.get_dask_edgelist(download)
self.get_dask_edgelist(download=download)

if create_using is None:
G = Graph()
Expand All @@ -367,7 +369,7 @@ def get_dask_graph(
f"{type(create_using)}"
)

if len(self.metadata["col_names"]) > 2 and not (ignore_weights):
if len(self.metadata["col_names"]) > 2 and not ignore_weights:
G.from_dask_cudf_edgelist(
self._edgelist,
source=self.metadata["col_names"][0],
Expand Down
50 changes: 24 additions & 26 deletions python/cugraph/cugraph/tests/utils/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import pytest

import cudf
import cugraph
import dask_cudf
from cugraph import datasets
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.datasets import karate
from cugraph.structure import Graph
from cugraph.testing import (
RAPIDS_DATASET_ROOT_DIR_PATH,
Expand All @@ -29,15 +33,15 @@
SMALL_DATASETS,
BENCHMARKING_DATASETS,
)
from cugraph import datasets
from cugraph.dask.common.mg_utils import is_single_gpu


# Add the sg marker to all tests in this module.
pytestmark = pytest.mark.sg


###############################################################################
# Fixtures
###############################################################################


# module fixture - called once for this module
Expand Down Expand Up @@ -201,21 +205,26 @@ def test_reader_dask(dask_client, dataset):
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_edgelist(dataset):
E = dataset.get_edgelist(download=True)

assert E is not None
assert isinstance(E, cudf.DataFrame)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_dask_edgelist(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert isinstance(E, dask_cudf.DataFrame)


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_graph(dataset):
G = dataset.get_graph(download=True)
assert G is not None
assert isinstance(G, cugraph.Graph)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
Expand All @@ -224,12 +233,14 @@ def test_get_graph(dataset):
def test_get_dask_graph(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert G is not None
# TODO Check G is a DistributedGraph


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_metadata(dataset):
M = dataset.metadata
assert M is not None
assert isinstance(M, dict)


@pytest.mark.parametrize("dataset", ALL_DATASETS)
Expand Down Expand Up @@ -346,32 +357,19 @@ def test_ctor_with_datafile():
assert ds.get_path() == karate_csv


def test_unload():
email_csv = RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv"
@pytest.mark.parametrize("dataset", [karate])
def test_unload(dataset):
assert dataset._edgelist is None

ds = datasets.Dataset(
csv_file=email_csv.as_posix(),
csv_col_names=["src", "dst", "wgt"],
csv_col_dtypes=["int32", "int32", "float32"],
)
dataset.get_edgelist()
assert dataset._edgelist is not None
dataset.unload()
assert dataset._edgelist is None

# FIXME: another (better?) test would be to check free memory and assert
# the memory use increases after get_*(), then returns to the pre-get_*()
# level after unload(). However, that type of test may fail for several
# reasons (the device being monitored is accidentally also being used by
# another process, and the use of memory pools to name two). Instead, just
# test that the internal members get cleared on unload().
assert ds._edgelist is None

ds.get_edgelist()
assert ds._edgelist is not None
ds.unload()
assert ds._edgelist is None

ds.get_graph()
assert ds._edgelist is not None
ds.unload()
assert ds._edgelist is None
dataset.get_graph()
assert dataset._edgelist is not None
dataset.unload()
assert dataset._edgelist is None


@pytest.mark.parametrize("dataset", ALL_DATASETS)
Expand Down
Loading