From c7b720d87deb59f934dcc922644e8bd9ced0cdd9 Mon Sep 17 00:00:00 2001 From: Huiyu Xie Date: Tue, 9 Jan 2024 08:56:10 -0800 Subject: [PATCH] [FEA]: Add DASK edgelist and graph support to the Dataset API (#4035) Hi! I choose to go further with some simple work other than docs. This PR is going to close #3218. Here is what I have done in this PR: 1. Added `get_dask_edgelist()` and `get_dask_graph()` (and another internal helper function `__download_dask_csv()`) to Dataset API. 2. Executed all necessary tests for these new functions. 3. Improved existing functions in the Dataset API and conducted tests to verify improvements. Here are some additional details regarding this PR: 1. The building and testing were conducted using version 23.12 instead of the default 24.02. Since Cugraph-ops library is no longer open, I failed to build from source using version 24.02. I built and tested the code in version 23.12 and then transferred the updated file to 24.02 before creating this PR. (I would appreciate any guidance on how to build from version 24.02 for external contributors). 2. All tests from the test file have passed, but some warnings remain, as shown below ```bash ============================================================ warnings summary ============================================================ cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0] cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0] cugraph/tests/utils/test_dataset.py::test_get_dask_graph[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] cugraph/tests/utils/test_dataset.py::test_weights_dask[dataset0] /home/ubuntu/miniconda3/envs/cugraph_dev/lib/python3.10/site-packages/cudf/core/index.py:3284: FutureWarning: cudf.StringIndex is deprecated and will be removed from cudf in a future version. Use cudf.Index with the appropriate dtype instead. warnings.warn( -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ``` I think above warnings came from the function call `from_dask_cudf_edgelist` but currently I have no idea how to remove them. I will do my best to address it if anyone has any ideas about it. 3. The `get_edgelist()` function returns a deep copy of the object, but this is not supported for `get_dask_edgelist()` since only shallow copy is allowed for Dask cuDF dataframe (see [docs](https://docs.rapids.ai/api/dask-cudf/legacy/api/#dask_cudf.DataFrame.copy)). This will lead to a problem where if a user modifies the dataframe, the changes will be reflected in the internal `self._edgelist` object. So when `get_dask_graph()` is called later, the resulting graph will differ from the one directly constructed from the data file. 4. I am uncertain about the requirements for (1) Identifying datasets and (2) Adding them to Dataset. If there is a need to add another function for determining whether a dataset requires MG handling based on its size, or to tag the dataset metadata (.yaml file) to indicate the necessity for MG processing, please let me know. Also, I welcome any suggestions for further features. 5. When I ran pytest on other test files, the most common warnings were ```bash /home/ubuntu/miniconda3/envs/cugraph_dev/lib/python3.10/site-packages/dask_cudf/io/csv.py:79: FutureWarning: `chunksize` is deprecated and will be removed in the future. Please use `blocksize` instead. ``` The keyword `chunksize` is no longer in use (check [docs](https://docs.rapids.ai/api/dask-cudf/legacy/api/#dask_cudf.read_csv) here). I have checked all related functions in the repository and found that they currently use `chunksize`. If there is a need to change them to `blocksize`, I will create another PR to address this issue. Any comments and suggestions are welcome! Authors: - Huiyu Xie (https://github.com/huiyuxie) - Rick Ratzel (https://github.com/rlratzel) Approvers: - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/4035 --- python/cugraph/cugraph/datasets/dataset.py | 126 ++++++++++++++++-- .../cugraph/tests/utils/test_dataset.py | 77 ++++++++++- 2 files changed, 191 insertions(+), 12 deletions(-) diff --git a/python/cugraph/cugraph/datasets/dataset.py b/python/cugraph/cugraph/datasets/dataset.py index dd7aa0df00a..9817d15dacb 100644 --- a/python/cugraph/cugraph/datasets/dataset.py +++ b/python/cugraph/cugraph/datasets/dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -12,10 +12,13 @@ # limitations under the License. import cudf +import dask_cudf import yaml import os import pandas as pd +import cugraph.dask as dcg from pathlib import Path +import urllib.request from cugraph.structure.graph_classes import Graph @@ -138,9 +141,8 @@ def __download_csv(self, url): filename = self.metadata["name"] + self.metadata["file_type"] if self._dl_path.path.is_dir(): - df = cudf.read_csv(url) self._path = self._dl_path.path / filename - df.to_csv(self._path, index=False) + urllib.request.urlretrieve(url, str(self._path)) else: raise RuntimeError( @@ -149,7 +151,6 @@ def __download_csv(self, url): return self._path def unload(self): - """ Remove all saved internal objects, forcing them to be re-created when accessed. @@ -162,7 +163,7 @@ def unload(self): def get_edgelist(self, download=False, reader="cudf"): """ - Return an Edgelist + Return an Edgelist. Parameters ---------- @@ -212,6 +213,47 @@ def get_edgelist(self, download=False, reader="cudf"): return self._edgelist.copy() + def get_dask_edgelist(self, download=False): + """ + Return a distributed Edgelist. + + Parameters + ---------- + download : Boolean (default=False) + Automatically download the dataset from the 'url' location within + the YAML file. + """ + if self._edgelist is None: + full_path = self.get_path() + if not full_path.is_file(): + if download: + full_path = self.__download_csv(self.metadata["url"]) + else: + raise RuntimeError( + f"The datafile {full_path} does not" + " exist. Try setting download=True" + " to download the datafile" + ) + + header = None + if isinstance(self.metadata["header"], int): + header = self.metadata["header"] + + blocksize = dcg.get_chunksize(full_path) + self._edgelist = dask_cudf.read_csv( + path=full_path, + blocksize=blocksize, + delimiter=self.metadata["delim"], + names=self.metadata["col_names"], + dtype={ + self.metadata["col_names"][i]: self.metadata["col_types"][i] + for i in range(len(self.metadata["col_types"])) + }, + header=header, + ) + + return self._edgelist.copy() + def get_graph( self, download=False, @@ -249,10 +291,10 @@ def get_graph( if create_using is None: G = Graph() elif isinstance(create_using, Graph): - # what about BFS if trnaposed is True + # what about BFS if transposed is True attrs = {"directed": create_using.is_directed()} G = type(create_using)(**attrs) - elif type(create_using) is type: + elif issubclass(create_using, Graph): G = create_using() else: raise TypeError( @@ -277,9 +319,74 @@ def get_graph( ) return G + def get_dask_graph( + self, + download=False, + create_using=Graph, + ignore_weights=False, + store_transposed=False, + ): + """ + Return a distributed Graph object. + + Parameters + ---------- + download : Boolean (default=False) + Downloads the dataset from the web. + + create_using: cugraph.Graph (instance or class), optional + (default=Graph) + Specify the type of Graph to create. Can pass in an instance to + create a Graph instance with specified 'directed' attribute. + + ignore_weights : Boolean (default=False) + Ignores weights in the dataset if True, resulting in an + unweighted Graph. If False (the default), weights from the + dataset -if present- will be applied to the Graph. If the + dataset does not contain weights, the Graph returned will + be unweighted regardless of ignore_weights. + + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + """ + if self._edgelist is None: + self.get_dask_edgelist(download) + + if create_using is None: + G = Graph() + elif isinstance(create_using, Graph): + attrs = {"directed": create_using.is_directed()} + G = type(create_using)(**attrs) + elif issubclass(create_using, Graph): + G = create_using() + else: + raise TypeError( + "create_using must be a cugraph.Graph " + "(or subclass) type or instance, got: " + f"{type(create_using)}" + ) + + if len(self.metadata["col_names"]) > 2 and not (ignore_weights): + G.from_dask_cudf_edgelist( + self._edgelist, + source=self.metadata["col_names"][0], + destination=self.metadata["col_names"][1], + edge_attr=self.metadata["col_names"][2], + store_transposed=store_transposed, + ) + else: + G.from_dask_cudf_edgelist( + self._edgelist, + source=self.metadata["col_names"][0], + destination=self.metadata["col_names"][1], + store_transposed=store_transposed, + ) + return G + def get_path(self): """ - Returns the location of the stored dataset file + Returns the location of the stored dataset file. """ if self._path is None: self._path = self._dl_path.path / ( @@ -347,8 +454,7 @@ def download_all(force=False): filename = meta["name"] + meta["file_type"] save_to = default_download_dir.path / filename if not save_to.is_file() or force: - df = cudf.read_csv(meta["url"]) - df.to_csv(save_to, index=False) + urllib.request.urlretrieve(meta["url"], str(save_to)) def set_download_dir(path): diff --git a/python/cugraph/cugraph/tests/utils/test_dataset.py b/python/cugraph/cugraph/tests/utils/test_dataset.py index 60bc6dbb45a..39f7ed8850b 100644 --- a/python/cugraph/cugraph/tests/utils/test_dataset.py +++ b/python/cugraph/cugraph/tests/utils/test_dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,6 +20,7 @@ import pytest import cudf +import dask_cudf from cugraph.structure import Graph from cugraph.testing import ( RAPIDS_DATASET_ROOT_DIR_PATH, @@ -29,6 +30,7 @@ BENCHMARKING_DATASETS, ) from cugraph import datasets +from cugraph.dask.common.mg_utils import is_single_gpu # Add the sg marker to all tests in this module. pytestmark = pytest.mark.sg @@ -37,6 +39,7 @@ ############################################################################### # Fixtures + # module fixture - called once for this module @pytest.fixture(scope="module") def tmpdir(): @@ -77,6 +80,7 @@ def setup(tmpdir): ############################################################################### # Helpers + # check if there is a row where src == dst def has_selfloop(dataset): if not dataset.metadata["is_directed"]: @@ -115,6 +119,7 @@ def is_symmetric(dataset): ############################################################################### # Tests + # setting download_dir to None effectively re-initialized the default def test_env_var(): os.environ["RAPIDS_DATASET_ROOT_DIR"] = "custom_storage_location" @@ -150,9 +155,19 @@ def test_download(dataset): assert dataset.get_path().is_file() +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_download_dask(dask_client, dataset): + E = dataset.get_dask_edgelist(download=True) + + assert E is not None + assert dataset.get_path().is_file() + + @pytest.mark.parametrize("dataset", SMALL_DATASETS) def test_reader(dataset): - # defaults to using cudf.read_csv + # defaults to using cudf E = dataset.get_edgelist(download=True) assert E is not None @@ -171,18 +186,46 @@ def test_reader(dataset): dataset.get_edgelist(reader=None) +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", SMALL_DATASETS) +def test_reader_dask(dask_client, dataset): + # using dask_cudf + E = dataset.get_dask_edgelist(download=True) + + assert E is not None + assert isinstance(E, dask_cudf.core.DataFrame) + dataset.unload() + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_get_edgelist(dataset): E = dataset.get_edgelist(download=True) assert E is not None +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_get_dask_edgelist(dask_client, dataset): + E = dataset.get_dask_edgelist(download=True) + assert E is not None + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_get_graph(dataset): G = dataset.get_graph(download=True) assert G is not None +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_get_dask_graph(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert G is not None + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_metadata(dataset): M = dataset.metadata @@ -207,6 +250,16 @@ def test_weights(dataset): assert not G.is_weighted() +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", WEIGHTED_DATASETS) +def test_weights_dask(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert G.is_weighted() + G = dataset.get_dask_graph(download=True, ignore_weights=True) + assert not G.is_weighted() + + @pytest.mark.parametrize("dataset", SMALL_DATASETS) def test_create_using(dataset): G = dataset.get_graph(download=True) @@ -216,6 +269,26 @@ def test_create_using(dataset): G = dataset.get_graph(download=True, create_using=Graph(directed=True)) assert G.is_directed() + # using a non-Graph type should raise an error + with pytest.raises(TypeError): + dataset.get_graph(download=True, create_using=set) + + +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", SMALL_DATASETS) +def test_create_using_dask(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert not G.is_directed() + G = dataset.get_dask_graph(download=True, create_using=Graph) + assert not G.is_directed() + G = dataset.get_dask_graph(download=True, create_using=Graph(directed=True)) + assert G.is_directed() + + # using a non-Graph type should raise an error + with pytest.raises(TypeError): + dataset.get_dask_graph(download=True, create_using=set) + def test_ctor_with_datafile(): from cugraph.datasets import karate