Skip to content

Commit

Permalink
Merge branch 'branch-23.12' into fix-loader-bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv authored Nov 8, 2023
2 parents 65f50a3 + c3b3cee commit 9b7901c
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 86 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,18 @@
</div>

-----
## News

## Table of content
___NEW!___ _[nx-cugraph](./python/nx-cugraph/README.md)_, a NetworkX backend that provides GPU acceleration to NetworkX with zero code change.
```
> pip install nx-cugraph-cu11 --extra-index-url https://pypi.nvidia.com
> export NETWORKX_AUTOMATIC_BACKENDS=cugraph
```
That's it. NetworkX now leverages cuGraph for accelerated graph algorithms.

-----

## Table of contents
- Installation
- [Getting cuGraph Packages](./docs/cugraph/source/installation/getting_cugraph.md)
- [Building from Source](./docs/cugraph/source/installation/source_build.md)
Expand All @@ -52,6 +62,7 @@
- [External Data Types](./readme_pages/data_types.md)
- [pylibcugraph](./readme_pages/pylibcugraph.md)
- [libcugraph (C/C++/CUDA)](./readme_pages/libcugraph.md)
- [nx-cugraph](./python/nx-cugraph/README.md)
- [cugraph-service](./readme_pages/cugraph_service.md)
- [cugraph-dgl](./readme_pages/cugraph_dgl.md)
- [cugraph-ops](./readme_pages/cugraph_ops.md)
Expand Down Expand Up @@ -116,6 +127,7 @@ df_page.sort_values('pagerank', ascending=False).head(10)
* ArangoDB - a free and open-source native multi-model database system - https://www.arangodb.com/
* CuPy - "NumPy/SciPy-compatible Array Library for GPU-accelerated Computing with Python" - https://cupy.dev/
* Memgraph - In-memory Graph database - https://memgraph.com/
* NetworkX (via [nx-cugraph](./python/nx-cugraph/README.md) backend) - an extremely popular, free and open-source package for the creation, manipulation, and study of the structure, dynamics, and functions of complex networks - https://networkx.org/
* PyGraphistry - free and open-source GPU graph ETL, AI, and visualization, including native RAPIDS & cuGraph support - http://github.com/graphistry/pygraphistry
* ScanPy - a scalable toolkit for analyzing single-cell gene expression data - https://scanpy.readthedocs.io/en/stable/

Expand Down
152 changes: 152 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

os.environ["LIBCUDF_CUFILE_POLICY"] = "KVIKIO"
os.environ["KVIKIO_NTHREADS"] = "64"
os.environ["RAPIDS_NO_INITIALIZE"] = "1"
import json
import pandas as pd
import os
import time
from rmm.allocators.torch import rmm_torch_allocator
import rmm
import torch
from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset
from model import run_1_epoch
from argparse import ArgumentParser
from load_graph_feats import load_node_labels, load_node_features


def create_dataloader(sampled_dir, total_num_nodes, sparse_format, return_type):
print("Creating dataloader", flush=True)
st = time.time()
dataset = HomogenousBulkSamplerDataset(
total_num_nodes,
edge_dir="in",
sparse_format=sparse_format,
return_type=return_type,
)

dataset.set_input_files(sampled_dir)
dataloader = torch.utils.data.DataLoader(
dataset, collate_fn=lambda x: x, shuffle=False, num_workers=0, batch_size=None
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True)
return dataloader


def setup_common_pool():
rmm.reinitialize(initial_pool_size=5e9, pool_allocator=True)
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def main(args):
print(
f"Running cugraph-dgl dataloading benchmark with the following parameters:\n"
f"Dataset path = {args.dataset_path}\n"
f"Sampling path = {args.sampling_path}\n"
)
with open(os.path.join(args.dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

sampled_dirs = [
os.path.join(args.sampling_path, f) for f in os.listdir(args.sampling_path)
]

time_ls = []
for sampled_dir in sampled_dirs:
with open(os.path.join(sampled_dir, "output_meta.json"), "r") as f:
sampled_meta_d = json.load(f)

replication_factor = sampled_meta_d["replication_factor"]
feat_load_st = time.time()
label_data = load_node_labels(
args.dataset_path, replication_factor, input_meta
)["paper"]["y"]
feat_data = feat_data = load_node_features(
args.dataset_path, replication_factor, node_type="paper"
)
print(
f"Feature and label data loading took = {time.time()-feat_load_st}",
flush=True,
)

r_time_ls = e2e_benchmark(sampled_dir, feat_data, label_data, sampled_meta_d)
[x.update({"replication_factor": replication_factor}) for x in r_time_ls]
[x.update({"num_edges": sampled_meta_d["total_num_edges"]}) for x in r_time_ls]
time_ls.extend(r_time_ls)

print(
f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}",
flush=True,
)

df = pd.DataFrame(time_ls)
df.to_csv("cugraph_dgl_e2e_benchmark.csv", index=False)
print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True)


