From 663d95fab0a9e6312f93d1190d6dfd590b081cd5 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Tue, 7 Nov 2023 16:44:18 -0800 Subject: [PATCH] [REVIEW]Optimize cugraph-DGL csc codepath (#3977) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR optimizes `cugraph-DGL` csc codepath and adds an end to end benchmark using cugraph-dgl. ```python3 sampled_dir = "/raid/vjawa/nov_1_bulksampling_benchmarks/ogbn_papers100M[2]_b512_f[10, 10, 10]" dataset = HomogenousBulkSamplerDataset(meta_json_d["total_num_nodes"], edge_dir=edge_dir, sparse_format="csc", return_type="cugraph_dgl.nn.SparseGraph") dataset.set_input_files(input_directory=sampled_dir+"/samples") dataloader = torch.utils.data.DataLoader(dataset, collate_fn=lambda x:x, shuffle=False, num_workers=0, batch_size=None) def run(dataloader): for input_nodes, output_nodes, blocks in dataloader: pass ``` ```python3 %%timeit run(dataloader) ``` With PR : ```python3 2.48 s ± 14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` MAIN: ```python3 %%timeit 9.52 s ± 151 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` ### E2E Benchmarks: ``` python3 cugraph_dgl_benchmark.py ``` PR: ```python3 ... Epoch time = 70.41 seconds Time to create MFG = 3.37 seconds Time analysis for fanout = [10, 10, 10], batch_size = 512 mfg_creation_time_per_epoch = 3.37 seconds feature_time_per_epoch = 44.09 seconds m_fwd_time_per_epoch = 7.31 seconds m_bkwd_time_per_epoch = 15.60 seconds ``` MAIN: ```python3 .... Epoch time = 84.72 seconds Time to create MFG = 10.79 seconds Time analysis for fanout = [10, 10, 10], batch_size = 512 mfg_creation_time_per_epoch = 10.79 seconds feature_time_per_epoch = 47.09 seconds m_fwd_time_per_epoch = 8.24 seconds m_bkwd_time_per_epoch = 18.58 seconds ``` Authors: - Vibhu Jawa (https://github.com/VibhuJawa) Approvers: - Alex Barghi (https://github.com/alexbarghi-nv) - Tingyu Wang (https://github.com/tingyu66) URL: https://github.com/rapidsai/cugraph/pull/3977 --- .../scale-benchmarks/cugraph_dgl_benchmark.py | 152 ++++++++++++++++++ .../cugraph-dgl/scale-benchmarks/model.py | 17 +- .../bulk_sampling/cugraph_bulk_sampling.py | 55 +++++-- .../dataloading/utils/sampling_helpers.py | 71 ++++---- 4 files changed, 244 insertions(+), 51 deletions(-) create mode 100644 benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py b/benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py new file mode 100644 index 00000000000..85f43b97b90 --- /dev/null +++ b/benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py @@ -0,0 +1,152 @@ +# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +os.environ["LIBCUDF_CUFILE_POLICY"] = "KVIKIO" +os.environ["KVIKIO_NTHREADS"] = "64" +os.environ["RAPIDS_NO_INITIALIZE"] = "1" +import json +import pandas as pd +import os +import time +from rmm.allocators.torch import rmm_torch_allocator +import rmm +import torch +from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset +from model import run_1_epoch +from argparse import ArgumentParser +from load_graph_feats import load_node_labels, load_node_features + + +def create_dataloader(sampled_dir, total_num_nodes, sparse_format, return_type): + print("Creating dataloader", flush=True) + st = time.time() + dataset = HomogenousBulkSamplerDataset( + total_num_nodes, + edge_dir="in", + sparse_format=sparse_format, + return_type=return_type, + ) + + dataset.set_input_files(sampled_dir) + dataloader = torch.utils.data.DataLoader( + dataset, collate_fn=lambda x: x, shuffle=False, num_workers=0, batch_size=None + ) + et = time.time() + print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True) + return dataloader + + +def setup_common_pool(): + rmm.reinitialize(initial_pool_size=5e9, pool_allocator=True) + torch.cuda.memory.change_current_allocator(rmm_torch_allocator) + + +def main(args): + print( + f"Running cugraph-dgl dataloading benchmark with the following parameters:\n" + f"Dataset path = {args.dataset_path}\n" + f"Sampling path = {args.sampling_path}\n" + ) + with open(os.path.join(args.dataset_path, "meta.json"), "r") as f: + input_meta = json.load(f) + + sampled_dirs = [ + os.path.join(args.sampling_path, f) for f in os.listdir(args.sampling_path) + ] + + time_ls = [] + for sampled_dir in sampled_dirs: + with open(os.path.join(sampled_dir, "output_meta.json"), "r") as f: + sampled_meta_d = json.load(f) + + replication_factor = sampled_meta_d["replication_factor"] + feat_load_st = time.time() + label_data = load_node_labels( + args.dataset_path, replication_factor, input_meta + )["paper"]["y"] + feat_data = feat_data = load_node_features( + args.dataset_path, replication_factor, node_type="paper" + ) + print( + f"Feature and label data loading took = {time.time()-feat_load_st}", + flush=True, + ) + + r_time_ls = e2e_benchmark(sampled_dir, feat_data, label_data, sampled_meta_d) + [x.update({"replication_factor": replication_factor}) for x in r_time_ls] + [x.update({"num_edges": sampled_meta_d["total_num_edges"]}) for x in r_time_ls] + time_ls.extend(r_time_ls) + + print( + f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}", + flush=True, + ) + + df = pd.DataFrame(time_ls) + df.to_csv("cugraph_dgl_e2e_benchmark.csv", index=False) + print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True) + + +def e2e_benchmark( + sampled_dir: str, feat: torch.Tensor, y: torch.Tensor, sampled_meta_d: dict +): + """ + Run the e2e_benchmark + Args: + sampled_dir: directory containing the sampled graph + feat: node features + y: node labels + sampled_meta_d: dictionary containing the sampled graph metadata + """ + time_ls = [] + + # TODO: Make this a parameter in bulk sampling script + sampled_meta_d["sparse_format"] = "csc" + sampled_dir = os.path.join(sampled_dir, "samples") + dataloader = create_dataloader( + sampled_dir, + sampled_meta_d["total_num_nodes"], + sampled_meta_d["sparse_format"], + return_type="cugraph_dgl.nn.SparseGraph", + ) + time_d = run_1_epoch( + dataloader, + feat, + y, + fanout=sampled_meta_d["fanout"], + batch_size=sampled_meta_d["batch_size"], + model_backend="cugraph_dgl", + ) + time_ls.append(time_d) + print("=" * 30) + return time_ls + + +def parse_arguments(): + parser = ArgumentParser() + parser.add_argument( + "--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M/" + ) + parser.add_argument( + "--sampling_path", + type=str, + default="/raid/vjawa/nov_1_bulksampling_benchmarks/", + ) + return parser.parse_args() + + +if __name__ == "__main__": + setup_common_pool() + arguments = parse_arguments() + main(arguments) diff --git a/benchmarks/cugraph-dgl/scale-benchmarks/model.py b/benchmarks/cugraph-dgl/scale-benchmarks/model.py index 08ae0e8b1ee..9a9dfe58f96 100644 --- a/benchmarks/cugraph-dgl/scale-benchmarks/model.py +++ b/benchmarks/cugraph-dgl/scale-benchmarks/model.py @@ -57,11 +57,11 @@ def create_model(feat_size, num_classes, num_layers, model_backend="dgl"): def train_model(model, dataloader, opt, feat, y): - times = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]} + times_d = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]} epoch_st = time.time() mfg_st = time.time() for input_nodes, output_nodes, blocks in dataloader: - times["mfg_creation"] += time.time() - mfg_st + times_d["mfg_creation"] += time.time() - mfg_st if feat is not None: fst = time.time() input_nodes = input_nodes.to("cpu") @@ -71,23 +71,24 @@ def train_model(model, dataloader, opt, feat, y): output_nodes = output_nodes["paper"] output_nodes = output_nodes.to(y.device) y_batch = y[output_nodes].to("cuda") - times["feature"] += time.time() - fst + times_d["feature"] += time.time() - fst m_fwd_st = time.time() y_hat = model(blocks, input_feat) - times["m_fwd"] += time.time() - m_fwd_st + times_d["m_fwd"] += time.time() - m_fwd_st m_bkwd_st = time.time() loss = F.cross_entropy(y_hat, y_batch) opt.zero_grad() loss.backward() opt.step() - times["m_bkwd"] += time.time() - m_bkwd_st + times_d["m_bkwd"] += time.time() - m_bkwd_st mfg_st = time.time() print(f"Epoch time = {time.time() - epoch_st:.2f} seconds") + print(f"Time to create MFG = {times_d['mfg_creation']:.2f} seconds") - return times + return times_d def analyze_time(dataloader, times, epoch_time, fanout, batch_size): @@ -119,6 +120,10 @@ def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend): else: model = None opt = None + + # Warmup RUN + times = train_model(model, dataloader, opt, feat, y) + epoch_st = time.time() times = train_model(model, dataloader, opt, feat, y) epoch_time = time.time() - epoch_st diff --git a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py index a8c0658767d..1ca5d6db637 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py @@ -22,7 +22,6 @@ get_allocation_counts_dask_lazy, sizeof_fmt, get_peak_output_ratio_across_workers, - restart_client, start_dask_client, stop_dask_client, enable_spilling, @@ -187,10 +186,10 @@ def sample_graph( output_path, seed=42, batch_size=500, - seeds_per_call=200000, + seeds_per_call=400000, batches_per_partition=100, fanout=[5, 5, 5], - persist=False, + sampling_kwargs={}, ): cupy.random.seed(seed) @@ -204,6 +203,7 @@ def sample_graph( seeds_per_call=seeds_per_call, batches_per_partition=batches_per_partition, log_level=logging.INFO, + **sampling_kwargs, ) n_workers = len(default_client().scheduler_info()["workers"]) @@ -469,6 +469,7 @@ def benchmark_cugraph_bulk_sampling( batch_size, seeds_per_call, fanout, + sampling_target_framework, reverse_edges=True, dataset_dir=".", replication_factor=1, @@ -564,17 +565,39 @@ def benchmark_cugraph_bulk_sampling( output_sample_path = os.path.join(output_subdir, "samples") os.makedirs(output_sample_path) - batches_per_partition = 200_000 // batch_size + if sampling_target_framework == "cugraph_dgl_csr": + sampling_kwargs = { + "deduplicate_sources": True, + "prior_sources_behavior": "carryover", + "renumber": True, + "compression": "CSR", + "compress_per_hop": True, + "use_legacy_names": False, + "include_hop_column": False, + } + else: + # FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.02) + sampling_kwargs = { + "deduplicate_sources": True, + "prior_sources_behavior": "exclude", + "renumber": True, + "compression": "COO", + "compress_per_hop": False, + "use_legacy_names": False, + "include_hop_column": True, + } + + batches_per_partition = 400_000 // batch_size execution_time, allocation_counts = sample_graph( - G, - dask_label_df, - output_sample_path, + G=G, + label_df=dask_label_df, + output_path=output_sample_path, seed=seed, batch_size=batch_size, seeds_per_call=seeds_per_call, batches_per_partition=batches_per_partition, fanout=fanout, - persist=persist, + sampling_kwargs=sampling_kwargs, ) output_meta = { @@ -701,7 +724,13 @@ def get_args(): required=False, default=False, ) - + parser.add_argument( + "--sampling_target_framework", + type=str, + help="The target framework for sampling (i.e. cugraph_dgl_csr, cugraph_pyg_csc, ...)", + required=False, + default=None, + ) parser.add_argument( "--dask_worker_devices", type=str, @@ -738,6 +767,12 @@ def get_args(): logging.basicConfig() args = get_args() + if args.sampling_target_framework not in ["cugraph_dgl_csr", None]: + raise ValueError( + "sampling_target_framework must be one of cugraph_dgl_csr or None", + "Other frameworks are not supported at this time.", + ) + fanouts = [ [int(f) for f in fanout.split("_")] for fanout in args.fanouts.split(",") ] @@ -785,6 +820,7 @@ def get_args(): batch_size=batch_size, seeds_per_call=seeds_per_call, fanout=fanout, + sampling_target_framework=args.sampling_target_framework, dataset_dir=args.dataset_root, reverse_edges=args.reverse_edges, replication_factor=replication_factor, @@ -809,7 +845,6 @@ def get_args(): warnings.warn("An Exception Occurred!") print(e) traceback.print_exc() - restart_client(client) sleep(10) stats_df = pd.DataFrame( diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py index a4f64668348..f674bece8be 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py @@ -14,7 +14,6 @@ from typing import List, Tuple, Dict, Optional from collections import defaultdict import cudf -import cupy from cugraph.utilities.utils import import_optional from cugraph_dgl.nn import SparseGraph @@ -444,53 +443,58 @@ def _process_sampled_df_csc( destinations, respectively. """ # dropna - major_offsets = df.major_offsets.dropna().values - label_hop_offsets = df.label_hop_offsets.dropna().values - renumber_map_offsets = df.renumber_map_offsets.dropna().values - renumber_map = df.map.dropna().values - minors = df.minors.dropna().values + major_offsets = cast_to_tensor(df.major_offsets.dropna()) + label_hop_offsets = cast_to_tensor(df.label_hop_offsets.dropna()) + renumber_map_offsets = cast_to_tensor(df.renumber_map_offsets.dropna()) + renumber_map = cast_to_tensor(df.map.dropna()) + minors = cast_to_tensor(df.minors.dropna()) - n_batches = renumber_map_offsets.size - 1 - n_hops = int((label_hop_offsets.size - 1) / n_batches) + n_batches = len(renumber_map_offsets) - 1 + n_hops = int((len(label_hop_offsets) - 1) / n_batches) # make global offsets local - major_offsets -= major_offsets[0] - label_hop_offsets -= label_hop_offsets[0] - renumber_map_offsets -= renumber_map_offsets[0] + # Have to make a clone as pytorch does not allow + # in-place operations on tensors + major_offsets -= major_offsets[0].clone() + label_hop_offsets -= label_hop_offsets[0].clone() + renumber_map_offsets -= renumber_map_offsets[0].clone() # get the sizes of each adjacency matrix (for MFGs) mfg_sizes = (label_hop_offsets[1:] - label_hop_offsets[:-1]).reshape( (n_batches, n_hops) ) n_nodes = renumber_map_offsets[1:] - renumber_map_offsets[:-1] - mfg_sizes = cupy.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) + mfg_sizes = torch.hstack((mfg_sizes, n_nodes.reshape(n_batches, -1))) if reverse_hop_id: - mfg_sizes = mfg_sizes[:, ::-1] + mfg_sizes = mfg_sizes.flip(1) tensors_dict = {} renumber_map_list = [] + # Note: minors and major_offsets from BulkSampler are of type int32 + # and int64 respectively. Since pylibcugraphops binding code doesn't + # support distinct node and edge index type, we simply casting both + # to int32 for now. + minors = minors.int() + major_offsets = major_offsets.int() + # Note: We transfer tensors to CPU here to avoid the overhead of + # transferring them in each iteration of the for loop below. + major_offsets_cpu = major_offsets.to("cpu").numpy() + label_hop_offsets_cpu = label_hop_offsets.to("cpu").numpy() + for batch_id in range(n_batches): batch_dict = {} - for hop_id in range(n_hops): hop_dict = {} idx = batch_id * n_hops + hop_id # idx in label_hop_offsets - major_offsets_start = label_hop_offsets[idx].item() - major_offsets_end = label_hop_offsets[idx + 1].item() - minors_start = major_offsets[major_offsets_start].item() - minors_end = major_offsets[major_offsets_end].item() - # Note: minors and major_offsets from BulkSampler are of type int32 - # and int64 respectively. Since pylibcugraphops binding code doesn't - # support distinct node and edge index type, we simply casting both - # to int32 for now. - hop_dict["minors"] = torch.as_tensor( - minors[minors_start:minors_end], device="cuda" - ).int() - hop_dict["major_offsets"] = torch.as_tensor( + major_offsets_start = label_hop_offsets_cpu[idx] + major_offsets_end = label_hop_offsets_cpu[idx + 1] + minors_start = major_offsets_cpu[major_offsets_start] + minors_end = major_offsets_cpu[major_offsets_end] + hop_dict["minors"] = minors[minors_start:minors_end] + hop_dict["major_offsets"] = ( major_offsets[major_offsets_start : major_offsets_end + 1] - - major_offsets[major_offsets_start], - device="cuda", - ).int() + - major_offsets[major_offsets_start] + ) if reverse_hop_id: batch_dict[n_hops - 1 - hop_id] = hop_dict else: @@ -499,12 +503,9 @@ def _process_sampled_df_csc( tensors_dict[batch_id] = batch_dict renumber_map_list.append( - torch.as_tensor( - renumber_map[ - renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] - ], - device="cuda", - ) + renumber_map[ + renumber_map_offsets[batch_id] : renumber_map_offsets[batch_id + 1] + ], ) return tensors_dict, renumber_map_list, mfg_sizes.tolist()