Skip to content

Commit

Permalink
[REVIEW]Optimize cugraph-DGL csc codepath (#3977)
Browse files Browse the repository at this point in the history
This PR optimizes `cugraph-DGL` csc codepath  and adds an end to end benchmark using cugraph-dgl. 


```python3
sampled_dir = "/raid/vjawa/nov_1_bulksampling_benchmarks/ogbn_papers100M[2]_b512_f[10, 10, 10]"
dataset = HomogenousBulkSamplerDataset(meta_json_d["total_num_nodes"], edge_dir=edge_dir, sparse_format="csc", return_type="cugraph_dgl.nn.SparseGraph")
dataset.set_input_files(input_directory=sampled_dir+"/samples")
dataloader = torch.utils.data.DataLoader(dataset, collate_fn=lambda x:x, shuffle=False, num_workers=0, batch_size=None)

def run(dataloader):
    for input_nodes, output_nodes, blocks in dataloader: 
       pass
``` 
```python3
%%timeit
run(dataloader)
```
With PR :
```python3
2.48 s ± 14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

MAIN: 
```python3
%%timeit
9.52 s ± 151 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

### E2E Benchmarks:

```
 python3 cugraph_dgl_benchmark.py
```

PR:
```python3
...
Epoch time = 70.41 seconds
Time to create MFG = 3.37 seconds
Time analysis for fanout = [10, 10, 10], batch_size = 512
mfg_creation_time_per_epoch = 3.37 seconds
feature_time_per_epoch = 44.09 seconds
m_fwd_time_per_epoch = 7.31 seconds
m_bkwd_time_per_epoch = 15.60 seconds
```
MAIN:
```python3
....
Epoch time = 84.72 seconds
Time to create MFG = 10.79 seconds
Time analysis for fanout = [10, 10, 10], batch_size = 512
mfg_creation_time_per_epoch = 10.79 seconds
feature_time_per_epoch = 47.09 seconds
m_fwd_time_per_epoch = 8.24 seconds
m_bkwd_time_per_epoch = 18.58 seconds
```

Authors:
  - Vibhu Jawa (https://github.com/VibhuJawa)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Tingyu Wang (https://github.com/tingyu66)

URL: #3977
  • Loading branch information
VibhuJawa authored Nov 8, 2023
1 parent ac5b981 commit 663d95f
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 51 deletions.
152 changes: 152 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

os.environ["LIBCUDF_CUFILE_POLICY"] = "KVIKIO"
os.environ["KVIKIO_NTHREADS"] = "64"
os.environ["RAPIDS_NO_INITIALIZE"] = "1"
import json
import pandas as pd
import os
import time
from rmm.allocators.torch import rmm_torch_allocator
import rmm
import torch
from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset
from model import run_1_epoch
from argparse import ArgumentParser
from load_graph_feats import load_node_labels, load_node_features


def create_dataloader(sampled_dir, total_num_nodes, sparse_format, return_type):
print("Creating dataloader", flush=True)
st = time.time()
dataset = HomogenousBulkSamplerDataset(
total_num_nodes,
edge_dir="in",
sparse_format=sparse_format,
return_type=return_type,
)

dataset.set_input_files(sampled_dir)
dataloader = torch.utils.data.DataLoader(
dataset, collate_fn=lambda x: x, shuffle=False, num_workers=0, batch_size=None
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True)
return dataloader


def setup_common_pool():
rmm.reinitialize(initial_pool_size=5e9, pool_allocator=True)
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def main(args):
print(
f"Running cugraph-dgl dataloading benchmark with the following parameters:\n"
f"Dataset path = {args.dataset_path}\n"
f"Sampling path = {args.sampling_path}\n"
)
with open(os.path.join(args.dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

sampled_dirs = [
os.path.join(args.sampling_path, f) for f in os.listdir(args.sampling_path)
]

time_ls = []
for sampled_dir in sampled_dirs:
with open(os.path.join(sampled_dir, "output_meta.json"), "r") as f:
sampled_meta_d = json.load(f)

replication_factor = sampled_meta_d["replication_factor"]
feat_load_st = time.time()
label_data = load_node_labels(
args.dataset_path, replication_factor, input_meta
)["paper"]["y"]
feat_data = feat_data = load_node_features(
args.dataset_path, replication_factor, node_type="paper"
)
print(
f"Feature and label data loading took = {time.time()-feat_load_st}",
flush=True,
)

r_time_ls = e2e_benchmark(sampled_dir, feat_data, label_data, sampled_meta_d)
[x.update({"replication_factor": replication_factor}) for x in r_time_ls]
[x.update({"num_edges": sampled_meta_d["total_num_edges"]}) for x in r_time_ls]
time_ls.extend(r_time_ls)

print(
f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}",
flush=True,
)

df = pd.DataFrame(time_ls)
df.to_csv("cugraph_dgl_e2e_benchmark.csv", index=False)
print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True)


def e2e_benchmark(
sampled_dir: str, feat: torch.Tensor, y: torch.Tensor, sampled_meta_d: dict
):
"""
Run the e2e_benchmark
Args:
sampled_dir: directory containing the sampled graph
feat: node features
y: node labels
sampled_meta_d: dictionary containing the sampled graph metadata
"""
time_ls = []

# TODO: Make this a parameter in bulk sampling script
sampled_meta_d["sparse_format"] = "csc"
sampled_dir = os.path.join(sampled_dir, "samples")
dataloader = create_dataloader(
sampled_dir,
sampled_meta_d["total_num_nodes"],
sampled_meta_d["sparse_format"],
return_type="cugraph_dgl.nn.SparseGraph",
)
time_d = run_1_epoch(
dataloader,
feat,
y,
fanout=sampled_meta_d["fanout"],
batch_size=sampled_meta_d["batch_size"],
model_backend="cugraph_dgl",
)
time_ls.append(time_d)
print("=" * 30)
return time_ls


def parse_arguments():
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M/"
)
parser.add_argument(
"--sampling_path",
type=str,
default="/raid/vjawa/nov_1_bulksampling_benchmarks/",
)
return parser.parse_args()


if __name__ == "__main__":
setup_common_pool()
arguments = parse_arguments()
main(arguments)
17 changes: 11 additions & 6 deletions benchmarks/cugraph-dgl/scale-benchmarks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def create_model(feat_size, num_classes, num_layers, model_backend="dgl"):


def train_model(model, dataloader, opt, feat, y):
times = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
times_d = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
epoch_st = time.time()
mfg_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
times["mfg_creation"] += time.time() - mfg_st
times_d["mfg_creation"] += time.time() - mfg_st
if feat is not None:
fst = time.time()
input_nodes = input_nodes.to("cpu")
Expand All @@ -71,23 +71,24 @@ def train_model(model, dataloader, opt, feat, y):
output_nodes = output_nodes["paper"]
output_nodes = output_nodes.to(y.device)
y_batch = y[output_nodes].to("cuda")
times["feature"] += time.time() - fst
times_d["feature"] += time.time() - fst

m_fwd_st = time.time()
y_hat = model(blocks, input_feat)
times["m_fwd"] += time.time() - m_fwd_st
times_d["m_fwd"] += time.time() - m_fwd_st

m_bkwd_st = time.time()
loss = F.cross_entropy(y_hat, y_batch)
opt.zero_grad()
loss.backward()
opt.step()
times["m_bkwd"] += time.time() - m_bkwd_st
times_d["m_bkwd"] += time.time() - m_bkwd_st
mfg_st = time.time()

print(f"Epoch time = {time.time() - epoch_st:.2f} seconds")
print(f"Time to create MFG = {times_d['mfg_creation']:.2f} seconds")

return times
return times_d


def analyze_time(dataloader, times, epoch_time, fanout, batch_size):
Expand Down Expand Up @@ -119,6 +120,10 @@ def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend):
else:
model = None
opt = None

