Skip to content

Commit

Permalink
Merge branch 'branch-23.10' into nx_plugin_info_entrypoint
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw committed Sep 27, 2023
2 parents f045d6e + 5d2f548 commit 67e3cc6
Show file tree
Hide file tree
Showing 147 changed files with 10,114 additions and 4,648 deletions.
22 changes: 21 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
arch: "amd64"
branch: ${{ inputs.branch }}
build_type: ${{ inputs.build_type || 'branch' }}
container_image: "rapidsai/ci:cuda11.8.0-ubuntu22.04-py3.10"
container_image: "rapidsai/ci-conda:cuda11.8.0-ubuntu22.04-py3.10"
date: ${{ inputs.date }}
node_type: "gpu-v100-latest-1"
run_script: "ci/build_docs.sh"
Expand Down Expand Up @@ -112,3 +112,23 @@ jobs:
sha: ${{ inputs.sha }}
date: ${{ inputs.date }}
package-name: cugraph
wheel-build-nx-cugraph:
needs: wheel-publish-pylibcugraph
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
sha: ${{ inputs.sha }}
date: ${{ inputs.date }}
script: ci/build_wheel_nx-cugraph.sh
wheel-publish-nx-cugraph:
needs: wheel-build-nx-cugraph
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
with:
build_type: ${{ inputs.build_type || 'branch' }}
branch: ${{ inputs.branch }}
sha: ${{ inputs.sha }}
date: ${{ inputs.date }}
package-name: nx-cugraph
20 changes: 18 additions & 2 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ jobs:
- wheel-tests-pylibcugraph
- wheel-build-cugraph
- wheel-tests-cugraph
- wheel-build-nx-cugraph
- wheel-tests-nx-cugraph
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
checks:
Expand Down Expand Up @@ -63,7 +65,7 @@ jobs:
build_type: pull-request
node_type: "gpu-v100-latest-1"
arch: "amd64"
container_image: "rapidsai/ci:cuda11.8.0-ubuntu22.04-py3.10"
container_image: "rapidsai/ci-conda:cuda11.8.0-ubuntu22.04-py3.10"
run_script: "ci/test_notebooks.sh"
docs-build:
needs: conda-python-build
Expand All @@ -73,7 +75,7 @@ jobs:
build_type: pull-request
node_type: "gpu-v100-latest-1"
arch: "amd64"
container_image: "rapidsai/ci:cuda11.8.0-ubuntu22.04-py3.10"
container_image: "rapidsai/ci-conda:cuda11.8.0-ubuntu22.04-py3.10"
run_script: "ci/build_docs.sh"
wheel-build-pylibcugraph:
needs: checks
Expand Down Expand Up @@ -109,3 +111,17 @@ jobs:
with:
build_type: pull-request
script: ci/test_wheel_cugraph.sh
wheel-build-nx-cugraph:
needs: wheel-tests-pylibcugraph
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
with:
build_type: pull-request
script: ci/build_wheel_nx-cugraph.sh
wheel-tests-nx-cugraph:
needs: wheel-build-nx-cugraph
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
with:
build_type: pull-request
script: ci/test_wheel_nx-cugraph.sh
9 changes: 9 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,12 @@ jobs:
date: ${{ inputs.date }}
sha: ${{ inputs.sha }}
script: ci/test_wheel_cugraph.sh
wheel-tests-nx-cugraph:
secrets: inherit
uses: rapidsai/shared-action-workflows/.github/workflows/[email protected]
with:
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
sha: ${{ inputs.sha }}
script: ci/test_wheel_nx-cugraph.sh
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repos:
additional_dependencies:
- flake8==6.0.0
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v16.0.1
rev: v16.0.6
hooks:
- id: clang-format
exclude: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import dgl
import torch
import pandas as pd
import os
import time
import json
import random
import numpy as np
from argparse import ArgumentParser


def load_edges_from_disk(parquet_path, replication_factor, input_meta):
"""
Load the edges from disk into a graph data dictionary.
Args:
parquet_path: Path to the parquet directory.
replication_factor: Number of times to replicate the edges.
input_meta: Input meta data.
Returns:
dict: Dictionary of edge types to a tuple of (src, dst)
"""
graph_data = {}
for edge_type in input_meta["num_edges"].keys():
print(
f"Loading edge index for edge type {edge_type}"
f"for replication factor = {replication_factor}"
)
can_edge_type = tuple(edge_type.split("__"))
# TODO: Rename `edge_index` to a better name
ei = pd.read_parquet(
os.path.join(parquet_path, edge_type, "edge_index.parquet")
)
ei = {
"src": torch.from_numpy(ei.src.values),
"dst": torch.from_numpy(ei.dst.values),
}
if replication_factor > 1:
src_ls = [ei["src"]]
dst_ls = [ei["dst"]]
for r in range(1, replication_factor):
new_src = ei["src"] + (
r * input_meta["num_nodes"][can_edge_type[0]]
)
src_ls.append(new_src)
new_dst = ei["dst"] + (
r * input_meta["num_nodes"][can_edge_type[2]]
)
dst_ls.append(new_dst)

ei["src"] = torch.cat(src_ls).contiguous()
ei["dst"] = torch.cat(dst_ls).contiguous()
graph_data[can_edge_type] = ei["src"], ei["dst"]
print("Graph Data compiled")
return graph_data