def e2e_benchmark(
sampled_dir: str, feat: torch.Tensor, y: torch.Tensor, sampled_meta_d: dict
):
"""
Run the e2e_benchmark
Args:
sampled_dir: directory containing the sampled graph
feat: node features
y: node labels
sampled_meta_d: dictionary containing the sampled graph metadata
"""
time_ls = []

# TODO: Make this a parameter in bulk sampling script
sampled_meta_d["sparse_format"] = "csc"
sampled_dir = os.path.join(sampled_dir, "samples")
dataloader = create_dataloader(
sampled_dir,
sampled_meta_d["total_num_nodes"],
sampled_meta_d["sparse_format"],
return_type="cugraph_dgl.nn.SparseGraph",
)
time_d = run_1_epoch(
dataloader,
feat,
y,
fanout=sampled_meta_d["fanout"],
batch_size=sampled_meta_d["batch_size"],
model_backend="cugraph_dgl",
)
time_ls.append(time_d)
print("=" * 30)
return time_ls


def parse_arguments():
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M/"
)
parser.add_argument(
"--sampling_path",
type=str,
default="/raid/vjawa/nov_1_bulksampling_benchmarks/",
)
return parser.parse_args()


if __name__ == "__main__":
setup_common_pool()
arguments = parse_arguments()
main(arguments)
17 changes: 11 additions & 6 deletions benchmarks/cugraph-dgl/scale-benchmarks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def create_model(feat_size, num_classes, num_layers, model_backend="dgl"):


def train_model(model, dataloader, opt, feat, y):
times = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
times_d = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
epoch_st = time.time()
mfg_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
times["mfg_creation"] += time.time() - mfg_st
times_d["mfg_creation"] += time.time() - mfg_st
if feat is not None:
fst = time.time()
input_nodes = input_nodes.to("cpu")
Expand All @@ -71,23 +71,24 @@ def train_model(model, dataloader, opt, feat, y):
output_nodes = output_nodes["paper"]
output_nodes = output_nodes.to(y.device)
y_batch = y[output_nodes].to("cuda")
times["feature"] += time.time() - fst
times_d["feature"] += time.time() - fst

m_fwd_st = time.time()
y_hat = model(blocks, input_feat)
times["m_fwd"] += time.time() - m_fwd_st
times_d["m_fwd"] += time.time() - m_fwd_st

m_bkwd_st = time.time()
loss = F.cross_entropy(y_hat, y_batch)
opt.zero_grad()
loss.backward()
opt.step()
times["m_bkwd"] += time.time() - m_bkwd_st
times_d["m_bkwd"] += time.time() - m_bkwd_st
mfg_st = time.time()

print(f"Epoch time = {time.time() - epoch_st:.2f} seconds")
print(f"Time to create MFG = {times_d['mfg_creation']:.2f} seconds")

return times
return times_d


def analyze_time(dataloader, times, epoch_time, fanout, batch_size):
Expand Down Expand Up @@ -119,6 +120,10 @@ def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend):
else:
model = None
opt = None

# Warmup RUN
times = train_model(model, dataloader, opt, feat, y)

epoch_st = time.time()
times = train_model(model, dataloader, opt, feat, y)
epoch_time = time.time() - epoch_st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_allocation_counts_dask_lazy,
sizeof_fmt,
get_peak_output_ratio_across_workers,
restart_client,
start_dask_client,
stop_dask_client,
enable_spilling,
Expand Down Expand Up @@ -187,10 +186,10 @@ def sample_graph(
output_path,
seed=42,
batch_size=500,
seeds_per_call=200000,
seeds_per_call=400000,
batches_per_partition=100,
fanout=[5, 5, 5],
persist=False,
sampling_kwargs={},
):
cupy.random.seed(seed)

