Skip to content

Commit

Permalink
Merge branch 'branch-23.10' into dgl_cugraph_comparision
Browse files Browse the repository at this point in the history
  • Loading branch information
VibhuJawa authored Sep 25, 2023
2 parents a2c1ca7 + 9c96b26 commit f60d97b
Show file tree
Hide file tree
Showing 14 changed files with 933 additions and 341 deletions.
Empty file.
152 changes: 152 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/dgl_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import time
import dgl
from dgl.dataloading import MultiLayerNeighborSampler, DataLoader
import pandas as pd
import torch
from model import run_1_epoch
from argparse import ArgumentParser
from load_graph_feats import load_edges_from_disk, load_node_labels, load_node_features

class DataLoaderArgs:
def __init__(self, args):
self.dataset_path = args.dataset_path
self.replication_factors = [int(x) for x in args.replication_factors.split(",")]
self.fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")]
self.batch_sizes = [int(x) for x in args.batch_sizes.split(",")]
self.use_uva = not args.do_not_use_uva



def create_dataloader(g, train_idx, batch_size, fanouts, use_uva):
print("Creating dataloader", flush=True)
st = time.time()
if use_uva:
train_idx = {k: v.to("cuda") for k, v in train_idx.items()}
sampler = MultiLayerNeighborSampler(fanouts=fanouts)
dataloader = DataLoader(
g,
train_idx,
sampler,
num_workers=0,
batch_size=batch_size,
use_uva=use_uva,
shuffle=False,
drop_last=False,
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True)
return dataloader



