Skip to content

Commit

Permalink
[FEA]: Add DASK edgelist and graph support to the Dataset API (#4035)
Browse files Browse the repository at this point in the history
Hi! I choose to go further with some simple work other than docs. This PR is going to close #3218.

Here is what I have done in this PR:
1. Added `get_dask_edgelist()` and `get_dask_graph()` (and another internal helper function `__download_dask_csv()`) to  Dataset API.
2. Executed all necessary tests for these new functions.
3. Improved existing functions in the Dataset API and conducted tests to verify improvements.

Here are some additional details regarding this PR:
1. The building and testing were conducted using version 23.12 instead of the default 24.02. Since Cugraph-ops library is no longer open, I failed to build from source using version 24.02. I built and tested the code in version 23.12 and then transferred the updated file to 24.02 before creating this PR. (I would appreciate any guidance on how to build from version 24.02 for external contributors).
2.  All tests from the test file have passed, but some warnings remain, as shown below
```bash
============================================================ warnings summary ============================================================
cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0]
cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0]
cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0]
  /home/ubuntu/miniconda3/envs/cugraph_dev/lib/python3.10/site-packages/cudf/core/index.py:3284: FutureWarning: cudf.StringIndex is deprecated and will be removed from cudf in a future version. Use cudf.Index with the appropriate dtype instead.
    warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
```
I think above warnings came from the function call `from_dask_cudf_edgelist` but currently I have no idea how to remove them. I will do my best to address it if anyone has any ideas about it.

 3. The `get_edgelist()` function returns a deep copy of the object, but this is not supported for `get_dask_edgelist()` since only shallow copy is allowed for Dask cuDF dataframe (see [docs](https://docs.rapids.ai/api/dask-cudf/legacy/api/#dask_cudf.DataFrame.copy)). This will lead to a problem where if a user modifies the dataframe, the changes will be reflected in the internal `self._edgelist` object. So when `get_dask_graph()` is called later, the resulting graph will differ from the one directly constructed from the data file.
 4. I am uncertain about the requirements for (1) Identifying datasets and (2) Adding them to Dataset. If there is a need to add another function for determining whether a dataset requires MG handling based on its size, or to tag the dataset metadata (.yaml file) to indicate the necessity for MG processing, please let me know. Also, I welcome any suggestions for further features.
 5. When I ran pytest on other test files, the most common warnings were
 ```bash
/home/ubuntu/miniconda3/envs/cugraph_dev/lib/python3.10/site-packages/dask_cudf/io/csv.py:79: FutureWarning: `chunksize` is deprecated and will be removed in the future. Please use `blocksize` instead.
 ```
 The keyword `chunksize` is no longer in use (check [docs](https://docs.rapids.ai/api/dask-cudf/legacy/api/#dask_cudf.read_csv) here). I have checked all related functions in the repository and found that they currently use `chunksize`. If there is a need to change them to `blocksize`, I will create another PR to address this issue.

Any comments and suggestions are welcome!

Authors:
  - Huiyu Xie (https://github.com/huiyuxie)
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #4035
  • Loading branch information
huiyuxie authored Jan 9, 2024
1 parent a59bd76 commit c7b720d
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 12 deletions.
126 changes: 116 additions & 10 deletions python/cugraph/cugraph/datasets/dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -12,10 +12,13 @@
# limitations under the License.

import cudf
import dask_cudf
import yaml
import os
import pandas as pd
import cugraph.dask as dcg
from pathlib import Path
import urllib.request
from cugraph.structure.graph_classes import Graph


Expand Down Expand Up @@ -138,9 +141,8 @@ def __download_csv(self, url):

filename = self.metadata["name"] + self.metadata["file_type"]
if self._dl_path.path.is_dir():
df = cudf.read_csv(url)
self._path = self._dl_path.path / filename
df.to_csv(self._path, index=False)
urllib.request.urlretrieve(url, str(self._path))

else:
raise RuntimeError(
Expand All @@ -149,7 +151,6 @@ def __download_csv(self, url):
return self._path

def unload(self):

"""
Remove all saved internal objects, forcing them to be re-created when
accessed.
Expand All @@ -162,7 +163,7 @@ def unload(self):

def get_edgelist(self, download=False, reader="cudf"):
"""
Return an Edgelist
Return an Edgelist.
Parameters
----------
Expand Down Expand Up @@ -212,6 +213,47 @@ def get_edgelist(self, download=False, reader="cudf"):

return self._edgelist.copy()

def get_dask_edgelist(self, download=False):
"""
Return a distributed Edgelist.
Parameters
----------
download : Boolean (default=False)
Automatically download the dataset from the 'url' location within
the YAML file.
"""
if self._edgelist is None:
full_path = self.get_path()
if not full_path.is_file():
if download:
full_path = self.__download_csv(self.metadata["url"])
else:
raise RuntimeError(
f"The datafile {full_path} does not"
" exist. Try setting download=True"
" to download the datafile"
)

header = None
if isinstance(self.metadata["header"], int):
header = self.metadata["header"]

blocksize = dcg.get_chunksize(full_path)
self._edgelist = dask_cudf.read_csv(
path=full_path,
blocksize=blocksize,
delimiter=self.metadata["delim"],
names=self.metadata["col_names"],
dtype={
self.metadata["col_names"][i]: self.metadata["col_types"][i]
for i in range(len(self.metadata["col_types"]))
},
header=header,
)

return self._edgelist.copy()

def get_graph(
self,
download=False,
Expand Down Expand Up @@ -249,10 +291,10 @@ def get_graph(
if create_using is None:
G = Graph()
elif isinstance(create_using, Graph):
# what about BFS if trnaposed is True
# what about BFS if transposed is True
attrs = {"directed": create_using.is_directed()}
G = type(create_using)(**attrs)
elif type(create_using) is type:
elif issubclass(create_using, Graph):
G = create_using()
else:
raise TypeError(
Expand All @@ -277,9 +319,74 @@ def get_graph(
)
return G

def get_dask_graph(
self,
download=False,
create_using=Graph,
ignore_weights=False,
store_transposed=False,
):
"""
Return a distributed Graph object.
Parameters
----------
download : Boolean (default=False)
Downloads the dataset from the web.
create_using: cugraph.Graph (instance or class), optional
(default=Graph)
Specify the type of Graph to create. Can pass in an instance to
create a Graph instance with specified 'directed' attribute.
ignore_weights : Boolean (default=False)
Ignores weights in the dataset if True, resulting in an
unweighted Graph. If False (the default), weights from the
dataset -if present- will be applied to the Graph. If the
dataset does not contain weights, the Graph returned will
be unweighted regardless of ignore_weights.
store_transposed : bool, optional (default=False)
If True, stores the transpose of the adjacency matrix. Required
for certain algorithms.
"""
if self._edgelist is None:
self.get_dask_edgelist(download)

if create_using is None:
G = Graph()
elif isinstance(create_using, Graph):
attrs = {"directed": create_using.is_directed()}
G = type(create_using)(**attrs)
elif issubclass(create_using, Graph):
G = create_using()
else:
raise TypeError(
"create_using must be a cugraph.Graph "
"(or subclass) type or instance, got: "
f"{type(create_using)}"
)

if len(self.metadata["col_names"]) > 2 and not (ignore_weights):
G.from_dask_cudf_edgelist(
self._edgelist,
source=self.metadata["col_names"][0],
destination=self.metadata["col_names"][1],
edge_attr=self.metadata["col_names"][2],
store_transposed=store_transposed,
)
else:
G.from_dask_cudf_edgelist(
self._edgelist,
source=self.metadata["col_names"][0],
destination=self.metadata["col_names"][1],
store_transposed=store_transposed,
)
return G

def get_path(self):
"""
Returns the location of the stored dataset file
Returns the location of the stored dataset file.
"""
if self._path is None:
self._path = self._dl_path.path / (
Expand Down Expand Up @@ -347,8 +454,7 @@ def download_all(force=False):
filename = meta["name"] + meta["file_type"]
save_to = default_download_dir.path / filename
if not save_to.is_file() or force:
df = cudf.read_csv(meta["url"])
df.to_csv(save_to, index=False)
urllib.request.urlretrieve(meta["url"], str(save_to))


def set_download_dir(path):
Expand Down
77 changes: 75 additions & 2 deletions python/cugraph/cugraph/tests/utils/test_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -20,6 +20,7 @@
import pytest

import cudf
import dask_cudf
from cugraph.structure import Graph
from cugraph.testing import (
RAPIDS_DATASET_ROOT_DIR_PATH,
Expand All @@ -29,6 +30,7 @@
BENCHMARKING_DATASETS,
)
from cugraph import datasets
from cugraph.dask.common.mg_utils import is_single_gpu

# Add the sg marker to all tests in this module.
pytestmark = pytest.mark.sg
Expand All @@ -37,6 +39,7 @@
###############################################################################
# Fixtures


# module fixture - called once for this module
@pytest.fixture(scope="module")
def tmpdir():
Expand Down Expand Up @@ -77,6 +80,7 @@ def setup(tmpdir):
###############################################################################
# Helpers


# check if there is a row where src == dst
def has_selfloop(dataset):
if not dataset.metadata["is_directed"]:
Expand Down Expand Up @@ -115,6 +119,7 @@ def is_symmetric(dataset):
###############################################################################
# Tests


# setting download_dir to None effectively re-initialized the default
def test_env_var():
os.environ["RAPIDS_DATASET_ROOT_DIR"] = "custom_storage_location"
Expand Down Expand Up @@ -150,9 +155,19 @@ def test_download(dataset):
assert dataset.get_path().is_file()


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_download_dask(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert dataset.get_path().is_file()


@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_reader(dataset):
# defaults to using cudf.read_csv
# defaults to using cudf
E = dataset.get_edgelist(download=True)

assert E is not None
Expand All @@ -171,18 +186,46 @@ def test_reader(dataset):
dataset.get_edgelist(reader=None)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_reader_dask(dask_client, dataset):
# using dask_cudf
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert isinstance(E, dask_cudf.core.DataFrame)
dataset.unload()


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_edgelist(dataset):
E = dataset.get_edgelist(download=True)
assert E is not None


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_dask_edgelist(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)
assert E is not None


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_graph(dataset):
G = dataset.get_graph(download=True)
assert G is not None


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_dask_graph(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert G is not None


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_metadata(dataset):
M = dataset.metadata
Expand All @@ -207,6 +250,16 @@ def test_weights(dataset):
assert not G.is_weighted()


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", WEIGHTED_DATASETS)
def test_weights_dask(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert G.is_weighted()
G = dataset.get_dask_graph(download=True, ignore_weights=True)
assert not G.is_weighted()


@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_create_using(dataset):
G = dataset.get_graph(download=True)
Expand All @@ -216,6 +269,26 @@ def test_create_using(dataset):
G = dataset.get_graph(download=True, create_using=Graph(directed=True))
assert G.is_directed()

# using a non-Graph type should raise an error
with pytest.raises(TypeError):
dataset.get_graph(download=True, create_using=set)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_create_using_dask(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert not G.is_directed()
G = dataset.get_dask_graph(download=True, create_using=Graph)
assert not G.is_directed()
G = dataset.get_dask_graph(download=True, create_using=Graph(directed=True))
assert G.is_directed()

# using a non-Graph type should raise an error
with pytest.raises(TypeError):
dataset.get_dask_graph(download=True, create_using=set)


def test_ctor_with_datafile():
from cugraph.datasets import karate
Expand Down

0 comments on commit c7b720d

Please sign in to comment.