From b38e010bd81e8d53f953970e5603f5890efbc4b6 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Wed, 31 Jul 2024 12:04:18 -0400 Subject: [PATCH] [FEA] New Graph Interface and Loaders for Distributed Sampling in DGL (#4486) - [x] New Graph class that duck-types `DGLGraph` - [x] New WholeGraph feature store that implements DGL's `FeatureStorage` - [x] New Sampler interface - [x] New Loader interface - [x] New Sampler for distributed uniform neighbor sampling without dask - [x] New Loader for distributed uniform neighbor sampling without dask This PR also adds `pydantic` and `torchdata` as test dependencies of cuGraph-DGL, because they are unlisted runtime dependencies of the latest version of DGL (which we will be testing with in the new `cugraph-gnn` repository). This PR also changes the test file structure to match the rest of cuGraph, by using the __mg_ suffix to denote multi-GPU tests and moving them to the main directory under "tests". New examples will be added in a future PR. This PR is just for basic functionality and tests. Closes #3805 Closes rapidsai/cugraph-gnn#4 Closes #4400 Closes #4247 Closes #3303 Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Ralph Liu (https://github.com/nv-rliu) Approvers: - Vibhu Jawa (https://github.com/VibhuJawa) - Ray Douglass (https://github.com/raydouglass) - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/4486 --- .../all_cuda-118_arch-x86_64.yaml | 2 + .../all_cuda-125_arch-x86_64.yaml | 2 + conda/recipes/cugraph-dgl/meta.yaml | 4 +- dependencies.yaml | 5 +- .../conda/cugraph_dgl_dev_cuda-118.yaml | 1 + python/cugraph-dgl/cugraph_dgl/__init__.py | 8 +- python/cugraph-dgl/cugraph_dgl/convert.py | 54 +- .../cugraph_dgl/dataloading/__init__.py | 20 +- .../dataloading/dask_dataloader.py | 321 ++++++ .../cugraph_dgl/dataloading/dataloader.py | 385 +++----- .../dataloading/neighbor_sampler.py | 151 ++- .../cugraph_dgl/dataloading/sampler.py | 193 ++++ .../dataloading/utils/sampling_helpers.py | 167 +++- python/cugraph-dgl/cugraph_dgl/features.py | 121 +++ python/cugraph-dgl/cugraph_dgl/graph.py | 910 ++++++++++++++++++ .../cugraph-dgl/cugraph_dgl/nn/conv/base.py | 25 +- .../{ => cugraph_dgl}/tests/__init__.py | 0 .../{ => cugraph_dgl}/tests/conftest.py | 0 .../dataloading/test_dask_dataloader.py} | 4 +- .../dataloading/test_dask_dataloader_mg.py} | 4 +- .../tests/dataloading/test_dataloader.py | 128 +++ .../tests/dataloading/test_dataloader_mg.py | 181 ++++ .../tests/dataloading}/test_dataset.py | 0 .../tests/nn/test_gatconv.py | 0 .../tests/nn/test_gatv2conv.py | 0 .../tests/nn/test_relgraphconv.py | 0 .../tests/nn/test_sageconv.py | 0 .../tests/nn/test_sparsegraph.py | 0 .../tests/nn/test_transformerconv.py | 0 .../tests/test_cugraph_storage.py | 0 .../tests/test_from_dgl_heterograph.py | 41 +- .../cugraph_dgl/tests/test_graph.py | 217 +++++ .../cugraph_dgl/tests/test_graph_mg.py | 310 ++++++ .../{ => cugraph_dgl}/tests/test_utils.py | 0 python/cugraph-dgl/cugraph_dgl/tests/utils.py | 154 +++ python/cugraph-dgl/cugraph_dgl/typing.py | 40 + .../utils/cugraph_conversion_utils.py | 15 +- python/cugraph-dgl/cugraph_dgl/view.py | 310 ++++++ python/cugraph-dgl/tests/utils.py | 67 -- 39 files changed, 3464 insertions(+), 376 deletions(-) create mode 100644 python/cugraph-dgl/cugraph_dgl/dataloading/dask_dataloader.py create mode 100644 python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py create mode 100644 python/cugraph-dgl/cugraph_dgl/features.py create mode 100644 python/cugraph-dgl/cugraph_dgl/graph.py rename python/cugraph-dgl/{ => cugraph_dgl}/tests/__init__.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/conftest.py (100%) rename python/cugraph-dgl/{tests/test_dataloader.py => cugraph_dgl/tests/dataloading/test_dask_dataloader.py} (98%) rename python/cugraph-dgl/{tests/mg/test_dataloader.py => cugraph_dgl/tests/dataloading/test_dask_dataloader_mg.py} (97%) create mode 100644 python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader.py create mode 100644 python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader_mg.py rename python/cugraph-dgl/{tests => cugraph_dgl/tests/dataloading}/test_dataset.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_gatconv.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_gatv2conv.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_relgraphconv.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_sageconv.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_sparsegraph.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/nn/test_transformerconv.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/test_cugraph_storage.py (100%) rename python/cugraph-dgl/{ => cugraph_dgl}/tests/test_from_dgl_heterograph.py (83%) create mode 100644 python/cugraph-dgl/cugraph_dgl/tests/test_graph.py create mode 100644 python/cugraph-dgl/cugraph_dgl/tests/test_graph_mg.py rename python/cugraph-dgl/{ => cugraph_dgl}/tests/test_utils.py (100%) create mode 100644 python/cugraph-dgl/cugraph_dgl/tests/utils.py create mode 100644 python/cugraph-dgl/cugraph_dgl/typing.py create mode 100644 python/cugraph-dgl/cugraph_dgl/view.py delete mode 100644 python/cugraph-dgl/tests/utils.py diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 5d2c942cd0c..5474c087532 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -46,6 +46,7 @@ dependencies: - packaging>=21 - pandas - pre-commit +- pydantic - pydata-sphinx-theme - pylibcugraphops==24.8.*,>=0.0.0a0 - pylibraft==24.8.*,>=0.0.0a0 @@ -72,6 +73,7 @@ dependencies: - sphinx<6 - sphinxcontrib-websupport - thriftpy2!=0.5.0,!=0.5.1 +- torchdata - ucx-proc=*=gpu - ucx-py==0.39.*,>=0.0.0a0 - wget diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index f8a95169ddd..54049a92061 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -51,6 +51,7 @@ dependencies: - packaging>=21 - pandas - pre-commit +- pydantic - pydata-sphinx-theme - pylibcugraphops==24.8.*,>=0.0.0a0 - pylibraft==24.8.*,>=0.0.0a0 @@ -77,6 +78,7 @@ dependencies: - sphinx<6 - sphinxcontrib-websupport - thriftpy2!=0.5.0,!=0.5.1 +- torchdata - ucx-proc=*=gpu - ucx-py==0.39.*,>=0.0.0a0 - wget diff --git a/conda/recipes/cugraph-dgl/meta.yaml b/conda/recipes/cugraph-dgl/meta.yaml index ca4fdb7f2fc..0affe456b73 100644 --- a/conda/recipes/cugraph-dgl/meta.yaml +++ b/conda/recipes/cugraph-dgl/meta.yaml @@ -28,8 +28,10 @@ requirements: - numba >=0.57 - numpy >=1.23,<2.0a0 - pylibcugraphops ={{ minor_version }} + - tensordict >=0.1.2 - python - - pytorch + - pytorch >=2.0 + - cupy >=12.0.0 tests: imports: diff --git a/dependencies.yaml b/dependencies.yaml index 82affb08dac..42eb8f2703a 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -659,6 +659,7 @@ dependencies: - *cugraph_unsuffixed - pytorch>=2.0 - pytorch-cuda==11.8 + - &tensordict tensordict>=0.1.2 - dgl>=1.1.0.cu* cugraph_pyg_dev: common: @@ -667,7 +668,7 @@ dependencies: - *cugraph_unsuffixed - pytorch>=2.0 - pytorch-cuda==11.8 - - &tensordict tensordict>=0.1.2 + - *tensordict - pyg>=2.5,<2.6 depends_on_pytorch: @@ -675,6 +676,8 @@ dependencies: - output_types: [conda] packages: - &pytorch_unsuffixed pytorch>=2.0,<2.2.0a0 + - torchdata + - pydantic specific: - output_types: [requirements] diff --git a/python/cugraph-dgl/conda/cugraph_dgl_dev_cuda-118.yaml b/python/cugraph-dgl/conda/cugraph_dgl_dev_cuda-118.yaml index 63771a75064..28c2fc81eeb 100644 --- a/python/cugraph-dgl/conda/cugraph_dgl_dev_cuda-118.yaml +++ b/python/cugraph-dgl/conda/cugraph_dgl_dev_cuda-118.yaml @@ -21,4 +21,5 @@ dependencies: - pytorch-cuda==11.8 - pytorch>=2.0 - scipy +- tensordict>=0.1.2 name: cugraph_dgl_dev_cuda-118 diff --git a/python/cugraph-dgl/cugraph_dgl/__init__.py b/python/cugraph-dgl/cugraph_dgl/__init__.py index 03ff50896a4..58850d47fba 100644 --- a/python/cugraph-dgl/cugraph_dgl/__init__.py +++ b/python/cugraph-dgl/cugraph_dgl/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,8 +15,12 @@ # to prevent rapids context being created when importing cugraph_dgl os.environ["RAPIDS_NO_INITIALIZE"] = "1" +from cugraph_dgl.graph import Graph from cugraph_dgl.cugraph_storage import CuGraphStorage -from cugraph_dgl.convert import cugraph_storage_from_heterograph +from cugraph_dgl.convert import ( + cugraph_storage_from_heterograph, + cugraph_dgl_graph_from_heterograph, +) import cugraph_dgl.dataloading import cugraph_dgl.nn diff --git a/python/cugraph-dgl/cugraph_dgl/convert.py b/python/cugraph-dgl/cugraph_dgl/convert.py index 1235f07adf1..ae4b96dd391 100644 --- a/python/cugraph-dgl/cugraph_dgl/convert.py +++ b/python/cugraph-dgl/cugraph_dgl/convert.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -12,6 +12,8 @@ # limitations under the License. from __future__ import annotations from cugraph.utilities.utils import import_optional + +import cugraph_dgl from cugraph_dgl import CuGraphStorage from cugraph_dgl.utils.cugraph_conversion_utils import ( get_edges_dict_from_dgl_HeteroGraph, @@ -39,3 +41,53 @@ def cugraph_storage_from_heterograph( add_ndata_from_dgl_HeteroGraph(gs, g) add_edata_from_dgl_HeteroGraph(gs, g) return gs + + +def cugraph_dgl_graph_from_heterograph( + input_graph: dgl.DGLGraph, + single_gpu: bool = True, + ndata_storage: str = "torch", + edata_storage: str = "torch", + **kwargs, +) -> cugraph_dgl.Graph: + """ + Converts a DGL Graph to a cuGraph-DGL Graph. + """ + + output_graph = cugraph_dgl.Graph( + is_multi_gpu=(not single_gpu), + ndata_storage=ndata_storage, + edata_storage=edata_storage, + **kwargs, + ) + + # Calling is_homogeneous does not work here + if len(input_graph.ntypes) <= 1: + output_graph.add_nodes( + input_graph.num_nodes(), data=input_graph.ndata, ntype=input_graph.ntypes[0] + ) + else: + for ntype in input_graph.ntypes: + data = { + k: v_dict[ntype] + for k, v_dict in input_graph.ndata.items() + if ntype in v_dict + } + output_graph.add_nodes(input_graph.num_nodes(ntype), data=data, ntype=ntype) + + if len(input_graph.canonical_etypes) <= 1: + can_etype = input_graph.canonical_etypes[0] + src_t, dst_t = input_graph.edges(form="uv", etype=can_etype) + output_graph.add_edges(src_t, dst_t, input_graph.edata, etype=can_etype) + else: + for can_etype in input_graph.canonical_etypes: + data = { + k: v_dict[can_etype] + for k, v_dict in input_graph.edata.items() + if can_etype in v_dict + } + + src_t, dst_t = input_graph.edges(form="uv", etype=can_etype) + output_graph.add_edges(src_t, dst_t, data=data, etype=can_etype) + + return output_graph diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py index 2fd7d29bd49..8a2e9cd954d 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -11,9 +11,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +import warnings + from cugraph_dgl.dataloading.dataset import ( HomogenousBulkSamplerDataset, HeterogenousBulkSamplerDataset, ) + +from cugraph_dgl.dataloading.sampler import Sampler from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler -from cugraph_dgl.dataloading.dataloader import DataLoader + +from cugraph_dgl.dataloading.dask_dataloader import DaskDataLoader +from cugraph_dgl.dataloading.dataloader import DataLoader as FutureDataLoader + + +def DataLoader(*args, **kwargs): + warnings.warn( + "DataLoader has been renamed to DaskDataLoader. " + "In Release 24.10, cugraph_dgl.dataloading.FutureDataLoader " + "will take over the DataLoader name.", + FutureWarning, + ) + return DaskDataLoader(*args, **kwargs) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dask_dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dask_dataloader.py new file mode 100644 index 00000000000..e220b93f738 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dask_dataloader.py @@ -0,0 +1,321 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations +import os +import shutil +import cugraph_dgl +import cupy as cp +import cudf +from cugraph.utilities.utils import import_optional +from cugraph.gnn import BulkSampler +from dask.distributed import default_client, Event +from cugraph_dgl.dataloading import ( + HomogenousBulkSamplerDataset, + HeterogenousBulkSamplerDataset, +) +from cugraph_dgl.dataloading.utils.extract_graph_helpers import ( + create_cugraph_graph_from_edges_dict, +) + +dgl = import_optional("dgl") +torch = import_optional("torch") + + +class DaskDataLoader(torch.utils.data.DataLoader): + """ + Sampled graph data loader. Wrap a :class:`~cugraph_dgl.CuGraphStorage` and a + :class:`~cugraph_dgl.dataloading.NeighborSampler` into + an iterable over mini-batches of samples. cugraph_dgl's ``DataLoader`` extends + PyTorch's ``DataLoader`` by handling creation and + transmission of graph samples. + """ + + def __init__( + self, + graph: cugraph_dgl.CuGraphStorage, + indices: torch.Tensor, + graph_sampler: cugraph_dgl.dataloading.NeighborSampler, + sampling_output_dir: str, + batches_per_partition: int = 50, + seeds_per_call: int = 200_000, + device: torch.device = None, + use_ddp: bool = False, + ddp_seed: int = 0, + batch_size: int = 1024, + drop_last: bool = False, + shuffle: bool = False, + sparse_format: str = "coo", + **kwargs, + ): + """ + Constructor for DaskDataLoader: + ------------------------------- + graph : CuGraphStorage + The graph. + indices : Tensor or dict[ntype, Tensor] + The set of indices. It can either be a tensor of + integer indices or a dictionary of types and indices. + The actual meaning of the indices is defined by the :meth:`sample` method of + :attr:`graph_sampler`. + graph_sampler : cugraph_dgl.dataloading.NeighborSampler + The subgraph sampler. + sampling_output_dir: str + Output directory to share sampling results in + batches_per_partition: int + The number of batches of sampling results to write/read + seeds_per_call: int + The number of seeds to sample at once + device : device context, optional + The device of the generated MFGs in each iteration, which should be a + PyTorch device object (e.g., ``torch.device``). + By default this returns the tenors on device with the current + cuda context + use_ddp : boolean, optional + If True, tells the DataLoader to split the training set for each + participating process appropriately using + :class:`torch.utils.data.distributed.DistributedSampler`. + Overrides the :attr:`sampler` argument of + :class:`torch.utils.data.DataLoader`. + ddp_seed : int, optional + The seed for shuffling the dataset in + :class:`torch.utils.data.distributed.DistributedSampler`. + Only effective when :attr:`use_ddp` is True. + batch_size: int + Batch size. + sparse_format: str, default = "coo" + The sparse format of the emitted sampled graphs. Choose between "csc" + and "coo". When using "csc", the graphs are of type + cugraph_dgl.nn.SparseGraph. + kwargs : dict + Key-word arguments to be passed to the parent PyTorch + :py:class:`torch.utils.data.DataLoader` class. Common arguments are: + - ``batch_size`` (int): The number of indices in each batch. + - ``drop_last`` (bool): Whether to drop the last incomplete + batch. + - ``shuffle`` (bool): Whether to randomly shuffle the + indices at each epoch + Examples + -------- + To train a 3-layer GNN for node classification on a set of nodes + ``train_nid`` on a homogeneous graph where each node takes messages + from 15 neighbors on the first layer, 10 neighbors on the second, and + 5 neighbors on the third: + >>> sampler = cugraph_dgl.dataloading.NeighborSampler([15, 10, 5]) + >>> dataloader = cugraph_dgl.dataloading.DataLoader( + ... g, train_nid, sampler, + ... batch_size=1024, shuffle=True, drop_last=False, num_workers=0) + >>> for input_nodes, output_nodes, blocks in dataloader: + ... train_on(input_nodes, output_nodes, blocks) + **Using with Distributed Data Parallel** + If you are using PyTorch's distributed training (e.g. when using + :mod:`torch.nn.parallel.DistributedDataParallel`), + you can train the model by turning + on the `use_ddp` option: + >>> sampler = cugraph_dgl.dataloading.NeighborSampler([15, 10, 5]) + >>> dataloader = cugraph_dgl.dataloading.DataLoader( + ... g, train_nid, sampler, use_ddp=True, + ... batch_size=1024, shuffle=True, drop_last=False, num_workers=0) + >>> for epoch in range(start_epoch, n_epochs): + ... for input_nodes, output_nodes, blocks in dataloader: + ... + """ + if sparse_format not in ["coo", "csc"]: + raise ValueError( + f"sparse_format must be one of 'coo', 'csc', " + f"but got {sparse_format}." + ) + self.sparse_format = sparse_format + + self.ddp_seed = ddp_seed + self.use_ddp = use_ddp + self.shuffle = shuffle + self.drop_last = drop_last + self.graph_sampler = graph_sampler + worker_init_fn = dgl.dataloading.WorkerInitWrapper( + kwargs.get("worker_init_fn", None) + ) + self.other_storages = {} + self.epoch_number = 0 + self._batch_size = batch_size + self._sampling_output_dir = sampling_output_dir + self._batches_per_partition = batches_per_partition + self._seeds_per_call = seeds_per_call + self._rank = None + + indices = _dgl_idx_to_cugraph_idx(indices, graph) + + self.tensorized_indices_ds = dgl.dataloading.create_tensorized_dataset( + indices, + batch_size, + drop_last, + use_ddp, + ddp_seed, + shuffle, + kwargs.get("persistent_workers", False), + ) + + if len(graph.ntypes) <= 1: + self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset( + total_number_of_nodes=graph.total_number_of_nodes, + edge_dir=self.graph_sampler.edge_dir, + sparse_format=sparse_format, + ) + else: + etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()} + + self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset( + num_nodes_dict=graph.num_nodes_dict, + etype_id_dict=etype_id_to_etype_str_dict, + etype_offset_dict=graph._etype_offset_d, + ntype_offset_dict=graph._ntype_offset_d, + edge_dir=self.graph_sampler.edge_dir, + ) + + if use_ddp: + rank = torch.distributed.get_rank() + client = default_client() + self._graph_creation_event = Event("cugraph_dgl_load_mg_graph_event") + if rank == 0: + G = create_cugraph_graph_from_edges_dict( + edges_dict=graph._edges_dict, + etype_id_dict=graph._etype_id_dict, + edge_dir=graph_sampler.edge_dir, + ) + client.publish_dataset(cugraph_dgl_mg_graph_ds=G) + self._graph_creation_event.set() + else: + if self._graph_creation_event.wait(timeout=1000): + G = client.get_dataset("cugraph_dgl_mg_graph_ds") + else: + raise RuntimeError( + f"Fetch cugraph_dgl_mg_graph_ds to worker_id {rank}", + "from worker_id 0 failed", + ) + else: + rank = 0 + G = create_cugraph_graph_from_edges_dict( + edges_dict=graph._edges_dict, + etype_id_dict=graph._etype_id_dict, + edge_dir=graph_sampler.edge_dir, + ) + + self._rank = rank + self._cugraph_graph = G + super().__init__( + self.cugraph_dgl_dataset, + batch_size=None, + worker_init_fn=worker_init_fn, + collate_fn=lambda x: x, # Hack to prevent collating + **kwargs, + ) + + def __iter__(self): + output_dir = os.path.join( + self._sampling_output_dir, "epoch_" + str(self.epoch_number) + ) + kwargs = {} + if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset): + kwargs["deduplicate_sources"] = True + kwargs["prior_sources_behavior"] = "carryover" + kwargs["renumber"] = True + + if self.sparse_format == "csc": + kwargs["compression"] = "CSR" + kwargs["compress_per_hop"] = True + # The following kwargs will be deprecated in uniform sampler. + kwargs["use_legacy_names"] = False + kwargs["include_hop_column"] = False + + else: + kwargs["deduplicate_sources"] = False + kwargs["prior_sources_behavior"] = None + kwargs["renumber"] = False + + bs = BulkSampler( + output_path=output_dir, + batch_size=self._batch_size, + graph=self._cugraph_graph, + batches_per_partition=self._batches_per_partition, + seeds_per_call=self._seeds_per_call, + fanout_vals=self.graph_sampler._reversed_fanout_vals, + with_replacement=self.graph_sampler.replace, + **kwargs, + ) + + if self.shuffle: + self.tensorized_indices_ds.shuffle() + + batch_df = create_batch_df(self.tensorized_indices_ds) + bs.add_batches(batch_df, start_col_name="start", batch_col_name="batch_id") + bs.flush() + self.cugraph_dgl_dataset.set_input_files(input_directory=output_dir) + self.epoch_number = self.epoch_number + 1 + return super().__iter__() + + def __del__(self): + if self.use_ddp: + torch.distributed.barrier() + if self._rank == 0: + if self.use_ddp: + client = default_client() + client.unpublish_dataset("cugraph_dgl_mg_graph_ds") + self._graph_creation_event.clear() + _clean_directory(self._sampling_output_dir) + + +def get_batch_id_series(n_output_rows: int, batch_size: int) -> cudf.Series: + num_batches = (n_output_rows + batch_size - 1) // batch_size + print(f"Number of batches = {num_batches}".format(num_batches)) + batch_ar = cp.arange(0, num_batches).repeat(batch_size) + batch_ar = batch_ar[0:n_output_rows].astype(cp.int32) + return cudf.Series(batch_ar) + + +def create_batch_df(dataset: torch.Tensor) -> cudf.DataFrame: + batch_id_ls = [] + indices_ls = [] + for batch_id, b_indices in enumerate(dataset): + if isinstance(b_indices, dict): + b_indices = torch.cat(list(b_indices.values())) + batch_id_ar = cp.full(shape=len(b_indices), fill_value=batch_id, dtype=cp.int32) + batch_id_ls.append(batch_id_ar) + indices_ls.append(b_indices) + + batch_id_ar = cp.concatenate(batch_id_ls) + indices_ar = cp.asarray(torch.concat(indices_ls)) + batches_df = cudf.DataFrame( + { + "start": indices_ar, + "batch_id": batch_id_ar, + } + ) + return batches_df + + +def _dgl_idx_to_cugraph_idx(idx, cugraph_gs): + if not isinstance(idx, dict): + if len(cugraph_gs.ntypes) > 1: + raise dgl.DGLError( + "Must specify node type when the graph is not homogeneous." + ) + return idx + else: + return {k: cugraph_gs.dgl_n_id_to_cugraph_id(n, k) for k, n in idx.items()} + + +def _clean_directory(path): + """param could either be relative or absolute.""" + if os.path.isfile(path): + os.remove(path) # remove the file + elif os.path.isdir(path): + shutil.rmtree(path) # remove dir and all contains diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py index 11139910931..21b70b05f3a 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -10,151 +10,121 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import annotations -import os -import shutil -import cugraph_dgl -import cupy as cp -import cudf + +import warnings + +from typing import Union, Optional, Dict + from cugraph.utilities.utils import import_optional -from cugraph.gnn import BulkSampler -from dask.distributed import default_client, Event -from cugraph_dgl.dataloading import ( - HomogenousBulkSamplerDataset, - HeterogenousBulkSamplerDataset, -) -from cugraph_dgl.dataloading.utils.extract_graph_helpers import ( - create_cugraph_graph_from_edges_dict, -) + +import cugraph_dgl +from cugraph_dgl.typing import TensorType +from cugraph_dgl.utils.cugraph_conversion_utils import _cast_to_torch_tensor dgl = import_optional("dgl") torch = import_optional("torch") -class DataLoader(torch.utils.data.DataLoader): +class DataLoader: """ - Sampled graph data loader. Wrap a :class:`~cugraph_dgl.CuGraphStorage` and a - :class:`~cugraph_dgl.dataloading.NeighborSampler` into - an iterable over mini-batches of samples. cugraph_dgl's ``DataLoader`` extends - PyTorch's ``DataLoader`` by handling creation and - transmission of graph samples. + Duck-typed version of dgl.dataloading.DataLoader """ def __init__( self, - graph: cugraph_dgl.CuGraphStorage, - indices: torch.Tensor, - graph_sampler: cugraph_dgl.dataloading.NeighborSampler, - sampling_output_dir: str, - batches_per_partition: int = 50, - seeds_per_call: int = 200_000, - device: torch.device = None, + graph: "cugraph_dgl.Graph", + indices: TensorType, + graph_sampler: "cugraph_dgl.dataloading.Sampler", + device: Union[int, str, "torch.device"] = None, use_ddp: bool = False, ddp_seed: int = 0, - batch_size: int = 1024, + batch_size: int = 1, drop_last: bool = False, shuffle: bool = False, - sparse_format: str = "coo", + use_prefetch_thread: Optional[bool] = None, + use_alternate_streams: Optional[bool] = None, + pin_prefetcher: Optional[bool] = None, + use_uva=False, + gpu_cache: Dict[str, Dict[str, int]] = None, + output_format: str = "dgl.Block", **kwargs, ): """ - Constructor for CuGraphStorage: - ------------------------------- - graph : CuGraphStorage - The graph. - indices : Tensor or dict[ntype, Tensor] - The set of indices. It can either be a tensor of - integer indices or a dictionary of types and indices. - The actual meaning of the indices is defined by the :meth:`sample` method of - :attr:`graph_sampler`. - graph_sampler : cugraph_dgl.dataloading.NeighborSampler - The subgraph sampler. - sampling_output_dir: str - Output directory to share sampling results in - batches_per_partition: int - The number of batches of sampling results to write/read - seeds_per_call: int - The number of seeds to sample at once - device : device context, optional - The device of the generated MFGs in each iteration, which should be a - PyTorch device object (e.g., ``torch.device``). - By default this returns the tenors on device with the current - cuda context - use_ddp : boolean, optional - If True, tells the DataLoader to split the training set for each - participating process appropriately using - :class:`torch.utils.data.distributed.DistributedSampler`. - Overrides the :attr:`sampler` argument of - :class:`torch.utils.data.DataLoader`. - ddp_seed : int, optional - The seed for shuffling the dataset in - :class:`torch.utils.data.distributed.DistributedSampler`. - Only effective when :attr:`use_ddp` is True. - batch_size: int - Batch size. - sparse_format: str, default = "coo" - The sparse format of the emitted sampled graphs. Choose between "csc" - and "coo". When using "csc", the graphs are of type - cugraph_dgl.nn.SparseGraph. - kwargs : dict - Key-word arguments to be passed to the parent PyTorch - :py:class:`torch.utils.data.DataLoader` class. Common arguments are: - - ``batch_size`` (int): The number of indices in each batch. - - ``drop_last`` (bool): Whether to drop the last incomplete - batch. - - ``shuffle`` (bool): Whether to randomly shuffle the - indices at each epoch - Examples - -------- - To train a 3-layer GNN for node classification on a set of nodes - ``train_nid`` on a homogeneous graph where each node takes messages - from 15 neighbors on the first layer, 10 neighbors on the second, and - 5 neighbors on the third: - >>> sampler = cugraph_dgl.dataloading.NeighborSampler([15, 10, 5]) - >>> dataloader = cugraph_dgl.dataloading.DataLoader( - ... g, train_nid, sampler, - ... batch_size=1024, shuffle=True, drop_last=False, num_workers=0) - >>> for input_nodes, output_nodes, blocks in dataloader: - ... train_on(input_nodes, output_nodes, blocks) - **Using with Distributed Data Parallel** - If you are using PyTorch's distributed training (e.g. when using - :mod:`torch.nn.parallel.DistributedDataParallel`), - you can train the model by turning - on the `use_ddp` option: - >>> sampler = cugraph_dgl.dataloading.NeighborSampler([15, 10, 5]) - >>> dataloader = cugraph_dgl.dataloading.DataLoader( - ... g, train_nid, sampler, use_ddp=True, - ... batch_size=1024, shuffle=True, drop_last=False, num_workers=0) - >>> for epoch in range(start_epoch, n_epochs): - ... for input_nodes, output_nodes, blocks in dataloader: - ... + Parameters + ---------- + graph: cugraph_dgl.Graph + The graph being sampled. Can be a single-GPU or multi-GPU graph. + indices: TensorType + The seed nodes for sampling. If use_ddp=True, then all seed + nodes should be provided. If use_ddp=False, then only the seed + nodes assigned to this worker should be provided. + graph_sampler: cugraph_dgl.dataloading.Sampler + The sampler responsible for sampling the graph and producing + output minibatches. + device: Union[int, str, torch.device] + Optional. + The device assigned to this loader ('cpu', 'cuda' or device id). + Defaults to the current device. + use_ddp: bool + Optional (default=False). + If true, this argument will assume the entire list of input seed + nodes is being passed to each worker, and will appropriately + split and shuffle the list. + It false, then it is assumed that the list of input seed nodes + is comprised of the union of the lists provided to each worker. + ddp_seed: int + Optional (default=0). + The seed used for dividing and shuffling data if use_ddp=True. + Has no effect if use_ddp=False. + use_uva: bool + Optional (default=False). + Whether to use pinned memory and unified virtual addressing + to perform sampling. + This argument is ignored by cuGraph-DGL. + use_prefetch_thread: bool + Optional (default=False). + Whether to spawn a new thread for feature fetching. + This argument is ignored by cuGraph-DGL. + use_alternate_streams: bool + Optional (default=False). + Whether to perform feature fetching on a separate stream. + This argument is ignored by cuGraph-DGL. + pin_prefetcher: bool + Optional (default=False). + Whether to pin the feature tensors. + This argument is currently ignored by cuGraph-DGL. + gpu_cache: Dict[str, Dict[str, int]] + List of features to cache using HugeCTR. + This argument is not supported by cuGraph-DGL and + will result in an error. + output_format: str + Optional (default="dgl.Block"). + The output format for blocks. + Can be either "dgl.Block" or "cugraph_dgl.nn.SparseGraph". """ - if sparse_format not in ["coo", "csc"]: + + if use_uva: + warnings.warn("The 'use_uva' argument is ignored by cuGraph-DGL.") + if use_prefetch_thread: + warnings.warn( + "The 'use_prefetch_thread' argument is ignored by cuGraph-DGL." + ) + if use_alternate_streams: + warnings.warn( + "The 'use_alternate_streams' argument is ignored by cuGraph-DGL." + ) + if pin_prefetcher: + warnings.warn("The 'pin_prefetcher' argument is ignored by cuGraph-DGL.") + if gpu_cache: raise ValueError( - f"sparse_format must be one of 'coo', 'csc', " - f"but got {sparse_format}." + "HugeCTR is not supported by cuGraph-DGL. " + "Consider using WholeGraph for feature storage" + " in cugraph_dgl.Graph instead." ) - self.sparse_format = sparse_format - self.ddp_seed = ddp_seed - self.use_ddp = use_ddp - self.shuffle = shuffle - self.drop_last = drop_last - self.graph_sampler = graph_sampler - worker_init_fn = dgl.dataloading.WorkerInitWrapper( - kwargs.get("worker_init_fn", None) - ) - self.other_storages = {} - self.epoch_number = 0 - self._batch_size = batch_size - self._sampling_output_dir = sampling_output_dir - self._batches_per_partition = batches_per_partition - self._seeds_per_call = seeds_per_call - self._rank = None - - indices = _dgl_idx_to_cugraph_idx(indices, graph) + indices = _cast_to_torch_tensor(indices) - self.tensorized_indices_ds = dgl.dataloading.create_tensorized_dataset( + self.__dataset = dgl.dataloading.create_tensorized_dataset( indices, batch_size, drop_last, @@ -164,158 +134,25 @@ def __init__( kwargs.get("persistent_workers", False), ) - if len(graph.ntypes) <= 1: - self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset( - total_number_of_nodes=graph.total_number_of_nodes, - edge_dir=self.graph_sampler.edge_dir, - sparse_format=sparse_format, - ) - else: - etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()} - - self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset( - num_nodes_dict=graph.num_nodes_dict, - etype_id_dict=etype_id_to_etype_str_dict, - etype_offset_dict=graph._etype_offset_d, - ntype_offset_dict=graph._ntype_offset_d, - edge_dir=self.graph_sampler.edge_dir, - ) + self.__output_format = output_format + self.__sampler = graph_sampler + self.__batch_size = batch_size + self.__graph = graph + self.__device = device - if use_ddp: - rank = torch.distributed.get_rank() - client = default_client() - self._graph_creation_event = Event("cugraph_dgl_load_mg_graph_event") - if rank == 0: - G = create_cugraph_graph_from_edges_dict( - edges_dict=graph._edges_dict, - etype_id_dict=graph._etype_id_dict, - edge_dir=graph_sampler.edge_dir, - ) - client.publish_dataset(cugraph_dgl_mg_graph_ds=G) - self._graph_creation_event.set() - else: - if self._graph_creation_event.wait(timeout=1000): - G = client.get_dataset("cugraph_dgl_mg_graph_ds") - else: - raise RuntimeError( - f"Fetch cugraph_dgl_mg_graph_ds to worker_id {rank}", - "from worker_id 0 failed", - ) - else: - rank = 0 - G = create_cugraph_graph_from_edges_dict( - edges_dict=graph._edges_dict, - etype_id_dict=graph._etype_id_dict, - edge_dir=graph_sampler.edge_dir, - ) - - self._rank = rank - self._cugraph_graph = G - super().__init__( - self.cugraph_dgl_dataset, - batch_size=None, - worker_init_fn=worker_init_fn, - collate_fn=lambda x: x, # Hack to prevent collating - **kwargs, - ) + @property + def dataset( + self, + ) -> Union[ + "dgl.dataloading.dataloader.TensorizedDataset", + "dgl.dataloading.dataloader.DDPTensorizedDataset", + ]: + return self.__dataset def __iter__(self): - output_dir = os.path.join( - self._sampling_output_dir, "epoch_" + str(self.epoch_number) + # TODO move to the correct device (rapidsai/cugraph-gnn#11) + return self.__sampler.sample( + self.__graph, + self.__dataset, + batch_size=self.__batch_size, ) - kwargs = {} - if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset): - kwargs["deduplicate_sources"] = True - kwargs["prior_sources_behavior"] = "carryover" - kwargs["renumber"] = True - - if self.sparse_format == "csc": - kwargs["compression"] = "CSR" - kwargs["compress_per_hop"] = True - # The following kwargs will be deprecated in uniform sampler. - kwargs["use_legacy_names"] = False - kwargs["include_hop_column"] = False - - else: - kwargs["deduplicate_sources"] = False - kwargs["prior_sources_behavior"] = None - kwargs["renumber"] = False - - bs = BulkSampler( - output_path=output_dir, - batch_size=self._batch_size, - graph=self._cugraph_graph, - batches_per_partition=self._batches_per_partition, - seeds_per_call=self._seeds_per_call, - fanout_vals=self.graph_sampler._reversed_fanout_vals, - with_replacement=self.graph_sampler.replace, - **kwargs, - ) - - if self.shuffle: - self.tensorized_indices_ds.shuffle() - - batch_df = create_batch_df(self.tensorized_indices_ds) - bs.add_batches(batch_df, start_col_name="start", batch_col_name="batch_id") - bs.flush() - self.cugraph_dgl_dataset.set_input_files(input_directory=output_dir) - self.epoch_number = self.epoch_number + 1 - return super().__iter__() - - def __del__(self): - if self.use_ddp: - torch.distributed.barrier() - if self._rank == 0: - if self.use_ddp: - client = default_client() - client.unpublish_dataset("cugraph_dgl_mg_graph_ds") - self._graph_creation_event.clear() - _clean_directory(self._sampling_output_dir) - - -def get_batch_id_series(n_output_rows: int, batch_size: int): - num_batches = (n_output_rows + batch_size - 1) // batch_size - print(f"Number of batches = {num_batches}".format(num_batches)) - batch_ar = cp.arange(0, num_batches).repeat(batch_size) - batch_ar = batch_ar[0:n_output_rows].astype(cp.int32) - return cudf.Series(batch_ar) - - -def create_batch_df(dataset: torch.Tensor): - batch_id_ls = [] - indices_ls = [] - for batch_id, b_indices in enumerate(dataset): - if isinstance(b_indices, dict): - b_indices = torch.cat(list(b_indices.values())) - batch_id_ar = cp.full(shape=len(b_indices), fill_value=batch_id, dtype=cp.int32) - batch_id_ls.append(batch_id_ar) - indices_ls.append(b_indices) - - batch_id_ar = cp.concatenate(batch_id_ls) - indices_ar = cp.asarray(torch.concat(indices_ls)) - batches_df = cudf.DataFrame( - { - "start": indices_ar, - "batch_id": batch_id_ar, - } - ) - return batches_df - - -def _dgl_idx_to_cugraph_idx(idx, cugraph_gs): - if not isinstance(idx, dict): - if len(cugraph_gs.ntypes) > 1: - raise dgl.DGLError( - "Must specify node type when the graph is not homogeneous." - ) - return idx - else: - return {k: cugraph_gs.dgl_n_id_to_cugraph_id(n, k) for k, n in idx.items()} - - -def _clean_directory(path): - """param could either be relative or absolute.""" - if os.path.isfile(path): - os.remove(path) # remove the file - elif os.path.isdir(path): - shutil.rmtree(path) # remove dir and all contains diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py b/python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py index b61f05f6379..1a35c3ea027 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/neighbor_sampler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -10,11 +10,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import annotations -from typing import Sequence +import warnings +import tempfile + +from typing import Sequence, Optional, Union, List, Tuple, Iterator + +from cugraph.gnn import UniformNeighborSampler, DistSampleWriter +from cugraph.utilities.utils import import_optional + +import cugraph_dgl +from cugraph_dgl.typing import DGLSamplerOutput +from cugraph_dgl.dataloading.sampler import Sampler, HomogeneousSampleReader -class NeighborSampler: +torch = import_optional("torch") + + +class NeighborSampler(Sampler): """Sampler that builds computational dependency of node representations via neighbor sampling for multilayer GNN. This sampler will make every node gather messages from a fixed number of neighbors @@ -50,7 +64,88 @@ def __init__( fanouts_per_layer: Sequence[int], edge_dir: str = "in", replace: bool = False, + prob: Optional[str] = None, + mask: Optional[str] = None, + prefetch_node_feats: Optional[Union[List[str], dict[str, List[str]]]] = None, + prefetch_edge_feats: Optional[ + Union[List[str], dict[Tuple[str, str, str], List[str]]] + ] = None, + prefetch_labels: Optional[Union[List[str], dict[str, List[str]]]] = None, + output_device: Optional[Union["torch.device", int, str]] = None, + fused: Optional[bool] = None, + sparse_format="csc", + output_format="dgl.Block", + **kwargs, ): + """ + Parameters + ---------- + fanouts_per_layer: Sequence[int] + The number of neighbors to sample per layer. + edge_dir: str + Optional (default='in'). + The direction to traverse edges. + replace: bool + Optional (default=False). + Whether to sample with replacement. + prob: str + Optional. + If provided, the probability of each neighbor being + sampled is proportional to the edge feature + with the given name. Mutually exclusive with mask. + Currently unsupported. + mask: str + Optional. + If proivided, only neighbors where the edge mask + with the given name is True can be selected. + Mutually exclusive with prob. + Currently unsupported. + prefetch_node_feats: Union[List[str], dict[str, List[str]]] + Optional. + Currently ignored by cuGraph-DGL. + prefetch_edge_feats: Union[List[str], dict[Tuple[str, str, str], List[str]]] + Optional. + Currently ignored by cuGraph-DGL. + prefetch_labels: Union[List[str], dict[str, List[str]]] + Optional. + Currently ignored by cuGraph-DGL. + output_device: Union[torch.device, int, str] + Optional. + Output device for samples. Defaults to the current device. + fused: bool + Optional. + This argument is ignored by cuGraph-DGL. + sparse_format: str + Optional (default = "coo"). + The sparse format of the emitted sampled graphs. + Currently, only "csc" is supported. + output_format: str + Optional (default = "dgl.Block") + The output format of the emitted sampled graphs. + Can be either "dgl.Block" (default), or "cugraph_dgl.nn.SparseGraph". + **kwargs + Keyword arguments for the underlying cuGraph distributed sampler + and writer (directory, batches_per_partition, format, + local_seeds_per_call). + """ + + if mask: + raise NotImplementedError( + "Edge masking is currently unsupported by cuGraph-DGL" + ) + if prob: + raise NotImplementedError( + "Edge masking is currently unsupported by cuGraph-DGL" + ) + if prefetch_edge_feats: + warnings.warn("'prefetch_edge_feats' is ignored by cuGraph-DGL") + if prefetch_node_feats: + warnings.warn("'prefetch_node_feats' is ignored by cuGraph-DGL") + if prefetch_labels: + warnings.warn("'prefetch_labels' is ignored by cuGraph-DGL") + if fused: + warnings.warn("'fused' is ignored by cuGraph-DGL") + self.fanouts = fanouts_per_layer reverse_fanouts = fanouts_per_layer.copy() reverse_fanouts.reverse() @@ -58,3 +153,53 @@ def __init__( self.edge_dir = edge_dir self.replace = replace + self.__kwargs = kwargs + + super().__init__( + sparse_format=sparse_format, + output_format=output_format, + ) + + def sample( + self, + g: "cugraph_dgl.Graph", + indices: Iterator["torch.Tensor"], + batch_size: int = 1, + ) -> Iterator[DGLSamplerOutput]: + kwargs = dict(**self.__kwargs) + + directory = kwargs.pop("directory", None) + if directory is None: + warnings.warn("Setting a directory to store samples is recommended.") + self._tempdir = tempfile.TemporaryDirectory() + directory = self._tempdir.name + + writer = DistSampleWriter( + directory=directory, + batches_per_partition=kwargs.pop("batches_per_partition", 256), + format=kwargs.pop("format", "parquet"), + ) + + ds = UniformNeighborSampler( + g._graph(self.edge_dir), + writer, + compression="CSR", + fanout=self._reversed_fanout_vals, + prior_sources_behavior="carryover", + deduplicate_sources=True, + compress_per_hop=True, + with_replacement=self.replace, + **kwargs, + ) + + if g.is_homogeneous: + indices = torch.concat(list(indices)) + ds.sample_from_nodes(indices, batch_size=batch_size) + return HomogeneousSampleReader( + ds.get_reader(), self.output_format, self.edge_dir + ) + + raise ValueError( + "Sampling heterogeneous graphs is currently" + " unsupported in the non-dask API" + ) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py b/python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py new file mode 100644 index 00000000000..731ec1b8d6f --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/sampler.py @@ -0,0 +1,193 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterator, Dict, Tuple, List, Union + +import cugraph_dgl +from cugraph_dgl.nn import SparseGraph +from cugraph_dgl.typing import DGLSamplerOutput +from cugraph_dgl.dataloading.utils.sampling_helpers import ( + create_homogeneous_sampled_graphs_from_tensors_csc, +) + +from cugraph.gnn import DistSampleReader + +from cugraph.utilities.utils import import_optional + +torch = import_optional("torch") +dgl = import_optional("dgl") + + +class SampleReader: + """ + Iterator that processes results from the cuGraph distributed sampler. + """ + + def __init__(self, base_reader: DistSampleReader, output_format: str = "dgl.Block"): + """ + Constructs a new SampleReader. + + Parameters + ---------- + base_reader: DistSampleReader + The reader responsible for loading saved samples produced by + the cuGraph distributed sampler. + """ + self.__output_format = output_format + self.__base_reader = base_reader + self.__num_samples_remaining = 0 + self.__index = 0 + + @property + def output_format(self) -> str: + return self.__output_format + + def __next__(self) -> DGLSamplerOutput: + if self.__num_samples_remaining == 0: + # raw_sample_data is already a dict of tensors + self.__raw_sample_data, start_inclusive, end_inclusive = next( + self.__base_reader + ) + + self.__decoded_samples = self._decode_all(self.__raw_sample_data) + self.__num_samples_remaining = end_inclusive - start_inclusive + 1 + self.__index = 0 + + out = self.__decoded_samples[self.__index] + self.__index += 1 + self.__num_samples_remaining -= 1 + return out + + def _decode_all(self) -> List[DGLSamplerOutput]: + raise NotImplementedError("Must be implemented by subclass") + + def __iter__(self) -> DGLSamplerOutput: + return self + + +class HomogeneousSampleReader(SampleReader): + """ + Subclass of SampleReader that reads DGL homogeneous output samples + produced by the cuGraph distributed sampler. + """ + + def __init__( + self, + base_reader: DistSampleReader, + output_format: str = "dgl.Block", + edge_dir="in", + ): + """ + Constructs a new HomogeneousSampleReader + + Parameters + ---------- + base_reader: DistSampleReader + The reader responsible for loading saved samples produced by + the cuGraph distributed sampler. + output_format: str + The output format for blocks (either "dgl.Block" or + "cugraph_dgl.nn.SparseGraph"). + edge_dir: str + The direction sampling was performed in ("in" or "out"). + """ + + self.__edge_dir = edge_dir + super().__init__(base_reader, output_format=output_format) + + def __decode_csc( + self, raw_sample_data: Dict[str, "torch.Tensor"] + ) -> List[DGLSamplerOutput]: + return create_homogeneous_sampled_graphs_from_tensors_csc( + raw_sample_data, output_format=self.output_format + ) + + def __decode_coo( + self, raw_sample_data: Dict[str, "torch.Tensor"] + ) -> List[DGLSamplerOutput]: + raise NotImplementedError( + "COO format is currently unsupported in the non-dask API" + ) + + def _decode_all( + self, raw_sample_data: Dict[str, "torch.Tensor"] + ) -> List[DGLSamplerOutput]: + if "major_offsets" in raw_sample_data: + return self.__decode_csc(raw_sample_data) + else: + return self.__decode_coo(raw_sample_data) + + +class Sampler: + """ + Base sampler class for all cugraph-DGL samplers. + """ + + def __init__(self, sparse_format: str = "csc", output_format="dgl.Block"): + """ + Parameters + ---------- + sparse_format: str + Optional (default = "coo"). + The sparse format of the emitted sampled graphs. + Currently, only "csc" is supported. + output_format: str + Optional (default = "dgl.Block") + The output format of the emitted sampled graphs. + Can be either "dgl.Block" (default), or "cugraph_dgl.nn.SparseGraph". + """ + + if sparse_format != "csc": + raise ValueError("Only CSC format is supported at this time") + + self.__output_format = output_format + + @property + def output_format(self): + return self.__output_format + + @property + def sparse_format(self): + return self.__sparse_format + + def sample( + self, + g: cugraph_dgl.Graph, + indices: Iterator["torch.Tensor"], + batch_size: int = 1, + ) -> Iterator[ + Tuple["torch.Tensor", "torch.Tensor", List[Union[SparseGraph, "dgl.Block"]]] + ]: + """ + Samples the graph. + + Parameters + ---------- + g: cugraph_dgl.Graph + The graph being sampled. + indices: TensorType + The node ids of seed nodes where sampling will initiate from. + batch_size: int + The number of seed nodes per batch. + + Returns + ------- + Iterator[DGLSamplerOutput] + Iterator over batches. The returned tuples are in standard + DGL format: (input nodes, output nodes, blocks) where input + nodes are the renumbered input nodes, output nodes are + the renumbered output nodes, and blocks are the output graphs + for each hop. + """ + + raise NotImplementedError("Must be implemented by subclass") diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index 10d851ebade..3b7e4502134 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -404,21 +404,21 @@ def create_heterogenous_dgl_block_from_tensors_dict( return block -def _process_sampled_df_csc( - df: cudf.DataFrame, +def _process_sampled_tensors_csc( + tensors: Dict["torch.Tensor"], reverse_hop_id: bool = True, ) -> Tuple[ - Dict[int, Dict[int, Dict[str, torch.Tensor]]], - List[torch.Tensor], + Dict[int, Dict[int, Dict[str, "torch.Tensor"]]], + List["torch.Tensor"], List[List[int, int]], ]: """ - Convert a dataframe generated by BulkSampler to a dictionary of tensors, to + Convert tensors generated by BulkSampler to a dictionary of tensors, to facilitate MFG creation. The sampled graphs in the dataframe use CSC-format. Parameters ---------- - df: cudf.DataFrame + tensors: Dict[torch.Tensor] The output from BulkSampler compressed in CSC format. The dataframe should be generated with `compression="CSR"` in BulkSampler, since the sampling routine treats seed nodes as sources. @@ -442,12 +442,12 @@ def _process_sampled_df_csc( k-th hop, mfg_sizes[k] and mfg_sizes[k+1] is the number of sources and destinations, respectively. """ - # dropna - major_offsets = cast_to_tensor(df.major_offsets.dropna()) - label_hop_offsets = cast_to_tensor(df.label_hop_offsets.dropna()) - renumber_map_offsets = cast_to_tensor(df.renumber_map_offsets.dropna()) - renumber_map = cast_to_tensor(df["map"].dropna()) - minors = cast_to_tensor(df.minors.dropna()) + + major_offsets = tensors["major_offsets"] + minors = tensors["minors"] + label_hop_offsets = tensors["label_hop_offsets"] + renumber_map = tensors["map"] + renumber_map_offsets = tensors["renumber_map_offsets"] n_batches = len(renumber_map_offsets) - 1 n_hops = int((len(label_hop_offsets) - 1) / n_batches) @@ -511,6 +511,115 @@ def _process_sampled_df_csc( return tensors_dict, renumber_map_list, mfg_sizes.tolist() +def _process_sampled_df_csc( + df: cudf.DataFrame, + reverse_hop_id: bool = True, +): + """ + Convert a dataframe generated by BulkSampler to a dictionary of tensors, to + facilitate MFG creation. The sampled graphs in the dataframe use CSC-format. + + Parameters + ---------- + df: cudf.DataFrame + The output from BulkSampler compressed in CSC format. The dataframe + should be generated with `compression="CSR"` in BulkSampler, + since the sampling routine treats seed nodes as sources. + + reverse_hop_id: bool (default=True) + Reverse hop id. + + Returns + ------- + tensors_dict: dict + A nested dictionary keyed by batch id and hop id. + `tensor_dict[batch_id][hop_id]` holds "minors" and "major_offsets" + values for CSC MFGs. + + renumber_map_list: list + List of renumbering maps for looking up global indices of nodes. One + map for each batch. + + mfg_sizes: list + List of the number of nodes in each message passing layer. For the + k-th hop, mfg_sizes[k] and mfg_sizes[k+1] is the number of sources and + destinations, respectively. + """ + + return _process_sampled_tensors_csc( + { + "major_offsets": cast_to_tensor(df.major_offsets.dropna()), + "label_hop_offsets": cast_to_tensor(df.label_hop_offsets.dropna()), + "renumber_map_offsets": cast_to_tensor(df.renumber_map_offsets.dropna()), + "map": cast_to_tensor(df["map"].dropna()), + "minors": cast_to_tensor(df.minors.dropna()), + }, + reverse_hop_id=reverse_hop_id, + ) + + +def _create_homogeneous_blocks_from_csc( + tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], + renumber_map_list: List[torch.Tensor], + mfg_sizes: List[int, int], +): + """Create mini-batches of MFGs in the dgl.Block format. + The input arguments are the outputs of + the function `_process_sampled_df_csc`. + + Returns + ------- + output: list + A list of mini-batches. Each mini-batch is a list that consists of + `input_nodes` tensor, `output_nodes` tensor and a list of MFGs. + """ + n_batches, n_hops = len(mfg_sizes), len(mfg_sizes[0]) - 1 + output = [] + for b_id in range(n_batches): + output_batch = [] + output_batch.append(renumber_map_list[b_id]) + output_batch.append(renumber_map_list[b_id][: mfg_sizes[b_id][-1]]) + + mfgs = [ + SparseGraph( + size=(mfg_sizes[b_id][h_id], mfg_sizes[b_id][h_id + 1]), + src_ids=tensors_dict[b_id][h_id]["minors"], + cdst_ids=tensors_dict[b_id][h_id]["major_offsets"], + formats=["csc", "coo"], + reduce_memory=True, + ) + for h_id in range(n_hops) + ] + + blocks = [] + seednodes_range = None + for mfg in reversed(mfgs): + block_mfg = _create_homogeneous_dgl_block_from_tensor_d( + { + "sources": mfg.src_ids(), + "destinations": mfg.dst_ids(), + "sources_range": mfg._num_src_nodes - 1, + "destinations_range": mfg._num_dst_nodes - 1, + }, + renumber_map=renumber_map_list[b_id], + seednodes_range=seednodes_range, + ) + + seednodes_range = max( + mfg._num_src_nodes - 1, + mfg._num_dst_nodes - 1, + ) + blocks.append(block_mfg) + del mfgs + + blocks.reverse() + + output_batch.append(blocks) + + output.append(output_batch) + return output + + def _create_homogeneous_sparse_graphs_from_csc( tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], renumber_map_list: List[torch.Tensor], @@ -549,9 +658,35 @@ def _create_homogeneous_sparse_graphs_from_csc( return output -def create_homogeneous_sampled_graphs_from_dataframe_csc(sampled_df: cudf.DataFrame): +def create_homogeneous_sampled_graphs_from_dataframe_csc( + sampled_df: cudf.DataFrame, output_format: str = "cugraph_dgl.nn.SparseGraph" +): + """Public API to create mini-batches of MFGs using a dataframe output by + BulkSampler, where the sampled graph is compressed in CSC format.""" + if output_format == "cugraph_dgl.nn.SparseGraph": + return _create_homogeneous_sparse_graphs_from_csc( + *(_process_sampled_df_csc(sampled_df)), + ) + elif output_format == "dgl.Block": + return _create_homogeneous_blocks_from_csc( + *(_process_sampled_df_csc(sampled_df)), + ) + else: + raise ValueError(f"Invalid output format {output_format}") + + +def create_homogeneous_sampled_graphs_from_tensors_csc( + tensors: Dict["torch.Tensor"], output_format: str = "cugraph_dgl.nn.SparseGraph" +): """Public API to create mini-batches of MFGs using a dataframe output by BulkSampler, where the sampled graph is compressed in CSC format.""" - return _create_homogeneous_sparse_graphs_from_csc( - *(_process_sampled_df_csc(sampled_df)) - ) + if output_format == "cugraph_dgl.nn.SparseGraph": + return _create_homogeneous_sparse_graphs_from_csc( + *(_process_sampled_tensors_csc(tensors)), + ) + elif output_format == "dgl.Block": + return _create_homogeneous_blocks_from_csc( + *(_process_sampled_tensors_csc(tensors)), + ) + else: + raise ValueError(f"Invalid output format {output_format}") diff --git a/python/cugraph-dgl/cugraph_dgl/features.py b/python/cugraph-dgl/cugraph_dgl/features.py new file mode 100644 index 00000000000..9dc009f4127 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/features.py @@ -0,0 +1,121 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings + +from cugraph.utilities.utils import import_optional, MissingModule + +torch = import_optional("torch") +dgl = import_optional("dgl") +wgth = import_optional("pylibwholegraph.torch") + + +class WholeFeatureStore( + object if isinstance(dgl, MissingModule) else dgl.storages.base.FeatureStorage +): + """ + Interface for feature storage. + """ + + def __init__( + self, + tensor: "torch.Tensor", + memory_type: str = "distributed", + location: str = "cpu", + ): + """ + Constructs a new WholeFeatureStore object that wraps a WholeGraph wholememory + distributed tensor. + + Parameters + ---------- + t: torch.Tensor + The local slice of the tensor being distributed. These should be in order + by rank (i.e. rank 0 contains elements 0-9, rank 1 contains elements 10-19, + rank 3 contains elements 20-29, etc.) The sizes do not need to be equal. + memory_type: str (optional, default='distributed') + The memory type of this store. Options are + 'distributed', 'chunked', and 'continuous'. + For more information consult the WholeGraph + documentation. + location: str(optional, default='cpu') + The location ('cpu' or 'cuda') where data is stored. + """ + self.__wg_comm = wgth.get_global_communicator() + + if len(tensor.shape) > 2: + raise ValueError("Only 1-D or 2-D tensors are supported by WholeGraph.") + + rank = torch.distributed.get_rank() + world_size = torch.distributed.get_world_size() + + ld = torch.tensor(tensor.shape[0], device="cuda", dtype=torch.int64) + sizes = torch.empty((world_size,), device="cuda", dtype=torch.int64) + torch.distributed.all_gather_into_tensor(sizes, ld) + + sizes = sizes.cpu() + ld = sizes.sum() + + self.__td = -1 if len(tensor.shape) == 1 else tensor.shape[1] + global_shape = [ + int(ld), + self.__td if self.__td > 0 else 1, + ] + + if self.__td < 0: + tensor = tensor.reshape((tensor.shape[0], 1)) + + wg_tensor = wgth.create_wholememory_tensor( + self.__wg_comm, + memory_type, + location, + global_shape, + tensor.dtype, + [global_shape[1], 1], + ) + + offset = sizes[:rank].sum() if rank > 0 else 0 + + wg_tensor.scatter( + tensor.clone(memory_format=torch.contiguous_format).cuda(), + torch.arange( + offset, offset + tensor.shape[0], dtype=torch.int64, device="cuda" + ).contiguous(), + ) + + self.__wg_comm.barrier() + + self.__wg_tensor = wg_tensor + + def requires_ddp(self) -> bool: + return True + + def fetch( + self, + indices: torch.Tensor, + device: torch.cuda.Device, + pin_memory=False, + **kwargs, + ): + if pin_memory: + warnings.warn("pin_memory has no effect for WholeFeatureStorage.") + + t = self.__wg_tensor.gather( + indices.cuda(), + force_dtype=self.__wg_tensor.dtype, + ) + + if self.__td < 0: + t = t.reshape((t.shape[0],)) + + return t.to(torch.device(device)) diff --git a/python/cugraph-dgl/cugraph_dgl/graph.py b/python/cugraph-dgl/cugraph_dgl/graph.py new file mode 100644 index 00000000000..2eba13c6958 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/graph.py @@ -0,0 +1,910 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings + +from typing import Union, Optional, Dict, Tuple, List + +from cugraph.utilities.utils import import_optional +from cugraph.gnn import cugraph_comms_get_raft_handle + +import cupy +import pylibcugraph + +from cugraph_dgl.typing import TensorType +from cugraph_dgl.utils.cugraph_conversion_utils import _cast_to_torch_tensor +from cugraph_dgl.features import WholeFeatureStore +from cugraph_dgl.view import ( + HeteroNodeView, + HeteroNodeDataView, + HeteroEdgeView, + HeteroEdgeDataView, +) + + +# Have to use import_optional even though these are required +# dependencies in order to build properly. +dgl = import_optional("dgl") +torch = import_optional("torch") +tensordict = import_optional("tensordict") + +HOMOGENEOUS_NODE_TYPE = "n" +HOMOGENEOUS_EDGE_TYPE = (HOMOGENEOUS_NODE_TYPE, "e", HOMOGENEOUS_NODE_TYPE) + + +class Graph: + """ + cuGraph-backed duck-typed version of dgl.DGLGraph that distributes + the graph across workers. This object uses lazy graph creation. + Users can repeatedly call add_edges, and the tensors won't + be converted into a cuGraph graph until one is needed + (i.e. when creating a loader). Supports + single-node/single-GPU, single-node/multi-GPU, and + multi-node/multi-GPU graph storage. + + Each worker should have a slice of the graph locally, and + call put_edge_index with its slice. + """ + + def __init__( + self, + is_multi_gpu: bool = False, + ndata_storage="torch", + edata_storage="torch", + **kwargs, + ): + """ + Parameters + ---------- + is_multi_gpu: bool (optional, default=False) + Specifies whether this graph is distributed across GPUs. + ndata_storage: str (optional, default='torch') + Specifies where node data should be stored + (options are 'torch' and 'wholegraph'). + If using PyTorch tensors for storage ('torch') + then data will be replicated across workers and data + for all nodes should be provided when calling add_nodes. + If using WholeGraph wholememory tensors for storage, + then data will be distributed across workers and only + the local slice of the data should be provided when + calling add_nodes. + edata_storage: str (optional, default='torch') + If using PyTorch tensors for storage ('torch') + then data will be replicated across workers and data + for all nodes should be provided when calling add_edge. + If using WholeGraph wholememory tensors for storage, + then data will be distributed across workers and only + the local slice of the data should be provided when + calling add_edges. + kwargs: + Optional kwargs for WholeGraph feature storage. + """ + + if ndata_storage not in ("torch", "wholegraph"): + raise ValueError( + "Invalid node storage type (valid types are 'torch' and 'wholegraph')" + ) + if edata_storage not in ("torch", "wholegraph"): + raise ValueError( + "Invalid edge storage type (valid types are 'torch' and 'wholegraph')" + ) + + self.__num_nodes_dict = {} + self.__num_edges_dict = {} + self.__edge_indices = tensordict.TensorDict({}, batch_size=(2,)) + + self.__graph = None + self.__vertex_offsets = None + self.__handle = None + self.__is_multi_gpu = is_multi_gpu + + self.__ndata_storage_type = ( + WholeFeatureStore + if ndata_storage == "wholegraph" + else dgl.storages.pytorch_tensor.PyTorchTensorStorage + ) + self.__edata_storage_type = ( + WholeFeatureStore + if edata_storage == "wholegraph" + else dgl.storages.pytorch_tensor.PyTorchTensorStorage + ) + self.__ndata_storage = {} + self.__edata_storage = {} + self.__wg_kwargs = kwargs + + @property + def is_multi_gpu(self): + return self.__is_multi_gpu + + def to_canonical_etype( + self, etype: Union[str, Tuple[str, str, str]] + ) -> Tuple[str, str, str]: + if etype is None: + if len(self.canonical_etypes) > 1: + raise ValueError("Edge type is required for heterogeneous graphs.") + return HOMOGENEOUS_EDGE_TYPE + + if isinstance(etype, tuple) and len(etype) == 3: + return etype + + for src_type, rel_type, dst_type in self.__edge_indices.keys( + leaves_only=True, include_nested=True + ): + if etype == rel_type: + return (src_type, rel_type, dst_type) + + raise ValueError("Unknown relation type " + etype) + + def add_nodes( + self, + global_num_nodes: int, + data: Optional[Dict[str, TensorType]] = None, + ntype: Optional[str] = None, + ): + """ + Adds the given number of nodes to this graph. Can only be called once + per node type. The number of nodes specified here refers to the total + number of nodes across all workers (the entire graph). If the backing + feature store is distributed (i.e. wholegraph), then only local features + should be passed to the data argument. If the backing feature store is + replicated, then features for all nodes in the graph should be passed to + the data argument, including those for nodes not on the local worker. + + Parameters + ---------- + global_num_nodes: int + The total number of nodes of the given type in this graph. + The same number should be passed to every worker. + data: Dict[str, TensorType] (optional, default=None) + Node feature tensors. + ntype: str (optional, default=None) + The node type being modified. Required for heterogeneous graphs. + """ + if ntype is None: + if len(self.__num_nodes_dict.keys()) > 1: + raise ValueError("Node type is required for heterogeneous graphs.") + ntype = HOMOGENEOUS_NODE_TYPE + + if ntype in self.__num_nodes_dict: + raise ValueError( + "Calling add_nodes multiple types for the same " + "node type is not allowed in cuGraph-DGL" + ) + + if self.is_multi_gpu: + # Ensure all nodes got the same number of nodes passed + world_size = torch.distributed.get_world_size() + local_size = torch.tensor( + [global_num_nodes], device="cuda", dtype=torch.int64 + ) + ns = torch.empty((world_size,), device="cuda", dtype=torch.int64) + torch.distributed.all_gather_into_tensor(ns, local_size) + if not (ns == global_num_nodes).all(): + raise ValueError("The global number of nodes must match on all workers") + + # Ensure the sum of the feature shapes equals the global number of nodes. + if data is not None: + for feature_name, feature_tensor in data.items(): + features_size = torch.tensor( + [int(feature_tensor.shape[0])], device="cuda", dtype=torch.int64 + ) + torch.distributed.all_reduce( + features_size, op=torch.distributed.ReduceOp.SUM + ) + if features_size != global_num_nodes: + raise ValueError( + "The total length of the feature vector across workers must" + " match the global number of nodes but it does not " + f"match for {feature_name}." + ) + + self.__num_nodes_dict[ntype] = global_num_nodes + + if data is not None: + for feature_name, feature_tensor in data.items(): + self.__ndata_storage[ntype, feature_name] = self.__ndata_storage_type( + _cast_to_torch_tensor(feature_tensor), **self.__wg_kwargs + ) + + self.__graph = None + self.__vertex_offsets = None + + def __check_node_ids(self, ntype: str, ids: TensorType): + """ + Ensures all node ids in the provided id tensor are valid. + Raises a ValueError if any are invalid. + + Parameters + ---------- + ntype: str + The node type being validated against. + ids: + The tensor of ids being validated. + """ + if ntype in self.__num_nodes_dict: + if ids.max() + 1 > self.num_nodes(ntype): + raise ValueError( + f"input tensor contains invalid node ids for type {ntype}" + ) + else: + raise ValueError( + f"add_nodes() must be called for type {ntype} before calling num_edges." + ) + + def add_edges( + self, + u: TensorType, + v: TensorType, + data: Optional[Dict[str, TensorType]] = None, + etype: Optional[Union[str, Tuple[str, str, str]]] = None, + ) -> None: + """ + Adds edges to this graph. Must be called after add_nodes + is called for the src/dst node type. If the backing feature + store is distributed (i.e. wholegraph), then only local + features should be passed to the data argument. If the + backing feature store is replicated, then features for + all edges should be passed to the data argument, + including those for edges not on the local worker. + + Parameters + ---------- + u: TensorType + 1d tensor of source node ids (local slice of the distributed edgelist). + v: TensorType + 1d tensor of destination node ids (local slice of the distributed edgelist). + data: Dict[str, TensorType] (optional, default=None) + Dictionary containing edge features for the new edges. + etype: Union[str, Tuple[str, str, str]] + The edge type of the edges being inserted. Not required + for homogeneous graphs, which have only one edge type. + """ + + # Validate all inputs before proceeding + # The number of nodes for the src/dst type needs to be known and there cannot + # be any edges of this type in the graph. + dgl_can_edge_type = self.to_canonical_etype(etype) + src_type, _, dst_type = dgl_can_edge_type + if dgl_can_edge_type in self.__edge_indices.keys( + leaves_only=True, include_nested=True + ): + raise ValueError( + "This cuGraph-DGL graph already contains edges of type" + f" {dgl_can_edge_type}. Calling add_edges multiple times" + " for the same edge type is not supported." + ) + self.__check_node_ids(src_type, u) + self.__check_node_ids(dst_type, v) + + self.__edge_indices[dgl_can_edge_type] = torch.stack( + [ + _cast_to_torch_tensor(u), + _cast_to_torch_tensor(v), + ] + ).to(self.idtype) + + if data is not None: + for attr_name, attr_tensor in data.items(): + self.__edata_storage[ + dgl_can_edge_type, attr_name + ] = self.__edata_storage_type( + _cast_to_torch_tensor(attr_tensor), **self.__wg_kwargs + ) + + num_edges = self.__edge_indices[dgl_can_edge_type].shape[1] + if self.is_multi_gpu: + num_edges = torch.tensor([num_edges], device="cuda", dtype=torch.int64) + torch.distributed.all_reduce(num_edges, op=torch.distributed.ReduceOp.SUM) + + self.__num_edges_dict[dgl_can_edge_type] = int(num_edges) + + self.__graph = None + self.__vertex_offsets = None + + def num_nodes(self, ntype: str = None) -> int: + """ + Returns the number of nodes of ntype, or if ntype is not provided, + the total number of nodes in the graph. + """ + if ntype is None: + return sum(self.__num_nodes_dict.values()) + + return self.__num_nodes_dict[ntype] + + def number_of_nodes(self, ntype: str = None) -> int: + """ + Alias for num_nodes. + """ + return self.num_nodes(ntype=ntype) + + def num_edges(self, etype: Union[str, Tuple[str, str, str]] = None) -> int: + """ + Returns the number of edges of etype, or if etype is not provided, + the total number of edges in the graph. + """ + if etype is None: + return sum(self.__num_edges_dict.values()) + + etype = self.to_canonical_etype(etype) + return self.__num_edges_dict[etype] + + def number_of_edges(self, etype: Union[str, Tuple[str, str, str]] = None) -> int: + """ + Alias for num_edges. + """ + return self.num_edges(etype=etype) + + @property + def ntypes(self) -> List[str]: + """ + Returns the node type names in this graph. + """ + return list(self.__num_nodes_dict.keys()) + + @property + def etypes(self) -> List[str]: + """ + Returns the edge type names in this graph + (the second element of the canonical edge + type tuple). + """ + return [et[1] for et in self.__num_edges_dict.keys()] + + @property + def canonical_etypes(self) -> List[str]: + """ + Returns the canonical edge type names in this + graph. + """ + return list(self.__num_edges_dict.keys()) + + @property + def _vertex_offsets(self) -> Dict[str, int]: + if self.__vertex_offsets is None: + ordered_keys = sorted(list(self.ntypes)) + self.__vertex_offsets = {} + offset = 0 + for vtype in ordered_keys: + self.__vertex_offsets[vtype] = offset + offset += self.num_nodes(vtype) + + return dict(self.__vertex_offsets) + + def __get_edgelist(self) -> Dict[str, "torch.Tensor"]: + """ + This function always returns src/dst labels with respect + to the out direction. + + Returns + ------- + Dict[str, torch.Tensor] with the following keys: + src: source vertices (int64) + Note that src is the 1st element of the DGL edge index. + dst: destination vertices (int64) + Note that dst is the 2nd element of the DGL edge index. + eid: edge ids for each edge (int64) + Note that these start from 0 for each edge type. + etp: edge types for each edge (int32) + Note that these are in lexicographic order. + """ + sorted_keys = sorted( + list(self.__edge_indices.keys(leaves_only=True, include_nested=True)) + ) + + # note that this still follows the DGL convention of (src, rel, dst) + # i.e. (author, writes, paper): [[0,1,2],[2,0,1]] is referring to a + # cuGraph graph where (paper 2) -> (author 0), (paper 0) -> (author 1), + # and (paper 1) -> (author 0) + edge_index = torch.concat( + [ + torch.stack( + [ + self.__edge_indices[src_type, rel_type, dst_type][0] + + self._vertex_offsets[src_type], + self.__edge_indices[src_type, rel_type, dst_type][1] + + self._vertex_offsets[dst_type], + ] + ) + for (src_type, rel_type, dst_type) in sorted_keys + ], + axis=1, + ).cuda() + + edge_type_array = torch.arange( + len(sorted_keys), dtype=torch.int32, device="cuda" + ).repeat_interleave( + torch.tensor( + [self.__edge_indices[et].shape[1] for et in sorted_keys], + device="cuda", + dtype=torch.int32, + ) + ) + + if self.is_multi_gpu: + rank = torch.distributed.get_rank() + world_size = torch.distributed.get_world_size() + + num_edges_t = torch.tensor( + [self.__edge_indices[et].shape[1] for et in sorted_keys], device="cuda" + ) + num_edges_all_t = torch.empty( + world_size, num_edges_t.numel(), dtype=torch.int64, device="cuda" + ) + torch.distributed.all_gather_into_tensor(num_edges_all_t, num_edges_t) + + if rank > 0: + start_offsets = num_edges_all_t[:rank].T.sum(axis=1) + edge_id_array = torch.concat( + [ + torch.arange( + start_offsets[i], + start_offsets[i] + num_edges_all_t[rank][i], + dtype=torch.int64, + device="cuda", + ) + for i in range(len(sorted_keys)) + ] + ) + else: + edge_id_array = torch.concat( + [ + torch.arange( + self.__edge_indices[et].shape[1], + dtype=torch.int64, + device="cuda", + ) + for et in sorted_keys + ] + ) + + else: + # single GPU + edge_id_array = torch.concat( + [ + torch.arange( + self.__edge_indices[et].shape[1], + dtype=torch.int64, + device="cuda", + ) + for et in sorted_keys + ] + ) + + return { + "src": edge_index[0], + "dst": edge_index[1], + "etp": edge_type_array, + "eid": edge_id_array, + } + + @property + def is_homogeneous(self): + return len(self.__num_edges_dict) <= 1 and len(self.__num_nodes_dict) <= 1 + + @property + def idtype(self): + return torch.int64 + + @property + def _resource_handle(self): + if self.__handle is None: + if self.is_multi_gpu: + self.__handle = pylibcugraph.ResourceHandle( + cugraph_comms_get_raft_handle().getHandle() + ) + else: + self.__handle = pylibcugraph.ResourceHandle() + return self.__handle + + def _graph( + self, direction: str + ) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]: + """ + Gets the pylibcugraph Graph object with edges pointing in the given direction + (i.e. 'out' is standard, 'in' is reverse). + """ + + if direction not in ["out", "in"]: + raise ValueError(f"Invalid direction {direction} (expected 'in' or 'out').") + + graph_properties = pylibcugraph.GraphProperties( + is_multigraph=True, is_symmetric=False + ) + + if self.__graph is not None and self.__graph[1] != direction: + self.__graph = None + + if self.__graph is None: + src_col, dst_col = ("src", "dst") if direction == "out" else ("dst", "src") + edgelist_dict = self.__get_edgelist() + + if self.is_multi_gpu: + rank = torch.distributed.get_rank() + world_size = torch.distributed.get_world_size() + + vertices_array = cupy.arange(self.num_nodes(), dtype="int64") + vertices_array = cupy.array_split(vertices_array, world_size)[rank] + + self.__graph = ( + pylibcugraph.MGGraph( + self._resource_handle, + graph_properties, + [cupy.asarray(edgelist_dict[src_col]).astype("int64")], + [cupy.asarray(edgelist_dict[dst_col]).astype("int64")], + vertices_array=[vertices_array], + edge_id_array=[cupy.asarray(edgelist_dict["eid"])], + edge_type_array=[cupy.asarray(edgelist_dict["etp"])], + ), + direction, + ) + else: + self.__graph = ( + pylibcugraph.SGGraph( + self._resource_handle, + graph_properties, + cupy.asarray(edgelist_dict[src_col]).astype("int64"), + cupy.asarray(edgelist_dict[dst_col]).astype("int64"), + vertices_array=cupy.arange(self.num_nodes(), dtype="int64"), + edge_id_array=cupy.asarray(edgelist_dict["eid"]), + edge_type_array=cupy.asarray(edgelist_dict["etp"]), + ), + direction, + ) + + return self.__graph[0] + + def _has_n_emb(self, ntype: str, emb_name: str) -> bool: + return (ntype, emb_name) in self.__ndata_storage + + def _get_n_emb( + self, ntype: str, emb_name: str, u: Union[str, TensorType] + ) -> "torch.Tensor": + """ + Gets the embedding of a single node type. + Unlike DGL, this function takes the string node + type name instead of an integer id. + + Parameters + ---------- + ntype: str + The node type to get the embedding of. + emb_name: str + The embedding name of the embedding to get. + u: Union[str, TensorType] + Nodes to get the representation of, or ALL + to get the representation of all nodes of + the given type. + + Returns + ------- + torch.Tensor + The embedding of the given edge type with the given embedding name. + """ + + if ntype is None: + if len(self.ntypes) == 1: + ntype = HOMOGENEOUS_NODE_TYPE + else: + raise ValueError("Must provide the node type for a heterogeneous graph") + + if dgl.base.is_all(u): + u = torch.arange(self.num_nodes(ntype), dtype=self.idtype, device="cpu") + + try: + return self.__ndata_storage[ntype, emb_name].fetch( + _cast_to_torch_tensor(u), "cuda" + ) + except RuntimeError as ex: + warnings.warn( + "Got error accessing data, trying again with index on device: " + + str(ex) + ) + return self.__ndata_storage[ntype, emb_name].fetch( + _cast_to_torch_tensor(u).cuda(), "cuda" + ) + + def _has_e_emb(self, etype: Tuple[str, str, str], emb_name: str) -> bool: + return (etype, emb_name) in self.__edata_storage + + def _get_e_emb( + self, etype: Tuple[str, str, str], emb_name: str, u: Union[str, TensorType] + ) -> "torch.Tensor": + """ + Gets the embedding of a single edge type. + Unlike DGL, this function takes the canonical edge type + instead of an integer id. + + Parameters + ---------- + etype: str + The edge type to get the embedding of. + emb_name: str + The embedding name of the embedding to get. + u: Union[str, TensorType] + Edges to get the representation of, or ALL to + get the representation of all nodes of the + given type. + + Returns + ------- + torch.Tensor + The embedding of the given edge type with the given embedding name. + """ + + etype = self.to_canonical_etype(etype) + + if dgl.base.is_all(u): + u = torch.arange(self.num_edges(etype), dtype=self.idtype, device="cpu") + + try: + return self.__edata_storage[etype, emb_name].fetch( + _cast_to_torch_tensor(u), "cuda" + ) + except RuntimeError as ex: + warnings.warn( + "Got error accessing data, trying again with index on device: " + + str(ex) + ) + return self.__edata_storage[etype, emb_name].fetch( + _cast_to_torch_tensor(u).cuda(), "cuda" + ) + + def _set_n_emb( + self, ntype: str, u: Union[str, TensorType], kv: Dict[str, TensorType] + ) -> None: + """ + Stores or updates the embedding(s) of a single node type. + Unlike DGL, this function takes the string node type name + instead of an integer id. + + The semantics of this function match those of add_nodes + with respect to whether or not the backing feature store + is distributed. + + Parameters + ---------- + ntype: str + The node type to store an embedding of. + u: Union[str, TensorType] + The indices to update, if updating the embedding. + Currently, updating a slice of an embedding is + unsupported, so this should be ALL. + kv: Dict[str, TensorType] + A mapping of embedding names to embedding tensors. + """ + + if not dgl.base.is_all(u): + raise NotImplementedError( + "Updating a slice of an embedding is " + "currently unimplemented in cuGraph-DGL." + ) + + for k, v in kv: + self.__ndata_storage[ntype, k] = self.__ndata_storage_type( + v, + **self.__wg_kwargs, + ) + + def _set_e_emb( + self, etype: str, u: Union[str, TensorType], kv: Dict[str, TensorType] + ) -> None: + """ + Stores or updates the embedding(s) of a single edge type. + Unlike DGL, this function takes the canonical edge type name + instead of an integer id. + + The semantics of this function match those of add_edges + with respect to whether or not the backing feature store + is distributed. + + Parameters + ---------- + etype: str + The edge type to store an embedding of. + u: Union[str, TensorType] + The indices to update, if updating the embedding. + Currently, updating a slice of an embedding is + unsupported, so this should be ALL. + kv: Dict[str, TensorType] + A mapping of embedding names to embedding tensors. + """ + + if not dgl.base.is_all(u): + raise NotImplementedError( + "Updating a slice of an embedding is " + "currently unimplemented in cuGraph-DGL." + ) + + for k, v in kv: + self.__edata_storage[etype, k] = self.__edata_storage_type( + v, + **self.__wg_kwargs, + ) + + def _pop_n_emb(self, ntype: str, key: str) -> "torch.Tensor": + """ + Removes and returns the embedding of the given node + type with the given name. + + Parameters + ---------- + ntype:str + The node type. + key:str + The embedding name. + + Returns + ------- + The removed embedding. + """ + return self.__ndata_storage[ntype, key].pop(key) + + def _pop_e_emb(self, etype: str, key: str) -> "torch.Tensor": + """ + Removes and returns the embedding of the given edge + type with the given name. + + Parameters + ---------- + etype:str + The node type. + key:str + The embedding name. + + Returns + ------- + torch.Tensor + The removed embedding. + """ + return self.__edata_storage[etype, key].pop(key) + + def _get_n_emb_keys(self, ntype: str) -> List[str]: + """ + Gets a list of the embedding names for a given node + type. + + Parameters + ---------- + ntype: str + The node type to get embedding names for. + + Returns + ------- + List[str] + The list of embedding names for the given node type. + """ + return [k for (t, k) in self.__ndata_storage if ntype == t] + + def _get_e_emb_keys(self, etype: str) -> List[str]: + """ + Gets a list of the embedding names for a given edge + type. + + Parameters + ---------- + etype: str + The edge type to get embedding names for. + + Returns + ------- + List[str] + The list of embedding names for the given edge type. + """ + return [k for (t, k) in self.__edata_storage if etype == t] + + def all_edges( + self, + form="uv", + order="eid", + etype: Union[str, Tuple[str, str, str]] = None, + device: Union[str, int, "torch.device"] = "cpu", + ): + """ + Returns all edges with the specified edge type. + cuGraph-DGL currently only supports 'eid' format and + 'eid' order. + + Parameters + ---------- + form: str (optional, default='uv') + The format to return ('uv', 'eid', 'all'). + + order: str (optional, default='eid') + The order to return edges in ('eid', 'srcdst') + cuGraph-DGL currently only supports 'eid'. + etype: Union[str, Tuple[str, str, str]] (optional, default=None) + The edge type to get. Not required if this is + a homogeneous graph. Can be the relation type if the + relation type is unique, or the canonical edge type. + device: Union[str, int, torch.device] (optional, default='cpu') + The device where returned edges should be stored + ('cpu', 'cuda', or device id). + """ + + if order != "eid": + raise NotImplementedError("cugraph-DGL only supports eid order.") + + if etype is None and len(self.canonical_etypes) > 1: + raise ValueError("Edge type is required for heterogeneous graphs.") + + etype = self.to_canonical_etype(etype) + + if form == "eid": + return torch.arange( + 0, + self.__num_edges_dict[etype], + dtype=self.idtype, + device=device, + ) + else: + if self.is_multi_gpu: + # This can't be done because it requires collective communication. + raise ValueError( + "Calling all_edges in a distributed graph with" + " form 'uv' or 'all' is unsupported." + ) + + else: + eix = self.__edge_indices[etype].to(device) + if form == "uv": + return eix[0], eix[1] + elif form == "all": + return ( + eix[0], + eix[1], + torch.arange( + self.__num_edges_dict[etype], + dtype=self.idtype, + device=device, + ), + ) + else: + raise ValueError(f"Invalid form {form}") + + @property + def ndata(self) -> HeteroNodeDataView: + """ + Returns a view of the node data in this graph which can be used to + access or modify node features. + """ + + if len(self.ntypes) == 1: + ntype = self.ntypes[0] + return HeteroNodeDataView(self, ntype, dgl.base.ALL) + + return HeteroNodeDataView(self, self.ntypes, dgl.base.ALL) + + @property + def edata(self) -> HeteroEdgeDataView: + """ + Returns a view of the edge data in this graph which can be used to + access or modify edge features. + """ + if len(self.canonical_etypes) == 1: + return HeteroEdgeDataView(self, None, dgl.base.ALL) + + return HeteroEdgeDataView(self, self.canonical_etypes, dgl.base.ALL) + + @property + def nodes(self) -> HeteroNodeView: + """ + Returns a view of the nodes in this graph. + """ + return HeteroNodeView(self) + + @property + def edges(self) -> HeteroEdgeView: + """ + Returns a view of the edges in this graph. + """ + return HeteroEdgeView(self) diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py index ddd95a76366..d2460f814c9 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,6 +15,8 @@ from cugraph.utilities.utils import import_optional +import cugraph_dgl + torch = import_optional("torch") ops_torch = import_optional("pylibcugraphops.pytorch") dgl = import_optional("dgl") @@ -255,6 +257,27 @@ def __repr__(self) -> str: f"num_edges={self._src_ids.size(0)}, formats={self._formats})" ) + def to(self, device: Union[torch.device, str, int]) -> "cugraph_dgl.nn.SparseGraph": + sg = SparseGraph( + src_ids=None if self._src_ids is None else self._src_ids.to(device), + dst_ids=None if self._dst_ids is None else self._dst_ids.to(device), + csrc_ids=None if self._csrc_ids is None else self._csrc_ids.to(device), + cdst_ids=None if self._cdst_ids is None else self._cdst_ids.to(device), + values=None if self._values is None else self._values.to(device), + is_sorted=self._is_sorted, + formats=self._formats, + reduce_memory=self._reduce_memory, + ) + + sg._perm_coo2csc = ( + None if self._perm_coo2csc is None else self._perm_coo2csc.to(device) + ) + sg._perm_csc2csr = ( + None if self._perm_csc2csr is None else self._perm_csc2csr.to(device) + ) + + return sg + class BaseConv(torch.nn.Module): r"""An abstract base class for cugraph-ops nn module.""" diff --git a/python/cugraph-dgl/tests/__init__.py b/python/cugraph-dgl/cugraph_dgl/tests/__init__.py similarity index 100% rename from python/cugraph-dgl/tests/__init__.py rename to python/cugraph-dgl/cugraph_dgl/tests/__init__.py diff --git a/python/cugraph-dgl/tests/conftest.py b/python/cugraph-dgl/cugraph_dgl/tests/conftest.py similarity index 100% rename from python/cugraph-dgl/tests/conftest.py rename to python/cugraph-dgl/cugraph_dgl/tests/conftest.py diff --git a/python/cugraph-dgl/tests/test_dataloader.py b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader.py similarity index 98% rename from python/cugraph-dgl/tests/test_dataloader.py rename to python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader.py index cc473cd0ad6..e2542657de4 100644 --- a/python/cugraph-dgl/tests/test_dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -52,7 +52,7 @@ def sample_cugraph_dgl_graphs(cugraph_gs, train_nid, fanouts): sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts) tempdir_object = tempfile.TemporaryDirectory() sampling_output_dir = tempdir_object - dataloader = cugraph_dgl.dataloading.DataLoader( + dataloader = cugraph_dgl.dataloading.DaskDataLoader( cugraph_gs, train_nid, sampler, diff --git a/python/cugraph-dgl/tests/mg/test_dataloader.py b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader_mg.py similarity index 97% rename from python/cugraph-dgl/tests/mg/test_dataloader.py rename to python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader_mg.py index 29b7e1c3412..d49e1293e77 100644 --- a/python/cugraph-dgl/tests/mg/test_dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dask_dataloader_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -51,7 +51,7 @@ def sample_cugraph_dgl_graphs(cugraph_gs, train_nid, fanouts): sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts) tempdir_object = tempfile.TemporaryDirectory() sampling_output_dir = tempdir_object - dataloader = cugraph_dgl.dataloading.DataLoader( + dataloader = cugraph_dgl.dataloading.DaskDataLoader( cugraph_gs, train_nid, sampler, diff --git a/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader.py b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader.py new file mode 100644 index 00000000000..ef47875463d --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader.py @@ -0,0 +1,128 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cugraph_dgl.dataloading +import pytest + +import cugraph_dgl + +from cugraph.datasets import karate +from cugraph.utilities.utils import import_optional, MissingModule + +import numpy as np + +torch = import_optional("torch") +dgl = import_optional("dgl") + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +def test_dataloader_basic_homogeneous(): + graph = cugraph_dgl.Graph(is_multi_gpu=False) + + num_nodes = karate.number_of_nodes() + graph.add_nodes(num_nodes, data={"z": torch.arange(num_nodes)}) + + edf = karate.get_edgelist() + graph.add_edges( + u=edf["src"], v=edf["dst"], data={"q": torch.arange(karate.number_of_edges())} + ) + + sampler = cugraph_dgl.dataloading.NeighborSampler([5, 5, 5]) + loader = cugraph_dgl.dataloading.FutureDataLoader( + graph, torch.arange(num_nodes), sampler, batch_size=2 + ) + + for in_t, out_t, blocks in loader: + assert len(blocks) == 3 + assert len(out_t) <= 2 + + +def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1): + # Single fanout to match cugraph + sampler = dgl.dataloading.NeighborSampler(fanouts) + dataloader = dgl.dataloading.DataLoader( + g, + train_nid, + sampler, + batch_size=batch_size, + shuffle=False, + drop_last=False, + num_workers=0, + ) + + dgl_output = {} + for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader): + dgl_output[batch_id] = { + "input_nodes": input_nodes, + "output_nodes": output_nodes, + "blocks": blocks, + } + return dgl_output + + +def sample_cugraph_dgl_graphs(cugraph_g, train_nid, fanouts, batch_size=1): + sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts) + + dataloader = cugraph_dgl.dataloading.FutureDataLoader( + cugraph_g, + train_nid, + sampler, + batch_size=batch_size, + drop_last=False, + shuffle=False, + ) + + cugraph_dgl_output = {} + for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader): + cugraph_dgl_output[batch_id] = { + "input_nodes": input_nodes, + "output_nodes": output_nodes, + "blocks": blocks, + } + return cugraph_dgl_output + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("ix", [[1], [1, 0]]) +@pytest.mark.parametrize("batch_size", [1, 2]) +def test_same_homogeneousgraph_results(ix, batch_size): + src = torch.tensor([1, 2, 3, 4, 5, 6, 7, 8]) + dst = torch.tensor([0, 0, 0, 0, 1, 1, 1, 1]) + + train_nid = torch.tensor(ix) + # Create a heterograph with 3 node types and 3 edges types. + dgl_g = dgl.graph((src, dst)) + + cugraph_g = cugraph_dgl.Graph(is_multi_gpu=False) + cugraph_g.add_nodes(9) + cugraph_g.add_edges(u=src, v=dst) + + dgl_output = sample_dgl_graphs(dgl_g, train_nid, [2], batch_size=batch_size) + cugraph_output = sample_cugraph_dgl_graphs(cugraph_g, train_nid, [2], batch_size) + + cugraph_output_nodes = cugraph_output[0]["output_nodes"].cpu().numpy() + dgl_output_nodes = dgl_output[0]["output_nodes"].cpu().numpy() + + np.testing.assert_array_equal( + np.sort(cugraph_output_nodes), np.sort(dgl_output_nodes) + ) + assert ( + dgl_output[0]["blocks"][0].num_dst_nodes() + == cugraph_output[0]["blocks"][0].num_dst_nodes() + ) + assert ( + dgl_output[0]["blocks"][0].num_edges() + == cugraph_output[0]["blocks"][0].num_edges() + ) diff --git a/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader_mg.py b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader_mg.py new file mode 100644 index 00000000000..b32233f16a6 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataloader_mg.py @@ -0,0 +1,181 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import numpy as np + +import cugraph_dgl + +from cugraph.datasets import karate +from cugraph.utilities.utils import import_optional, MissingModule + +from cugraph.gnn import ( + cugraph_comms_create_unique_id, + cugraph_comms_shutdown, +) + +from cugraph_dgl.tests.utils import init_pytorch_worker + +torch = import_optional("torch") +dgl = import_optional("dgl") + + +def run_test_dataloader_basic_homogeneous(rank, world_size, uid): + init_pytorch_worker(rank, world_size, uid) + + graph = cugraph_dgl.Graph(is_multi_gpu=True) + + num_nodes = karate.number_of_nodes() + graph.add_nodes( + num_nodes, + ) + + edf = karate.get_edgelist() + graph.add_edges( + u=torch.tensor_split(torch.as_tensor(edf["src"], device="cuda"), world_size)[ + rank + ], + v=torch.tensor_split(torch.as_tensor(edf["dst"], device="cuda"), world_size)[ + rank + ], + ) + + sampler = cugraph_dgl.dataloading.NeighborSampler([5, 5, 5]) + loader = cugraph_dgl.dataloading.FutureDataLoader( + graph, + torch.arange(num_nodes), + sampler, + batch_size=2, + use_ddp=True, + ) + + for in_t, out_t, blocks in loader: + assert len(blocks) == 3 + assert len(out_t) <= 2 + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +def test_dataloader_basic_homogeneous(): + uid = cugraph_comms_create_unique_id() + # Limit the number of GPUs this rest is run with + world_size = min(torch.cuda.device_count(), 4) + + torch.multiprocessing.spawn( + run_test_dataloader_basic_homogeneous, + args=( + world_size, + uid, + ), + nprocs=world_size, + ) + + +def sample_dgl_graphs(g, train_nid, fanouts, batch_size=1): + # Single fanout to match cugraph + sampler = dgl.dataloading.NeighborSampler(fanouts) + dataloader = dgl.dataloading.DataLoader( + g, + train_nid, + sampler, + batch_size=batch_size, + shuffle=False, + drop_last=False, + num_workers=0, + ) + + dgl_output = {} + for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader): + dgl_output[batch_id] = { + "input_nodes": input_nodes, + "output_nodes": output_nodes, + "blocks": blocks, + } + return dgl_output + + +def sample_cugraph_dgl_graphs(cugraph_g, train_nid, fanouts, batch_size=1): + sampler = cugraph_dgl.dataloading.NeighborSampler(fanouts) + + dataloader = cugraph_dgl.dataloading.FutureDataLoader( + cugraph_g, + train_nid, + sampler, + batch_size=batch_size, + drop_last=False, + shuffle=False, + ) + + cugraph_dgl_output = {} + for batch_id, (input_nodes, output_nodes, blocks) in enumerate(dataloader): + cugraph_dgl_output[batch_id] = { + "input_nodes": input_nodes, + "output_nodes": output_nodes, + "blocks": blocks, + } + return cugraph_dgl_output + + +def run_test_same_homogeneousgraph_results(rank, world_size, uid, ix, batch_size): + init_pytorch_worker(rank, world_size, uid) + + src = torch.tensor([1, 2, 3, 4, 5, 6, 7, 8]) + dst = torch.tensor([0, 0, 0, 0, 1, 1, 1, 1]) + + local_src = torch.tensor_split(src, world_size)[rank] + local_dst = torch.tensor_split(dst, world_size)[rank] + + train_nid = torch.tensor(ix) + # Create a heterograph with 3 node types and 3 edges types. + dgl_g = dgl.graph((src, dst)) + + cugraph_g = cugraph_dgl.Graph(is_multi_gpu=True) + cugraph_g.add_nodes(9) + cugraph_g.add_edges(u=local_src, v=local_dst) + + dgl_output = sample_dgl_graphs(dgl_g, train_nid, [2], batch_size=batch_size) + cugraph_output = sample_cugraph_dgl_graphs(cugraph_g, train_nid, [2], batch_size) + + cugraph_output_nodes = cugraph_output[0]["output_nodes"].cpu().numpy() + dgl_output_nodes = dgl_output[0]["output_nodes"].cpu().numpy() + + np.testing.assert_array_equal( + np.sort(cugraph_output_nodes), np.sort(dgl_output_nodes) + ) + assert ( + dgl_output[0]["blocks"][0].num_dst_nodes() + == cugraph_output[0]["blocks"][0].num_dst_nodes() + ) + assert ( + dgl_output[0]["blocks"][0].num_edges() + == cugraph_output[0]["blocks"][0].num_edges() + ) + + cugraph_comms_shutdown() + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("ix", [[1], [1, 0]]) +@pytest.mark.parametrize("batch_size", [1, 2]) +def test_same_homogeneousgraph_results_mg(ix, batch_size): + uid = cugraph_comms_create_unique_id() + # Limit the number of GPUs this rest is run with + world_size = min(torch.cuda.device_count(), 4) + + torch.multiprocessing.spawn( + run_test_same_homogeneousgraph_results, + args=(world_size, uid, ix, batch_size), + nprocs=world_size, + ) diff --git a/python/cugraph-dgl/tests/test_dataset.py b/python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataset.py similarity index 100% rename from python/cugraph-dgl/tests/test_dataset.py rename to python/cugraph-dgl/cugraph_dgl/tests/dataloading/test_dataset.py diff --git a/python/cugraph-dgl/tests/nn/test_gatconv.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_gatconv.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_gatconv.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_gatconv.py diff --git a/python/cugraph-dgl/tests/nn/test_gatv2conv.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_gatv2conv.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_gatv2conv.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_gatv2conv.py diff --git a/python/cugraph-dgl/tests/nn/test_relgraphconv.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_relgraphconv.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_relgraphconv.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_relgraphconv.py diff --git a/python/cugraph-dgl/tests/nn/test_sageconv.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_sageconv.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_sageconv.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_sageconv.py diff --git a/python/cugraph-dgl/tests/nn/test_sparsegraph.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_sparsegraph.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_sparsegraph.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_sparsegraph.py diff --git a/python/cugraph-dgl/tests/nn/test_transformerconv.py b/python/cugraph-dgl/cugraph_dgl/tests/nn/test_transformerconv.py similarity index 100% rename from python/cugraph-dgl/tests/nn/test_transformerconv.py rename to python/cugraph-dgl/cugraph_dgl/tests/nn/test_transformerconv.py diff --git a/python/cugraph-dgl/tests/test_cugraph_storage.py b/python/cugraph-dgl/cugraph_dgl/tests/test_cugraph_storage.py similarity index 100% rename from python/cugraph-dgl/tests/test_cugraph_storage.py rename to python/cugraph-dgl/cugraph_dgl/tests/test_cugraph_storage.py diff --git a/python/cugraph-dgl/tests/test_from_dgl_heterograph.py b/python/cugraph-dgl/cugraph_dgl/tests/test_from_dgl_heterograph.py similarity index 83% rename from python/cugraph-dgl/tests/test_from_dgl_heterograph.py rename to python/cugraph-dgl/cugraph_dgl/tests/test_from_dgl_heterograph.py index 128d9bfaca5..667a4a2e66d 100644 --- a/python/cugraph-dgl/tests/test_from_dgl_heterograph.py +++ b/python/cugraph-dgl/cugraph_dgl/tests/test_from_dgl_heterograph.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -20,7 +20,9 @@ from cugraph.utilities.utils import import_optional from .utils import ( assert_same_edge_feats, + assert_same_edge_feats_daskapi, assert_same_node_feats, + assert_same_node_feats_daskapi, assert_same_num_edges_can_etypes, assert_same_num_edges_etypes, assert_same_num_nodes, @@ -134,7 +136,7 @@ def create_heterograph4(idtype): @pytest.mark.parametrize("idxtype", [th.int32, th.int64]) -def test_heterograph_conversion_nodes(idxtype): +def test_heterograph_conversion_nodes_daskapi(idxtype): graph_fs = [ create_heterograph1, create_heterograph2, @@ -145,6 +147,39 @@ def test_heterograph_conversion_nodes(idxtype): g = graph_f(idxtype) gs = cugraph_dgl.cugraph_storage_from_heterograph(g) + assert_same_num_nodes(gs, g) + assert_same_node_feats_daskapi(gs, g) + + +@pytest.mark.parametrize("idxtype", [th.int32, th.int64]) +def test_heterograph_conversion_edges_daskapi(idxtype): + graph_fs = [ + create_heterograph1, + create_heterograph2, + create_heterograph3, + create_heterograph4, + ] + for graph_f in graph_fs: + g = graph_f(idxtype) + gs = cugraph_dgl.cugraph_storage_from_heterograph(g) + + assert_same_num_edges_can_etypes(gs, g) + assert_same_num_edges_etypes(gs, g) + assert_same_edge_feats_daskapi(gs, g) + + +@pytest.mark.parametrize("idxtype", [th.int32, th.int64]) +def test_heterograph_conversion_nodes(idxtype): + graph_fs = [ + create_heterograph1, + create_heterograph2, + create_heterograph3, + create_heterograph4, + ] + for graph_f in graph_fs: + g = graph_f(idxtype) + gs = cugraph_dgl.cugraph_dgl_graph_from_heterograph(g) + assert_same_num_nodes(gs, g) assert_same_node_feats(gs, g) @@ -159,7 +194,7 @@ def test_heterograph_conversion_edges(idxtype): ] for graph_f in graph_fs: g = graph_f(idxtype) - gs = cugraph_dgl.cugraph_storage_from_heterograph(g) + gs = cugraph_dgl.cugraph_dgl_graph_from_heterograph(g) assert_same_num_edges_can_etypes(gs, g) assert_same_num_edges_etypes(gs, g) diff --git a/python/cugraph-dgl/cugraph_dgl/tests/test_graph.py b/python/cugraph-dgl/cugraph_dgl/tests/test_graph.py new file mode 100644 index 00000000000..a60db97b8d6 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/tests/test_graph.py @@ -0,0 +1,217 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import cugraph_dgl +import pylibcugraph +import cupy +import numpy as np + +from cugraph.datasets import karate +from cugraph.utilities.utils import import_optional, MissingModule + +torch = import_optional("torch") +dgl = import_optional("dgl") + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("direction", ["out", "in"]) +def test_graph_make_homogeneous_graph(direction): + df = karate.get_edgelist() + df.src = df.src.astype("int64") + df.dst = df.dst.astype("int64") + wgt = np.random.random((len(df),)) + + graph = cugraph_dgl.Graph() + num_nodes = max(df.src.max(), df.dst.max()) + 1 + node_x = np.random.random((num_nodes,)) + + graph.add_nodes( + num_nodes, data={"num": torch.arange(num_nodes, dtype=torch.int64), "x": node_x} + ) + graph.add_edges(df.src, df.dst, {"weight": wgt}) + plc_dgl_graph = graph._graph(direction=direction) + + assert graph.num_nodes() == num_nodes + assert graph.num_edges() == len(df) + assert graph.is_homogeneous + assert not graph.is_multi_gpu + + assert ( + graph.nodes() == torch.arange(num_nodes, dtype=torch.int64, device="cuda") + ).all() + + assert graph.nodes[None]["x"] is not None + assert (graph.nodes[None]["x"] == torch.as_tensor(node_x, device="cuda")).all() + assert ( + graph.nodes[None]["num"] + == torch.arange(num_nodes, dtype=torch.int64, device="cuda") + ).all() + + assert ( + graph.edges("eid", device="cuda") + == torch.arange(len(df), dtype=torch.int64, device="cuda") + ).all() + assert (graph.edges[None]["weight"] == torch.as_tensor(wgt, device="cuda")).all() + + plc_expected_graph = pylibcugraph.SGGraph( + pylibcugraph.ResourceHandle(), + pylibcugraph.GraphProperties(is_multigraph=True, is_symmetric=False), + df.src if direction == "out" else df.dst, + df.dst if direction == "out" else df.src, + vertices_array=cupy.arange(num_nodes, dtype="int64"), + ) + + # Do the expensive check to make sure this test fails if an invalid + # graph is constructed. + v_actual, d_in_actual, d_out_actual = pylibcugraph.degrees( + pylibcugraph.ResourceHandle(), + plc_dgl_graph, + source_vertices=cupy.arange(num_nodes, dtype="int64"), + do_expensive_check=True, + ) + + v_exp, d_in_exp, d_out_exp = pylibcugraph.degrees( + pylibcugraph.ResourceHandle(), + plc_expected_graph, + source_vertices=cupy.arange(num_nodes, dtype="int64"), + do_expensive_check=True, + ) + + assert (v_actual == v_exp).all() + assert (d_in_actual == d_in_exp).all() + assert (d_out_actual == d_out_exp).all() + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("direction", ["out", "in"]) +def test_graph_make_heterogeneous_graph(direction): + df = karate.get_edgelist() + df.src = df.src.astype("int64") + df.dst = df.dst.astype("int64") + + graph = cugraph_dgl.Graph() + total_num_nodes = max(df.src.max(), df.dst.max()) + 1 + + num_nodes_group_1 = total_num_nodes // 2 + num_nodes_group_2 = total_num_nodes - num_nodes_group_1 + + node_x_1 = np.random.random((num_nodes_group_1,)) + node_x_2 = np.random.random((num_nodes_group_2,)) + + graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1") + graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2") + + edges_11 = df[(df.src < num_nodes_group_1) & (df.dst < num_nodes_group_1)] + edges_12 = df[(df.src < num_nodes_group_1) & (df.dst >= num_nodes_group_1)] + edges_21 = df[(df.src >= num_nodes_group_1) & (df.dst < num_nodes_group_1)] + edges_22 = df[(df.src >= num_nodes_group_1) & (df.dst >= num_nodes_group_1)] + + edges_12.dst -= num_nodes_group_1 + edges_21.src -= num_nodes_group_1 + edges_22.dst -= num_nodes_group_1 + edges_22.src -= num_nodes_group_1 + + graph.add_edges(edges_11.src, edges_11.dst, etype=("type1", "e1", "type1")) + graph.add_edges(edges_12.src, edges_12.dst, etype=("type1", "e2", "type2")) + graph.add_edges(edges_21.src, edges_21.dst, etype=("type2", "e3", "type1")) + graph.add_edges(edges_22.src, edges_22.dst, etype=("type2", "e4", "type2")) + + assert not graph.is_homogeneous + assert not graph.is_multi_gpu + + # Verify graph.nodes() + assert ( + graph.nodes() == torch.arange(total_num_nodes, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.nodes("type1") + == torch.arange(num_nodes_group_1, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.nodes("type2") + == torch.arange(num_nodes_group_2, dtype=torch.int64, device="cuda") + ).all() + + # Verify graph.edges() + assert ( + graph.edges("eid", etype=("type1", "e1", "type1")) + == torch.arange(len(edges_11), dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type1", "e2", "type2")) + == torch.arange(len(edges_12), dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type2", "e3", "type1")) + == torch.arange(len(edges_21), dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type2", "e4", "type2")) + == torch.arange(len(edges_22), dtype=torch.int64, device="cuda") + ).all() + + # Use sampling call to check graph creation + # This isn't a test of cuGraph sampling with DGL; the options are + # set to verify the graph only. + plc_graph = graph._graph(direction) + sampling_output = pylibcugraph.uniform_neighbor_sample( + pylibcugraph.ResourceHandle(), + plc_graph, + start_list=cupy.arange(total_num_nodes, dtype="int64"), + h_fan_out=np.array([1, 1], dtype="int32"), + with_replacement=False, + do_expensive_check=True, + with_edge_properties=True, + prior_sources_behavior="exclude", + return_dict=True, + ) + + expected_etypes = { + 0: "e1", + 1: "e2", + 2: "e3", + 3: "e4", + } + expected_offsets = { + 0: (0, 0), + 1: (0, num_nodes_group_1), + 2: (num_nodes_group_1, 0), + 3: (num_nodes_group_1, num_nodes_group_1), + } + if direction == "in": + src_col = "minors" + dst_col = "majors" + else: + src_col = "majors" + dst_col = "minors" + + # Looping over the output verifies that all edges are valid + # (and therefore, the graph is valid) + for i, etype in enumerate(sampling_output["edge_type"].tolist()): + eid = int(sampling_output["edge_id"][i]) + + srcs, dsts, eids = graph.edges( + "all", etype=expected_etypes[etype], device="cpu" + ) + + assert eids[eid] == eid + assert ( + srcs[eid] == int(sampling_output[src_col][i]) - expected_offsets[etype][0] + ) + assert ( + dsts[eid] == int(sampling_output[dst_col][i]) - expected_offsets[etype][1] + ) diff --git a/python/cugraph-dgl/cugraph_dgl/tests/test_graph_mg.py b/python/cugraph-dgl/cugraph_dgl/tests/test_graph_mg.py new file mode 100644 index 00000000000..eedda664c52 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/tests/test_graph_mg.py @@ -0,0 +1,310 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pytest + +import cugraph_dgl +import pylibcugraph +import cupy +import numpy as np + +import cudf + +from cugraph.datasets import karate +from cugraph.utilities.utils import import_optional, MissingModule + +from cugraph.gnn import ( + cugraph_comms_shutdown, + cugraph_comms_create_unique_id, + cugraph_comms_get_raft_handle, +) + +from .utils import init_pytorch_worker + +pylibwholegraph = import_optional("pylibwholegraph") +torch = import_optional("torch") +dgl = import_optional("dgl") + + +def run_test_graph_make_homogeneous_graph_mg(rank, uid, world_size, direction): + init_pytorch_worker(rank, world_size, uid, init_wholegraph=True) + + df = karate.get_edgelist() + df.src = df.src.astype("int64") + df.dst = df.dst.astype("int64") + wgt = np.random.random((len(df),)) + + graph = cugraph_dgl.Graph( + is_multi_gpu=True, ndata_storage="wholegraph", edata_storage="wholegraph" + ) + + # The number of nodes is set globally but features can have + # any distribution across workers as long as they are in order. + global_num_nodes = max(df.src.max(), df.dst.max()) + 1 + node_x = np.array_split(np.arange(global_num_nodes, dtype="int64"), world_size)[ + rank + ] + + # Each worker gets a shuffled, permuted version of the edgelist + df = df.sample(frac=1.0) + df.src = (df.src + rank) % global_num_nodes + df.dst = (df.dst + rank + 1) % global_num_nodes + + graph.add_nodes(global_num_nodes, data={"x": node_x}) + graph.add_edges(df.src, df.dst, {"weight": wgt}) + plc_dgl_graph = graph._graph(direction=direction) + + assert graph.num_nodes() == global_num_nodes + assert graph.num_edges() == len(df) * world_size + assert graph.is_homogeneous + assert graph.is_multi_gpu + + assert ( + graph.nodes() + == torch.arange(global_num_nodes, dtype=torch.int64, device="cuda") + ).all() + ix = torch.arange(len(node_x) * rank, len(node_x) * (rank + 1), dtype=torch.int64) + assert graph.nodes[ix]["x"] is not None + assert (graph.nodes[ix]["x"] == torch.as_tensor(node_x, device="cuda")).all() + + assert ( + graph.edges("eid", device="cuda") + == torch.arange(world_size * len(df), dtype=torch.int64, device="cuda") + ).all() + ix = torch.arange(len(df) * rank, len(df) * (rank + 1), dtype=torch.int64) + assert (graph.edges[ix]["weight"] == torch.as_tensor(wgt, device="cuda")).all() + + plc_handle = pylibcugraph.ResourceHandle( + cugraph_comms_get_raft_handle().getHandle() + ) + + plc_expected_graph = pylibcugraph.MGGraph( + plc_handle, + pylibcugraph.GraphProperties(is_multigraph=True, is_symmetric=False), + [df.src] if direction == "out" else [df.dst], + [df.dst] if direction == "out" else [df.src], + vertices_array=[ + cupy.array_split(cupy.arange(global_num_nodes, dtype="int64"), world_size)[ + rank + ] + ], + ) + + # Do the expensive check to make sure this test fails if an invalid + # graph is constructed. + v_actual, d_in_actual, d_out_actual = pylibcugraph.degrees( + plc_handle, + plc_dgl_graph, + source_vertices=cupy.arange(global_num_nodes, dtype="int64"), + do_expensive_check=True, + ) + + v_exp, d_in_exp, d_out_exp = pylibcugraph.degrees( + plc_handle, + plc_expected_graph, + source_vertices=cupy.arange(global_num_nodes, dtype="int64"), + do_expensive_check=True, + ) + + assert (v_actual == v_exp).all() + assert (d_in_actual == d_in_exp).all() + assert (d_out_actual == d_out_exp).all() + + cugraph_comms_shutdown() + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif( + isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available" +) +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("direction", ["out", "in"]) +def test_graph_make_homogeneous_graph_mg(direction): + uid = cugraph_comms_create_unique_id() + world_size = torch.cuda.device_count() + + torch.multiprocessing.spawn( + run_test_graph_make_homogeneous_graph_mg, + args=( + uid, + world_size, + direction, + ), + nprocs=world_size, + ) + + +def run_test_graph_make_heterogeneous_graph_mg(rank, uid, world_size, direction): + init_pytorch_worker(rank, world_size, uid) + + df = karate.get_edgelist() + df.src = df.src.astype("int64") + df.dst = df.dst.astype("int64") + + graph = cugraph_dgl.Graph(is_multi_gpu=True) + total_num_nodes = max(df.src.max(), df.dst.max()) + 1 + + # Each worker gets a shuffled, permuted version of the edgelist + df = df.sample(frac=1.0) + df.src = (df.src + rank) % total_num_nodes + df.dst = (df.dst + rank + 1) % total_num_nodes + + num_nodes_group_1 = total_num_nodes // 2 + num_nodes_group_2 = total_num_nodes - num_nodes_group_1 + + node_x_1 = np.array_split(np.random.random((num_nodes_group_1,)), world_size)[rank] + node_x_2 = np.array_split(np.random.random((num_nodes_group_2,)), world_size)[rank] + + graph.add_nodes(num_nodes_group_1, {"x": node_x_1}, "type1") + graph.add_nodes(num_nodes_group_2, {"x": node_x_2}, "type2") + + edges_11 = df[(df.src < num_nodes_group_1) & (df.dst < num_nodes_group_1)] + edges_12 = df[(df.src < num_nodes_group_1) & (df.dst >= num_nodes_group_1)] + edges_21 = df[(df.src >= num_nodes_group_1) & (df.dst < num_nodes_group_1)] + edges_22 = df[(df.src >= num_nodes_group_1) & (df.dst >= num_nodes_group_1)] + + edges_12.dst -= num_nodes_group_1 + edges_21.src -= num_nodes_group_1 + edges_22.dst -= num_nodes_group_1 + edges_22.src -= num_nodes_group_1 + + total_edges_11 = torch.tensor(len(edges_11), device="cuda", dtype=torch.int64) + torch.distributed.all_reduce(total_edges_11, torch.distributed.ReduceOp.SUM) + total_edges_12 = torch.tensor(len(edges_12), device="cuda", dtype=torch.int64) + torch.distributed.all_reduce(total_edges_12, torch.distributed.ReduceOp.SUM) + total_edges_21 = torch.tensor(len(edges_21), device="cuda", dtype=torch.int64) + torch.distributed.all_reduce(total_edges_21, torch.distributed.ReduceOp.SUM) + total_edges_22 = torch.tensor(len(edges_22), device="cuda", dtype=torch.int64) + torch.distributed.all_reduce(total_edges_22, torch.distributed.ReduceOp.SUM) + + graph.add_edges(edges_11.src, edges_11.dst, etype=("type1", "e1", "type1")) + graph.add_edges(edges_12.src, edges_12.dst, etype=("type1", "e2", "type2")) + graph.add_edges(edges_21.src, edges_21.dst, etype=("type2", "e3", "type1")) + graph.add_edges(edges_22.src, edges_22.dst, etype=("type2", "e4", "type2")) + + assert not graph.is_homogeneous + assert graph.is_multi_gpu + + # Verify graph.nodes() + assert ( + graph.nodes() == torch.arange(total_num_nodes, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.nodes("type1") + == torch.arange(num_nodes_group_1, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.nodes("type2") + == torch.arange(num_nodes_group_2, dtype=torch.int64, device="cuda") + ).all() + + # Verify graph.edges() + assert ( + graph.edges("eid", etype=("type1", "e1", "type1")) + == torch.arange(total_edges_11, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type1", "e2", "type2")) + == torch.arange(total_edges_12, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type2", "e3", "type1")) + == torch.arange(total_edges_21, dtype=torch.int64, device="cuda") + ).all() + assert ( + graph.edges("eid", etype=("type2", "e4", "type2")) + == torch.arange(total_edges_22, dtype=torch.int64, device="cuda") + ).all() + + # Use sampling call to check graph creation + # This isn't a test of cuGraph sampling with DGL; the options are + # set to verify the graph only. + plc_graph = graph._graph(direction) + assert isinstance(plc_graph, pylibcugraph.MGGraph) + sampling_output = pylibcugraph.uniform_neighbor_sample( + graph._resource_handle, + plc_graph, + start_list=cupy.arange(total_num_nodes, dtype="int64"), + batch_id_list=cupy.full(total_num_nodes, rank, dtype="int32"), + label_list=cupy.arange(world_size, dtype="int32"), + label_to_output_comm_rank=cupy.arange(world_size, dtype="int32"), + h_fan_out=np.array([-1], dtype="int32"), + with_replacement=False, + do_expensive_check=True, + with_edge_properties=True, + prior_sources_behavior="exclude", + return_dict=True, + ) + + sdf = cudf.DataFrame( + { + "majors": sampling_output["majors"], + "minors": sampling_output["minors"], + "edge_id": sampling_output["edge_id"], + "edge_type": sampling_output["edge_type"], + } + ) + + expected_offsets = { + 0: (0, 0), + 1: (0, num_nodes_group_1), + 2: (num_nodes_group_1, 0), + 3: (num_nodes_group_1, num_nodes_group_1), + } + if direction == "in": + src_col = "minors" + dst_col = "majors" + else: + src_col = "majors" + dst_col = "minors" + + edges_11["etype"] = 0 + edges_12["etype"] = 1 + edges_21["etype"] = 2 + edges_22["etype"] = 3 + + cdf = cudf.concat([edges_11, edges_12, edges_21, edges_22]) + for i in range(len(cdf)): + row = cdf.iloc[i] + etype = row["etype"] + src = row["src"] + expected_offsets[etype][0] + dst = row["dst"] + expected_offsets[etype][1] + + f = sdf[ + (sdf[src_col] == src) & (sdf[dst_col] == dst) & (sdf["edge_type"] == etype) + ] + assert len(f) > 0 # may be multiple, some could be on other GPU + + cugraph_comms_shutdown() + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif( + isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available" +) +@pytest.mark.skipif(isinstance(dgl, MissingModule), reason="dgl not available") +@pytest.mark.parametrize("direction", ["out", "in"]) +def test_graph_make_heterogeneous_graph_mg(direction): + uid = cugraph_comms_create_unique_id() + world_size = torch.cuda.device_count() + + torch.multiprocessing.spawn( + run_test_graph_make_heterogeneous_graph_mg, + args=( + uid, + world_size, + direction, + ), + nprocs=world_size, + ) diff --git a/python/cugraph-dgl/tests/test_utils.py b/python/cugraph-dgl/cugraph_dgl/tests/test_utils.py similarity index 100% rename from python/cugraph-dgl/tests/test_utils.py rename to python/cugraph-dgl/cugraph_dgl/tests/test_utils.py diff --git a/python/cugraph-dgl/cugraph_dgl/tests/utils.py b/python/cugraph-dgl/cugraph_dgl/tests/utils.py new file mode 100644 index 00000000000..fa4eb05f297 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/tests/utils.py @@ -0,0 +1,154 @@ +# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from cugraph.utilities.utils import import_optional +from cugraph.gnn import cugraph_comms_init + +th = import_optional("torch") + + +def assert_same_node_feats_daskapi(gs, g): + assert set(gs.ndata.keys()) == set(g.ndata.keys()) + + for key in g.ndata.keys(): + for ntype in g.ntypes: + indices = th.arange(0, g.num_nodes(ntype), dtype=g.idtype).cuda() + if len(g.ntypes) <= 1 or ntype in g.ndata[key]: + g_output = g.get_node_storage(key=key, ntype=ntype).fetch( + indices, device="cuda" + ) + gs_output = gs.get_node_storage(key=key, ntype=ntype).fetch(indices) + equal_t = (gs_output != g_output).sum().cpu() + assert equal_t == 0 + + +def assert_same_node_feats(gs, g): + assert set(gs.ndata.keys()) == set(g.ndata.keys()) + assert set(gs.ntypes) == set(g.ntypes) + + for key in g.ndata.keys(): + for ntype in g.ntypes: + if len(g.ntypes) <= 1 or ntype in g.ndata[key]: + indices = th.arange(0, g.num_nodes(ntype), dtype=g.idtype) + + g_output = g.ndata[key] + gs_output = gs.ndata[key] + + if len(g.ntypes) > 1: + g_output = g_output[ntype] + gs_output = gs_output[ntype] + + g_output = g_output[indices] + gs_output = gs_output[indices] + + equal_t = (gs_output != g_output).sum() + assert equal_t == 0 + + +def assert_same_num_nodes(gs, g): + for ntype in g.ntypes: + assert g.num_nodes(ntype) == gs.num_nodes(ntype) + + +def assert_same_num_edges_can_etypes(gs, g): + for can_etype in g.canonical_etypes: + assert g.num_edges(can_etype) == gs.num_edges(can_etype) + + +def assert_same_num_edges_etypes(gs, g): + for etype in g.etypes: + assert g.num_edges(etype) == gs.num_edges(etype) + + +def assert_same_edge_feats_daskapi(gs, g): + assert set(gs.edata.keys()) == set(g.edata.keys()) + for key in g.edata.keys(): + for etype in g.canonical_etypes: + indices = th.arange(0, g.num_edges(etype), dtype=g.idtype).cuda() + if len(g.etypes) <= 1 or etype in g.edata[key]: + g_output = g.get_edge_storage(key=key, etype=etype).fetch( + indices, device="cuda" + ) + gs_output = gs.get_edge_storage(key=key, etype=etype).fetch(indices) + equal_t = (gs_output != g_output).sum().cpu() + assert equal_t == 0 + + +def assert_same_edge_feats(gs, g): + assert set(gs.edata.keys()) == set(g.edata.keys()) + assert set(gs.canonical_etypes) == set(g.canonical_etypes) + assert set(gs.etypes) == set(g.etypes) + + for key in g.edata.keys(): + for etype in g.canonical_etypes: + if len(g.etypes) <= 1 or etype in g.edata[key]: + indices = th.arange(0, g.num_edges(etype), dtype=g.idtype).cuda() + g_output = g.edata[key] + gs_output = gs.edata[key] + + if len(g.etypes) > 1: + g_output = g_output[etype] + gs_output = gs_output[etype] + + g_output = g_output[indices] + gs_output = gs_output[indices] + + equal_t = (gs_output != g_output).sum().cpu() + assert equal_t == 0 + + +def assert_same_sampling_len(dgl_g, cugraph_gs, nodes, fanout, edge_dir): + dgl_o = dgl_g.sample_neighbors(nodes, fanout=fanout, edge_dir=edge_dir) + cugraph_o = cugraph_gs.sample_neighbors(nodes, fanout=fanout, edge_dir=edge_dir) + assert cugraph_o.num_edges() == dgl_o.num_edges() + for etype in dgl_o.canonical_etypes: + assert dgl_o.num_edges(etype) == cugraph_o.num_edges(etype) + + +def init_pytorch_worker(rank, world_size, cugraph_id, init_wholegraph=False): + import rmm + + rmm.reinitialize( + devices=rank, + ) + + import cupy + + cupy.cuda.Device(rank).use() + from rmm.allocators.cupy import rmm_cupy_allocator + + cupy.cuda.set_allocator(rmm_cupy_allocator) + + from cugraph.testing.mg_utils import enable_spilling + + enable_spilling() + + th.cuda.set_device(rank) + + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "12355" + th.distributed.init_process_group("nccl", rank=rank, world_size=world_size) + + if init_wholegraph: + import pylibwholegraph + + pylibwholegraph.torch.initialize.init( + rank, + world_size, + rank, + world_size, + ) + + cugraph_comms_init(rank=rank, world_size=world_size, uid=cugraph_id, device=rank) diff --git a/python/cugraph-dgl/cugraph_dgl/typing.py b/python/cugraph-dgl/cugraph_dgl/typing.py new file mode 100644 index 00000000000..a68463c3fd9 --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/typing.py @@ -0,0 +1,40 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Union, Tuple +from cugraph.utilities.utils import import_optional + +from cugraph_dgl.nn import SparseGraph + +import pandas +import numpy +import cupy +import cudf + +torch = import_optional("torch") +dgl = import_optional("dgl") + +TensorType = Union[ + "torch.Tensor", + "cupy.ndarray", + "numpy.ndarray", + "cudf.Series", + "pandas.Series", + List[int], +] + +DGLSamplerOutput = Tuple[ + "torch.Tensor", + "torch.Tensor", + List[Union["dgl.Block", SparseGraph]], +] diff --git a/python/cugraph-dgl/cugraph_dgl/utils/cugraph_conversion_utils.py b/python/cugraph-dgl/cugraph_dgl/utils/cugraph_conversion_utils.py index 647dbd38a64..2ba04bd916f 100644 --- a/python/cugraph-dgl/cugraph_dgl/utils/cugraph_conversion_utils.py +++ b/python/cugraph-dgl/cugraph_dgl/utils/cugraph_conversion_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,12 +15,15 @@ from __future__ import annotations from typing import Dict, Tuple, Union +from cugraph_dgl.typing import TensorType + import cudf import pandas as pd import dask.dataframe as dd import dask_cudf from dask.distributed import get_client import cupy as cp +import numpy as np from cugraph.utilities.utils import import_optional from cugraph.gnn.dgl_extensions.dgl_uniform_sampler import src_n, dst_n @@ -115,3 +118,13 @@ def add_edata_from_dgl_HeteroGraph(gs, g): gs.edata_storage.add_data( feat_name=feat_name, type_name=etype, feat_obj=feat_t ) + + +def _cast_to_torch_tensor(t: TensorType) -> "torch.Tensor": + if isinstance(t, torch.Tensor): + return t + elif isinstance(t, (cp.ndarray, cudf.Series)): + return torch.as_tensor(t, device="cuda") + elif isinstance(t, (pd.Series, np.ndarray)): + return torch.as_tensor(t, device="cpu") + return torch.as_tensor(t) diff --git a/python/cugraph-dgl/cugraph_dgl/view.py b/python/cugraph-dgl/cugraph_dgl/view.py new file mode 100644 index 00000000000..dbc53e73b6a --- /dev/null +++ b/python/cugraph-dgl/cugraph_dgl/view.py @@ -0,0 +1,310 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from collections import defaultdict +from collections.abc import MutableMapping +from typing import Union, Dict, List, Tuple + +from cugraph.utilities.utils import import_optional + +import cugraph_dgl +from cugraph_dgl.typing import TensorType + +torch = import_optional("torch") +dgl = import_optional("dgl") + + +class HeteroEdgeDataView(MutableMapping): + """ + Duck-typed version of DGL's HeteroEdgeDataView. + Used for accessing and modifying edge features. + """ + + def __init__( + self, + graph: "cugraph_dgl.Graph", + etype: Union[Tuple[str, str, str], List[Tuple[str, str, str]]], + edges: TensorType, + ): + self.__graph = graph + self.__etype = etype + self.__edges = edges + + @property + def _etype(self) -> Tuple[str, str, str]: + return self.__etype + + @property + def _graph(self) -> "cugraph_dgl.Graph": + return self.__graph + + @property + def _edges(self) -> TensorType: + return self.__edges + + def __getitem__(self, key: str): + if isinstance(self._etype, list): + return { + t: self._graph._get_e_emb(t, key, self._edges) + for t in self._etype + if self._graph._has_e_emb(t, key) + } + + return self._graph._get_e_emb(self._etype, key, self._edges) + + def __setitem__(self, key: str, val: Union[TensorType, Dict[str, TensorType]]): + if isinstance(self._etype, list): + if not isinstance(val, dict): + raise ValueError( + "There are multiple edge types in this view. " + "Expected a dictionary of values." + ) + for t, v in val.items(): + if t not in self._etype: + raise ValueError("Attempted to modify a type out of view.") + self._graph.set_e_emb(t, self._edges, {key: v}) + else: + if isinstance(val, dict): + raise ValueError( + "There is only one edge type in this view. " + "Expected a single tensor." + ) + self._graph.set_e_emb(self._etype, self._edges, {key: v}) + + def __delitem__(self, key: str): + if isinstance(self._etype, list): + for t in self._etype: + self._graph.pop_e_emb(t, key) + else: + self._graph.pop_e_emb(self._etype, key) + + def _transpose(self, fetch_vals=True): + if isinstance(self._etype, list): + tr = defaultdict(dict) + for etype in self._etype: + for key in self._graph._get_e_emb_keys(etype): + tr[key][etype] = ( + self._graph._get_e_emb(etype, key, self._edges) + if fetch_vals + else [] + ) + else: + tr = {} + for key in self._graph._get_e_emb_keys(self._etype): + tr[key] = ( + self._graph._get_e_emb(self._etype, key, self._edges) + if fetch_vals + else [] + ) + + return tr + + def __len__(self): + return len(self._transpose(fetch_vals=False)) + + def __iter__(self): + return iter(self._transpose()) + + def keys(self): + return self._transpose(fetch_vals=False).keys() + + def values(self): + return self._transpose().values() + + def __repr__(self): + return repr(self._transpose(fetch_vals=False)) + + +class HeteroNodeDataView(MutableMapping): + """ + Duck-typed version of DGL's HeteroNodeDataView. + Used for accessing and modifying node features. + """ + + def __init__( + self, + graph: "cugraph_dgl.Graph", + ntype: Union[str, List[str]], + nodes: TensorType, + ): + self.__graph = graph + self.__ntype = ntype + self.__nodes = nodes + + @property + def _ntype(self) -> str: + return self.__ntype + + @property + def _graph(self) -> "cugraph_dgl.Graph": + return self.__graph + + @property + def _nodes(self) -> TensorType: + return self.__nodes + + def __getitem__(self, key: str): + if isinstance(self._ntype, list): + return { + t: self._graph._get_n_emb(t, key, self._nodes) + for t in self._ntype + if self._graph._has_n_emb(t, key) + } + else: + return self._graph._get_n_emb(self._ntype, key, self._nodes) + + def __setitem__(self, key: str, val: Union[TensorType, Dict[str, TensorType]]): + if isinstance(self._ntype, list): + if not isinstance(val, dict): + raise ValueError( + "There are multiple node types in this view. " + "Expected a dictionary of values." + ) + for t, v in val.items(): + if t not in self._ntype: + raise ValueError("Attempted to modify a type out of view.") + self._graph._set_n_emb(t, self._nodes, {key: v}) + else: + if isinstance(val, dict): + raise ValueError( + "There is only one node type in this view. " + "Expected a single value tensor." + ) + self._graph._set_n_emb(self._ntype, self._nodes, {key: val}) + + def __delitem__(self, key: str): + if isinstance(self._ntype, list): + for t in self._ntype: + self._graph._pop_n_emb(t, key) + else: + self._graph.pop_n_emb(self._ntype, key) + + def _transpose(self, fetch_vals=True): + if isinstance(self._ntype, list): + tr = defaultdict(dict) + for ntype in self._ntype: + for key in self._graph._get_n_emb_keys(ntype): + tr[key][ntype] = ( + self._graph._get_n_emb(ntype, key, self._nodes) + if fetch_vals + else [] + ) + else: + tr = {} + for key in self._graph._get_n_emb_keys(self._ntype): + tr[key] = ( + self._graph._get_n_emb(self._ntype, key, self._nodes) + if fetch_vals + else [] + ) + + return tr + + def __len__(self): + return len(self._transpose(fetch_vals=False)) + + def __iter__(self): + return iter(self._transpose()) + + def keys(self): + return self._transpose(fetch_vals=False).keys() + + def values(self): + return self._transpose().values() + + def __repr__(self): + return repr(self._transpose(fetch_vals=False)) + + +class HeteroEdgeView: + """ + Duck-typed version of DGL's HeteroEdgeView. + """ + + def __init__(self, graph): + self.__graph = graph + + @property + def _graph(self) -> "cugraph_dgl.Graph": + return self.__graph + + def __getitem__(self, key): + if isinstance(key, slice): + if not (key.start is None and key.stop is None and key.stop is None): + raise ValueError("Only full slices are supported in DGL.") + edges = dgl.base.ALL + etype = None + elif key is None: + edges = dgl.base.ALL + etype = None + elif isinstance(key, tuple): + if len(key) == 3: + edges = dgl.base.ALL + etype = key + else: + edges = key + etype = None + elif isinstance(key, str): + edges = dgl.base.ALL + etype = key + else: + edges = key + etype = None + + return HeteroEdgeDataView( + graph=self.__graph, + etype=etype, + edges=edges, + ) + + def __call__(self, *args, **kwargs): + if "device" in kwargs: + return self.__graph.all_edges(*args, **kwargs) + + return self.__graph.all_edges(*args, **kwargs, device="cuda") + + +class HeteroNodeView: + """ + Duck-typed version of DGL's HeteroNodeView. + """ + + def __init__(self, graph: "cugraph_dgl.Graph"): + self.__graph = graph + + @property + def _graph(self) -> "cugraph_dgl.Graph": + return self.__graph + + def __getitem__(self, key): + if isinstance(key, slice): + if not (key.start is None and key.stop is None and key.stop is None): + raise ValueError("Only full slices are supported in DGL.") + nodes = dgl.base.ALL + ntype = None + elif isinstance(key, tuple): + nodes, ntype = key + elif key is None or isinstance(key, str): + nodes = dgl.base.ALL + ntype = key + else: + nodes = key + ntype = None + + return HeteroNodeDataView(graph=self.__graph, ntype=ntype, nodes=nodes) + + def __call__(self, ntype=None): + return torch.arange( + 0, self.__graph.num_nodes(ntype), dtype=self.__graph.idtype, device="cuda" + ) diff --git a/python/cugraph-dgl/tests/utils.py b/python/cugraph-dgl/tests/utils.py deleted file mode 100644 index d6a90840b72..00000000000 --- a/python/cugraph-dgl/tests/utils.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from cugraph.utilities.utils import import_optional - -th = import_optional("torch") - - -def assert_same_node_feats(gs, g): - set(gs.ndata.keys()) == set(g.ndata.keys()) - - for key in g.ndata.keys(): - for ntype in g.ntypes: - indices = th.arange(0, g.num_nodes(ntype), dtype=g.idtype).cuda() - if len(g.ntypes) <= 1 or ntype in g.ndata[key]: - g_output = g.get_node_storage(key=key, ntype=ntype).fetch( - indices, device="cuda" - ) - gs_output = gs.get_node_storage(key=key, ntype=ntype).fetch(indices) - equal_t = (gs_output != g_output).sum().cpu() - assert equal_t == 0 - - -def assert_same_num_nodes(gs, g): - for ntype in g.ntypes: - assert g.num_nodes(ntype) == gs.num_nodes(ntype) - - -def assert_same_num_edges_can_etypes(gs, g): - for can_etype in g.canonical_etypes: - assert g.num_edges(can_etype) == gs.num_edges(can_etype) - - -def assert_same_num_edges_etypes(gs, g): - for etype in g.etypes: - assert g.num_edges(etype) == gs.num_edges(etype) - - -def assert_same_edge_feats(gs, g): - set(gs.edata.keys()) == set(g.edata.keys()) - for key in g.edata.keys(): - for etype in g.canonical_etypes: - indices = th.arange(0, g.num_edges(etype), dtype=g.idtype).cuda() - if len(g.etypes) <= 1 or etype in g.edata[key]: - g_output = g.get_edge_storage(key=key, etype=etype).fetch( - indices, device="cuda" - ) - gs_output = gs.get_edge_storage(key=key, etype=etype).fetch(indices) - equal_t = (gs_output != g_output).sum().cpu() - assert equal_t == 0 - - -def assert_same_sampling_len(dgl_g, cugraph_gs, nodes, fanout, edge_dir): - dgl_o = dgl_g.sample_neighbors(nodes, fanout=fanout, edge_dir=edge_dir) - cugraph_o = cugraph_gs.sample_neighbors(nodes, fanout=fanout, edge_dir=edge_dir) - assert cugraph_o.num_edges() == dgl_o.num_edges() - for etype in dgl_o.canonical_etypes: - assert dgl_o.num_edges(etype) == cugraph_o.num_edges(etype)