From d15a4d491614efc4f9b29709361a697923268535 Mon Sep 17 00:00:00 2001 From: Alex Barghi Date: Thu, 14 Dec 2023 10:55:27 -0800 Subject: [PATCH] update --- benchmarks/cugraph-pyg/bench_cugraph_pyg.py | 230 ------- benchmarks/cugraph-pyg/datasets/__init__.py | 15 - benchmarks/cugraph-pyg/datasets/dataset.py | 55 -- .../cugraph-pyg/datasets/ogbn_papers100M.py | 236 ------- benchmarks/cugraph-pyg/models_cugraph.py | 74 -- benchmarks/cugraph-pyg/models_native.py | 54 -- benchmarks/cugraph-pyg/trainer.py | 258 ------- benchmarks/cugraph-pyg/trainers_cugraph.py | 164 ----- benchmarks/cugraph-pyg/trainers_native.py | 192 ------ .../standalone/bulk_sampling/.gitignore | 1 + .../bulk_sampling/bench_cugraph_pyg.py | 222 ------ .../bulk_sampling/bench_cugraph_training.py | 14 +- .../bulk_sampling/datasets/__init__.py | 15 + .../cugraph/standalone/bulk_sampling/mg_utils | 1 - .../bulk_sampling/models_cugraph.py | 84 --- .../standalone/bulk_sampling/models_native.py | 62 -- .../standalone/bulk_sampling/run_sampling.sh | 4 +- .../standalone/bulk_sampling/run_train_job.sh | 16 +- .../bulk_sampling/slurm-1581196.out | 635 ++++++++++++++++++ .../bulk_sampling/trainers_cugraph.py | 142 ---- .../bulk_sampling/trainers_native.py | 167 ----- 21 files changed, 674 insertions(+), 1967 deletions(-) delete mode 100644 benchmarks/cugraph-pyg/bench_cugraph_pyg.py delete mode 100644 benchmarks/cugraph-pyg/datasets/__init__.py delete mode 100644 benchmarks/cugraph-pyg/datasets/dataset.py delete mode 100644 benchmarks/cugraph-pyg/datasets/ogbn_papers100M.py delete mode 100644 benchmarks/cugraph-pyg/models_cugraph.py delete mode 100644 benchmarks/cugraph-pyg/models_native.py delete mode 100644 benchmarks/cugraph-pyg/trainer.py delete mode 100644 benchmarks/cugraph-pyg/trainers_cugraph.py delete mode 100644 benchmarks/cugraph-pyg/trainers_native.py create mode 100644 benchmarks/cugraph/standalone/bulk_sampling/.gitignore delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_pyg.py delete mode 120000 benchmarks/cugraph/standalone/bulk_sampling/mg_utils delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/models_cugraph.py delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/models_native.py create mode 100644 benchmarks/cugraph/standalone/bulk_sampling/slurm-1581196.out delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/trainers_cugraph.py delete mode 100644 benchmarks/cugraph/standalone/bulk_sampling/trainers_native.py diff --git a/benchmarks/cugraph-pyg/bench_cugraph_pyg.py b/benchmarks/cugraph-pyg/bench_cugraph_pyg.py deleted file mode 100644 index 7df1a545f16..00000000000 --- a/benchmarks/cugraph-pyg/bench_cugraph_pyg.py +++ /dev/null @@ -1,230 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import json -import argparse -import os -import json -import warnings - -import torch -import numpy as np -import pandas - -import torch.distributed as dist - -from datasets import OGBNPapers100MDataset - -from cugraph.testing.mg_utils import enable_spilling - - -def init_pytorch_worker(rank: int, use_rmm_torch_allocator: bool = False) -> None: - import cupy - import rmm - from pynvml.smi import nvidia_smi - - smi = nvidia_smi.getInstance() - pool_size = 16e9 # FIXME calculate this - - rmm.reinitialize( - devices=[rank], - pool_allocator=True, - initial_pool_size=pool_size, - ) - - if use_rmm_torch_allocator: - warnings.warn( - "Using the rmm pytorch allocator is currently unsupported." - " The default allocator will be used instead." - ) - # FIXME somehow get the pytorch allocator to work - # from rmm.allocators.torch import rmm_torch_allocator - # torch.cuda.memory.change_current_allocator(rmm_torch_allocator) - - from rmm.allocators.cupy import rmm_cupy_allocator - - cupy.cuda.set_allocator(rmm_cupy_allocator) - - cupy.cuda.Device(rank).use() - torch.cuda.set_device(rank) - - # Pytorch training worker initialization - torch.distributed.init_process_group(backend="nccl") - - -def parse_args(): - parser = argparse.ArgumentParser() - - parser.add_argument( - "--num_epochs", - type=int, - default=1, - help="Number of training epochs", - required=False, - ) - - parser.add_argument( - "--batch_size", - type=int, - default=512, - help="Batch size (required for Native run)", - required=False, - ) - - parser.add_argument( - "--fanout", - type=str, - default="10_10_10", - help="Fanout (required for Native run)", - required=False, - ) - - parser.add_argument( - "--sample_dir", - type=str, - help="Directory with stored bulk samples (required for cuGraph run)", - required=False, - ) - - parser.add_argument( - "--output_file", - type=str, - help="File to store results", - required=True, - ) - - parser.add_argument( - "--framework", - type=str, - help="The framework to test (PyG, cuGraphPyG)", - required=True, - ) - - parser.add_argument( - "--model", - type=str, - default="GraphSAGE", - help="The model to use (currently only GraphSAGE supported)", - required=False, - ) - - parser.add_argument( - "--replication_factor", - type=int, - default=1, - help="The replication factor for the dataset", - required=False, - ) - - parser.add_argument( - "--dataset_dir", - type=str, - help="The directory where datasets are stored", - required=True, - ) - - parser.add_argument( - "--train_split", - type=float, - help="The percentage of the labeled data to use for training. The remainder is used for testing/validation.", - default=0.8, - required=False, - ) - - parser.add_argument( - "--val_split", - type=float, - help="The percentage of the testing/validation data to allocate for validation.", - default=0.5, - required=False, - ) - - return parser.parse_args() - - -def main(args): - import logging - - logging.basicConfig( - level=logging.INFO, - ) - logger = logging.getLogger("bench_cugraph_pyg") - logger.setLevel(logging.INFO) - - local_rank = int(os.environ["LOCAL_RANK"]) - global_rank = int(os.environ["RANK"]) - - init_pytorch_worker( - local_rank, use_rmm_torch_allocator=(args.framework == "cuGraph") - ) - enable_spilling() - print(f"worker initialized") - dist.barrier() - - # Have to import here to avoid creating CUDA context - from trainers_cugraph import PyGCuGraphTrainer - from trainers_native import PyGNativeTrainer - - world_size = int(os.environ["SLURM_JOB_NUM_NODES"]) * int( - os.environ["SLURM_GPUS_PER_NODE"] - ) - - dataset = OGBNPapers100MDataset( - replication_factor=args.replication_factor, - dataset_dir=args.dataset_dir, - train_split=args.train_split, - val_split=args.val_split, - load_edge_index=(args.framework == "Native"), - ) - - if args.framework == "PyG": - trainer = PyGNativeTrainer( - model=args.model, - dataset=dataset, - device=local_rank, - rank=global_rank, - world_size=world_size, - num_epochs=args.num_epochs, - shuffle=True, - replace=False, - num_neighbors=[int(f) for f in args.fanout.split("_")], - batch_size=args.batch_size, - ) - elif args.framework == "cuGraphPyG": - trainer = PyGCuGraphTrainer( - model=args.model, - dataset=dataset, - sample_dir=args.sample_dir, - device=local_rank, - rank=global_rank, - world_size=world_size, - num_epochs=args.num_epochs, - shuffle=True, - replace=False, - num_neighbors=[int(f) for f in args.fanout.split("_")], - batch_size=args.batch_size, - ) - else: - raise ValueError("unsuported framework") - - stats = trainer.train() - logger.info(stats) - - with open(f"{args.output_file}[{global_rank}]", "w") as f: - json.dump(stats, f) - - -if __name__ == "__main__": - args = parse_args() - main(args) diff --git a/benchmarks/cugraph-pyg/datasets/__init__.py b/benchmarks/cugraph-pyg/datasets/__init__.py deleted file mode 100644 index 526b16f703a..00000000000 --- a/benchmarks/cugraph-pyg/datasets/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .dataset import Dataset -from .ogbn_papers100M import OGBNPapers100MDataset diff --git a/benchmarks/cugraph-pyg/datasets/dataset.py b/benchmarks/cugraph-pyg/datasets/dataset.py deleted file mode 100644 index 16f5ae2d6af..00000000000 --- a/benchmarks/cugraph-pyg/datasets/dataset.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch -from typing import Dict, Tuple - - -class Dataset: - @property - def edge_index_dict(self) -> Dict[Tuple[str, str, str], Dict[str, torch.Tensor]]: - raise NotImplementedError() - - @property - def x_dict(self) -> Dict[str, torch.Tensor]: - raise NotImplementedError() - - @property - def y_dict(self) -> Dict[str, torch.Tensor]: - raise NotImplementedError() - - @property - def train_dict(self) -> Dict[str, torch.Tensor]: - raise NotImplementedError() - - @property - def test_dict(self) -> Dict[str, torch.Tensor]: - raise NotImplementedError() - - @property - def val_dict(self) -> Dict[str, torch.Tensor]: - raise NotImplementedError() - - @property - def num_input_features(self) -> int: - raise NotImplementedError() - - @property - def num_labels(self) -> int: - raise NotImplementedError() - - def num_nodes(self, node_type: str) -> int: - raise NotImplementedError() - - def num_edges(self, edge_type: Tuple[str, str, str]) -> int: - raise NotImplementedError() diff --git a/benchmarks/cugraph-pyg/datasets/ogbn_papers100M.py b/benchmarks/cugraph-pyg/datasets/ogbn_papers100M.py deleted file mode 100644 index 66b0b323360..00000000000 --- a/benchmarks/cugraph-pyg/datasets/ogbn_papers100M.py +++ /dev/null @@ -1,236 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .dataset import Dataset -from typing import Dict, Tuple, Union - -import pandas -import torch -import numpy as np - -from sklearn.model_selection import train_test_split - -import gc -import os - -# TODO automatically generate this dataset and splits -class OGBNPapers100MDataset(Dataset): - def __init__( - self, - *, - replication_factor=1, - dataset_dir=".", - train_split=0.8, - val_split=0.5, - load_edge_index=True, - ): - self.__replication_factor = replication_factor - self.__disk_x = None - self.__y = None - self.__edge_index = None - self.__dataset_dir = dataset_dir - self.__train_split = train_split - self.__val_split = val_split - self.__load_edge_index = load_edge_index - - @property - def edge_index_dict( - self, - ) -> Dict[Tuple[str, str, str], Union[Dict[str, torch.Tensor], int]]: - import logging - - logger = logging.getLogger("OGBNPapers100MDataset") - - if self.__edge_index is None: - if self.__load_edge_index: - npy_path = os.path.join( - self.__dataset_dir, - "ogbn_papers100M_copy", - "npy", - "paper__cites__paper", - "edge_index.npy", - ) - - logger.info(f"loading edge index from {npy_path}") - ei = np.load(npy_path, mmap_mode="r") - ei = torch.as_tensor(ei) - ei = { - "src": ei[1], - "dst": ei[0], - } - - logger.info("sorting edge index...") - ei["dst"], ix = torch.sort(ei["dst"]) - ei["src"] = ei["src"][ix] - del ix - gc.collect() - - logger.info("processing replications...") - orig_num_nodes = self.num_nodes("paper") // self.__replication_factor - if self.__replication_factor > 1: - orig_src = ei["src"].clone().detach() - orig_dst = ei["dst"].clone().detach() - for r in range(1, self.__replication_factor): - ei["src"] = torch.concat( - [ - ei["src"], - orig_src + int(r * orig_num_nodes), - ] - ) - - ei["dst"] = torch.concat( - [ - ei["dst"], - orig_dst + int(r * orig_num_nodes), - ] - ) - - del orig_src - del orig_dst - - ei["src"] = ei["src"].contiguous() - ei["dst"] = ei["dst"].contiguous() - gc.collect() - - logger.info(f"# edges: {len(ei['src'])}") - self.__edge_index = {("paper", "cites", "paper"): ei} - else: - self.__edge_index = { - ("paper", "cites", "paper"): self.num_edges( - ("paper", "cites", "paper") - ) - } - - return self.__edge_index - - @property - def x_dict(self) -> Dict[str, torch.Tensor]: - node_type_path = os.path.join( - self.__dataset_dir, "ogbn_papers100M", "npy", "paper" - ) - - if self.__disk_x is None: - if self.__replication_factor == 1: - full_path = os.path.join(node_type_path, "node_feat.npy") - else: - full_path = os.path.join( - node_type_path, f"node_feat_{self.__replication_factor}x.npy" - ) - - self.__disk_x = {"paper": np.load(full_path, mmap_mode="r")} - - return self.__disk_x - - @property - def y_dict(self) -> Dict[str, torch.Tensor]: - if self.__y is None: - self.__get_labels() - - return self.__y - - @property - def train_dict(self) -> Dict[str, torch.Tensor]: - if self.__train is None: - self.__get_labels() - return self.__train - - @property - def test_dict(self) -> Dict[str, torch.Tensor]: - if self.__test is None: - self.__get_labels() - return self.__test - - @property - def val_dict(self) -> Dict[str, torch.Tensor]: - if self.__val is None: - self.__get_labels() - return self.__val - - @property - def num_input_features(self) -> int: - return int(self.x_dict["paper"].shape[1]) - - @property - def num_labels(self) -> int: - return int(self.y_dict["paper"].max()) + 1 - - def num_nodes(self, node_type: str) -> int: - if node_type != "paper": - raise ValueError(f"Invalid node type {node_type}") - - return 111_059_956 * self.__replication_factor - - def num_edges(self, edge_type: Tuple[str, str, str]) -> int: - if edge_type != ("paper", "cites", "paper"): - raise ValueError(f"Invalid edge type {edge_type}") - - return 1_615_685_872 * self.__replication_factor - - def __get_labels(self): - label_path = os.path.join( - self.__dataset_dir, - "ogbn_papers100M", - "parquet", - "paper", - "node_label.parquet", - ) - - node_label = pandas.read_parquet(label_path) - - if self.__replication_factor > 1: - orig_num_nodes = self.num_nodes("paper") // self.__replication_factor - dfr = pandas.DataFrame( - { - "node": pandas.concat( - [ - node_label.node + (r * orig_num_nodes) - for r in range(1, self.__replication_factor) - ] - ), - "label": pandas.concat( - [node_label.label for r in range(1, self.__replication_factor)] - ), - } - ) - node_label = pandas.concat([node_label, dfr]).reset_index(drop=True) - - num_nodes = self.num_nodes("paper") - node_label_tensor = torch.full( - (num_nodes,), -1, dtype=torch.float32, device="cpu" - ) - node_label_tensor[ - torch.as_tensor(node_label.node.values, device="cpu") - ] = torch.as_tensor(node_label.label.values, device="cpu") - - self.__y = {"paper": node_label_tensor.contiguous()} - - train_ix, test_val_ix = train_test_split( - torch.as_tensor(node_label.node.values), - train_size=self.__train_split, - random_state=num_nodes, - ) - test_ix, val_ix = train_test_split( - test_val_ix, test_size=self.__val_split, random_state=num_nodes - ) - - train_tensor = torch.full((num_nodes,), 0, dtype=torch.bool, device="cpu") - train_tensor[train_ix] = 1 - self.__train = {"paper": train_tensor} - - test_tensor = torch.full((num_nodes,), 0, dtype=torch.bool, device="cpu") - test_tensor[test_ix] = 1 - self.__test = {"paper": test_tensor} - - val_tensor = torch.full((num_nodes,), 0, dtype=torch.bool, device="cpu") - val_tensor[val_ix] = 1 - self.__val = {"paper": val_tensor} diff --git a/benchmarks/cugraph-pyg/models_cugraph.py b/benchmarks/cugraph-pyg/models_cugraph.py deleted file mode 100644 index 22c8aead57f..00000000000 --- a/benchmarks/cugraph-pyg/models_cugraph.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch - -from cugraph_pyg.nn.conv import SAGEConv as CuGraphSAGEConv -from torch_geometric.utils.trim_to_layer import TrimToLayer - -import torch.nn as nn -import torch.nn.functional as F - - -def extend_tensor(t: torch.Tensor, l: int): - return torch.concat([t, torch.zeros(l - len(t), dtype=t.dtype, device=t.device)]) - - -class CuGraphSAGE(nn.Module): - def __init__(self, in_channels, hidden_channels, out_channels, num_layers): - super().__init__() - - self.convs = torch.nn.ModuleList() - self.convs.append(CuGraphSAGEConv(in_channels, hidden_channels, aggr="mean")) - for _ in range(num_layers - 2): - conv = CuGraphSAGEConv(hidden_channels, hidden_channels, aggr="mean") - self.convs.append(conv) - - self.convs.append(CuGraphSAGEConv(hidden_channels, out_channels, aggr="mean")) - - self._trim = TrimToLayer() - - def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): - if isinstance(edge, torch.Tensor): - edge = list( - CuGraphSAGEConv.to_csc( - edge.cuda(), (x.shape[0], num_sampled_nodes.sum()) - ) - ) - else: - edge = edge.csr() - edge = [edge[1], edge[0], x.shape[0]] - - x = x.cuda().to(torch.float32) - - for i, conv in enumerate(self.convs): - if i > 0: - new_num_edges = edge[1][-2] - edge[0] = edge[0].narrow( - dim=0, - start=0, - length=new_num_edges, - ) - edge[1] = edge[1].narrow( - dim=0, start=0, length=edge[1].size(0) - num_sampled_nodes[-i - 1] - ) - edge[2] = x.shape[0] - - x = conv(x, edge) - - x = F.relu(x) - x = F.dropout(x, p=0.5) - - x = x.narrow(dim=0, start=0, length=num_sampled_nodes[0]) - - return x diff --git a/benchmarks/cugraph-pyg/models_native.py b/benchmarks/cugraph-pyg/models_native.py deleted file mode 100644 index 78795f92464..00000000000 --- a/benchmarks/cugraph-pyg/models_native.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch - -from torch_geometric.nn import SAGEConv -from torch_geometric.utils.trim_to_layer import TrimToLayer - -import torch.nn as nn -import torch.nn.functional as F - - -class GraphSAGE(nn.Module): - def __init__(self, in_channels, hidden_channels, out_channels, num_layers): - super().__init__() - - self.convs = torch.nn.ModuleList() - self.convs.append(SAGEConv(in_channels, hidden_channels, aggr="mean")) - for _ in range(num_layers - 2): - conv = SAGEConv(hidden_channels, hidden_channels, aggr="mean") - self.convs.append(conv) - - self.convs.append(SAGEConv(hidden_channels, out_channels, aggr="mean")) - - self._trim = TrimToLayer() - - def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): - edge = edge.cuda() - x = x.cuda().to(torch.float32) - - for i, conv in enumerate(self.convs): - x, edge, _ = self._trim( - i, num_sampled_nodes, num_sampled_edges, x, edge, None - ) - - s = x.shape[0] - x = conv(x, edge, size=(s, s)) - x = F.relu(x) - x = F.dropout(x, p=0.5) - - x = x.narrow(dim=0, start=0, length=x.shape[0] - num_sampled_nodes[1]) - - # assert x.shape[0] == num_sampled_nodes[0] - return x diff --git a/benchmarks/cugraph-pyg/trainer.py b/benchmarks/cugraph-pyg/trainer.py deleted file mode 100644 index dc07d872f79..00000000000 --- a/benchmarks/cugraph-pyg/trainer.py +++ /dev/null @@ -1,258 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch -import torch.distributed as td -import torch.nn.functional as F - -import time - -from typing import Union, List - - -def extend_tensor(t: Union[List[int], torch.Tensor], l: int): - t = torch.as_tensor(t) - - return torch.concat([t, torch.zeros(l - len(t), dtype=t.dtype, device=t.device)]) - - -class Trainer: - @property - def rank(self): - raise NotImplementedError() - - @property - def model(self): - raise NotImplementedError() - - @property - def dataset(self): - raise NotImplementedError() - - @property - def data(self): - raise NotImplementedError() - - @property - def optimizer(self): - raise NotImplementedError() - - @property - def num_epochs(self) -> int: - raise NotImplementedError() - - def get_loader(self, epoch: int = 0, stage="train"): - raise NotImplementedError() - - def train(self): - raise NotImplementedError() - - -class PyGTrainer(Trainer): - def train(self): - import logging - - logger = logging.getLogger("PyGTrainer") - - total_loss = 0.0 - num_batches = 0 - - time_forward = 0.0 - time_backward = 0.0 - time_loader = 0.0 - time_feature_additional = 0.0 - start_time = time.perf_counter() - end_time_backward = start_time - - for epoch in range(self.num_epochs): - with td.algorithms.join.Join([self.model, self.optimizer]): - for iter_i, data in enumerate( - self.get_loader(epoch=epoch, stage="train") - ): - loader_time_iter = time.perf_counter() - end_time_backward - time_loader += loader_time_iter - - additional_feature_time_start = time.perf_counter() - - num_sampled_nodes = sum( - [torch.tensor(n) for n in data.num_sampled_nodes_dict.values()] - ) - num_sampled_edges = sum( - [torch.tensor(e) for e in data.num_sampled_edges_dict.values()] - ) - - # FIXME find a way to get around this and not have to call extend_tensor - num_layers = len(self.model.module.convs) - num_sampled_nodes = extend_tensor(num_sampled_nodes, num_layers + 1) - num_sampled_edges = extend_tensor(num_sampled_edges, num_layers) - - data = data.to_homogeneous().cuda() - additional_feature_time_end = time.perf_counter() - time_feature_additional += ( - additional_feature_time_end - additional_feature_time_start - ) - - num_batches += 1 - if iter_i % 20 == 1: - time_forward_iter = time_forward / num_batches - time_backward_iter = time_backward / num_batches - - total_time_iter = ( - time.perf_counter() - start_time - ) / num_batches - logger.info(f"epoch {epoch}, iteration {iter_i}") - logger.info(f"num sampled nodes: {num_sampled_nodes}") - logger.info(f"num sampled edges: {num_sampled_edges}") - logger.info(f"time forward: {time_forward_iter}") - logger.info(f"time backward: {time_backward_iter}") - logger.info(f"loader time: {loader_time_iter}") - logger.info(f"total time: {total_time_iter}") - - y_true = data.y - x = data.x.to(torch.float32) - - start_time_forward = time.perf_counter() - edge_index = data.edge_index if "edge_index" in data else data.adj_t - - y_pred = self.model( - x, - edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - - end_time_forward = time.perf_counter() - time_forward += end_time_forward - start_time_forward - - if y_pred.shape[0] > len(y_true): - raise ValueError( - f"illegal shape: {y_pred.shape}; {y_true.shape}" - ) - - y_true = y_true[: y_pred.shape[0]] - - y_true = F.one_hot( - y_true.to(torch.int64), num_classes=self.dataset.num_labels - ).to(torch.float32) - - if y_true.shape != y_pred.shape: - raise ValueError( - f"y_true shape was {y_true.shape} " - f"but y_pred shape was {y_pred.shape} " - f"in iteration {iter_i} " - f"on rank {y_pred.device.index}" - ) - - start_time_backward = time.perf_counter() - loss = F.cross_entropy(y_pred, y_true) - - self.optimizer.zero_grad() - loss.backward() - self.optimizer.step() - total_loss += loss.item() - end_time_backward = time.perf_counter() - time_backward += end_time_backward - start_time_backward - - end_time = time.perf_counter() - - # test - from torchmetrics import Accuracy - - acc = Accuracy( - task="multiclass", num_classes=self.dataset.num_labels - ).cuda() - - with td.algorithms.join.Join([self.model, self.optimizer]): - if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="test") - ): - num_sampled_nodes = sum( - [ - torch.tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) - print( - f"Accuracy: {acc_sum/(i) * 100.0:.4f}%", - ) - - td.barrier() - - with td.algorithms.join.Join([self.model, self.optimizer]): - if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="val") - ): - num_sampled_nodes = sum( - [ - torch.tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) - print( - f"Validation Accuracy: {acc_sum/(i) * 100.0:.4f}%", - ) - - stats = { - "Accuracy": (acc_sum / (i) * 100.0) if self.rank == 0 else 0.0, - "# Batches": num_batches, - "Loader Time": time_loader + time_feature_additional, - "Forward Time": time_forward, - "Backward Time": time_backward, - } - return stats diff --git a/benchmarks/cugraph-pyg/trainers_cugraph.py b/benchmarks/cugraph-pyg/trainers_cugraph.py deleted file mode 100644 index d63761a7817..00000000000 --- a/benchmarks/cugraph-pyg/trainers_cugraph.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from trainer import PyGTrainer -from models_cugraph import CuGraphSAGE - -import torch -import numpy as np - -from torch.distributed.optim import ZeroRedundancyOptimizer -from torch.nn.parallel import DistributedDataParallel as ddp - -from cugraph.gnn import FeatureStore -from cugraph_pyg.data import CuGraphStore -from cugraph_pyg.loader import BulkSampleLoader - -import gc -import os - - -class PyGCuGraphTrainer(PyGTrainer): - def __init__( - self, - dataset, - model="GraphSAGE", - device=0, - rank=0, - world_size=1, - num_epochs=1, - sample_dir=".", - **kwargs, - ): - self.__data = None - self.__device = device - self.__rank = rank - self.__world_size = world_size - self.__num_epochs = num_epochs - self.__dataset = dataset - self.__sample_dir = sample_dir - self.__loader_kwargs = kwargs - self.__model = self.get_model(model) - - @property - def rank(self): - return self.__rank - - @property - def model(self): - return self.__model - - @property - def dataset(self): - return self.__dataset - - @property - def optimizer(self): - return ZeroRedundancyOptimizer( - self.model.parameters(), torch.optim.Adam, lr=0.01 - ) - - @property - def num_epochs(self) -> int: - return self.__num_epochs - - def get_loader(self, epoch: int = 0, stage="train") -> int: - # TODO support online sampling - if stage == "val": - path = os.path.join(self.__sample_dir, "val", "samples") - else: - path = os.path.join(self.__sample_dir, f"epoch={epoch}", stage, "samples") - return BulkSampleLoader( - self.data, - self.data, - None, # FIXME get input nodes properly - directory=path, - input_files=self.get_input_files(path), - **self.__loader_kwargs, - ) - - @property - def data(self): - import logging - - logger = logging.getLogger("PyGCuGraphTrainer") - logger.info("getting data") - - if self.__data is None: - # FIXME wholegraph - fs = FeatureStore(backend="torch") - num_nodes_dict = {} - - for node_type, x in self.__dataset.x_dict.items(): - logger.debug(f"getting x for {node_type}") - fs.add_data(x, node_type, "x") - num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) - - for node_type, y in self.__dataset.y_dict.items(): - logger.debug(f"getting y for {node_type}") - fs.add_data(y, node_type, "y") - - for node_type, train in self.__dataset.train_dict.items(): - logger.debug(f"getting train for {node_type}") - fs.add_data(train, node_type, "train") - - for node_type, test in self.__dataset.test_dict.items(): - logger.debug(f"getting test for {node_type}") - fs.add_data(test, node_type, "test") - - for node_type, val in self.__dataset.val_dict.items(): - logger.debug(f"getting val for {node_type}") - fs.add_data(val, node_type, "val") - - # TODO support online sampling if the edge index is provided - num_edges_dict = self.__dataset.edge_index_dict - if not isinstance(list(num_edges_dict.values())[0], int): - num_edges_dict = {k: len(v) for k, v in num_edges_dict} - - self.__data = CuGraphStore( - fs, - num_edges_dict, - num_nodes_dict, - ) - - return self.__data - - def get_model(self, name="GraphSAGE"): - if name != "GraphSAGE": - raise ValueError("only GraphSAGE is currently supported") - - num_input_features = self.__dataset.num_input_features - num_output_features = self.__dataset.num_labels - num_layers = len(self.__loader_kwargs["num_neighbors"]) - - with torch.cuda.device(self.__device): - model = ( - CuGraphSAGE( - in_channels=num_input_features, - hidden_channels=64, - out_channels=num_output_features, - num_layers=num_layers, - ) - .to(torch.float32) - .to(self.__device) - ) - - model = ddp(model, device_ids=[self.__device]) - print("done creating model") - - return model - - def get_input_files(self, path): - file_list = np.array(os.listdir(path)) - - return np.array_split(file_list, self.__world_size)[self.__rank] diff --git a/benchmarks/cugraph-pyg/trainers_native.py b/benchmarks/cugraph-pyg/trainers_native.py deleted file mode 100644 index a92b6825fa0..00000000000 --- a/benchmarks/cugraph-pyg/trainers_native.py +++ /dev/null @@ -1,192 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from trainer import PyGTrainer -from datasets import OGBNPapers100MDataset -from models_native import GraphSAGE - -import torch -import numpy as np - -from torch.distributed.optim import ZeroRedundancyOptimizer -from torch.nn.parallel import DistributedDataParallel as ddp - -from torch_geometric.utils.sparse import index2ptr -from torch_geometric.data import HeteroData -from torch_geometric.loader import NeighborLoader - -import gc -import os - - -def pyg_num_workers(world_size): - num_workers = None - if hasattr(os, "sched_getaffinity"): - try: - num_workers = len(os.sched_getaffinity(0)) / (2 * world_size) - except Exception: - pass - if num_workers is None: - num_workers = os.cpu_count() / (2 * world_size) - return int(num_workers) - - -class PyGNativeTrainer(PyGTrainer): - def __init__( - self, - dataset, - model="GraphSAGE", - device=0, - rank=0, - world_size=1, - num_epochs=1, - **kwargs, - ): - self.__dataset = dataset - self.__device = device - self.__data = None - self.__rank = rank - self.__num_epochs = num_epochs - self.__world_size = world_size - self.__loader_kwargs = kwargs - self.__model = self.get_model(model) - - @property - def model(self): - return self.__model - - @property - def dataset(self): - return self.__dataset - - @property - def data(self): - import logging - - logger = logging.getLogger("PyGNativeTrainer") - logger.info("getting data") - - if self.__data is None: - self.__data = HeteroData() - - for node_type, x in self.__dataset.x_dict.items(): - logger.debug(f"getting x for {node_type}") - self.__data[node_type].x = x - self.__data[node_type]["num_nodes"] = self.__dataset.num_nodes( - node_type - ) - - for node_type, y in self.__dataset.y_dict.items(): - logger.debug(f"getting y for {node_type}") - self.__data[node_type]["y"] = y - - for node_type, train in self.__dataset.train_dict.items(): - logger.debug(f"getting train for {node_type}") - self.__data[node_type]["train"] = train - - for node_type, test in self.__dataset.test_dict.items(): - logger.debug(f"getting test for {node_type}") - self.__data[node_type]["test"] = test - - for node_type, val in self.__dataset.val_dict.items(): - logger.debug(f"getting val for {node_type}") - self.__data[node_type]["val"] = val - - for can_edge_type, ei in self.__dataset.edge_index_dict.items(): - logger.info("converting to csc...") - ei["dst"] = index2ptr( - ei["dst"], self.__dataset.num_nodes(can_edge_type[2]) - ) - - logger.info("updating data structure...") - self.__data.put_edge_index( - layout="csc", - edge_index=list(ei.values()), - edge_type=can_edge_type, - size=( - self.__dataset.num_nodes(can_edge_type[0]), - self.__dataset.num_nodes(can_edge_type[2]), - ), - is_sorted=True, - ) - gc.collect() - - return self.__data - - @property - def optimizer(self): - return ZeroRedundancyOptimizer( - self.model.parameters(), torch.optim.Adam, lr=0.01 - ) - - @property - def num_epochs(self) -> int: - return self.__num_epochs - - def get_loader(self, epoch: int): - import logging - - logger = logging.getLogger("PyGNativeTrainer") - logger.info(f"Getting loader for epoch {epoch}") - - input_nodes_dict = { - node_type: np.array_split( - np.arange(len(train_mask))[train_mask], self.__world_size - )[self.__rank] - for node_type, train_mask in self.__dataset.train_dict.items() - } - - input_nodes = list(input_nodes_dict.items()) - if len(input_nodes) > 1: - raise ValueError("Multiple input node types currently unsupported") - else: - input_nodes = tuple(input_nodes[0]) - - # get loader - loader = NeighborLoader( - self.data, - input_nodes=input_nodes, - is_sorted=True, - disjoint=False, - num_workers=pyg_num_workers(self.__world_size), # FIXME change this - persistent_workers=True, - **self.__loader_kwargs, # batch size, num neighbors, replace, shuffle, etc. - ) - - logger.info("done creating loader") - return loader - - def get_model(self, name="GraphSAGE"): - if name != "GraphSAGE": - raise ValueError("only GraphSAGE is currently supported") - - num_input_features = self.__dataset.num_input_features - num_output_features = self.__dataset.num_labels - num_layers = len(self.__loader_kwargs["num_neighbors"]) - - with torch.cuda.device(self.__device): - model = ( - GraphSAGE( - in_channels=num_input_features, - hidden_channels=64, - out_channels=num_output_features, - num_layers=num_layers, - ) - .to(torch.float32) - .to(self.__device) - ) - model = ddp(model, device_ids=[self.__device]) - print("done creating model") - - return model diff --git a/benchmarks/cugraph/standalone/bulk_sampling/.gitignore b/benchmarks/cugraph/standalone/bulk_sampling/.gitignore new file mode 100644 index 00000000000..19cbd00ebe0 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/.gitignore @@ -0,0 +1 @@ +mg_utils/ diff --git a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_pyg.py deleted file mode 100644 index 2d800bac58e..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_pyg.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import json -import argparse -import os -import json -import warnings - -import torch -import numpy as np -import pandas - -import torch.distributed as dist - -from datasets import OGBNPapers100MDataset - -from cugraph.testing.mg_utils import enable_spilling - -def init_pytorch_worker(rank: int, use_rmm_torch_allocator: bool=False) -> None: - import cupy - import rmm - from pynvml.smi import nvidia_smi - - smi = nvidia_smi.getInstance() - pool_size=16e9 # FIXME calculate this - - rmm.reinitialize( - devices=[rank], - pool_allocator=True, - initial_pool_size=pool_size, - ) - - if use_rmm_torch_allocator: - warnings.warn( - "Using the rmm pytorch allocator is currently unsupported." - " The default allocator will be used instead." - ) - # FIXME somehow get the pytorch allocator to work - #from rmm.allocators.torch import rmm_torch_allocator - #torch.cuda.memory.change_current_allocator(rmm_torch_allocator) - - from rmm.allocators.cupy import rmm_cupy_allocator - cupy.cuda.set_allocator(rmm_cupy_allocator) - - cupy.cuda.Device(rank).use() - torch.cuda.set_device(rank) - - # Pytorch training worker initialization - torch.distributed.init_process_group(backend="nccl") - -def parse_args(): - parser = argparse.ArgumentParser() - - parser.add_argument( - "--num_epochs", - type=int, - default=1, - help="Number of training epochs", - required=False, - ) - - parser.add_argument( - "--batch_size", - type=int, - default=512, - help="Batch size (required for Native run)", - required=False, - ) - - parser.add_argument( - "--fanout", - type=str, - default="10_10_10", - help="Fanout (required for Native run)", - required=False, - ) - - parser.add_argument( - "--sample_dir", - type=str, - help="Directory with stored bulk samples (required for cuGraph run)", - required=False, - ) - - parser.add_argument( - "--output_file", - type=str, - help="File to store results", - required=True, - ) - - parser.add_argument( - "--framework", - type=str, - help="The framework to test (cuGraph or Native)", - required=True, - ) - - parser.add_argument( - "--model", - type=str, - default="GraphSAGE", - help="The model to use (currently only GraphSAGE supported)", - required=False, - ) - - parser.add_argument( - '--replication_factor', - type=int, - default=1, - help="The replication factor for the dataset", - required=False, - ) - - parser.add_argument( - '--dataset_dir', - type=str, - help="The directory where datasets are stored", - required=True, - ) - - parser.add_argument( - '--train_split', - type=float, - help="The percentage of the labeled data to use for training. The remainder is used for testing/validation.", - default=0.8, - required=False, - ) - - parser.add_argument( - '--val_split', - type=float, - help="The percentage of the testing/validation data to allocate for validation.", - default=0.5, - required=False, - ) - - return parser.parse_args() - - -def main(args): - import logging - logging.basicConfig( - level=logging.INFO, - ) - logger = logging.getLogger('bench_cugraph_pyg') - logger.setLevel(logging.INFO) - - local_rank = int(os.environ['LOCAL_RANK']) - global_rank = int(os.environ["RANK"]) - - init_pytorch_worker(local_rank, use_rmm_torch_allocator=(args.framework=="cuGraph")) - enable_spilling() - print(f'worker initialized') - dist.barrier() - - # Have to import here to avoid creating CUDA context - from trainers_cugraph import PyGCuGraphTrainer - from trainers_native import PyGNativeTrainer - - world_size = int(os.environ['SLURM_JOB_NUM_NODES']) * int(os.environ['SLURM_GPUS_PER_NODE']) - - dataset = OGBNPapers100MDataset( - replication_factor=args.replication_factor, - dataset_dir=args.dataset_dir, - train_split=args.train_split, - val_split=args.val_split, - load_edge_index=(args.framework=="Native"), - ) - - if args.framework == "Native": - trainer = PyGNativeTrainer( - model=args.model, - dataset=dataset, - device=local_rank, - rank=global_rank, - world_size=world_size, - num_epochs=args.num_epochs, - shuffle=True, - replace=False, - num_neighbors=[int(f) for f in args.fanout.split('_')], - batch_size=args.batch_size, - ) - elif args.framework == "cuGraph": - trainer = PyGCuGraphTrainer( - model=args.model, - dataset=dataset, - sample_dir=args.sample_dir, - device=local_rank, - rank=global_rank, - world_size=world_size, - num_epochs=args.num_epochs, - shuffle=True, - replace=False, - num_neighbors=[int(f) for f in args.fanout.split('_')], - batch_size=args.batch_size, - ) - else: - raise ValueError("unsuported framework") - - stats = trainer.train() - logger.info(stats) - - with open(f'{args.output_file}[{global_rank}]', 'w') as f: - json.dump(stats, f) - -if __name__ == "__main__": - args = parse_args() - main(args) - diff --git a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py index fa69d096a10..0411f695c07 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py @@ -85,7 +85,7 @@ def parse_args(): parser.add_argument( "--fanout", type=str, - default="10, 10, 10", + default="10_10_10", help="Fanout", required=False, ) @@ -184,6 +184,8 @@ def main(args): load_edge_index=(args.framework == "Native"), ) + fanout = [int(f) for f in args.fanout.split("_")] + if args.framework == "PyG": from trainers_pyg import PyGNativeTrainer trainer = PyGNativeTrainer( @@ -195,10 +197,14 @@ def main(args): num_epochs=args.num_epochs, shuffle=True, replace=False, - num_neighbors=[int(f) for f in args.fanout.split(", ")], + num_neighbors=fanout, batch_size=args.batch_size, ) elif args.framework == "cuGraphPyG": + sample_dir = os.path.join( + args.sample_dir, + f'ogbn_papers100M[{args.replication_factor}]_b{args.batch_size}_f[{fanout}]' + ) from trainers_cugraph_pyg import PyGCuGraphTrainer trainer = PyGCuGraphTrainer( model=args.model, @@ -210,11 +216,11 @@ def main(args): num_epochs=args.num_epochs, shuffle=True, replace=False, - num_neighbors=[int(f) for f in args.fanout.split(", ")], + num_neighbors=fanout, batch_size=args.batch_size, ) else: - raise ValueError("unsuported framework") + raise ValueError("unsupported framework") stats = trainer.train() logger.info(stats) diff --git a/benchmarks/cugraph/standalone/bulk_sampling/datasets/__init__.py b/benchmarks/cugraph/standalone/bulk_sampling/datasets/__init__.py index e69de29bb2d..ceedd02bdf5 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/datasets/__init__.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/datasets/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .dataset import Dataset +from .ogbn_papers100M import OGBNPapers100MDataset \ No newline at end of file diff --git a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils b/benchmarks/cugraph/standalone/bulk_sampling/mg_utils deleted file mode 120000 index e41487f4a45..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/mg_utils +++ /dev/null @@ -1 +0,0 @@ -../../../../mg_utils \ No newline at end of file diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models_cugraph.py b/benchmarks/cugraph/standalone/bulk_sampling/models_cugraph.py deleted file mode 100644 index 1bbe5d55903..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/models_cugraph.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch - -from cugraph_pyg.nn.conv import SAGEConv as CuGraphSAGEConv -from torch_geometric.utils.trim_to_layer import TrimToLayer - -import torch.nn as nn -import torch.nn.functional as F - -def extend_tensor(t: torch.Tensor, l:int): - return torch.concat([ - t, - torch.zeros( - l - len(t), - dtype=t.dtype, - device=t.device - ) - ]) - -class CuGraphSAGE(nn.Module): - def __init__(self, in_channels, hidden_channels, out_channels, num_layers): - super().__init__() - - self.convs = torch.nn.ModuleList() - self.convs.append(CuGraphSAGEConv(in_channels, hidden_channels, aggr='mean')) - for _ in range(num_layers - 2): - conv = CuGraphSAGEConv(hidden_channels, hidden_channels, aggr='mean') - self.convs.append(conv) - - self.convs.append(CuGraphSAGEConv(hidden_channels, out_channels, aggr='mean')) - - self._trim = TrimToLayer() - - def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): - if isinstance(edge, torch.Tensor): - edge = list( - CuGraphSAGEConv.to_csc(edge.cuda(), (x.shape[0], num_sampled_nodes.sum())) - ) - else: - edge = edge.csr() - edge = [edge[1], edge[0], x.shape[0]] - - x = x.cuda().to(torch.float32) - - for i, conv in enumerate(self.convs): - if i > 0: - new_num_edges = edge[1][-2] - edge[0] = edge[0].narrow( - dim=0, - start=0, - length=new_num_edges, - ) - edge[1] = edge[1].narrow( - dim=0, - start=0, - length=edge[1].size(0) - num_sampled_nodes[-i-1] - ) - edge[2] = x.shape[0] - - x = conv(x, edge) - - x = F.relu(x) - x = F.dropout(x, p=0.5) - - x = x.narrow( - dim=0, - start=0, - length=num_sampled_nodes[0] - ) - - return x - diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models_native.py b/benchmarks/cugraph/standalone/bulk_sampling/models_native.py deleted file mode 100644 index 2674cbdc1b4..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/models_native.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch - -from torch_geometric.nn import SAGEConv -from torch_geometric.utils.trim_to_layer import TrimToLayer - -import torch.nn as nn -import torch.nn.functional as F - -class GraphSAGE(nn.Module): - def __init__(self, in_channels, hidden_channels, out_channels, num_layers): - super().__init__() - - self.convs = torch.nn.ModuleList() - self.convs.append(SAGEConv(in_channels, hidden_channels, aggr='mean')) - for _ in range(num_layers - 2): - conv = SAGEConv(hidden_channels, hidden_channels, aggr='mean') - self.convs.append(conv) - - self.convs.append(SAGEConv(hidden_channels, out_channels, aggr='mean')) - - self._trim = TrimToLayer() - - def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): - edge = edge.cuda() - x = x.cuda().to(torch.float32) - - for i, conv in enumerate(self.convs): - x, edge, _ = self._trim( - i, - num_sampled_nodes, - num_sampled_edges, - x, - edge, - None - ) - - s = x.shape[0] - x = conv(x, edge, size=(s, s)) - x = F.relu(x) - x = F.dropout(x, p=0.5) - - x = x.narrow( - dim=0, - start=0, - length=x.shape[0] - num_sampled_nodes[1] - ) - - # assert x.shape[0] == num_sampled_nodes[0] - return x diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh b/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh index f694c6d097b..d51316244b5 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh @@ -22,6 +22,8 @@ export RAPIDS_NO_INITIALIZE=1 export CUDF_SPILL=1 export LIBCUDF_CUFILE_POLICY="OFF" +PATCH_CUGRAPH=1 + export SCHEDULER_FILE=$SCHEDULER_FILE export LOGS_DIR=$LOGS_DIR @@ -45,7 +47,7 @@ else fi echo "properly waiting for workers to connect" -NUM_GPUS=${echo $(SLURM_JOB_NUM_NODES)"*"$(SLURM_GPUS_PER_NODE) | bc} +NUM_GPUS=$(python -c "import os; print(int(os.environ['SLURM_JOB_NUM_NODES'])*int(os.environ['SLURM_GPUS_PER_NODE']))") handleTimeout 120 python ${MG_UTILS_DIR}/wait_for_workers.py \ --num-expected-workers ${NUM_GPUS} \ --scheduler-file-path ${SCHEDULER_FILE} diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh index c3d62493d06..9f397e2ac46 100755 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh @@ -10,13 +10,13 @@ #SBATCH --exclusive export CONTAINER_IMAGE="/gpfs/fs1/projects/sw_rapids/users/abarghi/dlfw_12_5_23.squash" -export SCRIPTS_DIR=${pwd} +export SCRIPTS_DIR=$(pwd) export LOGS_DIR="/gpfs/fs1/projects/sw_rapids/users/abarghi/logs" export SAMPLES_DIR="/gpfs/fs1/projects/sw_rapids/users/abarghi/samples" export DATASETS_DIR="/gpfs/fs1/projects/sw_rapids/users/abarghi/datasets" export BATCH_SIZE=512 -export FANOUT="10, 10, 10" +export FANOUT="10_10_10" export REPLICATION_FACTOR=1 export RAPIDS_NO_INITIALIZE=1 @@ -37,11 +37,13 @@ echo Num Nodes: $nnodes gpus_per_node=$SLURM_GPUS_PER_NODE echo Num GPUs Per Node: $gpus_per_node +set -e + # Generate samples srun \ --container-image $CONTAINER_IMAGE \ --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ - bash /project/run_sampling.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" + bash /scripts/run_sampling.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" # Train srun \ @@ -53,10 +55,12 @@ srun \ --rdzv-id $RANDOM \ --rdzv-backend c10d \ --rdzv-endpoint $head_node_ip:29500 \ - /project/bench_cugraph_pyg.py \ + /scripts/bench_cugraph_training.py \ --output_file "/logs/output.txt" \ - --framework "cuGraph" \ + --framework "cuGraphPyG" \ --dataset_dir "/datasets" \ - --sample_dir "/samples/ogbn_papers100M["$REPLICATION_FACTOR"]_b"$BATCH_SIZE"_f["$FANOUT"]" \ + --sample_dir "/samples" \ + --batch_size $BATCH_SIZE \ + --fanout $FANOUT \ --replication_factor $REPLICATION_FACTOR diff --git a/benchmarks/cugraph/standalone/bulk_sampling/slurm-1581196.out b/benchmarks/cugraph/standalone/bulk_sampling/slurm-1581196.out new file mode 100644 index 00000000000..808db9bfba9 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/slurm-1581196.out @@ -0,0 +1,635 @@ +Node IP: 10.180.6.152 +Num Nodes: 2 +Num GPUs Per Node: 8 +no change /opt/conda/condabin/conda +no change /opt/conda/bin/conda +no change /opt/conda/bin/conda-env +no change /opt/conda/bin/activate +no change /opt/conda/bin/deactivate +no change /opt/conda/etc/profile.d/conda.sh +no change /opt/conda/etc/fish/conf.d/conda.fish +no change /opt/conda/shell/condabin/Conda.psm1 +no change /opt/conda/shell/condabin/conda-hook.ps1 +no change /opt/conda/lib/python3.10/site-packages/xontrib/conda.xsh +no change /opt/conda/etc/profile.d/conda.csh +no change /root/.bashrc +No action taken. + +EnvironmentNameNotFound: Could not find conda environment: rapids +You can list all discoverable environments with `conda info --envs`. + + +properly waiting for workers to connect +>>>> Using cluster configurtion for TCP +>>>> Logs written to: /logs +no change /opt/conda/condabin/conda +no change /opt/conda/bin/conda +no change /opt/conda/bin/conda-env +no change /opt/conda/bin/activate +no change /opt/conda/bin/deactivate +no change /opt/conda/etc/profile.d/conda.sh +no change /opt/conda/etc/fish/conf.d/conda.fish +no change /opt/conda/shell/condabin/Conda.psm1 +no change /opt/conda/shell/condabin/conda-hook.ps1 +no change /opt/conda/lib/python3.10/site-packages/xontrib/conda.xsh +no change /opt/conda/etc/profile.d/conda.csh +no change /root/.bashrc +No action taken. +run-dask-process.sh: /scripts/mg_utils/dask_scheduler.json not present - waiting to start workers... +wait_for_workers.py - initializing client...done. + +EnvironmentNameNotFound: Could not find conda environment: rapids +You can list all discoverable environments with `conda info --envs`. + + +properly waiting for workers to connect +>>>> Using cluster configurtion for TCP +>>>> Logs written to: /logs +wait_for_workers.py - initializing client...done. +worker(s) started. +waiting for worker pid 693150 to finish before exiting script... +wait_for_workers.py expected 16 but got 0, waiting... +wait_for_workers.py expected 16 but got 0, waiting... +scheduler started. +worker(s) started. +waiting for worker pid 1727804 to finish before exiting script... +wait_for_workers.py expected 16 but got 0, waiting... +wait_for_workers.py expected 16 but got 0, waiting... +wait_for_workers.py expected 16 but got 8, waiting... +wait_for_workers.py expected 16 but got 8, waiting... +wait_for_workers.py got 16 workers, done. +wait_for_workers.py got 16 workers, done. +0 +Launching Python Script +1 +INFO:__main__:starting dask client +INFO:__main__:dask client started +INFO:__main__:dataset: ogbn_papers100M +INFO:__main__:batch size: 512 +INFO:__main__:fanout: [10, 10, 10] +INFO:__main__:seeds_per_call: 524288 +INFO:__main__:num epochs: 1 +INFO:__main__:ogbn_papers100M +INFO:__main__:Number of input edges = 1,615,685,872 +INFO:__main__:constructed graph +/opt/conda/lib/python3.10/site-packages/cudf/core/index.py:3284: FutureWarning: cudf.StringIndex is deprecated and will be removed from cudf in a future version. Use cudf.Index with the appropriate dtype instead. + warnings.warn( +/opt/conda/lib/python3.10/site-packages/cudf/core/index.py:3284: FutureWarning: cudf.StringIndex is deprecated and will be removed from cudf in a future version. Use cudf.Index with the appropriate dtype instead. + warnings.warn( +INFO:__main__:input memory: 38776460928 +INFO:cugraph.gnn.data_loading.bulk_sampler:Number of input seeds (1081726) is >= seeds per call (524288). Calling flush() to compute and write minibatches. +INFO:cugraph.gnn.data_loading.bulk_sampler:Calculated batches to sample; min = dd.Scalar and max = dd.Scalar; took 0.0109 s +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:533: FutureWarning: The with_edge_properties flag is deprecated and will be removed in the next release. + warnings.warn(warning_msg, FutureWarning) +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:559: FutureWarning: The include_hop_column flag is deprecated and will be removed in the next release in favor of always excluding the hop column when return_offsets is True + warnings.warn(warning_msg, FutureWarning) +INFO:cugraph.gnn.data_loading.bulk_sampler:Called uniform neighbor sample, took 1.0482 s +INFO:cugraph.gnn.data_loading.bulk_sampler:Wrote samples to parquet, took 5.496245765127242 seconds +INFO:cugraph.gnn.data_loading.bulk_sampler:There are still 281727 samples remaining, calling flush() again... +INFO:cugraph.gnn.data_loading.bulk_sampler:Calculated batches to sample; min = dd.Scalar and max = dd.Scalar; took 0.0102 s +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:533: FutureWarning: The with_edge_properties flag is deprecated and will be removed in the next release. + warnings.warn(warning_msg, FutureWarning) +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:559: FutureWarning: The include_hop_column flag is deprecated and will be removed in the next release in favor of always excluding the hop column when return_offsets is True + warnings.warn(warning_msg, FutureWarning) +INFO:cugraph.gnn.data_loading.bulk_sampler:Called uniform neighbor sample, took 0.7594 s +INFO:cugraph.gnn.data_loading.bulk_sampler:Wrote samples to parquet, took 0.3181804046034813 seconds +INFO:cugraph.gnn.data_loading.bulk_sampler:Number of input seeds (773907) is >= seeds per call (524288). Calling flush() to compute and write minibatches. +INFO:cugraph.gnn.data_loading.bulk_sampler:Calculated batches to sample; min = dd.Scalar and max = dd.Scalar; took 0.0107 s +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:533: FutureWarning: The with_edge_properties flag is deprecated and will be removed in the next release. + warnings.warn(warning_msg, FutureWarning) +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:559: FutureWarning: The include_hop_column flag is deprecated and will be removed in the next release in favor of always excluding the hop column when return_offsets is True + warnings.warn(warning_msg, FutureWarning) +INFO:cugraph.gnn.data_loading.bulk_sampler:Called uniform neighbor sample, took 0.8618 s +INFO:cugraph.gnn.data_loading.bulk_sampler:Wrote samples to parquet, took 0.5542019009590149 seconds +INFO:cugraph.gnn.data_loading.bulk_sampler:Number of input seeds (772875) is >= seeds per call (524288). Calling flush() to compute and write minibatches. +INFO:cugraph.gnn.data_loading.bulk_sampler:Calculated batches to sample; min = dd.Scalar and max = dd.Scalar; took 0.0102 s +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:533: FutureWarning: The with_edge_properties flag is deprecated and will be removed in the next release. + warnings.warn(warning_msg, FutureWarning) +/opt/conda/lib/python3.10/site-packages/cugraph/dask/sampling/uniform_neighbor_sample.py:559: FutureWarning: The include_hop_column flag is deprecated and will be removed in the next release in favor of always excluding the hop column when return_offsets is True + warnings.warn(warning_msg, FutureWarning) +INFO:cugraph.gnn.data_loading.bulk_sampler:Called uniform neighbor sample, took 0.8702 s +INFO:cugraph.gnn.data_loading.bulk_sampler:Wrote samples to parquet, took 4.945730976760387 seconds +INFO:__main__:allocation counts b: +INFO:__main__:dict_values([{'current_bytes': 1797218576, 'current_count': 44, 'peak_bytes': 3648738768, 'peak_count': 316, 'total_bytes': 78721724832, 'total_count': 3638}, {'current_bytes': 1324268432, 'current_count': 55, 'peak_bytes': 3125547408, 'peak_count': 286, 'total_bytes': 66945408776, 'total_count': 3464}, {'current_bytes': 950521984, 'current_count': 34, 'peak_bytes': 3411416496, 'peak_count': 283, 'total_bytes': 77176651552, 'total_count': 3590}, {'current_bytes': 1094964320, 'current_count': 40, 'peak_bytes': 3998979008, 'peak_count': 331, 'total_bytes': 77127985760, 'total_count': 3554}, {'current_bytes': 1128736880, 'current_count': 30, 'peak_bytes': 3098585696, 'peak_count': 277, 'total_bytes': 69220139080, 'total_count': 3051}, {'current_bytes': 641731216, 'current_count': 42, 'peak_bytes': 3354091888, 'peak_count': 317, 'total_bytes': 76584517088, 'total_count': 4034}, {'current_bytes': 1448900272, 'current_count': 36, 'peak_bytes': 3703882320, 'peak_count': 330, 'total_bytes': 79492285520, 'total_count': 3653}, {'current_bytes': 1781772720, 'current_count': 58, 'peak_bytes': 3735403472, 'peak_count': 278, 'total_bytes': 72256173968, 'total_count': 3467}, {'current_bytes': 1554635632, 'current_count': 37, 'peak_bytes': 4348037136, 'peak_count': 367, 'total_bytes': 78630312904, 'total_count': 3208}, {'current_bytes': 970590512, 'current_count': 39, 'peak_bytes': 3483604672, 'peak_count': 334, 'total_bytes': 77792715128, 'total_count': 3649}, {'current_bytes': 1810640480, 'current_count': 42, 'peak_bytes': 4206602592, 'peak_count': 338, 'total_bytes': 75900082296, 'total_count': 3186}, {'current_bytes': 1732609664, 'current_count': 50, 'peak_bytes': 3749166560, 'peak_count': 341, 'total_bytes': 75509918264, 'total_count': 3205}, {'current_bytes': 1854185856, 'current_count': 40, 'peak_bytes': 4309537168, 'peak_count': 348, 'total_bytes': 81190575096, 'total_count': 3264}, {'current_bytes': 307565104, 'current_count': 33, 'peak_bytes': 3630779520, 'peak_count': 353, 'total_bytes': 70144358680, 'total_count': 3548}, {'current_bytes': 1583199152, 'current_count': 41, 'peak_bytes': 3724237808, 'peak_count': 321, 'total_bytes': 75063243192, 'total_count': 3186}, {'current_bytes': 1613558192, 'current_count': 40, 'peak_bytes': 3802691456, 'peak_count': 316, 'total_bytes': 71658793880, 'total_count': 3122}]) +INFO:__main__:Number of edges in final graph = 1,615,685,872 +INFO:__main__:-------------------------------------------------------------------------------- + +Dask client created using /scripts/mg_utils/dask_scheduler.json +Loading edge index for edge type paper__cites__paper +Loading node labels for node type paper (offset=0) +created batches +flushed all batches +created batches +flushed all batches +created batches +flushed all batches +function: sample_graph +function args: () kwargs: {'G': , 'label_df': , 'output_path': '/samples/ogbn_papers100M[1]_b512_f[10, 10, 10]', 'num_epochs': 1, 'seed': 42, 'batch_size': 512, 'seeds_per_call': 524288, 'batches_per_partition': 781, 'fanout': [10, 10, 10], 'sampling_kwargs': {'deduplicate_sources': True, 'prior_sources_behavior': 'exclude', 'renumber': True, 'compression': 'COO', 'compress_per_hop': False, 'use_legacy_names': False, 'include_hop_column': True}} +execution_time: 15.987128257751465 +allocation_counts: +{ 'tcp://10.180.6.152:35915': { 'current_bytes': '1.7GB', + 'peak_bytes': '3.4GB', + 'total_bytes': '73.3GB'}, + 'tcp://10.180.6.152:40635': { 'current_bytes': '1.2GB', + 'peak_bytes': '2.9GB', + 'total_bytes': '62.3GB'}, + 'tcp://10.180.6.152:41253': { 'current_bytes': '906.5MB', + 'peak_bytes': '3.2GB', + 'total_bytes': '71.9GB'}, + 'tcp://10.180.6.152:43815': { 'current_bytes': '1.0GB', + 'peak_bytes': '3.7GB', + 'total_bytes': '71.8GB'}, + 'tcp://10.180.6.152:44619': { 'current_bytes': '1.1GB', + 'peak_bytes': '2.9GB', + 'total_bytes': '64.5GB'}, + 'tcp://10.180.6.152:44975': { 'current_bytes': '612.0MB', + 'peak_bytes': '3.1GB', + 'total_bytes': '71.3GB'}, + 'tcp://10.180.6.152:45029': { 'current_bytes': '1.3GB', + 'peak_bytes': '3.4GB', + 'total_bytes': '74.0GB'}, + 'tcp://10.180.6.152:46667': { 'current_bytes': '1.7GB', + 'peak_bytes': '3.5GB', + 'total_bytes': '67.3GB'}, + 'tcp://10.180.6.153:34327': { 'current_bytes': '1.4GB', + 'peak_bytes': '4.0GB', + 'total_bytes': '73.2GB'}, + 'tcp://10.180.6.153:35661': { 'current_bytes': '925.6MB', + 'peak_bytes': '3.2GB', + 'total_bytes': '72.5GB'}, + 'tcp://10.180.6.153:36727': { 'current_bytes': '1.7GB', + 'peak_bytes': '3.9GB', + 'total_bytes': '70.7GB'}, + 'tcp://10.180.6.153:37691': { 'current_bytes': '1.6GB', + 'peak_bytes': '3.5GB', + 'total_bytes': '70.3GB'}, + 'tcp://10.180.6.153:40027': { 'current_bytes': '1.7GB', + 'peak_bytes': '4.0GB', + 'total_bytes': '75.6GB'}, + 'tcp://10.180.6.153:41203': { 'current_bytes': '293.3MB', + 'peak_bytes': '3.4GB', + 'total_bytes': '65.3GB'}, + 'tcp://10.180.6.153:42921': { 'current_bytes': '1.5GB', + 'peak_bytes': '3.5GB', + 'total_bytes': '69.9GB'}, + 'tcp://10.180.6.153:43119': { 'current_bytes': '1.5GB', + 'peak_bytes': '3.5GB', + 'total_bytes': '66.7GB'}} +Edge List Memory = 2.3GB +Peak Memory across workers = 4.0GB +Max Peak to output graph ratio across workers = 11.80 +Max Peak to avg input graph ratio across workers = 1.79 +----------------------------------------dataset = ogbn_papers100M completed---------------------------------------- + +Dask client closed. +[1702579578.440170] [rno1-m02-d03-dgx1-066:1727970:0] parser.c:2036 UCX WARN unused environment variable: UCX_MEMTYPE_CACHE (maybe: UCX_MEMTYPE_CACHE?) +[1702579578.440170] [rno1-m02-d03-dgx1-066:1727970:0] parser.c:2036 UCX WARN (set UCX_WARN_UNUSED_ENV_VARS=n to suppress this warning) +1727660 /bin/bash /scripts/mg_utils/run-dask-process.sh scheduler workers +1727714 /opt/conda/bin/python3.10 /opt/conda/bin/dask-scheduler --protocol=tcp --scheduler-file /scripts/mg_utils/dask_scheduler.json +1727804 /opt/conda/bin/python /opt/conda/bin/dask-cuda-worker --rmm-pool-size=28G --rmm-async --local-directory=/tmp/abarghi --scheduler-file=/scripts/mg_utils/dask_scheduler.json --memory-limit=auto --device-memory-limit=auto +1726587 /usr/bin/python2 /usr/local/dcgm-nvdataflow/DcgmNVDataflowPoster.py +1727808 /opt/conda/bin/python -c from multiprocessing.resource_tracker import main;main(45) +1727811 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=54) --multiprocessing-fork +1727815 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=59) --multiprocessing-fork +1727819 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=68) --multiprocessing-fork +1727824 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=77) --multiprocessing-fork +1727828 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=84) --multiprocessing-fork +1727832 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=91) --multiprocessing-fork +1727836 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=98) --multiprocessing-fork +1727839 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=101) --multiprocessing-fork +pkill: killing pid 1726587 failed: Operation not permitted +1726587 /usr/bin/python2 /usr/local/dcgm-nvdataflow/DcgmNVDataflowPoster.py +1727808 /opt/conda/bin/python -c from multiprocessing.resource_tracker import main;main(45) +1727811 python +1727815 python +1727819 python +1727824 python +1727828 python +1727832 python +1727836 python +1727839 python +1727714 /opt/conda/bin/python3.10 /opt/conda/bin/dask-scheduler --protocol=tcp --scheduler-file /scripts/mg_utils/dask_scheduler.json +1727804 /opt/conda/bin/python /opt/conda/bin/dask-cuda-worker --rmm-pool-size=28G --rmm-async --local-directory=/tmp/abarghi --scheduler-file=/scripts/mg_utils/dask_scheduler.json --memory-limit=auto --device-memory-limit=auto +693105 /bin/bash /scripts/mg_utils/run-dask-process.sh workers +693150 /opt/conda/bin/python /opt/conda/bin/dask-cuda-worker --rmm-pool-size=28G --rmm-async --local-directory=/tmp/abarghi --scheduler-file=/scripts/mg_utils/dask_scheduler.json --memory-limit=auto --device-memory-limit=auto +692009 /usr/bin/python2 /usr/local/dcgm-nvdataflow/DcgmNVDataflowPoster.py +693167 /opt/conda/bin/python -c from multiprocessing.resource_tracker import main;main(45) +693170 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=52) --multiprocessing-fork +693175 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=63) --multiprocessing-fork +693178 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=68) --multiprocessing-fork +693182 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=71) --multiprocessing-fork +693187 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=84) --multiprocessing-fork +693191 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=91) --multiprocessing-fork +693195 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=98) --multiprocessing-fork +693198 /opt/conda/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=46, pipe_handle=101) --multiprocessing-fork +pkill: killing pid 692009 failed: Operation not permitted +692009 /usr/bin/python2 /usr/local/dcgm-nvdataflow/DcgmNVDataflowPoster.py +693167 /opt/conda/bin/python -c from multiprocessing.resource_tracker import main;main(45) +693170 python +693175 python +693178 python +693182 python +693187 python +693191 python +693195 python +693198 python +693150 /opt/conda/bin/python /opt/conda/bin/dask-cuda-worker --rmm-pool-size=28G --rmm-async --local-directory=/tmp/abarghi --scheduler-file=/scripts/mg_utils/dask_scheduler.json --memory-limit=auto --device-memory-limit=auto +srun: Job 1581196 step creation temporarily disabled, retrying (Requested nodes are busy) +srun: Step created for job 1581196 +[2023-12-14 10:49:27,128] torch.distributed.run: [WARNING] master_addr is only used for static rdzv_backend and when rdzv_endpoint is not specified. +[2023-12-14 10:49:27,128] torch.distributed.run: [WARNING] +[2023-12-14 10:49:27,128] torch.distributed.run: [WARNING] ***************************************** +[2023-12-14 10:49:27,128] torch.distributed.run: [WARNING] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. +[2023-12-14 10:49:27,128] torch.distributed.run: [WARNING] ***************************************** +[2023-12-14 10:49:27,197] torch.distributed.run: [WARNING] master_addr is only used for static rdzv_backend and when rdzv_endpoint is not specified. +[W socket.cpp:436] [c10d] The server socket cannot be initialized on [::]:29500 (errno: 97 - Address family not supported by protocol). +[2023-12-14 10:49:27,198] torch.distributed.run: [WARNING] +[2023-12-14 10:49:27,198] torch.distributed.run: [WARNING] ***************************************** +[2023-12-14 10:49:27,198] torch.distributed.run: [WARNING] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. +[2023-12-14 10:49:27,198] torch.distributed.run: [WARNING] ***************************************** +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:29500 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:29500 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:436] [c10d] The server socket cannot be initialized on [::]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +worker initialized +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +[W socket.cpp:663] [c10d] The client socket cannot be initialized to connect to [rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net]:56505 (errno: 97 - Address family not supported by protocol). +worker initialized +worker initialized +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +Traceback (most recent call last): + File "/scripts/bench_cugraph_training.py", line 234, in + main(args) + File "/scripts/bench_cugraph_training.py", line 208, in main + from trainers_cugraph_pyg import PyGCuGraphTrainer + File "/scripts/trainers_cugraph_pyg.py", line 14, in + from trainers_pyg import PyGTrainer + File "/scripts/trainers_pyg.py", line 17, in + from models_pyg import GraphSAGE + File "/scripts/models_pyg.py", line 16, in + from torch_geometric.nn import SAGEConv +ModuleNotFoundError: No module named 'torch_geometric' +[2023-12-14 10:49:53,293] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 1729546) of binary: /opt/conda/bin/python +[2023-12-14 10:49:53,293] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 695126) of binary: /opt/conda/bin/python +Traceback (most recent call last): + File "/opt/conda/bin/torchrun", line 33, in + sys.exit(load_entry_point('torch==2.1.1', 'console_scripts', 'torchrun')()) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper + return f(*args, **kwargs) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/run.py", line 806, in main + run(args) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/run.py", line 797, in run + elastic_launch( + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 134, in __call__ + return launch_agent(self._config, self._entrypoint, list(args)) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 264, in launch_agent + raise ChildFailedError( +torch.distributed.elastic.multiprocessing.errors.ChildFailedError: +============================================================ +/scripts/bench_cugraph_training.py FAILED +------------------------------------------------------------ +Failures: +[1]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 1 (local_rank: 1) + exitcode : 1 (pid: 1729547) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[2]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 2 (local_rank: 2) + exitcode : 1 (pid: 1729548) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[3]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 3 (local_rank: 3) + exitcode : 1 (pid: 1729549) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[4]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 4 (local_rank: 4) + exitcode : 1 (pid: 1729550) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[5]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 5 (local_rank: 5) + exitcode : 1 (pid: 1729551) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[6]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 6 (local_rank: 6) + exitcode : 1 (pid: 1729552) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[7]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 7 (local_rank: 7) + exitcode : 1 (pid: 1729553) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +------------------------------------------------------------ +Root Cause (first observed failure): +[0]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d03-dgx1-066.nsv.rno1.nvmetal.net + rank : 0 (local_rank: 0) + exitcode : 1 (pid: 1729546) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +============================================================ +Traceback (most recent call last): + File "/opt/conda/bin/torchrun", line 33, in + sys.exit(load_entry_point('torch==2.1.1', 'console_scripts', 'torchrun')()) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/elastic/multiprocessing/errors/__init__.py", line 346, in wrapper + return f(*args, **kwargs) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/run.py", line 806, in main + run(args) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/run.py", line 797, in run + elastic_launch( + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 134, in __call__ + return launch_agent(self._config, self._entrypoint, list(args)) + File "/opt/conda/lib/python3.10/site-packages/torch/distributed/launcher/api.py", line 264, in launch_agent + raise ChildFailedError( +torch.distributed.elastic.multiprocessing.errors.ChildFailedError: +============================================================ +/scripts/bench_cugraph_training.py FAILED +------------------------------------------------------------ +Failures: +[1]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 9 (local_rank: 1) + exitcode : 1 (pid: 695127) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[2]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 10 (local_rank: 2) + exitcode : 1 (pid: 695128) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[3]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 11 (local_rank: 3) + exitcode : 1 (pid: 695129) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[4]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 12 (local_rank: 4) + exitcode : 1 (pid: 695131) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[5]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 13 (local_rank: 5) + exitcode : 1 (pid: 695132) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[6]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 14 (local_rank: 6) + exitcode : 1 (pid: 695133) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +[7]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 15 (local_rank: 7) + exitcode : 1 (pid: 695134) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +------------------------------------------------------------ +Root Cause (first observed failure): +[0]: + time : 2023-12-14_10:49:53 + host : rno1-m02-d04-dgx1-067.nsv.rno1.nvmetal.net + rank : 8 (local_rank: 0) + exitcode : 1 (pid: 695126) + error_file: + traceback : To enable traceback see: https://pytorch.org/docs/stable/elastic/errors.html +============================================================ +srun: error: rno1-m02-d04-dgx1-067: task 1: Exited with exit code 1 +srun: Terminating job step 1581196.2 +srun: error: rno1-m02-d03-dgx1-066: task 0: Exited with exit code 1 diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers_cugraph.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers_cugraph.py deleted file mode 100644 index 863df4ee6b4..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers_cugraph.py +++ /dev/null @@ -1,142 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from trainer import PyGTrainer -from models_cugraph import CuGraphSAGE - -import torch -import numpy as np - -from torch.distributed.optim import ZeroRedundancyOptimizer -from torch.nn.parallel import DistributedDataParallel as ddp - -from cugraph.gnn import FeatureStore -from cugraph_pyg.data import CuGraphStore -from cugraph_pyg.loader import BulkSampleLoader - -import gc -import os - -class PyGCuGraphTrainer(PyGTrainer): - def __init__(self, dataset, model='GraphSAGE', device=0, rank=0, world_size=1, num_epochs=1, sample_dir='.', **kwargs): - self.__data = None - self.__device = device - self.__rank = rank - self.__world_size = world_size - self.__num_epochs = num_epochs - self.__dataset = dataset - self.__sample_dir = sample_dir - self.__loader_kwargs = kwargs - self.__model = self.get_model(model) - - @property - def model(self): - return self.__model - - @property - def dataset(self): - return self.__dataset - - @property - def optimizer(self): - return ZeroRedundancyOptimizer(self.model.parameters(), torch.optim.Adam, lr=0.01) - - @property - def num_epochs(self) -> int: - return self.__num_epochs - - def get_loader(self, epoch:int) -> int: - # FIXME suppor test, val - # TODO support online sampling - return BulkSampleLoader( - self.data, - self.data, - None, # FIXME get input nodes properly - directory=os.path.join(self.__sample_dir, f'epoch={epoch}', 'samples'), - input_files=self.get_input_files(epoch), - **self.__loader_kwargs, - ) - - @property - def data(self): - import logging - logger = logging.getLogger('PyGCuGraphTrainer') - logger.info('getting data') - - if self.__data is None: - # FIXME wholegraph - fs = FeatureStore(backend='torch') - num_nodes_dict = {} - - for node_type, x in self.__dataset.x_dict.items(): - logger.debug(f'getting x for {node_type}') - fs.add_data(x, node_type, 'x') - num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) - - for node_type, y in self.__dataset.y_dict.items(): - logger.debug(f'getting y for {node_type}') - fs.add_data(y, node_type, 'y') - - for node_type, train in self.__dataset.train_dict.items(): - logger.debug(f'getting train for {node_type}') - fs.add_data(train, node_type, 'train') - - for node_type, test in self.__dataset.test_dict.items(): - logger.debug(f'getting test for {node_type}') - fs.add_data(test, node_type, 'test') - - for node_type, val in self.__dataset.val_dict.items(): - logger.debug(f'getting val for {node_type}') - fs.add_data(val, node_type, 'val') - - # TODO support online sampling if the edge index is provided - num_edges_dict = self.__dataset.edge_index_dict - if not isinstance(list(num_edges_dict.values())[0], int): - num_edges_dict = {k: len(v) for k, v in num_edges_dict} - - self.__data = CuGraphStore( - fs, - num_edges_dict, - num_nodes_dict, - ) - - return self.__data - - def get_model(self, name='GraphSAGE'): - if name != 'GraphSAGE': - raise ValueError('only GraphSAGE is currently supported') - - num_input_features = self.__dataset.num_input_features - num_output_features = self.__dataset.num_labels - num_layers = len(self.__loader_kwargs['num_neighbors']) - - with torch.cuda.device(self.__device): - model = CuGraphSAGE( - in_channels=num_input_features, - hidden_channels=64, - out_channels=num_output_features, - num_layers=num_layers - ).to(torch.float32).to(self.__device) - - model = ddp(model, device_ids=[self.__device]) - print('done creating model') - - return model - - def get_input_files(self, epoch=0): - path = os.path.join(self.__sample_dir, f'epoch={epoch}', 'samples') - file_list = np.array( - os.listdir(path) - ) - - return np.array_split(file_list, self.__world_size)[self.__rank] diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers_native.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers_native.py deleted file mode 100644 index 37a0b3b2bf8..00000000000 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers_native.py +++ /dev/null @@ -1,167 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from trainer import PyGTrainer -from datasets import OGBNPapers100MDataset -from models_native import GraphSAGE - -import torch -import numpy as np - -from torch.distributed.optim import ZeroRedundancyOptimizer -from torch.nn.parallel import DistributedDataParallel as ddp - -from torch_geometric.utils.sparse import index2ptr -from torch_geometric.data import HeteroData -from torch_geometric.loader import NeighborLoader - -import gc -import os - -def pyg_num_workers(world_size): - num_workers = None - if hasattr(os, "sched_getaffinity"): - try: - num_workers = len(os.sched_getaffinity(0)) / (2 * world_size) - except Exception: - pass - if num_workers is None: - num_workers = os.cpu_count() / (2 * world_size) - return int(num_workers) - -class PyGNativeTrainer(PyGTrainer): - def __init__(self, dataset, model='GraphSAGE', device=0, rank=0, world_size=1, num_epochs=1, **kwargs): - self.__dataset = dataset - self.__device = device - self.__data = None - self.__rank = rank - self.__num_epochs = num_epochs - self.__world_size = world_size - self.__loader_kwargs = kwargs - self.__model = self.get_model(model) - - @property - def model(self): - return self.__model - - @property - def dataset(self): - return self.__dataset - - @property - def data(self): - import logging - logger = logging.getLogger('PyGNativeTrainer') - logger.info('getting data') - - if self.__data is None: - self.__data = HeteroData() - - for node_type, x in self.__dataset.x_dict.items(): - logger.debug(f'getting x for {node_type}') - self.__data[node_type].x = x - self.__data[node_type]['num_nodes'] = self.__dataset.num_nodes(node_type) - - for node_type, y in self.__dataset.y_dict.items(): - logger.debug(f'getting y for {node_type}') - self.__data[node_type]['y'] = y - - for node_type, train in self.__dataset.train_dict.items(): - logger.debug(f'getting train for {node_type}') - self.__data[node_type]['train'] = train - - for node_type, test in self.__dataset.test_dict.items(): - logger.debug(f'getting test for {node_type}') - self.__data[node_type]['test'] = test - - for node_type, val in self.__dataset.val_dict.items(): - logger.debug(f'getting val for {node_type}') - self.__data[node_type]['val'] = val - - for can_edge_type, ei in self.__dataset.edge_index_dict.items(): - logger.info('converting to csc...') - ei['dst'] = index2ptr(ei['dst'], self.__dataset.num_nodes(can_edge_type[2])) - - logger.info('updating data structure...') - self.__data.put_edge_index( - layout='csc', - edge_index=list(ei.values()), - edge_type=can_edge_type, - size=(self.__dataset.num_nodes(can_edge_type[0]), self.__dataset.num_nodes(can_edge_type[2])), - is_sorted=True - ) - gc.collect() - - return self.__data - - @property - def optimizer(self): - return ZeroRedundancyOptimizer(self.model.parameters(), torch.optim.Adam, lr=0.01) - - @property - def num_epochs(self) -> int: - return self.__num_epochs - - def get_loader(self, epoch: int): - import logging - logger = logging.getLogger('PyGNativeTrainer') - logger.info(f'Getting loader for epoch {epoch}') - - input_nodes_dict = { - node_type: np.array_split( - np.arange(len(train_mask))[train_mask], - self.__world_size - )[self.__rank] - for node_type, train_mask in self.__dataset.train_dict.items() - } - - input_nodes = list(input_nodes_dict.items()) - if len(input_nodes) > 1: - raise ValueError("Multiple input node types currently unsupported") - else: - input_nodes = tuple(input_nodes[0]) - - # get loader - loader = NeighborLoader( - self.data, - input_nodes=input_nodes, - is_sorted=True, - disjoint=False, - num_workers=pyg_num_workers(self.__world_size), # FIXME change this - persistent_workers=True, - **self.__loader_kwargs # batch size, num neighbors, replace, shuffle, etc. - ) - - logger.info('done creating loader') - return loader - - def get_model(self, name='GraphSAGE'): - if name != 'GraphSAGE': - raise ValueError("only GraphSAGE is currently supported") - - num_input_features = self.__dataset.num_input_features - num_output_features = self.__dataset.num_labels - num_layers = len(self.__loader_kwargs['num_neighbors']) - - with torch.cuda.device(self.__device): - model = GraphSAGE( - in_channels=num_input_features, - hidden_channels=64, - out_channels=num_output_features, - num_layers=num_layers, - ).to(torch.float32).to(self.__device) - model = ddp(model, device_ids=[self.__device]) - print('done creating model') - - return model \ No newline at end of file