From c00c3f1b9b64dbda3173b8f343de37f62d70ad94 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Sat, 30 Sep 2023 19:35:44 -0400 Subject: [PATCH 1/6] WholeGraph Feature Store for cuGraph-PyG and cuGraph-DGL (#3874) Created based on code from @dongxuy04 Adds support for `WholeGraph` `WholeMemory` in the cuGraph `FeatureStore` class. This enables both DGL and PyG to take advantage of distributed feature store functionality. Adds `pylibwholegraph` as a testing dependency so the feature store can be tested. Adds appropriate SG and MG tests. Authors: - Alex Barghi (https://github.com/alexbarghi-nv) Approvers: - Ray Douglass (https://github.com/raydouglass) - Brad Rees (https://github.com/BradReesWork) - Vibhu Jawa (https://github.com/VibhuJawa) URL: https://github.com/rapidsai/cugraph/pull/3874 --- ci/release/update-version.sh | 4 + .../all_cuda-118_arch-x86_64.yaml | 1 + .../all_cuda-120_arch-x86_64.yaml | 1 + dependencies.yaml | 3 + .../gnn/feature_storage/feat_storage.py | 147 ++++++++++++++++-- .../tests/data_store/test_gnn_feat_storage.py | 1 - .../test_gnn_feat_storage_wholegraph.py | 85 ++++++++++ 7 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage_wholegraph.py diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index adf3273e311..aaeaa715434 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -82,6 +82,9 @@ NEXT_SHORT_TAG_PEP440=$(python -c "from setuptools.extern import packaging; prin DEPENDENCIES=( cudf cugraph + cugraph-dgl + cugraph-pyg + cugraph-service-server cugraph-service-client cuxfilter dask-cuda @@ -93,6 +96,7 @@ DEPENDENCIES=( librmm pylibcugraph pylibcugraphops + pylibwholegraph pylibraft pyraft raft-dask diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 952ec9317e2..87179ef892e 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -53,6 +53,7 @@ dependencies: - pydata-sphinx-theme - pylibcugraphops==23.10.* - pylibraft==23.10.* +- pylibwholegraph==23.10.* - pytest - pytest-benchmark - pytest-cov diff --git a/conda/environments/all_cuda-120_arch-x86_64.yaml b/conda/environments/all_cuda-120_arch-x86_64.yaml index 38936c78c38..d54dc0abf51 100644 --- a/conda/environments/all_cuda-120_arch-x86_64.yaml +++ b/conda/environments/all_cuda-120_arch-x86_64.yaml @@ -52,6 +52,7 @@ dependencies: - pydata-sphinx-theme - pylibcugraphops==23.10.* - pylibraft==23.10.* +- pylibwholegraph==23.10.* - pytest - pytest-benchmark - pytest-cov diff --git a/dependencies.yaml b/dependencies.yaml index f74ed13115b..292fcf0baed 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -481,6 +481,9 @@ dependencies: - *numpy - python-louvain - scikit-learn>=0.23.1 + - output_types: [conda] + packages: + - pylibwholegraph==23.10.* test_python_pylibcugraph: common: - output_types: [conda, pyproject] diff --git a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py index e3fdeb7f150..77a53882fc4 100644 --- a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py +++ b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py @@ -17,23 +17,77 @@ import cupy as cp import numpy as np import pandas as pd -from cugraph.utilities.utils import import_optional +from cugraph.utilities.utils import import_optional, MissingModule torch = import_optional("torch") +wgth = import_optional("pylibwholegraph.torch") class FeatureStore: - """The feature-store class used to store feature data for GNNS""" + """The feature-store class used to store feature data for GNNs""" + + def __init__( + self, + backend: str = "numpy", + wg_comm: object = None, + wg_type: str = None, + wg_location: str = None, + ): + """ + Constructs a new FeatureStore object + + Parameters: + ---------- + backend: str ('numpy', 'torch', 'wholegraph') + Optional (default='numpy') + The name of the backend to use. + + wg_comm: WholeMemoryCommunicator + Optional (default=automatic) + Only used with the 'wholegraph' backend. + The communicator to use to store features in WholeGraph. + + wg_type: str ('distributed', 'continuous', 'chunked') + Optional (default='distributed') + Only used with the 'wholegraph' backend. + The memory format (distributed, continuous, or chunked) of + this FeatureStore. For more information see the WholeGraph + documentation. + + wg_location: str ('cpu', 'cuda') + Optional (default='cuda') + Only used with the 'wholegraph' backend. + Where the data is stored (cpu or cuda). + Defaults to storing on the GPU (cuda). + """ - def __init__(self, backend="numpy"): self.fd = defaultdict(dict) - if backend not in ["numpy", "torch"]: + if backend not in ["numpy", "torch", "wholegraph"]: raise ValueError( - f"backend {backend} not supported. Supported backends are numpy, torch" + f"backend {backend} not supported. " + "Supported backends are numpy, torch, wholegraph" ) self.backend = backend - def add_data(self, feat_obj: Sequence, type_name: str, feat_name: str) -> None: + self.__wg_comm = None + self.__wg_type = None + self.__wg_location = None + + if backend == "wholegraph": + self.__wg_comm = ( + wg_comm if wg_comm is not None else wgth.get_local_node_communicator() + ) + self.__wg_type = wg_type if wg_type is not None else "distributed" + self.__wg_location = wg_location if wg_location is not None else "cuda" + + if self.__wg_type not in ["distributed", "chunked", "continuous"]: + raise ValueError(f"invalid memory format {self.__wg_type}") + if (self.__wg_location != "cuda") and (self.__wg_location != "cpu"): + raise ValueError(f"invalid location {self.__wg_location}") + + def add_data( + self, feat_obj: Sequence, type_name: str, feat_name: str, **kwargs + ) -> None: """ Add the feature data to the feature_storage class Parameters: @@ -49,9 +103,31 @@ def add_data(self, feat_obj: Sequence, type_name: str, feat_name: str) -> None: None """ self.fd[feat_name][type_name] = self._cast_feat_obj_to_backend( - feat_obj, self.backend + feat_obj, + self.backend, + wg_comm=self.__wg_comm, + wg_type=self.__wg_type, + wg_location=self.__wg_location, + **kwargs, ) + def add_data_no_cast(self, feat_obj, type_name: str, feat_name: str) -> None: + """ + Direct add the feature data to the feature_storage class with no cast + Parameters: + ---------- + feat_obj : array_like object + The feature object to save in feature store + type_name : str + The node-type/edge-type of the feature + feat_name: str + The name of the feature being stored + Returns: + ------- + None + """ + self.fd[feat_name][type_name] = feat_obj + def get_data( self, indices: Union[np.ndarray, torch.Tensor], @@ -87,26 +163,67 @@ def get_data( f" feature: {list(self.fd[feat_name].keys())}" ) - return self.fd[feat_name][type_name][indices] + feat = self.fd[feat_name][type_name] + if not isinstance(wgth, MissingModule) and isinstance( + feat, wgth.WholeMemoryEmbedding + ): + indices_tensor = ( + indices + if isinstance(indices, torch.Tensor) + else torch.as_tensor(indices, device="cuda") + ) + return feat.gather(indices_tensor) + else: + return feat[indices] def get_feature_list(self) -> list[str]: return {feat_name: feats.keys() for feat_name, feats in self.fd.items()} @staticmethod - def _cast_feat_obj_to_backend(feat_obj, backend: str): + def _cast_feat_obj_to_backend(feat_obj, backend: str, **kwargs): if backend == "numpy": if isinstance(feat_obj, (cudf.DataFrame, pd.DataFrame)): - return _cast_to_numpy_ar(feat_obj.values) + return _cast_to_numpy_ar(feat_obj.values, **kwargs) else: - return _cast_to_numpy_ar(feat_obj) + return _cast_to_numpy_ar(feat_obj, **kwargs) elif backend == "torch": if isinstance(feat_obj, (cudf.DataFrame, pd.DataFrame)): - return _cast_to_torch_tensor(feat_obj.values) + return _cast_to_torch_tensor(feat_obj.values, **kwargs) else: - return _cast_to_torch_tensor(feat_obj) + return _cast_to_torch_tensor(feat_obj, **kwargs) + elif backend == "wholegraph": + return _get_wg_embedding(feat_obj, **kwargs) + +def _get_wg_embedding(feat_obj, wg_comm=None, wg_type=None, wg_location=None, **kwargs): + wg_comm_obj = wg_comm or wgth.get_local_node_communicator() + wg_type_str = wg_type or "distributed" + wg_location_str = wg_location or "cuda" -def _cast_to_torch_tensor(ar): + if isinstance(feat_obj, (cudf.DataFrame, pd.DataFrame)): + th_tensor = _cast_to_torch_tensor(feat_obj.values) + else: + th_tensor = _cast_to_torch_tensor(feat_obj) + wg_embedding = wgth.create_embedding( + wg_comm_obj, + wg_type_str, + wg_location_str, + th_tensor.dtype, + th_tensor.shape, + ) + ( + local_wg_tensor, + local_ld_offset, + ) = wg_embedding.get_embedding_tensor().get_local_tensor() + local_th_tensor = th_tensor[ + local_ld_offset : local_ld_offset + local_wg_tensor.shape[0] + ] + local_wg_tensor.copy_(local_th_tensor) + wg_comm_obj.barrier() + return wg_embedding + + +def _cast_to_torch_tensor(ar, **kwargs): if isinstance(ar, cp.ndarray): ar = torch.as_tensor(ar, device="cuda") elif isinstance(ar, np.ndarray): @@ -116,7 +233,7 @@ def _cast_to_torch_tensor(ar): return ar -def _cast_to_numpy_ar(ar): +def _cast_to_numpy_ar(ar, **kwargs): if isinstance(ar, cp.ndarray): ar = ar.get() elif type(ar).__name__ == "Tensor": diff --git a/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage.py b/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage.py index 2d1537d11e3..2b0ec4b11d0 100644 --- a/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage.py +++ b/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage.py @@ -10,7 +10,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# Import FeatureStore class import pytest import numpy as np diff --git a/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage_wholegraph.py b/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage_wholegraph.py new file mode 100644 index 00000000000..1892e8a85a6 --- /dev/null +++ b/python/cugraph/cugraph/tests/data_store/test_gnn_feat_storage_wholegraph.py @@ -0,0 +1,85 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import numpy as np + +from cugraph.gnn import FeatureStore + +from cugraph.utilities.utils import import_optional, MissingModule + +pylibwholegraph = import_optional("pylibwholegraph") +wmb = import_optional("pylibwholegraph.binding.wholememory_binding") +torch = import_optional("torch") + + +def runtest(world_rank: int, world_size: int): + from pylibwholegraph.torch.initialize import init_torch_env_and_create_wm_comm + + wm_comm, _ = init_torch_env_and_create_wm_comm( + world_rank, + world_size, + world_rank, + world_size, + ) + wm_comm = wm_comm.wmb_comm + + generator = np.random.default_rng(62) + arr = ( + generator.integers(low=0, high=100, size=100_000) + .reshape(10_000, -1) + .astype("float64") + ) + + fs = FeatureStore(backend="wholegraph") + fs.add_data(arr, "type2", "feat1") + wm_comm.barrier() + + indices_to_fetch = np.random.randint(low=0, high=len(arr), size=1024) + output_fs = fs.get_data(indices_to_fetch, type_name="type2", feat_name="feat1") + assert isinstance(output_fs, torch.Tensor) + assert output_fs.is_cuda + expected = arr[indices_to_fetch] + np.testing.assert_array_equal(output_fs.cpu().numpy(), expected) + + wmb.finalize() + + +@pytest.mark.sg +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif( + isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available" +) +def test_feature_storage_wholegraph_backend(): + from pylibwholegraph.utils.multiprocess import multiprocess_run + + gpu_count = wmb.fork_get_gpu_count() + print("gpu count:", gpu_count) + assert gpu_count > 0 + + multiprocess_run(1, runtest) + + +@pytest.mark.mg +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.skipif( + isinstance(pylibwholegraph, MissingModule), reason="wholegraph not available" +) +def test_feature_storage_wholegraph_backend_mg(): + from pylibwholegraph.utils.multiprocess import multiprocess_run + + gpu_count = wmb.fork_get_gpu_count() + print("gpu count:", gpu_count) + assert gpu_count > 0 + + multiprocess_run(gpu_count, runtest) From a8638355d4e74351e58706c4f747dcc63d23bd81 Mon Sep 17 00:00:00 2001 From: Tingyu Wang Date: Tue, 3 Oct 2023 13:45:14 -0400 Subject: [PATCH 2/6] Integrate renumbering and compression to `cugraph-dgl` to accelerate MFG creation (#3887) Allow cugraph-dgl dataloader to consume sampled outputs from BulkSampler in CSC format. Authors: - Tingyu Wang (https://github.com/tingyu66) - Seunghwa Kang (https://github.com/seunghwak) - Alex Barghi (https://github.com/alexbarghi-nv) Approvers: - Seunghwa Kang (https://github.com/seunghwak) - Alex Barghi (https://github.com/alexbarghi-nv) - Vibhu Jawa (https://github.com/VibhuJawa) URL: https://github.com/rapidsai/cugraph/pull/3887 --- .../cugraph_dgl/dataloading/__init__.py | 2 +- .../cugraph_dgl/dataloading/dataloader.py | 49 ++++-- .../cugraph_dgl/dataloading/dataset.py | 37 +++-- .../dataloading/utils/sampling_helpers.py | 155 +++++++++++++++++- .../cugraph-dgl/cugraph_dgl/nn/conv/base.py | 7 + python/cugraph-dgl/tests/test_utils.py | 28 ++++ 6 files changed, 250 insertions(+), 28 deletions(-) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py index 6cabea198f6..2fd7d29bd49 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/__init__.py @@ -13,7 +13,7 @@ from cugraph_dgl.dataloading.dataset import ( HomogenousBulkSamplerDataset, - HetrogenousBulkSamplerDataset, + HeterogenousBulkSamplerDataset, ) from cugraph_dgl.dataloading.neighbor_sampler import NeighborSampler from cugraph_dgl.dataloading.dataloader import DataLoader diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py index 0480f61807a..f154b096256 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py @@ -21,7 +21,7 @@ from dask.distributed import default_client, Event from cugraph_dgl.dataloading import ( HomogenousBulkSamplerDataset, - HetrogenousBulkSamplerDataset, + HeterogenousBulkSamplerDataset, ) from cugraph_dgl.dataloading.utils.extract_graph_helpers import ( create_cugraph_graph_from_edges_dict, @@ -47,19 +47,20 @@ def __init__( graph_sampler: cugraph_dgl.dataloading.NeighborSampler, sampling_output_dir: str, batches_per_partition: int = 50, - seeds_per_call: int = 400_000, + seeds_per_call: int = 200_000, device: torch.device = None, use_ddp: bool = False, ddp_seed: int = 0, batch_size: int = 1024, drop_last: bool = False, shuffle: bool = False, + sparse_format: str = "coo", **kwargs, ): """ Constructor for CuGraphStorage: ------------------------------- - graph : CuGraphStorage + graph : CuGraphStorage The graph. indices : Tensor or dict[ntype, Tensor] The set of indices. It can either be a tensor of @@ -89,7 +90,12 @@ def __init__( The seed for shuffling the dataset in :class:`torch.utils.data.distributed.DistributedSampler`. Only effective when :attr:`use_ddp` is True. - batch_size: int, + batch_size: int + Batch size. + sparse_format: str, default = "coo" + The sparse format of the emitted sampled graphs. Choose between "csc" + and "coo". When using "csc", the graphs are of type + cugraph_dgl.nn.SparseGraph. kwargs : dict Key-word arguments to be passed to the parent PyTorch :py:class:`torch.utils.data.DataLoader` class. Common arguments are: @@ -123,6 +129,12 @@ def __init__( ... for input_nodes, output_nodes, blocks in dataloader: ... """ + if sparse_format not in ["coo", "csc"]: + raise ValueError( + f"sparse_format must be one of 'coo', 'csc', " + f"but got {sparse_format}." + ) + self.sparse_format = sparse_format self.ddp_seed = ddp_seed self.use_ddp = use_ddp @@ -156,11 +168,12 @@ def __init__( self.cugraph_dgl_dataset = HomogenousBulkSamplerDataset( total_number_of_nodes=graph.total_number_of_nodes, edge_dir=self.graph_sampler.edge_dir, + sparse_format=sparse_format, ) else: etype_id_to_etype_str_dict = {v: k for k, v in graph._etype_id_dict.items()} - self.cugraph_dgl_dataset = HetrogenousBulkSamplerDataset( + self.cugraph_dgl_dataset = HeterogenousBulkSamplerDataset( num_nodes_dict=graph.num_nodes_dict, etype_id_dict=etype_id_to_etype_str_dict, etype_offset_dict=graph._etype_offset_d, @@ -210,14 +223,23 @@ def __iter__(self): output_dir = os.path.join( self._sampling_output_dir, "epoch_" + str(self.epoch_number) ) + kwargs = {} if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset): - deduplicate_sources = True - prior_sources_behavior = "carryover" - renumber = True + kwargs["deduplicate_sources"] = True + kwargs["prior_sources_behavior"] = "carryover" + kwargs["renumber"] = True + + if self.sparse_format == "csc": + kwargs["compression"] = "CSR" + kwargs["compress_per_hop"] = True + # The following kwargs will be deprecated in uniform sampler. + kwargs["use_legacy_names"] = False + kwargs["include_hop_column"] = False + else: - deduplicate_sources = False - prior_sources_behavior = None - renumber = False + kwargs["deduplicate_sources"] = False + kwargs["prior_sources_behavior"] = None + kwargs["renumber"] = False bs = BulkSampler( output_path=output_dir, @@ -227,10 +249,9 @@ def __iter__(self): seeds_per_call=self._seeds_per_call, fanout_vals=self.graph_sampler._reversed_fanout_vals, with_replacement=self.graph_sampler.replace, - deduplicate_sources=deduplicate_sources, - prior_sources_behavior=prior_sources_behavior, - renumber=renumber, + **kwargs, ) + if self.shuffle: self.tensorized_indices_ds.shuffle() diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py index e0d51bcf4cf..815fd30d8eb 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py @@ -19,6 +19,7 @@ from cugraph_dgl.dataloading.utils.sampling_helpers import ( create_homogeneous_sampled_graphs_from_dataframe, create_heterogeneous_sampled_graphs_from_dataframe, + create_homogeneous_sampled_graphs_from_dataframe_csc, ) @@ -33,17 +34,19 @@ def __init__( total_number_of_nodes: int, edge_dir: str, return_type: str = "dgl.Block", + sparse_format: str = "coo", ): if return_type not in ["dgl.Block", "cugraph_dgl.nn.SparseGraph"]: raise ValueError( - "return_type must be either 'dgl.Block' or \ - 'cugraph_dgl.nn.SparseGraph' " + "return_type must be either 'dgl.Block' or " + "'cugraph_dgl.nn.SparseGraph'." ) # TODO: Deprecate `total_number_of_nodes` # as it is no longer needed # in the next release self.total_number_of_nodes = total_number_of_nodes self.edge_dir = edge_dir + self.sparse_format = sparse_format self._current_batch_fn = None self._input_files = None self._return_type = return_type @@ -60,10 +63,20 @@ def __getitem__(self, idx: int): fn, batch_offset = self._batch_to_fn_d[idx] if fn != self._current_batch_fn: - df = _load_sampled_file(dataset_obj=self, fn=fn) - self._current_batches = create_homogeneous_sampled_graphs_from_dataframe( - sampled_df=df, edge_dir=self.edge_dir, return_type=self._return_type - ) + if self.sparse_format == "csc": + df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True) + self._current_batches = ( + create_homogeneous_sampled_graphs_from_dataframe_csc(df) + ) + else: + df = _load_sampled_file(dataset_obj=self, fn=fn) + self._current_batches = ( + create_homogeneous_sampled_graphs_from_dataframe( + sampled_df=df, + edge_dir=self.edge_dir, + return_type=self._return_type, + ) + ) current_offset = idx - batch_offset return self._current_batches[current_offset] @@ -87,7 +100,7 @@ def set_input_files( ) -class HetrogenousBulkSamplerDataset(torch.utils.data.Dataset): +class HeterogenousBulkSamplerDataset(torch.utils.data.Dataset): def __init__( self, num_nodes_dict: Dict[str, int], @@ -141,18 +154,18 @@ def set_input_files( ---------- input_directory: str input_directory which contains all the files that will be - loaded by HetrogenousBulkSamplerDataset + loaded by HeterogenousBulkSamplerDataset input_file_paths: List[str] - File names that will be loaded by the HetrogenousBulkSamplerDataset + File names that will be loaded by the HeterogenousBulkSamplerDataset """ _set_input_files( self, input_directory=input_directory, input_file_paths=input_file_paths ) -def _load_sampled_file(dataset_obj, fn): +def _load_sampled_file(dataset_obj, fn, skip_rename=False): df = cudf.read_parquet(os.path.join(fn)) - if dataset_obj.edge_dir == "in": + if dataset_obj.edge_dir == "in" and not skip_rename: df.rename( columns={"sources": "destinations", "destinations": "sources"}, inplace=True, @@ -181,7 +194,7 @@ def get_batch_to_fn_d(files): def _set_input_files( - dataset_obj: Union[HomogenousBulkSamplerDataset, HetrogenousBulkSamplerDataset], + dataset_obj: Union[HomogenousBulkSamplerDataset, HeterogenousBulkSamplerDataset], input_directory: Optional[str] = None, input_file_paths: Optional[List[str]] = None, ) -> None: diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index bdac3b1a323..a4f64668348 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -11,10 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations -from typing import Tuple, Dict, Optional +from typing import List, Tuple, Dict, Optional from collections import defaultdict import cudf +import cupy from cugraph.utilities.utils import import_optional +from cugraph_dgl.nn import SparseGraph dgl = import_optional("dgl") torch = import_optional("torch") @@ -401,3 +403,154 @@ def create_heterogenous_dgl_block_from_tensors_dict( block = dgl.to_block(sampled_graph, dst_nodes=seed_nodes, src_nodes=src_d) block.edata[dgl.EID] = sampled_graph.edata[dgl.EID] return block + + +def _process_sampled_df_csc( + df: cudf.DataFrame, + reverse_hop_id: bool = True, +) -> Tuple[ + Dict[int, Dict[int, Dict[str, torch.Tensor]]], + List[torch.Tensor], + List[List[int, int]], +]: + """ + Convert a dataframe generated by BulkSampler to a dictionary of tensors, to + facilitate MFG creation. The sampled graphs in the dataframe use CSC-format. + + Parameters + ---------- + df: cudf.DataFrame + The output from BulkSampler compressed in CSC format. The dataframe + should be generated with `compression="CSR"` in BulkSampler, + since the sampling routine treats seed nodes as sources. + + reverse_hop_id: bool (default=True) + Reverse hop id. + + Returns + ------- + tensors_dict: dict + A nested dictionary keyed by batch id and hop id. + `tensor_dict[batch_id][hop_id]` holds "minors" and "major_offsets" + values for CSC MFGs. + + renumber_map_list: list + List of renumbering maps for looking up global indices of nodes. One + map for each batch. + + mfg_sizes: list + List of the number of nodes in each message passing layer. For the + k-th hop, mfg_sizes[k] and mfg_sizes[k+1] is the number of sources and + destinations, respectively. + """ + # dropna + major_offsets = df.major_offsets.dropna().values + label_hop_offsets = df.label_hop_offsets.dropna().values + renumber_map_offsets = df.renumber_map_offsets.dropna().values + renumber_map = df.map.dropna().values + minors = df.minors.dropna().values + + n_batches = renumber_map_offsets.size - 1 + n_hops = int((label_hop_offsets.size - 1) / n_batches) + + # make global offsets local + major_offsets -= major_offsets[0] + label_hop_offsets -= label_hop_offsets[0] + renumber_map_offsets -= renumber_map_offsets[0] + + # get the sizes of each adjacency matrix (for MFGs) + mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape( + (n_batches, n_hops) + ) + n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] + mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) + if reverse_hop_id: + mfg_sizes = mfg_sizes[:, ::-1] + + tensors_dict = {} + renumber_map_list = [] + for batch_id in range(n_batches): + batch_dict = {} + + for hop_id in range(n_hops): + hop_dict = {} + idx = batch_id * n_hops + hop_id # idx in label_hop_offsets + major_offsets_start = label_hop_offsets[idx].item() + major_offsets_end = label_hop_offsets[idx + 1].item() + minors_start = major_offsets[major_offsets_start].item() + minors_end = major_offsets[major_offsets_end].item() + # Note: minors and major_offsets from BulkSampler are of type int32 + # and int64 respectively. Since pylibcugraphops binding code doesn't + # support distinct node and edge index type, we simply casting both + # to int32 for now. + hop_dict["minors"] = torch.as_tensor( + minors[minors_start:minors_end], device="cuda" + ).int() + hop_dict["major_offsets"] = torch.as_tensor( + major_offsets[major_offsets_start : major_offsets_end + 1] + - major_offsets[major_offsets_start], + device="cuda", + ).int() + if reverse_hop_id: + batch_dict[n_hops - 1 - hop_id] = hop_dict + else: + batch_dict[hop_id] = hop_dict + + tensors_dict[batch_id] = batch_dict + + renumber_map_list.append( + torch.as_tensor( + renumber_map[ + renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] + ], + device="cuda", + ) + ) + + return tensors_dict, renumber_map_list, mfg_sizes.tolist() + + +def _create_homogeneous_sparse_graphs_from_csc( + tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], + renumber_map_list: List[torch.Tensor], + mfg_sizes: List[int, int], +) -> List[List[torch.Tensor, torch.Tensor, List[SparseGraph]]]: + """Create mini-batches of MFGs. The input arguments are the outputs of + the function `_process_sampled_df_csc`. + + Returns + ------- + output: list + A list of mini-batches. Each mini-batch is a list that consists of + `input_nodes` tensor, `output_nodes` tensor and a list of MFGs. + """ + n_batches, n_hops = len(mfg_sizes), len(mfg_sizes[0]) - 1 + output = [] + for b_id in range(n_batches): + output_batch = [] + output_batch.append(renumber_map_list[b_id]) + output_batch.append(renumber_map_list[b_id][: mfg_sizes[b_id][-1]]) + mfgs = [ + SparseGraph( + size=(mfg_sizes[b_id][h_id], mfg_sizes[b_id][h_id + 1]), + src_ids=tensors_dict[b_id][h_id]["minors"], + cdst_ids=tensors_dict[b_id][h_id]["major_offsets"], + formats=["csc"], + reduce_memory=True, + ) + for h_id in range(n_hops) + ] + + output_batch.append(mfgs) + + output.append(output_batch) + + return output + + +def create_homogeneous_sampled_graphs_from_dataframe_csc(sampled_df: cudf.DataFrame): + """Public API to create mini-batches of MFGs using a dataframe output by + BulkSampler, where the sampled graph is compressed in CSC format.""" + return _create_homogeneous_sparse_graphs_from_csc( + *(_process_sampled_df_csc(sampled_df)) + ) diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py index 307eb33078e..ddd95a76366 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py @@ -248,6 +248,13 @@ def csr(self) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]: value = value[self._perm_csc2csr] return csrc_ids, dst_ids, value + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}(num_src_nodes={self._num_src_nodes}, " + f"num_dst_nodes={self._num_dst_nodes}, " + f"num_edges={self._src_ids.size(0)}, formats={self._formats})" + ) + class BaseConv(torch.nn.Module): r"""An abstract base class for cugraph-ops nn module.""" diff --git a/python/cugraph-dgl/tests/test_utils.py b/python/cugraph-dgl/tests/test_utils.py index 740db59ce7f..4be66758b43 100644 --- a/python/cugraph-dgl/tests/test_utils.py +++ b/python/cugraph-dgl/tests/test_utils.py @@ -22,6 +22,7 @@ create_homogeneous_sampled_graphs_from_dataframe, _get_source_destination_range, _create_homogeneous_cugraph_dgl_nn_sparse_graph, + create_homogeneous_sampled_graphs_from_dataframe_csc, ) from cugraph.utilities.utils import import_optional @@ -50,6 +51,23 @@ def get_dummy_sampled_df(): return df +def get_dummy_sampled_df_csc(): + df_dict = dict( + minors=np.array( + [1, 1, 2, 1, 0, 3, 1, 3, 2, 3, 2, 4, 0, 1, 1, 0, 3, 2], dtype=np.int32 + ), + major_offsets=np.arange(19, dtype=np.int64), + map=np.array( + [26, 29, 33, 22, 23, 32, 18, 29, 33, 33, 8, 30, 32], dtype=np.int32 + ), + renumber_map_offsets=np.array([0, 4, 9, 13], dtype=np.int64), + label_hop_offsets=np.array([0, 1, 3, 6, 7, 9, 13, 14, 16, 18], dtype=np.int64), + ) + + # convert values to Series so that NaNs are padded automatically + return cudf.DataFrame({k: cudf.Series(v) for k, v in df_dict.items()}) + + def test_get_renumber_map(): sampled_df = get_dummy_sampled_df() @@ -176,3 +194,13 @@ def test__create_homogeneous_cugraph_dgl_nn_sparse_graph(): assert sparse_graph.num_src_nodes() == 2 assert sparse_graph.num_dst_nodes() == seednodes_range + 1 assert isinstance(sparse_graph, cugraph_dgl.nn.SparseGraph) + + +def test_create_homogeneous_sampled_graphs_from_dataframe_csc(): + df = get_dummy_sampled_df_csc() + batches = create_homogeneous_sampled_graphs_from_dataframe_csc(df) + + assert len(batches) == 3 + assert torch.equal(batches[0][0], torch.IntTensor([26, 29, 33, 22]).cuda()) + assert torch.equal(batches[1][0], torch.IntTensor([23, 32, 18, 29, 33]).cuda()) + assert torch.equal(batches[2][0], torch.IntTensor([33, 8, 30, 32]).cuda()) From 5ce3ee1b11db62647337514a200845bbb392d351 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Tue, 3 Oct 2023 16:50:10 -0500 Subject: [PATCH 3/6] nx-cugraph: handle louvain with isolated nodes (#3897) This handles isolated nodes in `louvain_communities` similar to what is done in #3886. This is expected to be a temporary fix until pylibcugraph can handle isolated nodes. As a bonus, I added `isolates` algorithm :tada: CC @naimnv @rlratzel Authors: - Erik Welch (https://github.com/eriknw) Approvers: - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/3897 --- python/nx-cugraph/_nx_cugraph/__init__.py | 7 ++- python/nx-cugraph/lint.yaml | 2 +- .../nx_cugraph/algorithms/__init__.py | 1 + .../algorithms/community/louvain.py | 28 +++++++-- .../nx_cugraph/algorithms/isolate.py | 63 +++++++++++++++++++ .../nx-cugraph/nx_cugraph/classes/digraph.py | 2 +- python/nx-cugraph/nx_cugraph/classes/graph.py | 6 +- python/nx-cugraph/nx_cugraph/convert.py | 2 +- .../nx_cugraph/tests/test_community.py | 53 ++++++++++++++++ 9 files changed, 151 insertions(+), 13 deletions(-) create mode 100644 python/nx-cugraph/nx_cugraph/algorithms/isolate.py create mode 100644 python/nx-cugraph/nx_cugraph/tests/test_community.py diff --git a/python/nx-cugraph/_nx_cugraph/__init__.py b/python/nx-cugraph/_nx_cugraph/__init__.py index 9b3332106ec..ebd13daded0 100644 --- a/python/nx-cugraph/_nx_cugraph/__init__.py +++ b/python/nx-cugraph/_nx_cugraph/__init__.py @@ -31,20 +31,23 @@ # BEGIN: functions "betweenness_centrality", "edge_betweenness_centrality", + "is_isolate", + "isolates", "louvain_communities", + "number_of_isolates", # END: functions }, "extra_docstrings": { # BEGIN: extra_docstrings "betweenness_centrality": "`weight` parameter is not yet supported.", "edge_betweenness_centrality": "`weight` parameter is not yet supported.", - "louvain_communities": "`threshold` and `seed` parameters are currently ignored.", + "louvain_communities": "`seed` parameter is currently ignored.", # END: extra_docstrings }, "extra_parameters": { # BEGIN: extra_parameters "louvain_communities": { - "max_level : int, optional": "Upper limit of the number of macro-iterations.", + "max_level : int, optional": "Upper limit of the number of macro-iterations (max: 500).", }, # END: extra_parameters }, diff --git a/python/nx-cugraph/lint.yaml b/python/nx-cugraph/lint.yaml index 6a462a6af79..338ca7b230e 100644 --- a/python/nx-cugraph/lint.yaml +++ b/python/nx-cugraph/lint.yaml @@ -63,7 +63,7 @@ repos: # These versions need updated manually - flake8==6.1.0 - flake8-bugbear==23.9.16 - - flake8-simplify==0.20.0 + - flake8-simplify==0.21.0 - repo: https://github.com/asottile/yesqa rev: v1.5.0 hooks: diff --git a/python/nx-cugraph/nx_cugraph/algorithms/__init__.py b/python/nx-cugraph/nx_cugraph/algorithms/__init__.py index 3a585452d6d..dfd9adfc61a 100644 --- a/python/nx-cugraph/nx_cugraph/algorithms/__init__.py +++ b/python/nx-cugraph/nx_cugraph/algorithms/__init__.py @@ -12,3 +12,4 @@ # limitations under the License. from . import centrality, community from .centrality import * +from .isolate import * diff --git a/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py b/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py index a183b59fe1d..dc209870c89 100644 --- a/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py +++ b/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py @@ -10,7 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import sys +import warnings import pylibcugraph as plc @@ -22,19 +22,23 @@ not_implemented_for, ) +from ..isolate import _isolates + __all__ = ["louvain_communities"] @not_implemented_for("directed") @networkx_algorithm( extra_params={ - "max_level : int, optional": "Upper limit of the number of macro-iterations." + "max_level : int, optional": ( + "Upper limit of the number of macro-iterations (max: 500)." + ) } ) def louvain_communities( G, weight="weight", resolution=1, threshold=0.0000001, seed=None, *, max_level=None ): - """`threshold` and `seed` parameters are currently ignored.""" + """`seed` parameter is currently ignored.""" # NetworkX allows both directed and undirected, but cugraph only allows undirected. seed = _seed_to_int(seed) # Unused, but ensure it's valid for future compatibility G = _to_undirected_graph(G, weight) @@ -42,7 +46,14 @@ def louvain_communities( # TODO: PLC doesn't handle empty graphs gracefully! return [{key} for key in G._nodeiter_to_iter(range(len(G)))] if max_level is None: - max_level = sys.maxsize + max_level = 500 + elif max_level > 500: + warnings.warn( + f"max_level is set too high (={max_level}), setting it to 500.", + UserWarning, + stacklevel=2, + ) + max_level = 500 vertices, clusters, modularity = plc.louvain( resource_handle=plc.ResourceHandle(), graph=G._get_plc_graph(), @@ -52,7 +63,14 @@ def louvain_communities( do_expensive_check=False, ) groups = _groupby(clusters, vertices) - return [set(G._nodearray_to_list(node_ids)) for node_ids in groups.values()] + rv = [set(G._nodearray_to_list(node_ids)) for node_ids in groups.values()] + # TODO: PLC doesn't handle isolated vertices yet, so this is a temporary fix + isolates = _isolates(G) + if isolates.size > 0: + isolates = isolates[isolates > vertices.max()] + if isolates.size > 0: + rv.extend({node} for node in G._nodearray_to_list(isolates)) + return rv @louvain_communities._can_run diff --git a/python/nx-cugraph/nx_cugraph/algorithms/isolate.py b/python/nx-cugraph/nx_cugraph/algorithms/isolate.py new file mode 100644 index 00000000000..774627e84f6 --- /dev/null +++ b/python/nx-cugraph/nx_cugraph/algorithms/isolate.py @@ -0,0 +1,63 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import cupy as cp + +from nx_cugraph.convert import _to_graph +from nx_cugraph.utils import networkx_algorithm + +if TYPE_CHECKING: # pragma: no cover + from nx_cugraph.typing import IndexValue + +__all__ = ["is_isolate", "isolates", "number_of_isolates"] + + +@networkx_algorithm +def is_isolate(G, n): + G = _to_graph(G) + index = n if G.key_to_id is None else G.key_to_id[n] + return not ( + (G.row_indices == index).any().tolist() + or G.is_directed() + and (G.col_indices == index).any().tolist() + ) + + +def _mark_isolates(G) -> cp.ndarray[bool]: + """Return a boolean mask array indicating indices of isolated nodes.""" + mark_isolates = cp.ones(len(G), bool) + mark_isolates[G.row_indices] = False + if G.is_directed(): + mark_isolates[G.col_indices] = False + return mark_isolates + + +def _isolates(G) -> cp.ndarray[IndexValue]: + """Like isolates, but return an array of indices instead of an iterator of nodes.""" + G = _to_graph(G) + return cp.nonzero(_mark_isolates(G))[0] + + +@networkx_algorithm +def isolates(G): + G = _to_graph(G) + return G._nodeiter_to_iter(iter(_isolates(G).tolist())) + + +@networkx_algorithm +def number_of_isolates(G): + G = _to_graph(G) + return _mark_isolates(G).sum().tolist() diff --git a/python/nx-cugraph/nx_cugraph/classes/digraph.py b/python/nx-cugraph/nx_cugraph/classes/digraph.py index 0aaf88fd793..72a1bff21a9 100644 --- a/python/nx-cugraph/nx_cugraph/classes/digraph.py +++ b/python/nx-cugraph/nx_cugraph/classes/digraph.py @@ -20,7 +20,7 @@ from .graph import Graph -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from nx_cugraph.typing import NodeKey __all__ = ["DiGraph"] diff --git a/python/nx-cugraph/nx_cugraph/classes/graph.py b/python/nx-cugraph/nx_cugraph/classes/graph.py index 1432f68c752..ded4cc3943f 100644 --- a/python/nx-cugraph/nx_cugraph/classes/graph.py +++ b/python/nx-cugraph/nx_cugraph/classes/graph.py @@ -23,7 +23,7 @@ import nx_cugraph as nxcg -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from collections.abc import Iterable, Iterator from nx_cugraph.typing import ( @@ -245,9 +245,9 @@ def from_dcsc( def __new__(cls, incoming_graph_data=None, **attr) -> Graph: if incoming_graph_data is None: new_graph = cls.from_coo(0, cp.empty(0, np.int32), cp.empty(0, np.int32)) - elif incoming_graph_data.__class__ is new_graph.__class__: + elif incoming_graph_data.__class__ is cls: new_graph = incoming_graph_data.copy() - elif incoming_graph_data.__class__ is new_graph.to_networkx_class(): + elif incoming_graph_data.__class__ is cls.to_networkx_class(): new_graph = nxcg.from_networkx(incoming_graph_data, preserve_all_attrs=True) else: raise NotImplementedError diff --git a/python/nx-cugraph/nx_cugraph/convert.py b/python/nx-cugraph/nx_cugraph/convert.py index 9be8cac7877..1240ea71db7 100644 --- a/python/nx-cugraph/nx_cugraph/convert.py +++ b/python/nx-cugraph/nx_cugraph/convert.py @@ -24,7 +24,7 @@ import nx_cugraph as nxcg -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from nx_cugraph.typing import AttrKey, Dtype, EdgeValue, NodeValue __all__ = [ diff --git a/python/nx-cugraph/nx_cugraph/tests/test_community.py b/python/nx-cugraph/nx_cugraph/tests/test_community.py new file mode 100644 index 00000000000..126f45c14ae --- /dev/null +++ b/python/nx-cugraph/nx_cugraph/tests/test_community.py @@ -0,0 +1,53 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import networkx as nx +import pytest + +import nx_cugraph as nxcg + + +def test_louvain_isolated_nodes(): + is_nx_30_or_31 = hasattr(nx.classes, "backends") + + def check(left, right): + assert len(left) == len(right) + assert set(map(frozenset, left)) == set(map(frozenset, right)) + + # Empty graph (no nodes) + G = nx.Graph() + if is_nx_30_or_31: + with pytest.raises(ZeroDivisionError): + nx.community.louvain_communities(G) + else: + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Graph with no edges + G.add_nodes_from(range(5)) + if is_nx_30_or_31: + with pytest.raises(ZeroDivisionError): + nx.community.louvain_communities(G) + else: + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Graph with isolated nodes + G.add_edge(1, 2) + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Another one + G.add_edge(4, 4) + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) From 26af14ebad6a6b1f115779d90d3c0a68f0d380ee Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Wed, 4 Oct 2023 11:44:01 -0400 Subject: [PATCH 4/6] cuGraph-PyG MFG Creation and Conversion (#3873) Integrates the new CSR bulk sampler output, allowing reading of batches without having to call CSC conversion or count the numbers of vertices and edges in each batch. Should result in major performance improvements, especially for small batches. Authors: - Alex Barghi (https://github.com/alexbarghi-nv) - Seunghwa Kang (https://github.com/seunghwak) - Brad Rees (https://github.com/BradReesWork) Approvers: - Brad Rees (https://github.com/BradReesWork) - Ray Douglass (https://github.com/raydouglass) - Tingyu Wang (https://github.com/tingyu66) URL: https://github.com/rapidsai/cugraph/pull/3873 --- ci/test_python.sh | 7 +- .../cugraph_pyg/data/cugraph_store.py | 38 ++-- .../cugraph_pyg/loader/cugraph_node_loader.py | 199 +++++++++++++---- .../cugraph_pyg/sampler/cugraph_sampler.py | 130 +++++++++++- .../tests/mg/test_mg_cugraph_sampler.py | 10 +- .../tests/mg/test_mg_cugraph_store.py | 4 +- .../cugraph_pyg/tests/test_cugraph_loader.py | 200 ++++++++++++++++-- .../cugraph_pyg/tests/test_cugraph_sampler.py | 10 +- .../cugraph_pyg/tests/test_cugraph_store.py | 4 +- 9 files changed, 497 insertions(+), 105 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index 7b0077991ae..825d5b242d5 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -200,8 +200,11 @@ if [[ "${RAPIDS_CUDA_VERSION}" == "11.8.0" ]]; then --channel pytorch \ --channel nvidia \ 'pyg=2.3' \ - 'pytorch>=2.0' \ - 'pytorch-cuda>=11.8' + 'pytorch=2.0.0' \ + 'pytorch-cuda=11.8' + + # Install pyg dependencies (which requires pip) + pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.0+cu118.html rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index e0d318adbe0..fd2172e6ade 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -819,8 +819,8 @@ def _get_renumbered_edge_groups_from_sample( before this one to get the noi_index. Example Input: Series({ - 'sources': [0, 5, 11, 3], - 'destinations': [8, 2, 3, 5]}, + 'majors': [0, 5, 11, 3], + 'minors': [8, 2, 3, 5]}, 'edge_type': [1, 3, 5, 14] }), { @@ -865,24 +865,22 @@ def _get_renumbered_edge_groups_from_sample( index=cupy.asarray(id_table), ).sort_index() - # Renumber the sources using binary search + # Renumber the majors using binary search # Step 1: get the index of the new id ix_r = torch.searchsorted( torch.as_tensor(id_map.index.values, device="cuda"), - torch.as_tensor(sampling_results.sources.values, device="cuda"), + torch.as_tensor(sampling_results.majors.values, device="cuda"), ) # Step 2: Go from id indices to actual ids row_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ ix_r ] - # Renumber the destinations using binary search + # Renumber the minors using binary search # Step 1: get the index of the new id ix_c = torch.searchsorted( torch.as_tensor(id_map.index.values, device="cuda"), - torch.as_tensor( - sampling_results.destinations.values, device="cuda" - ), + torch.as_tensor(sampling_results.minors.values, device="cuda"), ) # Step 2: Go from id indices to actual ids col_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ @@ -897,7 +895,7 @@ def _get_renumbered_edge_groups_from_sample( "new_id": cupy.arange(dst_id_table.shape[0]), } ).set_index("dst") - dst = dst_id_map["new_id"].loc[sampling_results.destinations] + dst = dst_id_map["new_id"].loc[sampling_results.minors] col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda") src_id_table = noi_index[src_type] @@ -907,7 +905,7 @@ def _get_renumbered_edge_groups_from_sample( "new_id": cupy.arange(src_id_table.shape[0]), } ).set_index("src") - src = src_id_map["new_id"].loc[sampling_results.sources] + src = src_id_map["new_id"].loc[sampling_results.majors] row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda") else: @@ -929,12 +927,12 @@ def _get_renumbered_edge_groups_from_sample( else: # CSC dst_type, _, src_type = pyg_can_edge_type - # Get the de-offsetted destinations + # Get the de-offsetted minors dst_num_type = self._numeric_vertex_type_from_name(dst_type) - destinations = torch.as_tensor( - sampling_results.destinations.iloc[ix].values, device="cuda" + minors = torch.as_tensor( + sampling_results.minors.iloc[ix].values, device="cuda" ) - destinations -= self.__vertex_type_offsets["start"][dst_num_type] + minors -= self.__vertex_type_offsets["start"][dst_num_type] # Create the col entry for this type dst_id_table = noi_index[dst_type] @@ -944,15 +942,15 @@ def _get_renumbered_edge_groups_from_sample( .rename(columns={"index": "new_id"}) .set_index("dst") ) - dst = dst_id_map["new_id"].loc[cupy.asarray(destinations)] + dst = dst_id_map["new_id"].loc[cupy.asarray(minors)] col_dict[pyg_can_edge_type] = torch.as_tensor(dst.values, device="cuda") - # Get the de-offsetted sources + # Get the de-offsetted majors src_num_type = self._numeric_vertex_type_from_name(src_type) - sources = torch.as_tensor( - sampling_results.sources.iloc[ix].values, device="cuda" + majors = torch.as_tensor( + sampling_results.majors.iloc[ix].values, device="cuda" ) - sources -= self.__vertex_type_offsets["start"][src_num_type] + majors -= self.__vertex_type_offsets["start"][src_num_type] # Create the row entry for this type src_id_table = noi_index[src_type] @@ -962,7 +960,7 @@ def _get_renumbered_edge_groups_from_sample( .rename(columns={"index": "new_id"}) .set_index("src") ) - src = src_id_map["new_id"].loc[cupy.asarray(sources)] + src = src_id_map["new_id"].loc[cupy.asarray(majors)] row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda") return row_dict, col_dict diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index cf7eb330d67..8552e7412e0 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -25,7 +25,9 @@ from cugraph_pyg.data import CuGraphStore from cugraph_pyg.sampler.cugraph_sampler import ( _sampler_output_from_sampling_results_heterogeneous, - _sampler_output_from_sampling_results_homogeneous, + _sampler_output_from_sampling_results_homogeneous_csr, + _sampler_output_from_sampling_results_homogeneous_coo, + filter_cugraph_store_csc, ) from typing import Union, Tuple, Sequence, List, Dict @@ -58,6 +60,7 @@ def __init__( # Sampler args num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]] = None, replace: bool = True, + compression: str = "COO", # Other kwargs for the BulkSampler **kwargs, ): @@ -128,6 +131,10 @@ def __init__( self.__batches_per_partition = batches_per_partition self.__starting_batch_id = starting_batch_id + self._total_read_time = 0.0 + self._total_convert_time = 0.0 + self._total_feature_time = 0.0 + if input_nodes is None: # Will be loading from disk self.__num_batches = input_nodes @@ -174,6 +181,10 @@ def __init__( with_replacement=replace, batches_per_partition=self.__batches_per_partition, renumber=renumber, + use_legacy_names=False, + deduplicate_sources=True, + prior_sources_behavior="exclude", + include_hop_column=(compression == "COO"), **kwargs, ) @@ -211,6 +222,10 @@ def __init__( self.__input_files = iter(os.listdir(self.__directory.name)) def __next__(self): + from time import perf_counter + + start_time_read_data = perf_counter() + # Load the next set of sampling results if necessary if self.__next_batch >= self.__end_exclusive: if self.__directory is None: @@ -245,51 +260,98 @@ def __next__(self): fname, ) - columns = { - "sources": "int64", - "destinations": "int64", - # 'edge_id':'int64', - "edge_type": "int32", - "batch_id": "int32", - "hop_id": "int32", - } - raw_sample_data = cudf.read_parquet(parquet_path) + if "map" in raw_sample_data.columns: - num_batches = end_inclusive - self.__start_inclusive + 1 + if "renumber_map_offsets" not in raw_sample_data.columns: + num_batches = end_inclusive - self.__start_inclusive + 1 - map_end = raw_sample_data["map"].iloc[num_batches] + map_end = raw_sample_data["map"].iloc[num_batches] - map = torch.as_tensor( - raw_sample_data["map"].iloc[0:map_end], device="cuda" - ) - raw_sample_data.drop("map", axis=1, inplace=True) + map = torch.as_tensor( + raw_sample_data["map"].iloc[0:map_end], device="cuda" + ) + raw_sample_data.drop("map", axis=1, inplace=True) - self.__renumber_map_offsets = map[0 : num_batches + 1] - map[0] - self.__renumber_map = map[num_batches + 1 :] + self.__renumber_map_offsets = map[0 : num_batches + 1] - map[0] + self.__renumber_map = map[num_batches + 1 :] + else: + self.__renumber_map = raw_sample_data["map"] + self.__renumber_map_offsets = raw_sample_data[ + "renumber_map_offsets" + ] + raw_sample_data.drop( + columns=["map", "renumber_map_offsets"], inplace=True + ) + + self.__renumber_map.dropna(inplace=True) + self.__renumber_map = torch.as_tensor( + self.__renumber_map, device="cuda" + ) + + self.__renumber_map_offsets.dropna(inplace=True) + self.__renumber_map_offsets = torch.as_tensor( + self.__renumber_map_offsets, device="cuda" + ) else: self.__renumber_map = None - self.__data = raw_sample_data[list(columns.keys())].astype(columns) - self.__data.dropna(inplace=True) + self.__data = raw_sample_data + self.__coo = "majors" in self.__data.columns + if self.__coo: + self.__data.dropna(inplace=True) if ( len(self.__graph_store.edge_types) == 1 and len(self.__graph_store.node_types) == 1 ): - group_cols = ["batch_id", "hop_id"] - self.__data_index = self.__data.groupby(group_cols, as_index=True).agg( - {"sources": "max", "destinations": "max"} - ) - self.__data_index.rename( - columns={"sources": "src_max", "destinations": "dst_max"}, - inplace=True, - ) - self.__data_index = self.__data_index.to_dict(orient="index") + if self.__coo: + group_cols = ["batch_id", "hop_id"] + self.__data_index = self.__data.groupby( + group_cols, as_index=True + ).agg({"majors": "max", "minors": "max"}) + self.__data_index.rename( + columns={"majors": "src_max", "minors": "dst_max"}, + inplace=True, + ) + self.__data_index = self.__data_index.to_dict(orient="index") + else: + self.__data_index = None + + self.__label_hop_offsets = self.__data["label_hop_offsets"] + self.__data.drop(columns=["label_hop_offsets"], inplace=True) + self.__label_hop_offsets.dropna(inplace=True) + self.__label_hop_offsets = torch.as_tensor( + self.__label_hop_offsets, device="cuda" + ) + self.__label_hop_offsets -= self.__label_hop_offsets[0].clone() + + self.__major_offsets = self.__data["major_offsets"] + self.__data.drop(columns="major_offsets", inplace=True) + self.__major_offsets.dropna(inplace=True) + self.__major_offsets = torch.as_tensor( + self.__major_offsets, device="cuda" + ) + self.__major_offsets -= self.__major_offsets[0].clone() + + self.__minors = self.__data["minors"] + self.__data.drop(columns="minors", inplace=True) + self.__minors.dropna(inplace=True) + self.__minors = torch.as_tensor(self.__minors, device="cuda") + + num_batches = self.__end_exclusive - self.__start_inclusive + offsets_len = len(self.__label_hop_offsets) - 1 + if offsets_len % num_batches != 0: + raise ValueError("invalid label-hop offsets") + self.__fanout_length = int(offsets_len / num_batches) + + end_time_read_data = perf_counter() + self._total_read_time += end_time_read_data - start_time_read_data # Pull the next set of sampling results out of the dataframe in memory - f = self.__data["batch_id"] == self.__next_batch + if self.__coo: + f = self.__data["batch_id"] == self.__next_batch if self.__renumber_map is not None: i = self.__next_batch - self.__start_inclusive @@ -301,18 +363,43 @@ def __next__(self): else: current_renumber_map = None + start_time_convert = perf_counter() # Get and return the sampled subgraph if ( len(self.__graph_store.edge_types) == 1 and len(self.__graph_store.node_types) == 1 ): - sampler_output = _sampler_output_from_sampling_results_homogeneous( - self.__data[f], - current_renumber_map, - self.__graph_store, - self.__data_index, - self.__next_batch, - ) + if self.__coo: + sampler_output = _sampler_output_from_sampling_results_homogeneous_coo( + self.__data[f], + current_renumber_map, + self.__graph_store, + self.__data_index, + self.__next_batch, + ) + else: + i = (self.__next_batch - self.__start_inclusive) * self.__fanout_length + current_label_hop_offsets = self.__label_hop_offsets[ + i : i + self.__fanout_length + 1 + ] + + current_major_offsets = self.__major_offsets[ + current_label_hop_offsets[0] : (current_label_hop_offsets[-1] + 1) + ] + + current_minors = self.__minors[ + current_major_offsets[0] : current_major_offsets[-1] + ] + + sampler_output = _sampler_output_from_sampling_results_homogeneous_csr( + current_major_offsets, + current_minors, + current_renumber_map, + self.__graph_store, + current_label_hop_offsets, + self.__data_index, + self.__next_batch, + ) else: sampler_output = _sampler_output_from_sampling_results_heterogeneous( self.__data[f], current_renumber_map, self.__graph_store @@ -321,18 +408,35 @@ def __next__(self): # Get ready for next iteration self.__next_batch += 1 + end_time_convert = perf_counter() + self._total_convert_time += end_time_convert - start_time_convert + + start_time_feature = perf_counter() # Create a PyG HeteroData object, loading the required features - out = torch_geometric.loader.utils.filter_custom_store( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) + if self.__coo: + out = torch_geometric.loader.utils.filter_custom_store( + self.__feature_store, + self.__graph_store, + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) + else: + if self.__graph_store.order == "CSR": + raise ValueError("CSR format incompatible with CSC output") + + out = filter_cugraph_store_csc( + self.__feature_store, + self.__graph_store, + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) # Account for CSR format in cuGraph vs. CSC format in PyG - if self.__graph_store.order == "CSC": + if self.__coo and self.__graph_store.order == "CSC": for node_type in out.edge_index_dict: out[node_type].edge_index[0], out[node_type].edge_index[1] = ( out[node_type].edge_index[1], @@ -342,6 +446,9 @@ def __next__(self): out.set_value_dict("num_sampled_nodes", sampler_output.num_sampled_nodes) out.set_value_dict("num_sampled_edges", sampler_output.num_sampled_edges) + end_time_feature = perf_counter() + self._total_feature_time = end_time_feature - start_time_feature + return out @property diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py index 6e8c4322418..300ca9beb5a 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py @@ -53,10 +53,10 @@ def _get_unique_nodes( The unique nodes of the given node type. """ if node_position == "src": - edge_index = "sources" + edge_index = "majors" edge_sel = 0 elif node_position == "dst": - edge_index = "destinations" + edge_index = "minors" edge_sel = -1 else: raise ValueError(f"Illegal value {node_position} for node_position") @@ -78,7 +78,7 @@ def _get_unique_nodes( return sampling_results_node[edge_index] -def _sampler_output_from_sampling_results_homogeneous( +def _sampler_output_from_sampling_results_homogeneous_coo( sampling_results: cudf.DataFrame, renumber_map: torch.Tensor, graph_store: CuGraphStore, @@ -133,11 +133,11 @@ def _sampler_output_from_sampling_results_homogeneous( noi_index = {node_type: torch.as_tensor(renumber_map, device="cuda")} row_dict = { - edge_type: torch.as_tensor(sampling_results.sources, device="cuda"), + edge_type: torch.as_tensor(sampling_results.majors, device="cuda"), } col_dict = { - edge_type: torch.as_tensor(sampling_results.destinations, device="cuda"), + edge_type: torch.as_tensor(sampling_results.minors, device="cuda"), } num_nodes_per_hop_dict[node_type][0] = data_index[batch_id, 0]["src_max"] + 1 @@ -177,6 +177,88 @@ def _sampler_output_from_sampling_results_homogeneous( ) +def _sampler_output_from_sampling_results_homogeneous_csr( + major_offsets: torch.Tensor, + minors: torch.Tensor, + renumber_map: torch.Tensor, + graph_store: CuGraphStore, + label_hop_offsets: torch.Tensor, + batch_id: int, + metadata: Sequence = None, +) -> HeteroSamplerOutput: + """ + Parameters + ---------- + major_offsets: torch.Tensor + The major offsets for the CSC/CSR matrix ("row pointer") + minors: torch.Tensor + The minors for the CSC/CSR matrix ("col index") + renumber_map: torch.Tensor + The tensor containing the renumber map. + Required. + graph_store: CuGraphStore + The graph store containing the structure of the sampled graph. + label_hop_offsets: torch.Tensor + The tensor containing the label-hop offsets. + batch_id: int + The current batch id, whose samples are being retrieved + from the sampling results and data index. + metadata: Tensor + The metadata for the sampled batch. + + Returns + ------- + HeteroSamplerOutput + """ + + if len(graph_store.edge_types) > 1 or len(graph_store.node_types) > 1: + raise ValueError("Graph is heterogeneous") + + if renumber_map is None: + raise ValueError("Renumbered input is expected for homogeneous graphs") + + node_type = graph_store.node_types[0] + edge_type = graph_store.edge_types[0] + + major_offsets = major_offsets.clone() - major_offsets[0] + label_hop_offsets = label_hop_offsets.clone() - label_hop_offsets[0] + + num_edges_per_hop_dict = {edge_type: major_offsets[label_hop_offsets].diff().cpu()} + + label_hop_offsets = label_hop_offsets.cpu() + num_nodes_per_hop_dict = { + node_type: torch.concat( + [ + label_hop_offsets.diff(), + (renumber_map.shape[0] - label_hop_offsets[-1]).reshape((1,)), + ] + ).cpu() + } + + noi_index = {node_type: torch.as_tensor(renumber_map, device="cuda")} + + col_dict = { + edge_type: major_offsets, + } + + row_dict = { + edge_type: minors, + } + + if HeteroSamplerOutput is None: + raise ImportError("Error importing from pyg") + + return HeteroSamplerOutput( + node=noi_index, + row=row_dict, + col=col_dict, + edge=None, + num_sampled_nodes=num_nodes_per_hop_dict, + num_sampled_edges=num_edges_per_hop_dict, + metadata=metadata, + ) + + def _sampler_output_from_sampling_results_heterogeneous( sampling_results: cudf.DataFrame, renumber_map: cudf.Series, @@ -244,8 +326,8 @@ def _sampler_output_from_sampling_results_heterogeneous( cudf.Series( torch.concat( [ - torch.as_tensor(sampling_results_hop_0.sources, device="cuda"), - torch.as_tensor(sampling_results.destinations, device="cuda"), + torch.as_tensor(sampling_results_hop_0.majors, device="cuda"), + torch.as_tensor(sampling_results.minors, device="cuda"), ] ), name="nodes_of_interest", @@ -320,3 +402,37 @@ def _sampler_output_from_sampling_results_heterogeneous( num_sampled_edges=num_edges_per_hop_dict, metadata=metadata, ) + + +def filter_cugraph_store_csc( + feature_store: torch_geometric.data.FeatureStore, + graph_store: torch_geometric.data.GraphStore, + node_dict: Dict[str, torch.Tensor], + row_dict: Dict[str, torch.Tensor], + col_dict: Dict[str, torch.Tensor], + edge_dict: Dict[str, Tuple[torch.Tensor]], +) -> torch_geometric.data.HeteroData: + data = torch_geometric.data.HeteroData() + + for attr in graph_store.get_all_edge_attrs(): + key = attr.edge_type + if key in row_dict and key in col_dict: + data.put_edge_index( + (row_dict[key], col_dict[key]), + edge_type=key, + layout="csc", + is_sorted=True, + ) + + required_attrs = [] + for attr in feature_store.get_all_tensor_attrs(): + if attr.group_name in node_dict: + attr.index = node_dict[attr.group_name] + required_attrs.append(attr) + data[attr.group_name].num_nodes = attr.index.size(0) + + tensors = feature_store.multi_get_tensor(required_attrs) + for i, attr in enumerate(required_attrs): + data[attr.group_name][attr.attr_name] = tensors[i] + + return data diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index a1a72a44d0c..80a2d0a6c79 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -53,9 +53,10 @@ def test_neighbor_sample(dask_client, basic_graph_1): random_state=62, return_offsets=False, return_hops=True, + use_legacy_names=False, ) .compute() - .sort_values(by=["sources", "destinations"]) + .sort_values(by=["majors", "minors"]) ) out = _sampler_output_from_sampling_results_heterogeneous( @@ -116,8 +117,9 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph random_state=62, return_offsets=False, with_batch_ids=True, + use_legacy_names=False, ) - .sort_values(by=["sources", "destinations"]) + .sort_values(by=["majors", "minors"]) .compute() ) @@ -193,8 +195,8 @@ def test_neighbor_sample_mock_sampling_results(dask_client): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py index 43b1e5da5a0..ed7f70034e2 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py @@ -220,8 +220,8 @@ def test_renumber_edges(abc_graph, dask_client): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py index 48a21cb7fd6..03274948158 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py @@ -22,6 +22,8 @@ from cugraph_pyg.loader import CuGraphNeighborLoader from cugraph_pyg.loader import BulkSampleLoader from cugraph_pyg.data import CuGraphStore +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv + from cugraph.gnn import FeatureStore from cugraph.utilities.utils import import_optional, MissingModule @@ -98,8 +100,8 @@ def test_cugraph_loader_from_disk(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -130,12 +132,10 @@ def test_cugraph_loader_from_disk(): assert list(edge_index.shape) == [2, 8] assert ( - edge_index[0].tolist() - == bogus_samples.sources.dropna().values_host.tolist() + edge_index[0].tolist() == bogus_samples.majors.dropna().values_host.tolist() ) assert ( - edge_index[1].tolist() - == bogus_samples.destinations.dropna().values_host.tolist() + edge_index[1].tolist() == bogus_samples.minors.dropna().values_host.tolist() ) assert num_samples == 256 @@ -157,8 +157,8 @@ def test_cugraph_loader_from_disk_subset(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -190,13 +190,77 @@ def test_cugraph_loader_from_disk_subset(): assert list(edge_index.shape) == [2, 8] assert ( - edge_index[0].tolist() - == bogus_samples.sources.dropna().values_host.tolist() + edge_index[0].tolist() == bogus_samples.majors.dropna().values_host.tolist() + ) + assert ( + edge_index[1].tolist() == bogus_samples.minors.dropna().values_host.tolist() ) + + assert num_samples == 100 + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +def test_cugraph_loader_from_disk_subset_csr(): + m = [2, 9, 99, 82, 11, 13] + n = torch.arange(1, 1 + len(m), dtype=torch.int32) + x = torch.zeros(256, dtype=torch.int32) + x[torch.tensor(m, dtype=torch.int32)] = n + F = FeatureStore() + F.add_data(x, "t0", "x") + + G = {("t0", "knows", "t0"): 9080} + N = {"t0": 256} + + cugraph_store = CuGraphStore(F, G, N) + + bogus_samples = cudf.DataFrame( + { + "major_offsets": [0, 3, 5, 7, 8, None, None, None], + "minors": [1, 2, 3, 0, 3, 4, 5, 1], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "label_hop_offsets": cudf.Series( + [0, 1, 4, None, None, None, None, None], dtype="int32" + ), + "renumber_map_offsets": cudf.Series([0, 6], dtype="int32"), + } + ) + map = cudf.Series(m, name="map") + bogus_samples["map"] = map + + tempdir = tempfile.TemporaryDirectory() + for s in range(256): + # offset the offsets + bogus_samples["batch_id"] = cupy.int32(s) + bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet")) + + loader = BulkSampleLoader( + feature_store=cugraph_store, + graph_store=cugraph_store, + directory=tempdir, + input_files=list(os.listdir(tempdir.name))[100:200], + ) + + num_samples = 0 + for sample in loader: + num_samples += 1 + assert sample["t0"]["num_nodes"] == 6 + + assert sample["t0"]["x"].tolist() == [1, 2, 3, 4, 5, 6] + + edge_index = sample[("t0", "knows", "t0")]["adj_t"] + assert edge_index.size(0) == 4 + assert edge_index.size(1) == 6 + + colptr, row, _ = edge_index.csr() + assert ( - edge_index[1].tolist() - == bogus_samples.destinations.dropna().values_host.tolist() + colptr.tolist() == bogus_samples.major_offsets.dropna().values_host.tolist() ) + assert row.tolist() == bogus_samples.minors.dropna().values_host.tolist() + + assert sample["t0"]["num_sampled_nodes"].tolist() == [1, 3, 2] + assert sample["t0", "knows", "t0"]["num_sampled_edges"].tolist() == [3, 5] assert num_samples == 100 @@ -215,8 +279,8 @@ def test_cugraph_loader_e2e_coo(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -253,8 +317,6 @@ def test_cugraph_loader_e2e_coo(): num_sampled_nodes = hetero_data["t0"]["num_sampled_nodes"] num_sampled_edges = hetero_data["t0", "knows", "t0"]["num_sampled_edges"] - print(num_sampled_nodes, num_sampled_edges) - for i in range(len(convs)): x, ei, _ = trim(i, num_sampled_nodes, num_sampled_edges, x, ei, None) @@ -263,9 +325,111 @@ def test_cugraph_loader_e2e_coo(): x = convs[i](x, ei, size=(s, s)) x = relu(x) x = dropout(x, p=0.5) - print(x.shape) - print(x.shape) x = x.narrow(dim=0, start=0, length=x.shape[0] - num_sampled_nodes[1]) assert list(x.shape) == [3, 1] + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.parametrize("framework", ["pyg", "cugraph-ops"]) +def test_cugraph_loader_e2e_csc(framework): + m = [2, 9, 99, 82, 9, 3, 18, 1, 12] + x = torch.randint(3000, (256, 256)).to(torch.float32) + F = FeatureStore() + F.add_data(x, "t0", "x") + + G = {("t0", "knows", "t0"): 9999} + N = {"t0": 256} + + cugraph_store = CuGraphStore(F, G, N) + + bogus_samples = cudf.DataFrame( + { + "major_offsets": [0, 3, 5, 7, 8, None, None, None], + "minors": [1, 2, 3, 0, 3, 4, 5, 1], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "label_hop_offsets": cudf.Series( + [0, 1, 4, None, None, None, None, None], dtype="int32" + ), + "renumber_map_offsets": cudf.Series([0, 6], dtype="int32"), + } + ) + map = cudf.Series(m, name="map") + bogus_samples = bogus_samples.join(map, how="outer").sort_index() + + tempdir = tempfile.TemporaryDirectory() + for s in range(256): + bogus_samples["batch_id"] = cupy.int32(s) + bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet")) + + loader = BulkSampleLoader( + feature_store=cugraph_store, + graph_store=cugraph_store, + directory=tempdir, + input_files=list(os.listdir(tempdir.name))[100:200], + ) + + if framework == "pyg": + convs = [ + torch_geometric.nn.SAGEConv(256, 64, aggr="mean").cuda(), + torch_geometric.nn.SAGEConv(64, 1, aggr="mean").cuda(), + ] + else: + convs = [ + CuGraphSAGEConv(256, 64, aggr="mean").cuda(), + CuGraphSAGEConv(64, 1, aggr="mean").cuda(), + ] + + trim = trim_to_layer.TrimToLayer() + relu = torch.nn.functional.relu + dropout = torch.nn.functional.dropout + + for hetero_data in loader: + x = hetero_data["t0"]["x"].cuda() + + if framework == "pyg": + ei = hetero_data["t0", "knows", "t0"]["adj_t"].coo() + ei = torch.stack((ei[0], ei[1])) + else: + ei = hetero_data["t0", "knows", "t0"]["adj_t"].csr() + ei = [ei[1], ei[0], x.shape[0]] + + num_sampled_nodes = hetero_data["t0"]["num_sampled_nodes"] + num_sampled_edges = hetero_data["t0", "knows", "t0"]["num_sampled_edges"] + + s = x.shape[0] + for i in range(len(convs)): + if framework == "pyg": + x, ei, _ = trim(i, num_sampled_nodes, num_sampled_edges, x, ei, None) + else: + if i > 0: + x = x.narrow( + dim=0, + start=0, + length=s - num_sampled_nodes[-i], + ) + + ei[0] = ei[0].narrow( + dim=0, + start=0, + length=ei[0].size(0) - num_sampled_edges[-i], + ) + ei[1] = ei[1].narrow( + dim=0, start=0, length=ei[1].size(0) - num_sampled_nodes[-i] + ) + ei[2] = x.size(0) + + s = x.shape[0] + + if framework == "pyg": + x = convs[i](x, ei, size=(s, s)) + else: + x = convs[i](x, ei) + x = relu(x) + x = dropout(x, p=0.5) + + x = x.narrow(dim=0, start=0, length=s - num_sampled_nodes[1]) + + assert list(x.shape) == [1, 1] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 84f62e80c9d..e703d477b70 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -49,7 +49,8 @@ def test_neighbor_sample(basic_graph_1): with_batch_ids=True, random_state=62, return_offsets=False, - ).sort_values(by=["sources", "destinations"]) + use_legacy_names=False, + ).sort_values(by=["majors", "minors"]) out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, @@ -107,7 +108,8 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): random_state=62, return_offsets=False, with_batch_ids=True, - ).sort_values(by=["sources", "destinations"]) + use_legacy_names=False, + ).sort_values(by=["majors", "minors"]) out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, @@ -154,8 +156,8 @@ def test_neighbor_sample_mock_sampling_results(abc_graph): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py index e815b813050..da3043760d4 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py @@ -204,8 +204,8 @@ def test_renumber_edges(abc_graph): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } From 061ada449b7b11fd39789aca59a65a663f631a3e Mon Sep 17 00:00:00 2001 From: Joseph Nke <76006812+jnke2016@users.noreply.github.com> Date: Wed, 4 Oct 2023 14:41:24 -0500 Subject: [PATCH 5/6] Increase dask-related timeouts for CI testing (#3907) This PR increases the minimum timeout when waiting for the workers to complete their tasks. Authors: - Joseph Nke (https://github.com/jnke2016) Approvers: - Brad Rees (https://github.com/BradReesWork) - Vibhu Jawa (https://github.com/VibhuJawa) - Rick Ratzel (https://github.com/rlratzel) - Jake Awe (https://github.com/AyodeAwe) URL: https://github.com/rapidsai/cugraph/pull/3907 --- ci/test_python.sh | 5 ++++- ci/test_wheel.sh | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index 825d5b242d5..df0f34377a3 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -65,7 +65,10 @@ popd rapids-logger "pytest cugraph" pushd python/cugraph/cugraph -export DASK_WORKER_DEVICES="0" +DASK_WORKER_DEVICES="0" \ +DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \ +DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \ +DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ pytest \ -v \ --benchmark-disable \ diff --git a/ci/test_wheel.sh b/ci/test_wheel.sh index 3ac3549f143..d6ec67cd9e9 100755 --- a/ci/test_wheel.sh +++ b/ci/test_wheel.sh @@ -21,5 +21,9 @@ arch=$(uname -m) if [[ "${arch}" == "aarch64" && ${RAPIDS_BUILD_TYPE} == "pull-request" ]]; then python ./ci/wheel_smoke_test_${package_name}.py else - RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets python -m pytest ./python/${package_name}/${python_package_name}/tests + RAPIDS_DATASET_ROOT_DIR=`pwd`/datasets \ + DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="1000s" \ + DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1000s" \ + DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ + python -m pytest ./python/${package_name}/${python_package_name}/tests fi From d03cb0fd33e072756f6d87fbe009b561ff48bafb Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Thu, 5 Oct 2023 09:31:01 -0700 Subject: [PATCH 6/6] Remove `dask_cudf` dataframe for the `_make_plc_graph` while creating `cugraph.Graph` (#3895) This PR attempts to fix https://github.com/rapidsai/cugraph/issues/3790 Please note that I have not being able to cause failure locally so it is really hard for me to know if it actually fixes anything or not . MRE being used to test locally: https://gist.github.com/VibhuJawa/4b1ec24022b6e2dd7879cd2e8d3fab67 CC: @jnke2016 , @rlratzel , CC: @rjzamora , Please let me know what i can do better here. Authors: - Vibhu Jawa (https://github.com/VibhuJawa) - Brad Rees (https://github.com/BradReesWork) Approvers: - Rick Ratzel (https://github.com/rlratzel) - Joseph Nke (https://github.com/jnke2016) URL: https://github.com/rapidsai/cugraph/pull/3895 --- .../cugraph/cugraph/dask/common/part_utils.py | 62 ++++++++++++++++--- .../simpleDistributedGraph.py | 24 ++++--- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 7c0aad6c3ee..25311902b29 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -99,19 +99,65 @@ def _chunk_lst(ls, num_parts): return [ls[i::num_parts] for i in range(num_parts)] -def persist_dask_df_equal_parts_per_worker(dask_df, client): +def persist_dask_df_equal_parts_per_worker( + dask_df, client, return_type="dask_cudf.DataFrame" +): + """ + Persist dask_df with equal parts per worker + Args: + dask_df: dask_cudf.DataFrame + client: dask.distributed.Client + return_type: str, "dask_cudf.DataFrame" or "dict" + Returns: + persisted_keys: dict of {worker: [persisted_keys]} + """ + if return_type not in ["dask_cudf.DataFrame", "dict"]: + raise ValueError("return_type must be either 'dask_cudf.DataFrame' or 'dict'") + ddf_keys = dask_df.to_delayed() workers = client.scheduler_info()["workers"].keys() ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) - persisted_keys = [] + persisted_keys_d = {} for w, ddf_k in zip(workers, ddf_keys_ls): - persisted_keys.extend( - client.persist(ddf_k, workers=w, allow_other_workers=False) + persisted_keys_d[w] = client.compute( + ddf_k, workers=w, allow_other_workers=False, pure=False ) - dask_df = dask_cudf.from_delayed(persisted_keys, meta=dask_df._meta).persist() - wait(dask_df) - client.rebalance(dask_df) - return dask_df + + persisted_keys_ls = [ + item for sublist in persisted_keys_d.values() for item in sublist + ] + wait(persisted_keys_ls) + if return_type == "dask_cudf.DataFrame": + dask_df = dask_cudf.from_delayed( + persisted_keys_ls, meta=dask_df._meta + ).persist() + wait(dask_df) + return dask_df + + return persisted_keys_d + + +def get_length_of_parts(persisted_keys_d, client): + """ + Get the length of each partition + Args: + persisted_keys_d: dict of {worker: [persisted_keys]} + client: dask.distributed.Client + Returns: + length_of_parts: dict of {worker: [length_of_parts]} + """ + length_of_parts = {} + for w, p_keys in persisted_keys_d.items(): + length_of_parts[w] = [ + client.submit( + len, p_key, pure=False, workers=[w], allow_other_workers=False + ) + for p_key in p_keys + ] + + for w, len_futures in length_of_parts.items(): + length_of_parts[w] = client.gather(len_futures) + return length_of_parts async def _extract_partitions( diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index fa94fa67625..935d0c597d4 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -36,6 +36,7 @@ from cugraph.structure.symmetrize import symmetrize from cugraph.dask.common.part_utils import ( get_persisted_df_worker_map, + get_length_of_parts, persist_dask_df_equal_parts_per_worker, ) from cugraph.dask import get_n_workers @@ -318,9 +319,14 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - ddf = persist_dask_df_equal_parts_per_worker(ddf, _client) - num_edges = len(ddf) - ddf = get_persisted_df_worker_map(ddf, _client) + persisted_keys_d = persist_dask_df_equal_parts_per_worker( + ddf, _client, return_type="dict" + ) + del ddf + length_of_parts = get_length_of_parts(persisted_keys_d, _client) + num_edges = sum( + [item for sublist in length_of_parts.values() for item in sublist] + ) delayed_tasks_d = { w: delayed(simpleDistributedGraphImpl._make_plc_graph)( Comms.get_session_id(), @@ -331,14 +337,16 @@ def __from_edgelist( store_transposed, num_edges, ) - for w, edata in ddf.items() + for w, edata in persisted_keys_d.items() } - # FIXME: For now, don't delete the copied dataframe to avoid crash self._plc_graph = { - w: _client.compute(delayed_task, workers=w, allow_other_workers=False) + w: _client.compute( + delayed_task, workers=w, allow_other_workers=False, pure=False + ) for w, delayed_task in delayed_tasks_d.items() } wait(list(self._plc_graph.values())) + del persisted_keys_d del delayed_tasks_d _client.run(gc.collect) @@ -1192,5 +1200,7 @@ def _get_column_from_ls_dfs(lst_df, col_name): if len_df == 0: return lst_df[0][col_name] output_col = cudf.concat([df[col_name] for df in lst_df], ignore_index=True) - # FIXME: For now, don't delete the copied dataframe to avoid cras + for df in lst_df: + df.drop(columns=[col_name], inplace=True) + gc.collect() return output_col