def create_dgl_graph_from_disk(dataset_path, replication_factor=1):
"""
Create a DGL graph from a dataset on disk.
Args:
dataset_path: Path to the dataset on disk.
replication_factor: Number of times to replicate the edges.
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""
with open(os.path.join(dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

parquet_path = os.path.join(dataset_path, "parquet")
graph_data = load_edges_from_disk(
parquet_path, replication_factor, input_meta
)
label_data = load_node_labels(dataset_path, replication_factor, input_meta)
if replication_factor <8 :
feat_data = load_node_features(dataset_path, replication_factor, node_type='paper')
else:
feat_data = None
print("labels and features loaded ", flush=True)

g = dgl.heterograph(graph_data)

return g, label_data, feat_data


def main(args):
print(f"Running dgl dataloading benchmark with the following parameters:\n"
f"Dataset path = {args.dataset_path}\n"
f"Replication factors = {args.replication_factors}\n"
f"Fanouts = {args.fanouts}\n"
f"Batch sizes = {args.batch_sizes}\n"
f"Use UVA = {args.use_uva}\n"
f"{'=' * 30}")

time_ls = []
for replication_factor in args.replication_factors:
start_time = time.time()
g, label_data, feat_data = create_dgl_graph_from_disk(args.dataset_path, replication_factor)
elapsed_time = time.time() - start_time

print(f"Replication factor = {replication_factor}\n"
f"G has {g.num_edges():,} edges and took {elapsed_time:.2f} seconds to load", flush=True)

train_idx = {"paper": label_data["paper"]["train_idx"]}
y = label_data["paper"]["y"]
r_time_ls = e2e_benchmark(g, feat_data, y, train_idx, args.fanouts, args.batch_sizes, use_uva=args.use_uva)
[x.update({"replication_factor": replication_factor}) for x in r_time_ls]
[x.update({"num_edges": g.num_edges()}) for x in r_time_ls]
time_ls.extend(r_time_ls)

print(f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}", flush=True)

df = pd.DataFrame(time_ls)
df.to_csv("dgl_e2e_benchmark.csv", index=False)
print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True)


def e2e_benchmark(g, feat, y, train_idx, fanouts, batch_sizes, use_uva):
"""
Run the e2e_benchmark
Args:
g: DGLGraph
feat: Tensor containing the features.
y: Tensor containing the labels.
train_idx: Tensor containing the training indices.
fanouts: List of fanouts to use for the dataloader.
batch_sizes: List of batch sizes to use for the dataloader.
use_uva: Whether to use unified virtual address space.
model_backend: Backend of model to use.
"""
time_ls = []
for fanout in fanouts:
for batch_size in batch_sizes:
dataloader = create_dataloader(g, train_idx, batch_size, fanout, use_uva)
time_d = run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend='dgl')
time_ls.append(time_d)
print("="*30)
return time_ls



def parse_arguments():
parser = ArgumentParser()
parser.add_argument("--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M")
parser.add_argument("--replication_factors", type=str, default="2")
parser.add_argument("--fanouts", type=str, default="10_10_10")
parser.add_argument("--batch_sizes", type=str, default="512,1024,8192,16384")
parser.add_argument("--do_not_use_uva", action="store_true")
return parser.parse_args()

if __name__ == "__main__":
arguments = parse_arguments()
main(DataLoaderArgs(arguments))
123 changes: 123 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/load_graph_feats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import pandas as pd
import torch
import os


def load_edges_from_disk(parquet_path, replication_factor, input_meta):
"""
Load the edges from disk into a graph data dictionary.
Args:
parquet_path: Path to the parquet directory.
replication_factor: Number of times to replicate the edges.
input_meta: Input meta data.
Returns:
dict: Dictionary of edge types to a tuple of (src, dst)
"""
graph_data = {}

for edge_type in input_meta["num_edges"].keys():
print(f"Loading edge index for edge type {edge_type} for replication factor = {replication_factor}")

canonical_edge_type = tuple(edge_type.split("__"))
edge_index = pd.read_parquet(os.path.join(parquet_path, edge_type, "edge_index.parquet"))
edge_index = {
"src": torch.from_numpy(edge_index.src.values),
"dst": torch.from_numpy(edge_index.dst.values),
}

if replication_factor > 1:
src_list, dst_list = replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta)
edge_index["src"] = torch.cat(src_list).contiguous()
edge_index["dst"] = torch.cat(dst_list).contiguous()

graph_data[canonical_edge_type] = edge_index["src"], edge_index["dst"]

print("Read Edge Data")
return graph_data


def replicate_edges(edge_index, canonical_edge_type, replication_factor, input_meta):
src_list = [edge_index["src"]]
dst_list = [edge_index["dst"]]

for r in range(1, replication_factor):
new_src = edge_index["src"] + (r * input_meta["num_nodes"][canonical_edge_type[0]])
new_dst = edge_index["dst"] + (r * input_meta["num_nodes"][canonical_edge_type[2]])
src_list.append(new_src)
dst_list.append(new_dst)

return src_list, dst_list




def load_node_labels(dataset_path, replication_factor, input_meta):
num_nodes_dict = {node_type: t * replication_factor for node_type, t in input_meta["num_nodes"].items()}
node_data = {}

for node_type in input_meta["num_nodes"].keys():
node_data[node_type] = {}
label_path = os.path.join(dataset_path, "parquet", node_type, "node_label.parquet")

if os.path.exists(label_path):
node_data[node_type] = process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta)

else:
node_data[node_type]["num_nodes"] = num_nodes_dict[node_type]

print("Loaded node labels", flush=True)
return node_data

def process_node_label(label_path, node_type, replication_factor, num_nodes_dict, input_meta):
node_label = pd.read_parquet(label_path)

if replication_factor > 1:
node_label = replicate_node_label(node_label, node_type, replication_factor, input_meta)

node_label_tensor = torch.full((num_nodes_dict[node_type],), -1, dtype=torch.float32)
node_label_tensor[torch.as_tensor(node_label.node.values)] = torch.as_tensor(node_label.label.values)

del node_label

return {
"train_idx": (node_label_tensor > -1).contiguous().nonzero().view(-1),
"y": node_label_tensor.contiguous().long()
}


def replicate_node_label(node_label, node_type, replication_factor, input_meta):
base_num_nodes = input_meta["num_nodes"][node_type]

replicated_df = pd.DataFrame({
"node": pd.concat([node_label.node + (r * base_num_nodes) for r in range(1, replication_factor)]),
"label": pd.concat([node_label.label for _ in range(1, replication_factor)])
})

return pd.concat([node_label, replicated_df]).reset_index(drop=True)


def load_node_features(dataset_path, replication_factor, node_type):
print("Loading node features", flush=True)
node_type_path = os.path.join(dataset_path, "npy", node_type)
if replication_factor == 1:
fname = os.path.join(node_type_path, "node_feat.npy")
else:
fname = os.path.join(node_type_path, f"node_feat_{replication_factor}x.npy")

feat = torch.from_numpy(np.load(fname))
print("Loaded node features", flush=True)
return feat
110 changes: 110 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn.functional as F
import time


class GNN(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_layers, model_backend='dgl'):
if model_backend == 'dgl':
from dgl.nn import SAGEConv
else:
from cugraph_dgl.nn import SAGEConv

super(GNN, self).__init__()
self.convs = torch.nn.ModuleList()
for _ in range(num_layers - 1):
self.convs.append(SAGEConv(in_channels, hidden_channels, aggregator_type='mean'))
in_channels = hidden_channels
self.convs.append(SAGEConv(hidden_channels, out_channels, aggregator_type='mean'))

def forward(self, blocks, x):
for i, conv in enumerate(self.convs):
x = conv(blocks[i], x)
if i != len(self.convs) - 1:
x = F.relu(x)
return x


def create_model(feat_size, num_classes, num_layers, model_backend='dgl'):
model = GNN(feat_size, 64, num_classes, num_layers, model_backend=model_backend)
model = model.to('cuda')
model.train()
return model

def train_model(model, dataloader, opt, feat, y):
times = {key: 0 for key in ['mfg_creation', 'feature', 'm_fwd', 'm_bkwd']}
epoch_st = time.time()
mfg_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
times['mfg_creation'] += time.time() - mfg_st
if feat is not None:
fst = time.time()
input_nodes = input_nodes.to('cpu')
input_feat = feat[input_nodes]
input_feat = input_feat.to('cuda')
if isinstance(output_nodes, dict):
output_nodes = output_nodes['paper']
output_nodes = output_nodes.to(y.device)
y_batch = y[output_nodes].to('cuda')
times['feature'] += time.time() - fst

m_fwd_st = time.time()
y_hat = model(blocks, input_feat)
times['m_fwd'] += time.time() - m_fwd_st

m_bkwd_st = time.time()
loss = F.cross_entropy(y_hat, y_batch)
opt.zero_grad()
loss.backward()
opt.step()
times['m_bkwd'] += time.time() - m_bkwd_st
mfg_st = time.time()

print(f"Epoch time = {time.time() - epoch_st:.2f} seconds")

return times

def analyze_time(dataloader, times, epoch_time, fanout, batch_size):
num_batches = len(dataloader)
time_d = {
"fanout": fanout,
"batch_size": batch_size,
"epoch_time": epoch_time,
"epoch_time_per_batch": epoch_time / num_batches,
"num_batches": num_batches,
}
for key, value in times.items():
time_d[f"{key}_time_per_epoch"] = value
time_d[f"{key}_time_per_batch"] = value / num_batches

print(f"Time analysis for fanout = {fanout}, batch_size = {batch_size}")
for k in time_d.keys():
if 'time_per_epoch' in str(k):
print(f"{k} = {time_d[k]:.2f} seconds")
return time_d

def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend):
if feat is not None:
model = create_model(feat.shape[1], 172, len(fanout), model_backend=model_backend)
opt = torch.optim.Adam(model.parameters(), lr=0.01)
else:
model = None
opt = None
epoch_st = time.time()
times = train_model(model, dataloader, opt, feat, y)
epoch_time = time.time() - epoch_st
time_d = analyze_time(dataloader, times, epoch_time, fanout, batch_size)
return time_d
Loading

0 comments on commit f60d97b

Please sign in to comment.