diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index ccfdb826812..0f490283795 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -22,7 +22,7 @@ on: default: nightly concurrency: - group: ${{ github.workflow }}-${{ github.ref }} + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} cancel-in-progress: true jobs: diff --git a/ci/test_python.sh b/ci/test_python.sh index 1690ce2f15b..273d3c93482 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -197,27 +197,26 @@ if [[ "${RAPIDS_CUDA_VERSION}" == "11.8.0" ]]; then conda activate test_cugraph_pyg set -u - # Install pytorch + # Will automatically install built dependencies of cuGraph-PyG rapids-mamba-retry install \ - --force-reinstall \ - --channel pyg \ + --channel "${CPP_CHANNEL}" \ + --channel "${PYTHON_CHANNEL}" \ --channel pytorch \ --channel nvidia \ - 'pyg=2.3' \ - 'pytorch=2.0.0' \ - 'pytorch-cuda=11.8' + --channel pyg \ + --channel rapidsai-nightly \ + "cugraph-pyg" \ + "pytorch>=2.0,<2.1" \ + "pytorch-cuda=11.8" # Install pyg dependencies (which requires pip) - pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.0+cu118.html - - rapids-mamba-retry install \ - --channel "${CPP_CHANNEL}" \ - --channel "${PYTHON_CHANNEL}" \ - libcugraph \ - pylibcugraph \ - pylibcugraphops \ - cugraph \ - cugraph-pyg + pip install \ + pyg_lib \ + torch_scatter \ + torch_sparse \ + torch_cluster \ + torch_spline_conv \ + -f https://data.pyg.org/whl/torch-2.0.0+cu118.html rapids-print-env diff --git a/conda/recipes/cugraph-pyg/meta.yaml b/conda/recipes/cugraph-pyg/meta.yaml index 07caf07daab..a2a02a1d9f6 100644 --- a/conda/recipes/cugraph-pyg/meta.yaml +++ b/conda/recipes/cugraph-pyg/meta.yaml @@ -34,7 +34,7 @@ requirements: - cupy >=12.0.0 - cugraph ={{ version }} - pylibcugraphops ={{ minor_version }} - - pyg >=2.3,<2.4 + - pyg >=2.3,<2.5 tests: imports: diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 360165e688d..3e867643041 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -153,6 +153,11 @@ rapids_cpm_init() # lags behind. ### +# Need to make sure rmm is found before cuco so that rmm patches the libcudacxx +# directory to be found by cuco. +include(${rapids-cmake-dir}/cpm/rmm.cmake) +rapids_cpm_rmm(BUILD_EXPORT_SET cugraph-exports + INSTALL_EXPORT_SET cugraph-exports) # Putting this before raft to override RAFT from pulling them in. include(cmake/thirdparty/get_libcudacxx.cmake) include(${rapids-cmake-dir}/cpm/cuco.cmake) diff --git a/dependencies.yaml b/dependencies.yaml index 13f100610cf..a89acd9288b 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -497,9 +497,9 @@ dependencies: - output_types: [conda] packages: - cugraph==23.12.* - - pytorch==2.0 + - pytorch>=2.0 - pytorch-cuda==11.8 - - pyg=2.3.1=*torch_2.0.0*cu118* + - pyg>=2.4.0 depends_on_rmm: common: diff --git a/python/cugraph-pyg/conda/cugraph_pyg_dev_cuda-118.yaml b/python/cugraph-pyg/conda/cugraph_pyg_dev_cuda-118.yaml index f98eab430ba..71d1c7e389c 100644 --- a/python/cugraph-pyg/conda/cugraph_pyg_dev_cuda-118.yaml +++ b/python/cugraph-pyg/conda/cugraph_pyg_dev_cuda-118.yaml @@ -13,13 +13,13 @@ dependencies: - cugraph==23.12.* - pandas - pre-commit -- pyg=2.3.1=*torch_2.0.0*cu118* +- pyg>=2.4.0 - pylibcugraphops==23.12.* - pytest - pytest-benchmark - pytest-cov - pytest-xdist - pytorch-cuda==11.8 -- pytorch==2.0 +- pytorch>=2.0 - scipy name: cugraph_pyg_dev_cuda-118 diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index d1b24543956..edeeface4c4 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -210,7 +210,10 @@ class EXPERIMENTAL__CuGraphStore: def __init__( self, F: cugraph.gnn.FeatureStore, - G: Union[Dict[str, Tuple[TensorType]], Dict[str, int]], + G: Union[ + Dict[Tuple[str, str, str], Tuple[TensorType]], + Dict[Tuple[str, str, str], int], + ], num_nodes_dict: Dict[str, int], *, multi_gpu: bool = False, @@ -744,7 +747,7 @@ def _subgraph(self, edge_types: List[tuple] = None) -> cugraph.MultiGraph: def _get_vertex_groups_from_sample( self, nodes_of_interest: TensorType, is_sorted: bool = False - ) -> dict: + ) -> Dict[str, torch.Tensor]: """ Given a tensor of nodes of interest, this method a single dictionary, noi_index. @@ -808,7 +811,10 @@ def _get_sample_from_vertex_groups( def _get_renumbered_edge_groups_from_sample( self, sampling_results: cudf.DataFrame, noi_index: dict - ) -> Tuple[dict, dict]: + ) -> Tuple[ + Dict[Tuple[str, str, str], torch.Tensor], + Tuple[Dict[Tuple[str, str, str], torch.Tensor]], + ]: """ Given a cudf (NOT dask_cudf) DataFrame of sampling results and a dictionary of non-renumbered vertex ids grouped by vertex type, this method diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index 8552e7412e0..ad8d22e255e 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -15,6 +15,7 @@ import os import re +import warnings import cupy import cudf @@ -159,23 +160,34 @@ def __init__( if batch_size is None or batch_size < 1: raise ValueError("Batch size must be >= 1") - self.__directory = tempfile.TemporaryDirectory(dir=directory) + self.__directory = ( + tempfile.TemporaryDirectory() if directory is None else directory + ) if isinstance(num_neighbors, dict): raise ValueError("num_neighbors dict is currently unsupported!") - renumber = ( - True - if ( - (len(self.__graph_store.node_types) == 1) - and (len(self.__graph_store.edge_types) == 1) + if "renumber" in kwargs: + warnings.warn( + "Setting renumbering manually could result in invalid output," + " please ensure you intended to do this." + ) + renumber = kwargs.pop("renumber") + else: + renumber = ( + True + if ( + (len(self.__graph_store.node_types) == 1) + and (len(self.__graph_store.edge_types) == 1) + ) + else False ) - else False - ) bulk_sampler = BulkSampler( batch_size, - self.__directory.name, + self.__directory + if isinstance(self.__directory, str) + else self.__directory.name, self.__graph_store._subgraph(edge_types), fanout_vals=num_neighbors, with_replacement=replace, @@ -219,7 +231,13 @@ def __init__( ) bulk_sampler.flush() - self.__input_files = iter(os.listdir(self.__directory.name)) + self.__input_files = iter( + os.listdir( + self.__directory + if isinstance(self.__directory, str) + else self.__directory.name + ) + ) def __next__(self): from time import perf_counter @@ -423,9 +441,6 @@ def __next__(self): sampler_output.edge, ) else: - if self.__graph_store.order == "CSR": - raise ValueError("CSR format incompatible with CSC output") - out = filter_cugraph_store_csc( self.__feature_store, self.__graph_store, @@ -437,11 +452,8 @@ def __next__(self): # Account for CSR format in cuGraph vs. CSC format in PyG if self.__coo and self.__graph_store.order == "CSC": - for node_type in out.edge_index_dict: - out[node_type].edge_index[0], out[node_type].edge_index[1] = ( - out[node_type].edge_index[1], - out[node_type].edge_index[0], - ) + for edge_type in out.edge_index_dict: + out[edge_type].edge_index = out[edge_type].edge_index.flip(dims=[0]) out.set_value_dict("num_sampled_nodes", sampler_output.num_sampled_nodes) out.set_value_dict("num_sampled_edges", sampler_output.num_sampled_edges) diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py index ed7f70034e2..13c9c90c7c2 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py @@ -120,7 +120,7 @@ def test_get_edge_index(graph, edge_index_type, dask_client): G[et][0] = dask_cudf.from_cudf(cudf.Series(G[et][0]), npartitions=1) G[et][1] = dask_cudf.from_cudf(cudf.Series(G[et][1]), npartitions=1) - cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + cugraph_store = CuGraphStore(F, G, N, order="CSC", multi_gpu=True) for pyg_can_edge_type in G: src, dst = cugraph_store.get_edge_index( diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py index 853836dc2a6..27b73bf7d35 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py @@ -18,6 +18,7 @@ import cudf import cupy +import numpy as np from cugraph_pyg.loader import CuGraphNeighborLoader from cugraph_pyg.loader import BulkSampleLoader @@ -27,6 +28,8 @@ from cugraph.gnn import FeatureStore from cugraph.utilities.utils import import_optional, MissingModule +from typing import Dict, Tuple + torch = import_optional("torch") torch_geometric = import_optional("torch_geometric") trim_to_layer = import_optional("torch_geometric.utils.trim_to_layer") @@ -40,7 +43,11 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -def test_cugraph_loader_basic(karate_gnn): +def test_cugraph_loader_basic( + karate_gnn: Tuple[ + FeatureStore, Dict[Tuple[str, str, str], np.ndarray], Dict[str, int] + ] +): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N, order="CSR") loader = CuGraphNeighborLoader( @@ -66,7 +73,11 @@ def test_cugraph_loader_basic(karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -def test_cugraph_loader_hetero(karate_gnn): +def test_cugraph_loader_hetero( + karate_gnn: Tuple[ + FeatureStore, Dict[Tuple[str, str, str], np.ndarray], Dict[str, int] + ] +): F, G, N = karate_gnn cugraph_store = CuGraphStore(F, G, N, order="CSR") loader = CuGraphNeighborLoader( @@ -342,7 +353,7 @@ def test_cugraph_loader_e2e_coo(): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") @pytest.mark.skipif(not HAS_TORCH_SPARSE, reason="torch-sparse not available") @pytest.mark.parametrize("framework", ["pyg", "cugraph-ops"]) -def test_cugraph_loader_e2e_csc(framework): +def test_cugraph_loader_e2e_csc(framework: str): m = [2, 9, 99, 82, 9, 3, 18, 1, 12] x = torch.randint(3000, (256, 256)).to(torch.float32) F = FeatureStore() @@ -442,3 +453,40 @@ def test_cugraph_loader_e2e_csc(framework): x = x.narrow(dim=0, start=0, length=s - num_sampled_nodes[1]) assert list(x.shape) == [1, 1] + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.parametrize("directory", ["local", "temp"]) +def test_load_directory( + karate_gnn: Tuple[ + FeatureStore, Dict[Tuple[str, str, str], np.ndarray], Dict[str, int] + ], + directory: str, +): + if directory == "local": + local_dir = tempfile.TemporaryDirectory(dir=".") + + cugraph_store = CuGraphStore(*karate_gnn) + cugraph_loader = CuGraphNeighborLoader( + (cugraph_store, cugraph_store), + torch.arange(8, dtype=torch.int64), + 2, + num_neighbors=[8, 4, 2], + random_state=62, + replace=False, + directory=None if directory == "temp" else local_dir.name, + batches_per_partition=1, + ) + + it = iter(cugraph_loader) + next_batch = next(it) + assert next_batch is not None + + if directory == "local": + assert len(os.listdir(local_dir.name)) == 4 + + count = 1 + while next(it, None) is not None: + count += 1 + + assert count == 4 diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py index da3043760d4..b39ebad8254 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py @@ -113,7 +113,7 @@ def test_get_edge_index(graph, edge_index_type): G[et][0] = cudf.Series(G[et][0]) G[et][1] = cudf.Series(G[et][1]) - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSC") for pyg_can_edge_type in G: src, dst = cugraph_store.get_edge_index( diff --git a/python/cugraph/cugraph/datasets/dataset.py b/python/cugraph/cugraph/datasets/dataset.py index 71a9affa274..dd7aa0df00a 100644 --- a/python/cugraph/cugraph/datasets/dataset.py +++ b/python/cugraph/cugraph/datasets/dataset.py @@ -14,6 +14,7 @@ import cudf import yaml import os +import pandas as pd from pathlib import Path from cugraph.structure.graph_classes import Graph @@ -159,9 +160,9 @@ def unload(self): """ self._edgelist = None - def get_edgelist(self, download=False, reader=cudf.read_csv): + def get_edgelist(self, download=False, reader="cudf"): """ - Return a DataFrame that represents a graph edgelist. + Return an Edgelist Parameters ---------- @@ -169,10 +170,8 @@ def get_edgelist(self, download=False, reader=cudf.read_csv): Automatically download the dataset from the 'url' location within the YAML file. - reader : callable (default=cudf.read_csv) - A callable to use to read the dataset. The callable must be - compatible with the pandas/cudf read_csv() function and return a - compatible DataFrame. + reader : 'cudf' or 'pandas' (default='cudf') + The library used to read a CSV and return an edgelist DataFrame. """ if self._edgelist is None: full_path = self.get_path() @@ -185,14 +184,29 @@ def get_edgelist(self, download=False, reader=cudf.read_csv): " exist. Try setting download=True" " to download the datafile" ) + header = None if isinstance(self.metadata["header"], int): header = self.metadata["header"] - self._edgelist = reader( - full_path, + + if reader == "cudf": + self.__reader = cudf.read_csv + elif reader == "pandas": + self.__reader = pd.read_csv + else: + raise ValueError( + "reader must be a module with a read_csv function compatible with \ + cudf.read_csv" + ) + + self._edgelist = self.__reader( + filepath_or_buffer=full_path, delimiter=self.metadata["delim"], names=self.metadata["col_names"], - dtype=self.metadata["col_types"], + dtype={ + self.metadata["col_names"][i]: self.metadata["col_types"][i] + for i in range(len(self.metadata["col_types"])) + }, header=header, ) diff --git a/python/cugraph/cugraph/tests/utils/test_dataset.py b/python/cugraph/cugraph/tests/utils/test_dataset.py index fa8ccfbad20..60bc6dbb45a 100644 --- a/python/cugraph/cugraph/tests/utils/test_dataset.py +++ b/python/cugraph/cugraph/tests/utils/test_dataset.py @@ -16,6 +16,7 @@ from pathlib import Path from tempfile import TemporaryDirectory +import pandas import pytest import cudf @@ -149,6 +150,27 @@ def test_download(dataset): assert dataset.get_path().is_file() +@pytest.mark.parametrize("dataset", SMALL_DATASETS) +def test_reader(dataset): + # defaults to using cudf.read_csv + E = dataset.get_edgelist(download=True) + + assert E is not None + assert isinstance(E, cudf.core.dataframe.DataFrame) + dataset.unload() + + # using pandas + E_pd = dataset.get_edgelist(download=True, reader="pandas") + + assert E_pd is not None + assert isinstance(E_pd, pandas.core.frame.DataFrame) + dataset.unload() + + with pytest.raises(ValueError): + dataset.get_edgelist(reader="fail") + dataset.get_edgelist(reader=None) + + @pytest.mark.parametrize("dataset", ALL_DATASETS) def test_get_edgelist(dataset): E = dataset.get_edgelist(download=True)