diff --git a/python/cugraph/cugraph/datasets/dataset.py b/python/cugraph/cugraph/datasets/dataset.py index dd7aa0df00a..9817d15dacb 100644 --- a/python/cugraph/cugraph/datasets/dataset.py +++ b/python/cugraph/cugraph/datasets/dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -12,10 +12,13 @@ # limitations under the License. import cudf +import dask_cudf import yaml import os import pandas as pd +import cugraph.dask as dcg from pathlib import Path +import urllib.request from cugraph.structure.graph_classes import Graph @@ -138,9 +141,8 @@ def __download_csv(self, url): filename = self.metadata["name"] + self.metadata["file_type"] if self._dl_path.path.is_dir(): - df = cudf.read_csv(url) self._path = self._dl_path.path / filename - df.to_csv(self._path, index=False) + urllib.request.urlretrieve(url, str(self._path)) else: raise RuntimeError( @@ -149,7 +151,6 @@ def __download_csv(self, url): return self._path def unload(self): - """ Remove all saved internal objects, forcing them to be re-created when accessed. @@ -162,7 +163,7 @@ def unload(self): def get_edgelist(self, download=False, reader="cudf"): """ - Return an Edgelist + Return an Edgelist. Parameters ---------- @@ -212,6 +213,47 @@ def get_edgelist(self, download=False, reader="cudf"): return self._edgelist.copy() + def get_dask_edgelist(self, download=False): + """ + Return a distributed Edgelist. + + Parameters + ---------- + download : Boolean (default=False) + Automatically download the dataset from the 'url' location within + the YAML file. + """ + if self._edgelist is None: + full_path = self.get_path() + if not full_path.is_file(): + if download: + full_path = self.__download_csv(self.metadata["url"]) + else: + raise RuntimeError( + f"The datafile {full_path} does not" + " exist. Try setting download=True" + " to download the datafile" + ) + + header = None + if isinstance(self.metadata["header"], int): + header = self.metadata["header"] + + blocksize = dcg.get_chunksize(full_path) + self._edgelist = dask_cudf.read_csv( + path=full_path, + blocksize=blocksize, + delimiter=self.metadata["delim"], + names=self.metadata["col_names"], + dtype={ + self.metadata["col_names"][i]: self.metadata["col_types"][i] + for i in range(len(self.metadata["col_types"])) + }, + header=header, + ) + + return self._edgelist.copy() + def get_graph( self, download=False, @@ -249,10 +291,10 @@ def get_graph( if create_using is None: G = Graph() elif isinstance(create_using, Graph): - # what about BFS if trnaposed is True + # what about BFS if transposed is True attrs = {"directed": create_using.is_directed()} G = type(create_using)(**attrs) - elif type(create_using) is type: + elif issubclass(create_using, Graph): G = create_using() else: raise TypeError( @@ -277,9 +319,74 @@ def get_graph( ) return G + def get_dask_graph( + self, + download=False, + create_using=Graph, + ignore_weights=False, + store_transposed=False, + ): + """ + Return a distributed Graph object. + + Parameters + ---------- + download : Boolean (default=False) + Downloads the dataset from the web. + + create_using: cugraph.Graph (instance or class), optional + (default=Graph) + Specify the type of Graph to create. Can pass in an instance to + create a Graph instance with specified 'directed' attribute. + + ignore_weights : Boolean (default=False) + Ignores weights in the dataset if True, resulting in an + unweighted Graph. If False (the default), weights from the + dataset -if present- will be applied to the Graph. If the + dataset does not contain weights, the Graph returned will + be unweighted regardless of ignore_weights. + + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + """ + if self._edgelist is None: + self.get_dask_edgelist(download) + + if create_using is None: + G = Graph() + elif isinstance(create_using, Graph): + attrs = {"directed": create_using.is_directed()} + G = type(create_using)(**attrs) + elif issubclass(create_using, Graph): + G = create_using() + else: + raise TypeError( + "create_using must be a cugraph.Graph " + "(or subclass) type or instance, got: " + f"{type(create_using)}" + ) + + if len(self.metadata["col_names"]) > 2 and not (ignore_weights): + G.from_dask_cudf_edgelist( + self._edgelist, + source=self.metadata["col_names"][0], + destination=self.metadata["col_names"][1], + edge_attr=self.metadata["col_names"][2], + store_transposed=store_transposed, + ) + else: + G.from_dask_cudf_edgelist( + self._edgelist, + source=self.metadata["col_names"][0], + destination=self.metadata["col_names"][1], + store_transposed=store_transposed, + ) + return G + def get_path(self): """ - Returns the location of the stored dataset file + Returns the location of the stored dataset file. """ if self._path is None: self._path = self._dl_path.path / ( @@ -347,8 +454,7 @@ def download_all(force=False): filename = meta["name"] + meta["file_type"] save_to = default_download_dir.path / filename if not save_to.is_file() or force: - df = cudf.read_csv(meta["url"]) - df.to_csv(save_to, index=False) + urllib.request.urlretrieve(meta["url"], str(save_to)) def set_download_dir(path): diff --git a/python/cugraph/cugraph/tests/utils/test_dataset.py b/python/cugraph/cugraph/tests/utils/test_dataset.py index 60bc6dbb45a..39f7ed8850b 100644 --- a/python/cugraph/cugraph/tests/utils/test_dataset.py +++ b/python/cugraph/cugraph/tests/utils/test_dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,6 +20,7 @@ import pytest import cudf +import dask_cudf from cugraph.structure import Graph from cugraph.testing import ( RAPIDS_DATASET_ROOT_DIR_PATH, @@ -29,6 +30,7 @@ BENCHMARKING_DATASETS, ) from cugraph import datasets +from cugraph.dask.common.mg_utils import is_single_gpu # Add the sg marker to all tests in this module. pytestmark = pytest.mark.sg @@ -37,6 +39,7 @@ ############################################################################### # Fixtures + # module fixture - called once for this module @pytest.fixture(scope="module") def tmpdir(): @@ -77,6 +80,7 @@ def setup(tmpdir): ############################################################################### # Helpers + # check if there is a row where src == dst def has_selfloop(dataset): if not dataset.metadata["is_directed"]: @@ -115,6 +119,7 @@ def is_symmetric(dataset): ############################################################################### # Tests + # setting download_dir to None effectively re-initialized the default def test_env_var(): os.environ["RAPIDS_DATASET_ROOT_DIR"] = "custom_storage_location" @@ -150,9 +155,19 @@ def test_download(dataset): assert dataset.get_path().is_file() +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_download_dask(dask_client, dataset): + E = dataset.get_dask_edgelist(download=True) + + assert E is not None + assert dataset.get_path().is_file() + + @pytest.mark.parametrize("dataset", SMALL_DATASETS) def test_reader(dataset): - # defaults to using cudf.read_csv + # defaults to using cudf E = dataset.get_edgelist(download=True) assert E is not None @@ -171,18 +186,46 @@ def test_reader(dataset): dataset.get_edgelist(reader=None) +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", SMALL_DATASETS) +def test_reader_dask(dask_client, dataset): + # using dask_cudf + E = dataset.get_dask_edgelist(download=True) + + assert E is not None + assert isinstance(E, dask_cudf.core.DataFrame) + dataset.unload() + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_get_edgelist(dataset): E = dataset.get_edgelist(download=True) assert E is not None +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_get_dask_edgelist(dask_client, dataset): + E = dataset.get_dask_edgelist(download=True) + assert E is not None + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_get_graph(dataset): G = dataset.get_graph(download=True) assert G is not None +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", ALL_DATASETS) +def test_get_dask_graph(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert G is not None + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_metadata(dataset): M = dataset.metadata @@ -207,6 +250,16 @@ def test_weights(dataset): assert not G.is_weighted() +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", WEIGHTED_DATASETS) +def test_weights_dask(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert G.is_weighted() + G = dataset.get_dask_graph(download=True, ignore_weights=True) + assert not G.is_weighted() + + @pytest.mark.parametrize("dataset", SMALL_DATASETS) def test_create_using(dataset): G = dataset.get_graph(download=True) @@ -216,6 +269,26 @@ def test_create_using(dataset): G = dataset.get_graph(download=True, create_using=Graph(directed=True)) assert G.is_directed() + # using a non-Graph type should raise an error + with pytest.raises(TypeError): + dataset.get_graph(download=True, create_using=set) + + +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.skip(reason="MG not supported on CI") +@pytest.mark.parametrize("dataset", SMALL_DATASETS) +def test_create_using_dask(dask_client, dataset): + G = dataset.get_dask_graph(download=True) + assert not G.is_directed() + G = dataset.get_dask_graph(download=True, create_using=Graph) + assert not G.is_directed() + G = dataset.get_dask_graph(download=True, create_using=Graph(directed=True)) + assert G.is_directed() + + # using a non-Graph type should raise an error + with pytest.raises(TypeError): + dataset.get_dask_graph(download=True, create_using=set) + def test_ctor_with_datafile(): from cugraph.datasets import karate