def load_node_labels(dataset_path, replication_factor, input_meta):
num_nodes_dict = {
node_type: t * replication_factor
for node_type, t in input_meta["num_nodes"].items()
}
node_data = {}
for node_type in input_meta["num_nodes"].keys():
node_data[node_type] = {}
label_path = os.path.join(
dataset_path, "parquet", node_type, "node_label.parquet"
)
if os.path.exists(label_path):
node_label = pd.read_parquet(label_path)
if replication_factor > 1:
base_num_nodes = input_meta["num_nodes"][node_type]
dfr = pd.DataFrame(
{
"node": pd.concat(
[
node_label.node + (r * base_num_nodes)
for r in range(1, replication_factor)
]
),
"label": pd.concat(
[
node_label.label
for r in range(1, replication_factor)
]
),
}
)
node_label = pd.concat([node_label, dfr]).reset_index(
drop=True
)

node_label_tensor = torch.full(
(num_nodes_dict[node_type],), -1, dtype=torch.float32
)
node_label_tensor[
torch.as_tensor(node_label.node.values)
] = torch.as_tensor(node_label.label.values)

del node_label
node_data[node_type]["train_idx"] = (
(node_label_tensor > -1).contiguous().nonzero().view(-1)
)
node_data[node_type]["y"] = node_label_tensor.contiguous()
else:
node_data[node_type]["num_nodes"] = num_nodes_dict[node_type]
return node_data


def create_dgl_graph_from_disk(dataset_path, replication_factor=1):
"""
Create a DGL graph from a dataset on disk.
Args:
dataset_path: Path to the dataset on disk.
replication_factor: Number of times to replicate the edges.
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""
with open(os.path.join(dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

parquet_path = os.path.join(dataset_path, "parquet")
graph_data = load_edges_from_disk(
parquet_path, replication_factor, input_meta
)
node_data = load_node_labels(dataset_path, replication_factor, input_meta)
g = dgl.heterograph(graph_data)

return g, node_data


def create_dataloader(g, train_idx, batch_size, fanouts, use_uva):
"""
Create a DGL dataloader from a DGL graph.
Args:
g: DGLGraph to create the dataloader from.
train_idx: Tensor containing the training indices.
batch_size: Batch size to use for the dataloader.
fanouts: List of fanouts to use for the dataloader.
use_uva: Whether to use unified virtual address space.
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""

print("Creating dataloader", flush=True)
st = time.time()
if use_uva:
train_idx = {k: v.to("cuda") for k, v in train_idx.items()}
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts=fanouts)
dataloader = dgl.dataloading.DataLoader(
g,
train_idx,
sampler,
num_workers=0,
batch_size=batch_size,
use_uva=use_uva,
shuffle=False,
drop_last=False,
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds")
return dataloader


def dataloading_benchmark(g, train_idx, fanouts, batch_sizes, use_uva):
"""
Run the dataloading benchmark.
Args:
g: DGLGraph
train_idx: Tensor containing the training indices.
fanouts: List of fanouts to use for the dataloader.
batch_sizes: List of batch sizes to use for the dataloader.
use_uva: Whether to use unified virtual address space.
"""
time_ls = []
for fanout in fanouts:
for batch_size in batch_sizes:
dataloader = create_dataloader(
g,
train_idx,
batch_size=batch_size,
fanouts=fanout,
use_uva=use_uva,
)
dataloading_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
pass
dataloading_et = time.time()
dataloading_time = dataloading_et - dataloading_st
time_d = {
"fanout": fanout,
"batch_size": batch_size,
"dataloading_time_per_epoch": dataloading_time,
"dataloading_time_per_batch": dataloading_time / len(dataloader),
"num_edges": g.num_edges(),
"num_batches": len(dataloader),
}
time_ls.append(time_d)

print("Dataloading completed")
print(f"Fanout = {fanout}, batch_size = {batch_size}")
print(
f"Time taken {dataloading_time:.2f} ",
f"seconds for num batches {len(dataloader)}",
flush=True,
)
print("==============================================")
return time_ls

def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/datasets/abarghi/ogbn_papers100M"
)
parser.add_argument("--replication_factors", type=str, default="1,2,4,8")
parser.add_argument(
"--fanouts", type=str, default="25_25,10_10_10,5_10_20"
)
parser.add_argument("--batch_sizes", type=str, default="512,1024")
parser.add_argument("--do_not_use_uva", action="store_true")
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()

if args.do_not_use_uva:
use_uva = False
else:
use_uva = True
set_seed(args.seed)
replication_factors = [int(x) for x in args.replication_factors.split(",")]
fanouts = [[int(y) for y in x.split("_")] for x in args.fanouts.split(",")]
batch_sizes = [int(x) for x in args.batch_sizes.split(",")]

print("Running dgl dataloading benchmark with the following parameters:")
print(f"Dataset path = {args.dataset_path}")
print(f"Replication factors = {replication_factors}")
print(f"Fanouts = {fanouts}")
print(f"Batch sizes = {batch_sizes}")
print(f"Use UVA = {use_uva}")
print("==============================================")

time_ls = []
for replication_factor in replication_factors:
st = time.time()
g, node_data = create_dgl_graph_from_disk(
dataset_path=args.dataset_path,
replication_factor=replication_factor,
)
et = time.time()
print(f"Replication factor = {replication_factor}")
print(
f"G has {g.num_edges()} edges and took",
f" {et - st:.2f} seconds to load"
)
train_idx = {"paper": node_data["paper"]["train_idx"]}
r_time_ls = dataloading_benchmark(
g, train_idx, fanouts, batch_sizes, use_uva=use_uva
)
print(
"Benchmark completed for replication factor = ", replication_factor
)
print("==============================================")
# Add replication factor to the time list
[
x.update({"replication_factor": replication_factor})
for x in r_time_ls
]
time_ls.extend(r_time_ls)

df = pd.DataFrame(time_ls)
df.to_csv("dgl_dataloading_benchmark.csv", index=False)
print("Benchmark completed for all replication factors")
print("==============================================")
Empty file.
Loading

0 comments on commit 67e3cc6

Please sign in to comment.