From 9c96b2613f029eb9616c19aefb38a689c1267bae Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Mon, 25 Sep 2023 07:49:19 -0700 Subject: [PATCH] Update dgl benchmarks (#3775) This PR adds upstream DGL benchmarks , I will expand this to add `cugraph-dgl` soon. image CC: @tingyu66 , @BradReesWork Authors: - Vibhu Jawa (https://github.com/VibhuJawa) Approvers: - Brad Rees (https://github.com/BradReesWork) - Alex Barghi (https://github.com/alexbarghi-nv) URL: https://github.com/rapidsai/cugraph/pull/3775 --- .../cugraph-dgl/scale-benchmarks/__init__.py | 0 .../scale-benchmarks/dgl_benchmark.py | 152 ++++++++++++++++++ .../scale-benchmarks/load_graph_feats.py | 123 ++++++++++++++ .../cugraph-dgl/scale-benchmarks/model.py | 110 +++++++++++++ 4 files changed, 385 insertions(+) create mode 100644 benchmarks/cugraph-dgl/scale-benchmarks/__init__.py create mode 100644 benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py create mode 100644 benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py create mode 100644 benchmarks/cugraph-dgl/scale-benchmarks/model.py diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/__init__.py b/benchmarks/cugraph-dgl/scale-benchmarks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py b/benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py new file mode 100644 index 00000000000..3762226d570 --- /dev/null +++ b/benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py @@ -0,0 +1,152 @@ +# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import time +import dgl +from dgl.dataloading import MultiLayerNeighborSampler, DataLoader +import pandas as pd +import torch +from model import run_1_epoch +from argparse import ArgumentParser +from load_graph_feats import load_edges_from_disk, load_node_labels, load_node_features + +class DataLoaderArgs: + def __init__(self, args): + self.dataset_path = args.dataset_path + self.replication_factors = [int(x) for x in args.replication_factors.split(",")] + self.fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")] + self.batch_sizes = [int(x) for x in args.batch_sizes.split(",")] + self.use_uva = not args.do_not_use_uva + + + +def create_dataloader(g, train_idx, batch_size, fanouts, use_uva): + print("Creating dataloader", flush=True) + st = time.time() + if use_uva: + train_idx = {k: v.to("cuda") for k, v in train_idx.items()} + sampler = MultiLayerNeighborSampler(fanouts=fanouts) + dataloader = DataLoader( + g, + train_idx, + sampler, + num_workers=0, + batch_size=batch_size, + use_uva=use_uva, + shuffle=False, + drop_last=False, + ) + et = time.time() + print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True) + return dataloader + + + +def create_dgl_graph_from_disk(dataset_path, replication_factor=1): + """ + Create a DGL graph from a dataset on disk. + Args: + dataset_path: Path to the dataset on disk. + replication_factor: Number of times to replicate the edges. + Returns: + DGLGraph: DGLGraph with the loaded dataset. + """ + with open(os.path.join(dataset_path, "meta.json"), "r") as f: + input_meta = json.load(f) + + parquet_path = os.path.join(dataset_path, "parquet") + graph_data = load_edges_from_disk( + parquet_path, replication_factor, input_meta + ) + label_data = load_node_labels(dataset_path, replication_factor, input_meta) + if replication_factor <8 : + feat_data = load_node_features(dataset_path, replication_factor, node_type='paper') + else: + feat_data = None + print("labels and features loaded ", flush=True) + + g = dgl.heterograph(graph_data) + + return g, label_data, feat_data + + +def main(args): + print(f"Running dgl dataloading benchmark with the following parameters:\n" + f"Dataset path = {args.dataset_path}\n" + f"Replication factors = {args.replication_factors}\n" + f"Fanouts = {args.fanouts}\n" + f"Batch sizes = {args.batch_sizes}\n" + f"Use UVA = {args.use_uva}\n" + f"{'=' * 30}") + + time_ls = [] + for replication_factor in args.replication_factors: + start_time = time.time() + g, label_data, feat_data = create_dgl_graph_from_disk(args.dataset_path, replication_factor) + elapsed_time = time.time() - start_time + + print(f"Replication factor = {replication_factor}\n" + f"G has {g.num_edges():,} edges and took {elapsed_time:.2f} seconds to load", flush=True) + + train_idx = {"paper": label_data["paper"]["train_idx"]} + y = label_data["paper"]["y"] + r_time_ls = e2e_benchmark(g, feat_data, y, train_idx, args.fanouts, args.batch_sizes, use_uva=args.use_uva) + [x.update({"replication_factor": replication_factor}) for x in r_time_ls] + [x.update({"num_edges": g.num_edges()}) for x in r_time_ls] + time_ls.extend(r_time_ls) + + print(f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}", flush=True) + + df = pd.DataFrame(time_ls) + df.to_csv("dgl_e2e_benchmark.csv", index=False) + print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True) + + +def e2e_benchmark(g, feat, y, train_idx, fanouts, batch_sizes, use_uva): + """ + Run the e2e_benchmark + Args: + g: DGLGraph + feat: Tensor containing the features. + y: Tensor containing the labels. + train_idx: Tensor containing the training indices. + fanouts: List of fanouts to use for the dataloader. + batch_sizes: List of batch sizes to use for the dataloader. + use_uva: Whether to use unified virtual address space. + model_backend: Backend of model to use. + """ + time_ls = [] + for fanout in fanouts: + for batch_size in batch_sizes: + dataloader = create_dataloader(g, train_idx, batch_size, fanout, use_uva) + time_d = run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend='dgl') + time_ls.append(time_d) + print("="*30) + return time_ls + + + +def parse_arguments(): + parser = ArgumentParser() + parser.add_argument("--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M") + parser.add_argument("--replication_factors", type=str, default="2") + parser.add_argument("--fanouts", type=str, default="10_10_10") + parser.add_argument("--batch_sizes", type=str, default="512,1024,8192,16384") + parser.add_argument("--do_not_use_uva", action="store_true") + return parser.parse_args() + +if __name__ == "__main__": + arguments = parse_arguments() + main(DataLoaderArgs(arguments)) diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py b/benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py new file mode 100644 index 00000000000..4f0f81c70e1 --- /dev/null +++ b/benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py @@ -0,0 +1,123 @@ +# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pandas as pd +import torch +import os + + +def load_edges_from_disk(parquet_path, replication_factor, input_meta): + """ + Load the edges from disk into a graph data dictionary. + Args: + parquet_path: Path to the parquet directory. + replication_factor: Number of times to replicate the edges. + input_meta: Input meta data. + Returns: + dict: Dictionary of edge types to a tuple of (src, dst) + """ + graph_data = {} + + for edge_type in input_meta["num_edges"].keys(): + print(f"Loading edge index for edge type {edge_type} for replication factor = {replication_factor}") + + canonical_edge_type = tuple(edge_type.split("__")) + edge_index = pd.read_parquet(os.path.join(parquet_path, edge_type, "edge_index.parquet")) + edge_index = { + "src": torch.from_numpy(edge_index.src.values), + "dst": torch.from_numpy(edge_index.dst.values), + } + + if replication_factor > 1: + src_list, dst_list = replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta) + edge_index["src"] = torch.cat(src_list).contiguous() + edge_index["dst"] = torch.cat(dst_list).contiguous() + + graph_data[canonical_edge_type] = edge_index["src"], edge_index["dst"] + + print("Read Edge Data") + return graph_data + + +def replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta): + src_list = [edge_index["src"]] + dst_list = [edge_index["dst"]] + + for r in range(1, replication_factor): + new_src = edge_index["src"] + (r * input_meta["num_nodes"][canonical_edge_type[0]]) + new_dst = edge_index["dst"] + (r * input_meta["num_nodes"][canonical_edge_type[2]]) + src_list.append(new_src) + dst_list.append(new_dst) + + return src_list, dst_list + + + + +def load_node_labels(dataset_path, replication_factor, input_meta): + num_nodes_dict = {node_type: t * replication_factor for node_type, t in input_meta["num_nodes"].items()} + node_data = {} + + for node_type in input_meta["num_nodes"].keys(): + node_data[node_type] = {} + label_path = os.path.join(dataset_path, "parquet", node_type, "node_label.parquet") + + if os.path.exists(label_path): + node_data[node_type] = process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta) + + else: + node_data[node_type]["num_nodes"] = num_nodes_dict[node_type] + + print("Loaded node labels", flush=True) + return node_data + +def process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta): + node_label = pd.read_parquet(label_path) + + if replication_factor > 1: + node_label = replicate_node_label(node_label, node_type, replication_factor, input_meta) + + node_label_tensor = torch.full((num_nodes_dict[node_type],), -1, dtype=torch.float32) + node_label_tensor[torch.as_tensor(node_label.node.values)] = torch.as_tensor(node_label.label.values) + + del node_label + + return { + "train_idx": (node_label_tensor > -1).contiguous().nonzero().view(-1), + "y": node_label_tensor.contiguous().long() + } + + +def replicate_node_label(node_label, node_type, replication_factor, input_meta): + base_num_nodes = input_meta["num_nodes"][node_type] + + replicated_df = pd.DataFrame({ + "node": pd.concat([node_label.node + (r * base_num_nodes) for r in range(1, replication_factor)]), + "label": pd.concat([node_label.label for _ in range(1, replication_factor)]) + }) + + return pd.concat([node_label, replicated_df]).reset_index(drop=True) + + +def load_node_features(dataset_path, replication_factor, node_type): + print("Loading node features", flush=True) + node_type_path = os.path.join(dataset_path, "npy", node_type) + if replication_factor == 1: + fname = os.path.join(node_type_path, "node_feat.npy") + else: + fname = os.path.join(node_type_path, f"node_feat_{replication_factor}x.npy") + + feat = torch.from_numpy(np.load(fname)) + print("Loaded node features", flush=True) + return feat diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/model.py b/benchmarks/cugraph-dgl/scale-benchmarks/model.py new file mode 100644 index 00000000000..506e3bd5227 --- /dev/null +++ b/benchmarks/cugraph-dgl/scale-benchmarks/model.py @@ -0,0 +1,110 @@ +# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn.functional as F +import time + + +class GNN(torch.nn.Module): + def __init__(self, in_channels, hidden_channels, out_channels, num_layers, model_backend='dgl'): + if model_backend == 'dgl': + from dgl.nn import SAGEConv + else: + from cugraph_dgl.nn import SAGEConv + + super(GNN, self).__init__() + self.convs = torch.nn.ModuleList() + for _ in range(num_layers - 1): + self.convs.append(SAGEConv(in_channels, hidden_channels, aggregator_type='mean')) + in_channels = hidden_channels + self.convs.append(SAGEConv(hidden_channels, out_channels, aggregator_type='mean')) + + def forward(self, blocks, x): + for i, conv in enumerate(self.convs): + x = conv(blocks[i], x) + if i != len(self.convs) - 1: + x = F.relu(x) + return x + + +def create_model(feat_size, num_classes, num_layers, model_backend='dgl'): + model = GNN(feat_size, 64, num_classes, num_layers, model_backend=model_backend) + model = model.to('cuda') + model.train() + return model + +def train_model(model, dataloader, opt, feat, y): + times = {key: 0 for key in ['mfg_creation', 'feature', 'm_fwd', 'm_bkwd']} + epoch_st = time.time() + mfg_st = time.time() + for input_nodes, output_nodes, blocks in dataloader: + times['mfg_creation'] += time.time() - mfg_st + if feat is not None: + fst = time.time() + input_nodes = input_nodes.to('cpu') + input_feat = feat[input_nodes] + input_feat = input_feat.to('cuda') + if isinstance(output_nodes, dict): + output_nodes = output_nodes['paper'] + output_nodes = output_nodes.to(y.device) + y_batch = y[output_nodes].to('cuda') + times['feature'] += time.time() - fst + + m_fwd_st = time.time() + y_hat = model(blocks, input_feat) + times['m_fwd'] += time.time() - m_fwd_st + + m_bkwd_st = time.time() + loss = F.cross_entropy(y_hat, y_batch) + opt.zero_grad() + loss.backward() + opt.step() + times['m_bkwd'] += time.time() - m_bkwd_st + mfg_st = time.time() + + print(f"Epoch time = {time.time() - epoch_st:.2f} seconds") + + return times + +def analyze_time(dataloader, times, epoch_time, fanout, batch_size): + num_batches = len(dataloader) + time_d = { + "fanout": fanout, + "batch_size": batch_size, + "epoch_time": epoch_time, + "epoch_time_per_batch": epoch_time / num_batches, + "num_batches": num_batches, + } + for key, value in times.items(): + time_d[f"{key}_time_per_epoch"] = value + time_d[f"{key}_time_per_batch"] = value / num_batches + + print(f"Time analysis for fanout = {fanout}, batch_size = {batch_size}") + for k in time_d.keys(): + if 'time_per_epoch' in str(k): + print(f"{k} = {time_d[k]:.2f} seconds") + return time_d + +def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend): + if feat is not None: + model = create_model(feat.shape[1], 172, len(fanout), model_backend=model_backend) + opt = torch.optim.Adam(model.parameters(), lr=0.01) + else: + model = None + opt = None + epoch_st = time.time() + times = train_model(model, dataloader, opt, feat, y) + epoch_time = time.time() - epoch_st + time_d = analyze_time(dataloader, times, epoch_time, fanout, batch_size) + return time_d