Skip to content

Commit

Permalink
process raw csc df output
Browse files Browse the repository at this point in the history
  • Loading branch information
tingyu66 committed Sep 26, 2023
1 parent 77a5ba3 commit 757f385
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
9 changes: 9 additions & 0 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(
batch_size: int = 1024,
drop_last: bool = False,
shuffle: bool = False,
sparse_format: str = "csc",
**kwargs,
):
"""
Expand Down Expand Up @@ -91,6 +92,8 @@ def __init__(
Only effective when :attr:`use_ddp` is True.
batch_size: int
Batch size.
sparse_format: str, default = "csc"
Sparse format of the sample graph. Choose from "csc", "csr" and "coo".
kwargs : dict
Key-word arguments to be passed to the parent PyTorch
:py:class:`torch.utils.data.DataLoader` class. Common arguments are:
Expand Down Expand Up @@ -124,6 +127,12 @@ def __init__(
... for input_nodes, output_nodes, blocks in dataloader:
...
"""
if sparse_format not in ["coo", "csc", "csr"]:
raise ValueError(
f"sparse_format must be one of 'coo', 'csc', 'csr', "
f"but got {sparse_format}."
)
self.sparse_format = sparse_format

self.ddp_seed = ddp_seed
self.use_ddp = use_ddp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Tuple, Dict, Optional
from collections import defaultdict
import cudf
import cupy
from cugraph.utilities.utils import import_optional

dgl = import_optional("dgl")
Expand Down Expand Up @@ -401,3 +402,70 @@ def create_heterogenous_dgl_block_from_tensors_dict(
block = dgl.to_block(sampled_graph, dst_nodes=seed_nodes, src_nodes=src_d)
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
return block


def _process_sampled_df_csc(
df: cudf.DataFrame, n_hops: int, n_batches: int
) -> Tuple[
Dict[int, Dict[int, Dict[str, torch.Tensor]]],
cupy.ndarray,
cupy.ndarray,
cupy.ndarray,
]:
"""
Convert a dataframe generated by BulkSampler to a dictionary of tensors, to
faciliate MFG creation. The sampled graphs in the dataframe use CSC-format.
Note: The CSR
df: cudf.DataFrame
The dataframe output by BulkSampler containing one or multiple batches.
n_hops: int
Length of fanout values.
n_batches: int
Number of batches in each parquet file.
Returns:
tensor_dict[batch_id][hop_id] has three keys:
- src_ids:
- cdst_ids:
- mfg_size:
"""
# dropna
major_offsets = df.major_offsets.dropna().values
label_hop_offsets = df.label_hop_offsets.dropna().values
renumber_map_offsets = df.renumber_map_offsets.dropna().values
renumber_map = df.map.dropna().values

# make global offsets local
major_offsets -= major_offsets[0]
label_hop_offsets -= label_hop_offsets[0]
renumber_map_offsets -= renumber_map_offsets[0]

# get the sizes of each adjacency matrix (for MFGs)
mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape(
(n_batches, n_hops)
)
n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1]
mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1)))

output_dict = {}
for batch_id in range(n_batches):
batch_dict = {}
for hop_id in range(n_hops):
hop_dict = {}
idx = batch_id * n_hops + hop_id # idx in label_hop_offsets
major_offsets_start = label_hop_offsets[idx].item()
major_offsets_end = label_hop_offsets[idx + 1].item()
minor_start = major_offsets[major_offsets_start].item()
minor_end = major_offsets[major_offsets_end].item()
hop_dict["minors"] = df.minors.iloc[minor_start:minor_end].values
hop_dict["major_offsets"] = (
major_offsets[major_offsets_start : major_offsets_end + 1]
- major_offsets[major_offsets_start]
)

batch_dict[hop_id] = hop_dict
output_dict[batch_id] = batch_dict

return output_dict, mfg_sizes, renumber_map, renumber_map_offsets

0 comments on commit 757f385

Please sign in to comment.