-
Notifications
You must be signed in to change notification settings - Fork 309
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'branch-23.10' into cugraph-sample-convert
- Loading branch information
Showing
14 changed files
with
933 additions
and
341 deletions.
There are no files selected for viewing
Empty file.
152 changes: 152 additions & 0 deletions
152
benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
# Copyright (c) 2018-2023, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import json | ||
import os | ||
import time | ||
import dgl | ||
from dgl.dataloading import MultiLayerNeighborSampler, DataLoader | ||
import pandas as pd | ||
import torch | ||
from model import run_1_epoch | ||
from argparse import ArgumentParser | ||
from load_graph_feats import load_edges_from_disk, load_node_labels, load_node_features | ||
|
||
class DataLoaderArgs: | ||
def __init__(self, args): | ||
self.dataset_path = args.dataset_path | ||
self.replication_factors = [int(x) for x in args.replication_factors.split(",")] | ||
self.fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")] | ||
self.batch_sizes = [int(x) for x in args.batch_sizes.split(",")] | ||
self.use_uva = not args.do_not_use_uva | ||
|
||
|
||
|
||
def create_dataloader(g, train_idx, batch_size, fanouts, use_uva): | ||
print("Creating dataloader", flush=True) | ||
st = time.time() | ||
if use_uva: | ||
train_idx = {k: v.to("cuda") for k, v in train_idx.items()} | ||
sampler = MultiLayerNeighborSampler(fanouts=fanouts) | ||
dataloader = DataLoader( | ||
g, | ||
train_idx, | ||
sampler, | ||
num_workers=0, | ||
batch_size=batch_size, | ||
use_uva=use_uva, | ||
shuffle=False, | ||
drop_last=False, | ||
) | ||
et = time.time() | ||
print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True) | ||
return dataloader | ||
|
||
|
||
|
||
def create_dgl_graph_from_disk(dataset_path, replication_factor=1): | ||
""" | ||
Create a DGL graph from a dataset on disk. | ||
Args: | ||
dataset_path: Path to the dataset on disk. | ||
replication_factor: Number of times to replicate the edges. | ||
Returns: | ||
DGLGraph: DGLGraph with the loaded dataset. | ||
""" | ||
with open(os.path.join(dataset_path, "meta.json"), "r") as f: | ||
input_meta = json.load(f) | ||
|
||
parquet_path = os.path.join(dataset_path, "parquet") | ||
graph_data = load_edges_from_disk( | ||
parquet_path, replication_factor, input_meta | ||
) | ||
label_data = load_node_labels(dataset_path, replication_factor, input_meta) | ||
if replication_factor <8 : | ||
feat_data = load_node_features(dataset_path, replication_factor, node_type='paper') | ||
else: | ||
feat_data = None | ||
print("labels and features loaded ", flush=True) | ||
|
||
g = dgl.heterograph(graph_data) | ||
|
||
return g, label_data, feat_data | ||
|
||
|
||
def main(args): | ||
print(f"Running dgl dataloading benchmark with the following parameters:\n" | ||
f"Dataset path = {args.dataset_path}\n" | ||
f"Replication factors = {args.replication_factors}\n" | ||
f"Fanouts = {args.fanouts}\n" | ||
f"Batch sizes = {args.batch_sizes}\n" | ||
f"Use UVA = {args.use_uva}\n" | ||
f"{'=' * 30}") | ||
|
||
time_ls = [] | ||
for replication_factor in args.replication_factors: | ||
start_time = time.time() | ||
g, label_data, feat_data = create_dgl_graph_from_disk(args.dataset_path, replication_factor) | ||
elapsed_time = time.time() - start_time | ||
|
||
print(f"Replication factor = {replication_factor}\n" | ||
f"G has {g.num_edges():,} edges and took {elapsed_time:.2f} seconds to load", flush=True) | ||
|
||
train_idx = {"paper": label_data["paper"]["train_idx"]} | ||
y = label_data["paper"]["y"] | ||
r_time_ls = e2e_benchmark(g, feat_data, y, train_idx, args.fanouts, args.batch_sizes, use_uva=args.use_uva) | ||
[x.update({"replication_factor": replication_factor}) for x in r_time_ls] | ||
[x.update({"num_edges": g.num_edges()}) for x in r_time_ls] | ||
time_ls.extend(r_time_ls) | ||
|
||
print(f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}", flush=True) | ||
|
||
df = pd.DataFrame(time_ls) | ||
df.to_csv("dgl_e2e_benchmark.csv", index=False) | ||
print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True) | ||
|
||
|
||
def e2e_benchmark(g, feat, y, train_idx, fanouts, batch_sizes, use_uva): | ||
""" | ||
Run the e2e_benchmark | ||
Args: | ||
g: DGLGraph | ||
feat: Tensor containing the features. | ||
y: Tensor containing the labels. | ||
train_idx: Tensor containing the training indices. | ||
fanouts: List of fanouts to use for the dataloader. | ||
batch_sizes: List of batch sizes to use for the dataloader. | ||
use_uva: Whether to use unified virtual address space. | ||
model_backend: Backend of model to use. | ||
""" | ||
time_ls = [] | ||
for fanout in fanouts: | ||
for batch_size in batch_sizes: | ||
dataloader = create_dataloader(g, train_idx, batch_size, fanout, use_uva) | ||
time_d = run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend='dgl') | ||
time_ls.append(time_d) | ||
print("="*30) | ||
return time_ls | ||
|
||
|
||
|
||
def parse_arguments(): | ||
parser = ArgumentParser() | ||
parser.add_argument("--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M") | ||
parser.add_argument("--replication_factors", type=str, default="2") | ||
parser.add_argument("--fanouts", type=str, default="10_10_10") | ||
parser.add_argument("--batch_sizes", type=str, default="512,1024,8192,16384") | ||
parser.add_argument("--do_not_use_uva", action="store_true") | ||
return parser.parse_args() | ||
|
||
if __name__ == "__main__": | ||
arguments = parse_arguments() | ||
main(DataLoaderArgs(arguments)) |
123 changes: 123 additions & 0 deletions
123
benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
# Copyright (c) 2018-2023, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import torch | ||
import os | ||
|
||
|
||
def load_edges_from_disk(parquet_path, replication_factor, input_meta): | ||
""" | ||
Load the edges from disk into a graph data dictionary. | ||
Args: | ||
parquet_path: Path to the parquet directory. | ||
replication_factor: Number of times to replicate the edges. | ||
input_meta: Input meta data. | ||
Returns: | ||
dict: Dictionary of edge types to a tuple of (src, dst) | ||
""" | ||
graph_data = {} | ||
|
||
for edge_type in input_meta["num_edges"].keys(): | ||
print(f"Loading edge index for edge type {edge_type} for replication factor = {replication_factor}") | ||
|
||
canonical_edge_type = tuple(edge_type.split("__")) | ||
edge_index = pd.read_parquet(os.path.join(parquet_path, edge_type, "edge_index.parquet")) | ||
edge_index = { | ||
"src": torch.from_numpy(edge_index.src.values), | ||
"dst": torch.from_numpy(edge_index.dst.values), | ||
} | ||
|
||
if replication_factor > 1: | ||
src_list, dst_list = replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta) | ||
edge_index["src"] = torch.cat(src_list).contiguous() | ||
edge_index["dst"] = torch.cat(dst_list).contiguous() | ||
|
||
graph_data[canonical_edge_type] = edge_index["src"], edge_index["dst"] | ||
|
||
print("Read Edge Data") | ||
return graph_data | ||
|
||
|
||
def replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta): | ||
src_list = [edge_index["src"]] | ||
dst_list = [edge_index["dst"]] | ||
|
||
for r in range(1, replication_factor): | ||
new_src = edge_index["src"] + (r * input_meta["num_nodes"][canonical_edge_type[0]]) | ||
new_dst = edge_index["dst"] + (r * input_meta["num_nodes"][canonical_edge_type[2]]) | ||
src_list.append(new_src) | ||
dst_list.append(new_dst) | ||
|
||
return src_list, dst_list | ||
|
||
|
||
|
||
|
||
def load_node_labels(dataset_path, replication_factor, input_meta): | ||
num_nodes_dict = {node_type: t * replication_factor for node_type, t in input_meta["num_nodes"].items()} | ||
node_data = {} | ||
|
||
for node_type in input_meta["num_nodes"].keys(): | ||
node_data[node_type] = {} | ||
label_path = os.path.join(dataset_path, "parquet", node_type, "node_label.parquet") | ||
|
||
if os.path.exists(label_path): | ||
node_data[node_type] = process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta) | ||
|
||
else: | ||
node_data[node_type]["num_nodes"] = num_nodes_dict[node_type] | ||
|
||
print("Loaded node labels", flush=True) | ||
return node_data | ||
|
||
def process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta): | ||
node_label = pd.read_parquet(label_path) | ||
|
||
if replication_factor > 1: | ||
node_label = replicate_node_label(node_label, node_type, replication_factor, input_meta) | ||
|
||
node_label_tensor = torch.full((num_nodes_dict[node_type],), -1, dtype=torch.float32) | ||
node_label_tensor[torch.as_tensor(node_label.node.values)] = torch.as_tensor(node_label.label.values) | ||
|
||
del node_label | ||
|
||
return { | ||
"train_idx": (node_label_tensor > -1).contiguous().nonzero().view(-1), | ||
"y": node_label_tensor.contiguous().long() | ||
} | ||
|
||
|
||
def replicate_node_label(node_label, node_type, replication_factor, input_meta): | ||
base_num_nodes = input_meta["num_nodes"][node_type] | ||
|
||
replicated_df = pd.DataFrame({ | ||
"node": pd.concat([node_label.node + (r * base_num_nodes) for r in range(1, replication_factor)]), | ||
"label": pd.concat([node_label.label for _ in range(1, replication_factor)]) | ||
}) | ||
|
||
return pd.concat([node_label, replicated_df]).reset_index(drop=True) | ||
|
||
|
||
def load_node_features(dataset_path, replication_factor, node_type): | ||
print("Loading node features", flush=True) | ||
node_type_path = os.path.join(dataset_path, "npy", node_type) | ||
if replication_factor == 1: | ||
fname = os.path.join(node_type_path, "node_feat.npy") | ||
else: | ||
fname = os.path.join(node_type_path, f"node_feat_{replication_factor}x.npy") | ||
|
||
feat = torch.from_numpy(np.load(fname)) | ||
print("Loaded node features", flush=True) | ||
return feat |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# Copyright (c) 2018-2023, NVIDIA CORPORATION. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import torch | ||
import torch.nn.functional as F | ||
import time | ||
|
||
|
||
class GNN(torch.nn.Module): | ||
def __init__(self, in_channels, hidden_channels, out_channels, num_layers, model_backend='dgl'): | ||
if model_backend == 'dgl': | ||
from dgl.nn import SAGEConv | ||
else: | ||
from cugraph_dgl.nn import SAGEConv | ||
|
||
super(GNN, self).__init__() | ||
self.convs = torch.nn.ModuleList() | ||
for _ in range(num_layers - 1): | ||
self.convs.append(SAGEConv(in_channels, hidden_channels, aggregator_type='mean')) | ||
in_channels = hidden_channels | ||
self.convs.append(SAGEConv(hidden_channels, out_channels, aggregator_type='mean')) | ||
|
||
def forward(self, blocks, x): | ||
for i, conv in enumerate(self.convs): | ||
x = conv(blocks[i], x) | ||
if i != len(self.convs) - 1: | ||
x = F.relu(x) | ||
return x | ||
|
||
|
||
def create_model(feat_size, num_classes, num_layers, model_backend='dgl'): | ||
model = GNN(feat_size, 64, num_classes, num_layers, model_backend=model_backend) | ||
model = model.to('cuda') | ||
model.train() | ||
return model | ||
|
||
def train_model(model, dataloader, opt, feat, y): | ||
times = {key: 0 for key in ['mfg_creation', 'feature', 'm_fwd', 'm_bkwd']} | ||
epoch_st = time.time() | ||
mfg_st = time.time() | ||
for input_nodes, output_nodes, blocks in dataloader: | ||
times['mfg_creation'] += time.time() - mfg_st | ||
if feat is not None: | ||
fst = time.time() | ||
input_nodes = input_nodes.to('cpu') | ||
input_feat = feat[input_nodes] | ||
input_feat = input_feat.to('cuda') | ||
if isinstance(output_nodes, dict): | ||
output_nodes = output_nodes['paper'] | ||
output_nodes = output_nodes.to(y.device) | ||
y_batch = y[output_nodes].to('cuda') | ||
times['feature'] += time.time() - fst | ||
|
||
m_fwd_st = time.time() | ||
y_hat = model(blocks, input_feat) | ||
times['m_fwd'] += time.time() - m_fwd_st | ||
|
||
m_bkwd_st = time.time() | ||
loss = F.cross_entropy(y_hat, y_batch) | ||
opt.zero_grad() | ||
loss.backward() | ||
opt.step() | ||
times['m_bkwd'] += time.time() - m_bkwd_st | ||
mfg_st = time.time() | ||
|
||
print(f"Epoch time = {time.time() - epoch_st:.2f} seconds") | ||
|
||
return times | ||
|
||
def analyze_time(dataloader, times, epoch_time, fanout, batch_size): | ||
num_batches = len(dataloader) | ||
time_d = { | ||
"fanout": fanout, | ||
"batch_size": batch_size, | ||
"epoch_time": epoch_time, | ||
"epoch_time_per_batch": epoch_time / num_batches, | ||
"num_batches": num_batches, | ||
} | ||
for key, value in times.items(): | ||
time_d[f"{key}_time_per_epoch"] = value | ||
time_d[f"{key}_time_per_batch"] = value / num_batches | ||
|
||
print(f"Time analysis for fanout = {fanout}, batch_size = {batch_size}") | ||
for k in time_d.keys(): | ||
if 'time_per_epoch' in str(k): | ||
print(f"{k} = {time_d[k]:.2f} seconds") | ||
return time_d | ||
|
||
def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend): | ||
if feat is not None: | ||
model = create_model(feat.shape[1], 172, len(fanout), model_backend=model_backend) | ||
opt = torch.optim.Adam(model.parameters(), lr=0.01) | ||
else: | ||
model = None | ||
opt = None | ||
epoch_st = time.time() | ||
times = train_model(model, dataloader, opt, feat, y) | ||
epoch_time = time.time() - epoch_st | ||
time_d = analyze_time(dataloader, times, epoch_time, fanout, batch_size) | ||
return time_d |
Oops, something went wrong.