From 22217dc8464fa8d7016ac8d23d931f8a64705ae0 Mon Sep 17 00:00:00 2001 From: Tingyu Wang Date: Tue, 26 Sep 2023 16:13:33 -0700 Subject: [PATCH] cast to tensors, create list for minibatches --- .../dataloading/utils/sampling_helpers.py | 80 ++++++++++++++++--- .../cugraph-dgl/cugraph_dgl/nn/conv/base.py | 6 ++ 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index 1bbd19b8563..06c56728a3f 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -11,11 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations -from typing import Tuple, Dict, Optional +from typing import List, Tuple, Dict, Optional, Any from collections import defaultdict import cudf import cupy from cugraph.utilities.utils import import_optional +from cugraph_dgl.nn import SparseGraph dgl = import_optional("dgl") torch = import_optional("torch") @@ -405,12 +406,14 @@ def create_heterogenous_dgl_block_from_tensors_dict( def _process_sampled_df_csc( - df: cudf.DataFrame, n_hops: int, n_batches: int + df: cudf.DataFrame, + n_hops: int, + n_batches: int, + reverse_hop_id: bool = True, ) -> Tuple[ Dict[int, Dict[int, Dict[str, torch.Tensor]]], - cupy.ndarray, - cupy.ndarray, - cupy.ndarray, + List[torch.Tensor], + List[List[int, int]], ]: """ Convert a dataframe generated by BulkSampler to a dictionary of tensors, to @@ -424,6 +427,8 @@ def _process_sampled_df_csc( Length of fanout values. n_batches: int Number of batches in each parquet file. + reverse_hop_id: bool, default=True + Reverse hop id. Returns: tensor_dict[batch_id][hop_id] has three keys: @@ -448,10 +453,14 @@ def _process_sampled_df_csc( ) n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) + if reverse_hop_id: + mfg_sizes = mfg_sizes[:, ::-1] - output_dict = {} + tensors_dict = {} + renumber_map_list = [] for batch_id in range(n_batches): batch_dict = {} + for hop_id in range(n_hops): hop_dict = {} idx = batch_id * n_hops + hop_id # idx in label_hop_offsets @@ -459,13 +468,60 @@ def _process_sampled_df_csc( major_offsets_end = label_hop_offsets[idx + 1].item() minor_start = major_offsets[major_offsets_start].item() minor_end = major_offsets[major_offsets_end].item() - hop_dict["minors"] = df.minors.iloc[minor_start:minor_end].values - hop_dict["major_offsets"] = ( + # Note: major_offsets from BulkSampler are int64. + hop_dict["minors"] = torch.as_tensor( + df.minors.iloc[minor_start:minor_end].values, device="cuda" + ).int() + hop_dict["major_offsets"] = torch.as_tensor( major_offsets[major_offsets_start : major_offsets_end + 1] - - major_offsets[major_offsets_start] + - major_offsets[major_offsets_start], + device="cuda", + ).int() + if reverse_hop_id: + batch_dict[n_hops - 1 - hop_id] = hop_dict + else: + batch_dict[hop_id] = hop_dict + + tensors_dict[batch_id] = batch_dict + + renumber_map_list.append( + torch.as_tensor( + renumber_map[ + renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] + ], + device="cuda", ) + ) + + return tensors_dict, renumber_map_list, mfg_sizes.tolist() + + +def create_homogenous_sparse_graphs( + tensors_dict: Dict[int, Dict[int, Dict[str, torch.Tensor]]], + renumber_map_list: List[torch.Tensor], + mfg_sizes: List[int, int], +) -> Any: + """Create minibatches of MFGs. The input argument are the outputs of + the function `_process_sampled_df_csc`.""" + n_batches, n_hops = len(mfg_sizes), len(mfg_sizes[0]) - 1 + output = [] + for b_id in range(n_batches): + output_batch = [] + output_batch.append(renumber_map_list[b_id]) + output_batch.append(renumber_map_list[b_id][: mfg_sizes[b_id][-1]]) + mfgs = [ + SparseGraph( + size=(mfg_sizes[b_id][h_id], mfg_sizes[b_id][h_id + 1]), + src_ids=tensors_dict[b_id][h_id]["minors"], + cdst_ids=tensors_dict[b_id][h_id]["major_offsets"], + formats=["csc"], + reduce_memory=True, + ) + for h_id in range(n_hops) + ] + + output_batch.append(mfgs) - batch_dict[hop_id] = hop_dict - output_dict[batch_id] = batch_dict + output.append(output_batch) - return output_dict, mfg_sizes, renumber_map, renumber_map_offsets + return output diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py index 307eb33078e..b3ab0e848f4 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/base.py @@ -248,6 +248,12 @@ def csr(self) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]: value = value[self._perm_csc2csr] return csrc_ids, dst_ids, value + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}({self._num_src_nodes}, " + f"{self._num_dst_nodes}, formats={self._formats})" + ) + class BaseConv(torch.nn.Module): r"""An abstract base class for cugraph-ops nn module."""