# Warmup RUN
times = train_model(model, dataloader, opt, feat, y)

epoch_st = time.time()
times = train_model(model, dataloader, opt, feat, y)
epoch_time = time.time() - epoch_st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_allocation_counts_dask_lazy,
sizeof_fmt,
get_peak_output_ratio_across_workers,
restart_client,
start_dask_client,
stop_dask_client,
enable_spilling,
Expand Down Expand Up @@ -187,10 +186,10 @@ def sample_graph(
output_path,
seed=42,
batch_size=500,
seeds_per_call=200000,
seeds_per_call=400000,
batches_per_partition=100,
fanout=[5, 5, 5],
persist=False,
sampling_kwargs={},
):
cupy.random.seed(seed)

Expand All @@ -204,6 +203,7 @@ def sample_graph(
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
log_level=logging.INFO,
**sampling_kwargs,
)

n_workers = len(default_client().scheduler_info()["workers"])
Expand Down Expand Up @@ -469,6 +469,7 @@ def benchmark_cugraph_bulk_sampling(
batch_size,
seeds_per_call,
fanout,
sampling_target_framework,
reverse_edges=True,
dataset_dir=".",
replication_factor=1,
Expand Down Expand Up @@ -564,17 +565,39 @@ def benchmark_cugraph_bulk_sampling(
output_sample_path = os.path.join(output_subdir, "samples")
os.makedirs(output_sample_path)

batches_per_partition = 200_000 // batch_size
if sampling_target_framework == "cugraph_dgl_csr":
sampling_kwargs = {
"deduplicate_sources": True,
"prior_sources_behavior": "carryover",
"renumber": True,
"compression": "CSR",
"compress_per_hop": True,
"use_legacy_names": False,
"include_hop_column": False,
}
else:
# FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.02)
sampling_kwargs = {
"deduplicate_sources": True,
"prior_sources_behavior": "exclude",
"renumber": True,
"compression": "COO",
"compress_per_hop": False,
"use_legacy_names": False,
"include_hop_column": True,
}

batches_per_partition = 400_000 // batch_size
execution_time, allocation_counts = sample_graph(
G,
dask_label_df,
output_sample_path,
G=G,
label_df=dask_label_df,
output_path=output_sample_path,
seed=seed,
batch_size=batch_size,
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
fanout=fanout,
persist=persist,
sampling_kwargs=sampling_kwargs,
)

output_meta = {
Expand Down Expand Up @@ -701,7 +724,13 @@ def get_args():
required=False,
default=False,
)

parser.add_argument(
"--sampling_target_framework",
type=str,
help="The target framework for sampling (i.e. cugraph_dgl_csr, cugraph_pyg_csc, ...)",
required=False,
default=None,
)
parser.add_argument(
"--dask_worker_devices",
type=str,
Expand Down Expand Up @@ -738,6 +767,12 @@ def get_args():
logging.basicConfig()

args = get_args()
if args.sampling_target_framework not in ["cugraph_dgl_csr", None]:
raise ValueError(
"sampling_target_framework must be one of cugraph_dgl_csr or None",
"Other frameworks are not supported at this time.",
)

fanouts = [
[int(f) for f in fanout.split("_")] for fanout in args.fanouts.split(",")
]
Expand Down Expand Up @@ -785,6 +820,7 @@ def get_args():
batch_size=batch_size,
seeds_per_call=seeds_per_call,
fanout=fanout,
sampling_target_framework=args.sampling_target_framework,
dataset_dir=args.dataset_root,
reverse_edges=args.reverse_edges,
replication_factor=replication_factor,
Expand All @@ -809,7 +845,6 @@ def get_args():
warnings.warn("An Exception Occurred!")
print(e)
traceback.print_exc()
restart_client(client)
sleep(10)

stats_df = pd.DataFrame(
Expand Down
Loading

0 comments on commit 663d95f

Please sign in to comment.