Skip to content

Commit

Permalink
[REVIEW] Add Pure DGL Dataloading benchmark (#3660)
Browse files Browse the repository at this point in the history
This PR adds the DGL data loading benchmark:

Arguments supported: 
- dataset_path: path to the dataset
- replication_factors: replication factors for number of edges 
- fanouts: fanouts
- batch_sizes: batch sizes

```bash 
python3 dgl_dataloading.py --dataset_path "/datasets/abarghi/ogbn_papers100M" \
--replication_factors "1,2,4" \
--fanouts "25_25,10_10_10,5_10_20" \
--batch_sizes "512,1024"
``` 


This produces the following results on a V100:

| Fanout | Batch Size | Data Loading Time per Epoch | Data Loading Time per Batch | Number of Edges | Number of Batches | Replication Factor |
|--------|------------|-----------------------------|-----------------------------|-----------------|-------------------|--------------------|
| [25, 25] | 512 | 9.48 | 0.0031 | 1615685872 | 3022 | 1 |
| [25, 25] | 1024 | 6.39 | 0.0042 | 1615685872 | 1511 | 1 |
| [10, 10, 10] | 512 | 15.91 | 0.0053 | 1615685872 | 3022 | 1 |
| [10, 10, 10] | 1024 | 11.64 | 0.0077 | 1615685872 | 1511 | 1 |
| [5, 10, 20] | 512 | 17.73 | 0.0059 | 1615685872 | 3022 | 1 |
| [5, 10, 20] | 1024 | 13.52 | 0.0089 | 1615685872 | 1511 | 1 |
| [25, 25] | 512 | 19.44 | 0.0032 | 3231371744 | 6043 | 2 |
| [25, 25] | 1024 | 12.98 | 0.0043 | 3231371744 | 3022 | 2 |
| [10, 10, 10] | 512 | 32.88 | 0.0054 | 3231371744 | 6043 | 2 |
| [10, 10, 10] | 1024 | 24.35 | 0.0081 | 3231371744 | 3022 | 2 |
| [5, 10, 20] | 512 | 38.35 | 0.0063 | 3231371744 | 6043 | 2 |
| [5, 10, 20] | 1024 | 28.93 | 0.0096 | 3231371744 | 3022 | 2 |
| [25, 25] | 512 | 37.31 | 0.0031 | 6462743488 | 12085 | 4 |
| [25, 25] | 1024 | 25.15 | 0.0042 | 6462743488 | 6043 | 4 |
| [10, 10, 10] | 512 | 64.29 | 0.0053 | 6462743488 | 12085 | 4 |
| [10, 10, 10] | 1024 | 47.13 | 0.0078 | 6462743488 | 6043 | 4 |
| [5, 10, 20] | 512 | 72.90 | 0.0060 | 6462743488 | 12085 | 4 |
| [5, 10, 20] | 1024 | 56.70 | 0.0094 | 6462743488 | 6043 | 4 |
| [25, 25] | 512 | 80.99 | 0.0034 | 12925486976 | 24169 | 8 |
| [25, 25] | 1024 | 50.89 | 0.0042 | 12925486976 | 12085 | 8 |
| [10, 10, 10] | 512 | 129.49 | 0.0054 | 12925486976 | 24169 | 8 |
| [10, 10, 10] | 1024 | 93.66 | 0.0078 | 12925486976 | 12085 | 8 |
| [5, 10, 20] | 512 | 143.45 | 0.0059 | 12925486976 | 24169 | 8 |
| [5, 10, 20] | 1024 | 110.22 | 0.0091 | 12925486976 | 12085 | 8 |

Authors:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Brad Rees (https://github.com/BradReesWork)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Tingyu Wang (https://github.com/tingyu66)

URL: #3660
  • Loading branch information
VibhuJawa authored Sep 26, 2023
1 parent c11eff2 commit b199bf0
Showing 1 changed file with 291 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import dgl
import torch
import pandas as pd
import os
import time
import json
import random
import numpy as np
from argparse import ArgumentParser


def load_edges_from_disk(parquet_path, replication_factor, input_meta):
"""
Load the edges from disk into a graph data dictionary.
Args:
parquet_path: Path to the parquet directory.
replication_factor: Number of times to replicate the edges.
input_meta: Input meta data.
Returns:
dict: Dictionary of edge types to a tuple of (src, dst)
"""
graph_data = {}
for edge_type in input_meta["num_edges"].keys():
print(
f"Loading edge index for edge type {edge_type}"
f"for replication factor = {replication_factor}"
)
can_edge_type = tuple(edge_type.split("__"))
# TODO: Rename `edge_index` to a better name
ei = pd.read_parquet(
os.path.join(parquet_path, edge_type, "edge_index.parquet")
)
ei = {
"src": torch.from_numpy(ei.src.values),
"dst": torch.from_numpy(ei.dst.values),
}
if replication_factor > 1:
src_ls = [ei["src"]]
dst_ls = [ei["dst"]]
for r in range(1, replication_factor):
new_src = ei["src"] + (
r * input_meta["num_nodes"][can_edge_type[0]]
)
src_ls.append(new_src)
new_dst = ei["dst"] + (
r * input_meta["num_nodes"][can_edge_type[2]]
)
dst_ls.append(new_dst)

ei["src"] = torch.cat(src_ls).contiguous()
ei["dst"] = torch.cat(dst_ls).contiguous()
graph_data[can_edge_type] = ei["src"], ei["dst"]
print("Graph Data compiled")
return graph_data


def load_node_labels(dataset_path, replication_factor, input_meta):
num_nodes_dict = {
node_type: t * replication_factor
for node_type, t in input_meta["num_nodes"].items()
}
node_data = {}
for node_type in input_meta["num_nodes"].keys():
node_data[node_type] = {}
label_path = os.path.join(
dataset_path, "parquet", node_type, "node_label.parquet"
)
if os.path.exists(label_path):
node_label = pd.read_parquet(label_path)
if replication_factor > 1:
base_num_nodes = input_meta["num_nodes"][node_type]
dfr = pd.DataFrame(
{
"node": pd.concat(
[
node_label.node + (r * base_num_nodes)
for r in range(1, replication_factor)
]
),
"label": pd.concat(
[
node_label.label
for r in range(1, replication_factor)
]
),
}
)
node_label = pd.concat([node_label, dfr]).reset_index(
drop=True
)

node_label_tensor = torch.full(
(num_nodes_dict[node_type],), -1, dtype=torch.float32
)
node_label_tensor[
torch.as_tensor(node_label.node.values)
] = torch.as_tensor(node_label.label.values)

del node_label
node_data[node_type]["train_idx"] = (
(node_label_tensor > -1).contiguous().nonzero().view(-1)
)
node_data[node_type]["y"] = node_label_tensor.contiguous()
else:
node_data[node_type]["num_nodes"] = num_nodes_dict[node_type]
return node_data


def create_dgl_graph_from_disk(dataset_path, replication_factor=1):
"""
Create a DGL graph from a dataset on disk.
Args:
dataset_path: Path to the dataset on disk.
replication_factor: Number of times to replicate the edges.
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""
with open(os.path.join(dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

parquet_path = os.path.join(dataset_path, "parquet")
graph_data = load_edges_from_disk(
parquet_path, replication_factor, input_meta
)
node_data = load_node_labels(dataset_path, replication_factor, input_meta)
g = dgl.heterograph(graph_data)

return g, node_data


def create_dataloader(g, train_idx, batch_size, fanouts, use_uva):
"""
Create a DGL dataloader from a DGL graph.
Args:
g: DGLGraph to create the dataloader from.
train_idx: Tensor containing the training indices.
batch_size: Batch size to use for the dataloader.
fanouts: List of fanouts to use for the dataloader.
use_uva: Whether to use unified virtual address space.
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""

print("Creating dataloader", flush=True)
st = time.time()
if use_uva:
train_idx = {k: v.to("cuda") for k, v in train_idx.items()}
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts=fanouts)
dataloader = dgl.dataloading.DataLoader(
g,
train_idx,
sampler,
num_workers=0,
batch_size=batch_size,
use_uva=use_uva,
shuffle=False,
drop_last=False,
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds")
return dataloader


def dataloading_benchmark(g, train_idx, fanouts, batch_sizes, use_uva):
"""
Run the dataloading benchmark.
Args:
g: DGLGraph
train_idx: Tensor containing the training indices.
fanouts: List of fanouts to use for the dataloader.
batch_sizes: List of batch sizes to use for the dataloader.
use_uva: Whether to use unified virtual address space.
"""
time_ls = []
for fanout in fanouts:
for batch_size in batch_sizes:
dataloader = create_dataloader(
g,
train_idx,
batch_size=batch_size,
fanouts=fanout,
use_uva=use_uva,
)
dataloading_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
pass
dataloading_et = time.time()
dataloading_time = dataloading_et - dataloading_st
time_d = {
"fanout": fanout,
"batch_size": batch_size,
"dataloading_time_per_epoch": dataloading_time,
"dataloading_time_per_batch": dataloading_time / len(dataloader),
"num_edges": g.num_edges(),
"num_batches": len(dataloader),
}
time_ls.append(time_d)

print("Dataloading completed")
print(f"Fanout = {fanout}, batch_size = {batch_size}")
print(
f"Time taken {dataloading_time:.2f} ",
f"seconds for num batches {len(dataloader)}",
flush=True,
)
print("==============================================")
return time_ls

def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/datasets/abarghi/ogbn_papers100M"
)
parser.add_argument("--replication_factors", type=str, default="1,2,4,8")
parser.add_argument(
"--fanouts", type=str, default="25_25,10_10_10,5_10_20"
)
parser.add_argument("--batch_sizes", type=str, default="512,1024")
parser.add_argument("--do_not_use_uva", action="store_true")
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()

if args.do_not_use_uva:
use_uva = False
else:
use_uva = True
set_seed(args.seed)
replication_factors = [int(x) for x in args.replication_factors.split(",")]
fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")]
batch_sizes = [int(x) for x in args.batch_sizes.split(",")]

print("Running dgl dataloading benchmark with the following parameters:")
print(f"Dataset path = {args.dataset_path}")
print(f"Replication factors = {replication_factors}")
print(f"Fanouts = {fanouts}")
print(f"Batch sizes = {batch_sizes}")
print(f"Use UVA = {use_uva}")
print("==============================================")

time_ls = []
for replication_factor in replication_factors:
st = time.time()
g, node_data = create_dgl_graph_from_disk(
dataset_path=args.dataset_path,
replication_factor=replication_factor,
)
et = time.time()
print(f"Replication factor = {replication_factor}")
print(
f"G has {g.num_edges()} edges and took",
f" {et - st:.2f} seconds to load"
)
train_idx = {"paper": node_data["paper"]["train_idx"]}
r_time_ls = dataloading_benchmark(
g, train_idx, fanouts, batch_sizes, use_uva=use_uva
)
print(
"Benchmark completed for replication factor = ", replication_factor
)
print("==============================================")
# Add replication factor to the time list
[
x.update({"replication_factor": replication_factor})
for x in r_time_ls
]
time_ls.extend(r_time_ls)

df = pd.DataFrame(time_ls)
df.to_csv("dgl_dataloading_benchmark.csv", index=False)
print("Benchmark completed for all replication factors")
print("==============================================")

0 comments on commit b199bf0

Please sign in to comment.