Expand All @@ -204,6 +203,7 @@ def sample_graph(
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
log_level=logging.INFO,
**sampling_kwargs,
)

n_workers = len(default_client().scheduler_info()["workers"])
Expand Down Expand Up @@ -469,6 +469,7 @@ def benchmark_cugraph_bulk_sampling(
batch_size,
seeds_per_call,
fanout,
sampling_target_framework,
reverse_edges=True,
dataset_dir=".",
replication_factor=1,
Expand Down Expand Up @@ -564,17 +565,39 @@ def benchmark_cugraph_bulk_sampling(
output_sample_path = os.path.join(output_subdir, "samples")
os.makedirs(output_sample_path)

batches_per_partition = 200_000 // batch_size
if sampling_target_framework == "cugraph_dgl_csr":
sampling_kwargs = {
"deduplicate_sources": True,
"prior_sources_behavior": "carryover",
"renumber": True,
"compression": "CSR",
"compress_per_hop": True,
"use_legacy_names": False,
"include_hop_column": False,
}
else:
# FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.02)
sampling_kwargs = {
"deduplicate_sources": True,
"prior_sources_behavior": "exclude",
"renumber": True,
"compression": "COO",
"compress_per_hop": False,
"use_legacy_names": False,
"include_hop_column": True,
}

batches_per_partition = 400_000 // batch_size
execution_time, allocation_counts = sample_graph(
G,
dask_label_df,
output_sample_path,
G=G,
label_df=dask_label_df,
output_path=output_sample_path,
seed=seed,
batch_size=batch_size,
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
fanout=fanout,
persist=persist,
sampling_kwargs=sampling_kwargs,
)

output_meta = {
Expand Down Expand Up @@ -701,7 +724,13 @@ def get_args():
required=False,
default=False,
)

parser.add_argument(
"--sampling_target_framework",
type=str,
help="The target framework for sampling (i.e. cugraph_dgl_csr, cugraph_pyg_csc, ...)",
required=False,
default=None,
)
parser.add_argument(
"--dask_worker_devices",
type=str,
Expand Down Expand Up @@ -738,6 +767,12 @@ def get_args():
logging.basicConfig()

args = get_args()
if args.sampling_target_framework not in ["cugraph_dgl_csr", None]:
raise ValueError(
"sampling_target_framework must be one of cugraph_dgl_csr or None",
"Other frameworks are not supported at this time.",
)

fanouts = [
[int(f) for f in fanout.split("_")] for fanout in args.fanouts.split(",")
]
Expand Down Expand Up @@ -785,6 +820,7 @@ def get_args():
batch_size=batch_size,
seeds_per_call=seeds_per_call,
fanout=fanout,
sampling_target_framework=args.sampling_target_framework,
dataset_dir=args.dataset_root,
reverse_edges=args.reverse_edges,
replication_factor=replication_factor,
Expand All @@ -809,7 +845,6 @@ def get_args():
warnings.warn("An Exception Occurred!")
print(e)
traceback.print_exc()
restart_client(client)
sleep(10)

stats_df = pd.DataFrame(
Expand Down
10 changes: 9 additions & 1 deletion cpp/src/centrality/eigenvector_centrality_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ rmm::device_uvector<weight_t> eigenvector_centrality(
centralities.end(),
old_centralities.data());

update_edge_src_property(handle, pull_graph_view, centralities.begin(), edge_src_centralities);
update_edge_src_property(
handle, pull_graph_view, old_centralities.begin(), edge_src_centralities);

if (edge_weight_view) {
per_v_transform_reduce_incoming_e(
Expand All @@ -122,6 +123,13 @@ rmm::device_uvector<weight_t> eigenvector_centrality(
centralities.begin());
}

thrust::transform(handle.get_thrust_policy(),
centralities.begin(),
centralities.end(),
old_centralities.begin(),
centralities.begin(),
thrust::plus<weight_t>());

// Normalize the centralities
auto hypotenuse = sqrt(transform_reduce_v(
handle,
Expand Down
Loading

0 comments on commit 9b7901c

Please sign in to comment.