From 757f38519b17e2424d1499d03354acc8780507a5 Mon Sep 17 00:00:00 2001 From: Tingyu Wang Date: Tue, 26 Sep 2023 12:09:02 -0700 Subject: [PATCH] process raw csc df output --- .../cugraph_dgl/dataloading/dataloader.py | 9 +++ .../dataloading/utils/sampling_helpers.py | 68 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py index 793139f580e..58a11c46ae5 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py @@ -54,6 +54,7 @@ def __init__( batch_size: int = 1024, drop_last: bool = False, shuffle: bool = False, + sparse_format: str = "csc", **kwargs, ): """ @@ -91,6 +92,8 @@ def __init__( Only effective when :attr:`use_ddp` is True. batch_size: int Batch size. + sparse_format: str, default = "csc" + Sparse format of the sample graph. Choose from "csc", "csr" and "coo". kwargs : dict Key-word arguments to be passed to the parent PyTorch :py:class:`torch.utils.data.DataLoader` class. Common arguments are: @@ -124,6 +127,12 @@ def __init__( ... for input_nodes, output_nodes, blocks in dataloader: ... """ + if sparse_format not in ["coo", "csc", "csr"]: + raise ValueError( + f"sparse_format must be one of 'coo', 'csc', 'csr', " + f"but got {sparse_format}." + ) + self.sparse_format = sparse_format self.ddp_seed = ddp_seed self.use_ddp = use_ddp diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index bdac3b1a323..1bbd19b8563 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -14,6 +14,7 @@ from typing import Tuple, Dict, Optional from collections import defaultdict import cudf +import cupy from cugraph.utilities.utils import import_optional dgl = import_optional("dgl") @@ -401,3 +402,70 @@ def create_heterogenous_dgl_block_from_tensors_dict( block = dgl.to_block(sampled_graph, dst_nodes=seed_nodes, src_nodes=src_d) block.edata[dgl.EID] = sampled_graph.edata[dgl.EID] return block + + +def _process_sampled_df_csc( + df: cudf.DataFrame, n_hops: int, n_batches: int +) -> Tuple[ + Dict[int, Dict[int, Dict[str, torch.Tensor]]], + cupy.ndarray, + cupy.ndarray, + cupy.ndarray, +]: + """ + Convert a dataframe generated by BulkSampler to a dictionary of tensors, to + faciliate MFG creation. The sampled graphs in the dataframe use CSC-format. + + Note: The CSR + + df: cudf.DataFrame + The dataframe output by BulkSampler containing one or multiple batches. + n_hops: int + Length of fanout values. + n_batches: int + Number of batches in each parquet file. + + Returns: + tensor_dict[batch_id][hop_id] has three keys: + - src_ids: + - cdst_ids: + - mfg_size: + """ + # dropna + major_offsets = df.major_offsets.dropna().values + label_hop_offsets = df.label_hop_offsets.dropna().values + renumber_map_offsets = df.renumber_map_offsets.dropna().values + renumber_map = df.map.dropna().values + + # make global offsets local + major_offsets -= major_offsets[0] + label_hop_offsets -= label_hop_offsets[0] + renumber_map_offsets -= renumber_map_offsets[0] + + # get the sizes of each adjacency matrix (for MFGs) + mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape( + (n_batches, n_hops) + ) + n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] + mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) + + output_dict = {} + for batch_id in range(n_batches): + batch_dict = {} + for hop_id in range(n_hops): + hop_dict = {} + idx = batch_id * n_hops + hop_id # idx in label_hop_offsets + major_offsets_start = label_hop_offsets[idx].item() + major_offsets_end = label_hop_offsets[idx + 1].item() + minor_start = major_offsets[major_offsets_start].item() + minor_end = major_offsets[major_offsets_end].item() + hop_dict["minors"] = df.minors.iloc[minor_start:minor_end].values + hop_dict["major_offsets"] = ( + major_offsets[major_offsets_start : major_offsets_end + 1] + - major_offsets[major_offsets_start] + ) + + batch_dict[hop_id] = hop_dict + output_dict[batch_id] = batch_dict + + return output_dict, mfg_sizes, renumber_map, renumber_map_offsets