diff --git a/benchmarks/cugraph/standalone/bulk_sampling/README.md b/benchmarks/cugraph/standalone/bulk_sampling/README.md index 2d09466fb2f..56e9f4f5f64 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/README.md +++ b/benchmarks/cugraph/standalone/bulk_sampling/README.md @@ -152,7 +152,7 @@ Next are standard GNN training arguments such as `FANOUT`, `BATCH_SIZE`, etc. Y the number of training epochs here. These are followed by the `REPLICATION_FACTOR` argument, which can be used to create replications of the dataset for scale testing purposes. -The final two arguments are `FRAMEWORK` which can be either "cuGraphPyG" or "PyG", and `GPUS_PER_NODE` +The final two arguments are `FRAMEWORK` which can be "cugraph_dgl_csr", "cugraph_pyg" or "pyg", and `GPUS_PER_NODE` which must be set to the correct value, even if this is provided by a SLURM argument. If `GPUS_PER_NODE` is not set to the correct number of GPUs, the script will hang indefinitely until it times out. Mismatched GPUs per node is currently unsupported by this script but should be possible in practice. diff --git a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py index c9e347b261d..2604642b748 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py @@ -43,8 +43,9 @@ def init_pytorch_worker(rank: int, use_rmm_torch_allocator: bool = False) -> Non rmm.reinitialize( devices=[rank], - pool_allocator=True, - initial_pool_size=pool_size, + pool_allocator=False, + # pool_allocator=True, + # initial_pool_size=pool_size, ) if use_rmm_torch_allocator: @@ -119,10 +120,17 @@ def parse_args(): parser.add_argument( "--framework", type=str, - help="The framework to test (PyG, cuGraphPyG)", + help="The framework to test (PyG, cugraph_pyg, cugraph_dgl_csr)", required=True, ) + parser.add_argument( + "--use_wholegraph", + action="store_true", + help="Whether to use WholeGraph feature storage", + required=False, + ) + parser.add_argument( "--model", type=str, @@ -162,6 +170,13 @@ def parse_args(): required=False, ) + parser.add_argument( + "--skip_download", + action="store_true", + help="Whether to skip downloading", + required=False, + ) + return parser.parse_args() @@ -186,21 +201,43 @@ def main(args): world_size = int(os.environ["SLURM_JOB_NUM_NODES"]) * args.gpus_per_node + if args.use_wholegraph: + # TODO support WG without cuGraph + if args.framework.lower() not in ["cugraph_pyg", "cugraph_dgl_csr"]: + raise ValueError("WG feature store only supported with cuGraph backends") + from pylibwholegraph.torch.initialize import ( + get_global_communicator, + get_local_node_communicator, + init, + ) + + logger.info("initializing WG comms...") + init(global_rank, world_size, local_rank, args.gpus_per_node) + wm_comm = get_global_communicator() + get_local_node_communicator() + + wm_comm = wm_comm.wmb_comm + logger.info(f"rank {global_rank} successfully initialized WG comms") + wm_comm.barrier() + dataset = OGBNPapers100MDataset( replication_factor=args.replication_factor, dataset_dir=args.dataset_dir, train_split=args.train_split, val_split=args.val_split, - load_edge_index=(args.framework == "PyG"), + load_edge_index=(args.framework.lower() == "pyg"), + backend="wholegraph" if args.use_wholegraph else "torch", ) - if global_rank == 0: + # Note: this does not generate WG files + if global_rank == 0 and not args.skip_download: dataset.download() + dist.barrier() fanout = [int(f) for f in args.fanout.split("_")] - if args.framework == "PyG": + if args.framework.lower() == "pyg": from trainers.pyg import PyGNativeTrainer trainer = PyGNativeTrainer( @@ -215,7 +252,7 @@ def main(args): num_neighbors=fanout, batch_size=args.batch_size, ) - elif args.framework == "cuGraphPyG": + elif args.framework.lower() == "cugraph_pyg": sample_dir = os.path.join( args.sample_dir, f"ogbn_papers100M[{args.replication_factor}]_b{args.batch_size}_f{fanout}", @@ -229,11 +266,35 @@ def main(args): device=local_rank, rank=global_rank, world_size=world_size, + gpus_per_node=args.gpus_per_node, num_epochs=args.num_epochs, shuffle=True, replace=False, num_neighbors=fanout, batch_size=args.batch_size, + backend="wholegraph" if args.use_wholegraph else "torch", + ) + elif args.framework.lower() == "cugraph_dgl_csr": + sample_dir = os.path.join( + args.sample_dir, + f"ogbn_papers100M[{args.replication_factor}]_b{args.batch_size}_f{fanout}", + ) + from trainers.dgl import DGLCuGraphTrainer + + trainer = DGLCuGraphTrainer( + model=args.model, + dataset=dataset, + sample_dir=sample_dir, + device=local_rank, + rank=global_rank, + world_size=world_size, + gpus_per_node=args.gpus_per_node, + num_epochs=args.num_epochs, + shuffle=True, + replace=False, + num_neighbors=[int(f) for f in args.fanout.split("_")], + batch_size=args.batch_size, + backend="wholegraph" if args.use_wholegraph else "torch", ) else: raise ValueError("unsupported framework") diff --git a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py index e3a5bba3162..95e1afcb28b 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py @@ -190,6 +190,10 @@ def sample_graph( val_perc=0.5, sampling_kwargs={}, ): + logger = logging.getLogger("__main__") + logger.info("Starting sampling phase...") + + logger.info("Calculating random splits...") cupy.random.seed(seed) train_df, test_df = label_df.random_split( [train_perc, 1 - train_perc], random_state=seed, shuffle=True @@ -197,24 +201,35 @@ def sample_graph( val_df, test_df = label_df.random_split( [val_perc, 1 - val_perc], random_state=seed, shuffle=True ) + logger.info("Calculated random splits") total_time = 0.0 for epoch in range(num_epochs): - steps = [("train", train_df), ("test", test_df)] + steps = [("train", train_df)] if epoch == num_epochs - 1: steps.append(("val", val_df)) + steps.append(("test", test_df)) for step, batch_df in steps: - batch_df = batch_df.sample(frac=1.0, random_state=seed) + logger.info("Shuffling batch dataframe...") + batch_df = batch_df.sample(frac=1.0, random_state=seed).persist() + logger.info("Shuffled and persisted batch dataframe...") - if step == "val": - output_sample_path = os.path.join(output_path, "val", "samples") - else: + if step == "train": output_sample_path = os.path.join( output_path, f"epoch={epoch}", f"{step}", "samples" ) - os.makedirs(output_sample_path) + else: + output_sample_path = os.path.join(output_path, step, "samples") + + client = default_client() + + def func(): + os.makedirs(output_sample_path, exist_ok=True) + + client.run(func) + logger.info("Creating bulk sampler...") sampler = BulkSampler( batch_size=batch_size, output_path=output_sample_path, @@ -227,6 +242,7 @@ def sample_graph( log_level=logging.INFO, **sampling_kwargs, ) + logger.info("Bulk sampler created and ready for input") n_workers = len(default_client().scheduler_info()["workers"]) @@ -244,13 +260,13 @@ def sample_graph( # should always persist the batch dataframe or performance may be suboptimal batch_df = batch_df.persist() - print("created batches") + logger.info("created and persisted batches") start_time = perf_counter() sampler.add_batches(batch_df, start_col_name="node", batch_col_name="batch") sampler.flush() end_time = perf_counter() - print("flushed all batches") + logger.info("flushed all batches") total_time += end_time - start_time return total_time @@ -356,23 +372,29 @@ def load_disk_dataset( path = Path(dataset_dir) / dataset parquet_path = path / "parquet" + logger = logging.getLogger("__main__") + + logger.info("getting n workers...") n_workers = get_n_workers() + logger.info(f"there are {n_workers} workers") with open(os.path.join(path, "meta.json")) as meta_file: meta = json.load(meta_file) + logger.info("assigning offsets...") node_offsets, node_offsets_replicated, total_num_nodes = assign_offsets_pyg( meta["num_nodes"], replication_factor=replication_factor ) + logger.info("offsets assigned") edge_index_dict = {} for edge_type in meta["num_edges"].keys(): - print(f"Loading edge index for edge type {edge_type}") + logger.info(f"Loading edge index for edge type {edge_type}") can_edge_type = tuple(edge_type.split("__")) edge_index_dict[can_edge_type] = dask_cudf.read_parquet( Path(parquet_path) / edge_type / "edge_index.parquet" - ).repartition(n_workers * 2) + ).repartition(npartitions=n_workers * 2) edge_index_dict[can_edge_type]["src"] += node_offsets_replicated[ can_edge_type[0] @@ -384,6 +406,7 @@ def load_disk_dataset( edge_index_dict[can_edge_type] = edge_index_dict[can_edge_type] if replication_factor > 1: + logger.info("processing replications") edge_index_dict[can_edge_type] = edge_index_dict[ can_edge_type ].map_partitions( @@ -400,6 +423,7 @@ def load_disk_dataset( } ), ) + logger.info("replications processed") gc.collect() @@ -407,48 +431,63 @@ def load_disk_dataset( edge_index_dict[can_edge_type] = edge_index_dict[can_edge_type].rename( columns={"src": "dst", "dst": "src"} ) + logger.info("edge index loaded") # Assign numeric edge type ids based on lexicographic order edge_offsets = {} edge_count = 0 - for num_edge_type, can_edge_type in enumerate(sorted(edge_index_dict.keys())): - if add_edge_types: - edge_index_dict[can_edge_type]["etp"] = cupy.int32(num_edge_type) - edge_offsets[can_edge_type] = edge_count - edge_count += len(edge_index_dict[can_edge_type]) + # for num_edge_type, can_edge_type in enumerate(sorted(edge_index_dict.keys())): + # if add_edge_types: + # edge_index_dict[can_edge_type]["etp"] = cupy.int32(num_edge_type) + # edge_offsets[can_edge_type] = edge_count + # edge_count += len(edge_index_dict[can_edge_type]) + + if len(edge_index_dict) != 1: + raise ValueError("should only be 1 edge index") + + logger.info("setting edge type") + + all_edges_df = list(edge_index_dict.values())[0] + if add_edge_types: + all_edges_df["etp"] = cupy.int32(0) - all_edges_df = dask_cudf.concat(list(edge_index_dict.values())) + # all_edges_df = dask_cudf.concat(list(edge_index_dict.values())) del edge_index_dict gc.collect() node_labels = {} for node_type, offset in node_offsets_replicated.items(): - print(f"Loading node labels for node type {node_type} (offset={offset})") + logger.info(f"Loading node labels for node type {node_type} (offset={offset})") node_label_path = os.path.join( os.path.join(parquet_path, node_type), "node_label.parquet" ) if os.path.exists(node_label_path): node_labels[node_type] = ( dask_cudf.read_parquet(node_label_path) - .repartition(n_workers) + .repartition(npartitions=n_workers) .drop("label", axis=1) .persist() ) + logger.info(f"Loaded and persisted initial labels") node_labels[node_type]["node"] += offset node_labels[node_type] = node_labels[node_type].persist() + logger.info(f"Set and persisted node offsets") if replication_factor > 1: + logger.info(f"Replicating labels...") node_labels[node_type] = node_labels[node_type].map_partitions( _replicate_df, replication_factor, {"node": meta["num_nodes"][node_type]}, meta=cudf.DataFrame({"node": cudf.Series(dtype="int64")}), ) + logger.info(f"Replicated labels (will likely evaluate later)") gc.collect() node_labels_df = dask_cudf.concat(list(node_labels.values())).reset_index(drop=True) + logger.info("Dataset successfully loaded") del node_labels gc.collect() @@ -459,6 +498,7 @@ def load_disk_dataset( node_offsets_replicated, edge_offsets, total_num_nodes, + sum(meta["num_edges"].values()) * replication_factor, ) @@ -540,6 +580,7 @@ def benchmark_cugraph_bulk_sampling( node_offsets, edge_offsets, total_num_nodes, + num_input_edges, ) = load_disk_dataset( dataset, dataset_dir=dataset_dir, @@ -548,7 +589,6 @@ def benchmark_cugraph_bulk_sampling( add_edge_types=add_edge_types, ) - num_input_edges = len(dask_edgelist_df) logger.info(f"Number of input edges = {num_input_edges:,}") G = construct_graph(dask_edgelist_df) @@ -562,7 +602,13 @@ def benchmark_cugraph_bulk_sampling( output_path, f"{dataset}[{replication_factor}]_b{batch_size}_f{fanout}", ) - os.makedirs(output_subdir) + + client = default_client() + + def func(): + os.makedirs(output_subdir, exist_ok=True) + + client.run(func) if sampling_target_framework == "cugraph_dgl_csr": sampling_kwargs = { @@ -574,8 +620,8 @@ def benchmark_cugraph_bulk_sampling( "use_legacy_names": False, "include_hop_column": False, } - else: - # FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.02) + elif sampling_target_framework == "cugraph_pyg": + # FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.04) sampling_kwargs = { "deduplicate_sources": True, "prior_sources_behavior": "exclude", @@ -585,8 +631,10 @@ def benchmark_cugraph_bulk_sampling( "use_legacy_names": False, "include_hop_column": True, } + else: + raise ValueError("Only cugraph_dgl_csr or cugraph_pyg are valid frameworks") - batches_per_partition = 600_000 // batch_size + batches_per_partition = 256 execution_time, allocation_counts = sample_graph( G=G, label_df=dask_label_df, @@ -761,9 +809,9 @@ def get_args(): logger.setLevel(logging.INFO) args = get_args() - if args.sampling_target_framework not in ["cugraph_dgl_csr", None]: + if args.sampling_target_framework not in ["cugraph_dgl_csr", "cugraph_pyg"]: raise ValueError( - "sampling_target_framework must be one of cugraph_dgl_csr or None", + "sampling_target_framework must be one of cugraph_dgl_csr or cugraph_pyg", "Other frameworks are not supported at this time.", ) @@ -775,12 +823,30 @@ def get_args(): seeds_per_call_opts = [int(s) for s in args.seeds_per_call_opts.split(",")] dask_worker_devices = [int(d) for d in args.dask_worker_devices.split(",")] - logger.info("starting dask client") - client, cluster = start_dask_client() + import time + + time_dask_start = time.localtime() + + logger.info(f"{time.asctime(time_dask_start)}: starting dask client") + from dask_cuda.initialize import initialize + from dask.distributed import Client + from cugraph.dask.comms import comms as Comms + import os, time + + client = Client(scheduler_file=os.environ["SCHEDULER_FILE"], timeout=360) + time.sleep(30) + cluster = Comms.initialize(p2p=True) + # client, cluster = start_dask_client() + time_dask_end = time.localtime() + logger.info(f"{time.asctime(time_dask_end)}: dask client started") + + logger.info("enabling spilling") enable_spilling() - stats_ls = [] client.run(enable_spilling) - logger.info("dask client started") + logger.info("enabled spilling") + + stats_ls = [] + for dataset in datasets: m = re.match(r"(\w+)\[([0-9]+)\]", dataset) if m: diff --git a/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py b/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py index a50e40f6d55..e3151e37a25 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py @@ -34,6 +34,7 @@ def __init__( train_split=0.8, val_split=0.5, load_edge_index=True, + backend="torch", ): self.__replication_factor = replication_factor self.__disk_x = None @@ -43,6 +44,7 @@ def __init__( self.__train_split = train_split self.__val_split = val_split self.__load_edge_index = load_edge_index + self.__backend = backend def download(self): import logging @@ -152,6 +154,27 @@ def download(self): ) ldf.to_parquet(node_label_file_path) + # WholeGraph + wg_bin_file_path = os.path.join(dataset_path, "wgb", "paper") + if self.__replication_factor == 1: + wg_bin_rep_path = os.path.join(wg_bin_file_path, "node_feat.d") + else: + wg_bin_rep_path = os.path.join( + wg_bin_file_path, f"node_feat_{self.__replication_factor}x.d" + ) + + if not os.path.exists(wg_bin_rep_path): + os.makedirs(wg_bin_rep_path) + if dataset is None: + from ogb.nodeproppred import NodePropPredDataset + + dataset = NodePropPredDataset( + name="ogbn-papers100M", root=self.__dataset_dir + ) + node_feat = dataset[0][0]["node_feat"] + for k in range(self.__replication_factor): + node_feat.tofile(os.path.join(wg_bin_rep_path, f"{k:04d}.bin")) + @property def edge_index_dict( self, @@ -224,45 +247,87 @@ def edge_index_dict( @property def x_dict(self) -> Dict[str, torch.Tensor]: + if self.__disk_x is None: + if self.__backend == "wholegraph": + self.__load_x_wg() + else: + self.__load_x_torch() + + return self.__disk_x + + def __load_x_torch(self) -> None: node_type_path = os.path.join( self.__dataset_dir, "ogbn_papers100M", "npy", "paper" ) + if self.__replication_factor == 1: + full_path = os.path.join(node_type_path, "node_feat.npy") + else: + full_path = os.path.join( + node_type_path, f"node_feat_{self.__replication_factor}x.npy" + ) - if self.__disk_x is None: - if self.__replication_factor == 1: - full_path = os.path.join(node_type_path, "node_feat.npy") - else: - full_path = os.path.join( - node_type_path, f"node_feat_{self.__replication_factor}x.npy" - ) + self.__disk_x = {"paper": torch.as_tensor(np.load(full_path, mmap_mode="r"))} - self.__disk_x = {"paper": np.load(full_path, mmap_mode="r")} + def __load_x_wg(self) -> None: + import logging - return self.__disk_x + logger = logging.getLogger("OGBNPapers100MDataset") + logger.info("Loading x into WG embedding...") + + import pylibwholegraph.torch as wgth + + node_type_path = os.path.join( + self.__dataset_dir, "ogbn_papers100M", "wgb", "paper" + ) + if self.__replication_factor == 1: + full_path = os.path.join(node_type_path, "node_feat.d") + else: + full_path = os.path.join( + node_type_path, f"node_feat_{self.__replication_factor}x.d" + ) + + file_list = [os.path.join(full_path, f) for f in os.listdir(full_path)] + + x = wgth.create_embedding_from_filelist( + wgth.get_global_communicator(), + "distributed", # TODO support other options + "cpu", # TODO support GPU + file_list, + torch.float32, + 128, + ) + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + + logger.info("created x wg embedding") + + self.__disk_x = {"paper": x} @property def y_dict(self) -> Dict[str, torch.Tensor]: if self.__y is None: - self.__get_labels() + self.__get_y() return self.__y @property def train_dict(self) -> Dict[str, torch.Tensor]: if self.__train is None: - self.__get_labels() + self.__get_split() return self.__train @property def test_dict(self) -> Dict[str, torch.Tensor]: if self.__test is None: - self.__get_labels() + self.__get_split() return self.__test @property def val_dict(self) -> Dict[str, torch.Tensor]: if self.__val is None: - self.__get_labels() + self.__get_split() return self.__val @property @@ -271,7 +336,7 @@ def num_input_features(self) -> int: @property def num_labels(self) -> int: - return int(self.y_dict["paper"].max()) + 1 + return 172 def num_nodes(self, node_type: str) -> int: if node_type != "paper": @@ -285,46 +350,49 @@ def num_edges(self, edge_type: Tuple[str, str, str]) -> int: return 1_615_685_872 * self.__replication_factor - def __get_labels(self): + def __get_y(self): label_path = os.path.join( self.__dataset_dir, "ogbn_papers100M", - "parquet", + "wgb", "paper", - "node_label.parquet", + "node_label.d", + "0.bin", ) - node_label = pandas.read_parquet(label_path) - - if self.__replication_factor > 1: - orig_num_nodes = self.num_nodes("paper") // self.__replication_factor - dfr = pandas.DataFrame( - { - "node": pandas.concat( - [ - node_label.node + (r * orig_num_nodes) - for r in range(1, self.__replication_factor) - ] - ), - "label": pandas.concat( - [node_label.label for r in range(1, self.__replication_factor)] - ), - } + if self.__backend == "wholegraph": + import pylibwholegraph.torch as wgth + + node_label = wgth.create_embedding_from_filelist( + wgth.get_global_communicator(), + "distributed", # TODO support other options + "cpu", # TODO support GPU + [label_path] * self.__replication_factor, + torch.int16, + 1, + ) + + else: + node_label_1x = torch.as_tensor( + np.fromfile(label_path, dtype="int16"), device="cpu" ) - node_label = pandas.concat([node_label, dfr]).reset_index(drop=True) + if self.__replication_factor > 1: + node_label = torch.concatenate( + [node_label_1x] * self.__replication_factor + ) + else: + node_label = node_label_1x + + self.__y = {"paper": node_label} + + def __get_split(self): num_nodes = self.num_nodes("paper") - node_label_tensor = torch.full( - (num_nodes,), -1, dtype=torch.float32, device="cpu" - ) - node_label_tensor[ - torch.as_tensor(node_label.node.values, device="cpu") - ] = torch.as_tensor(node_label.label.values, device="cpu") - self.__y = {"paper": node_label_tensor.contiguous()} + node = self.y_dict["paper"][self.y_dict["paper"] > 0] train_ix, test_val_ix = train_test_split( - torch.as_tensor(node_label.node.values), + node, train_size=self.__train_split, random_state=num_nodes, ) diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py new file mode 100644 index 00000000000..610a7648801 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .models_dgl import GraphSAGE diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py new file mode 100644 index 00000000000..2cfdda2d2e7 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py @@ -0,0 +1,69 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn.functional as F + + +class GraphSAGE(torch.nn.Module): + """ + GraphSAGE model implementation for DGL + supporting both native DGL and cuGraph-ops + backends. + """ + + def __init__( + self, + in_channels, + hidden_channels, + out_channels, + num_layers, + model_backend="dgl", + ): + if model_backend == "dgl": + from dgl.nn import SAGEConv + else: + from cugraph_dgl.nn import SAGEConv + + super(GraphSAGE, self).__init__() + self.convs = torch.nn.ModuleList() + for _ in range(num_layers - 1): + self.convs.append( + SAGEConv(in_channels, hidden_channels, aggregator_type="mean") + ) + in_channels = hidden_channels + self.convs.append( + SAGEConv(hidden_channels, out_channels, aggregator_type="mean") + ) + + def forward(self, blocks, x): + """ + Runs the model forward pass given a list of blocks + and feature tensor. + """ + + for i, conv in enumerate(self.convs): + x = conv(blocks[i], x) + if i != len(self.convs) - 1: + x = F.relu(x) + x = F.dropout(x, p=0.5) + return x + + +def create_model(feat_size, num_classes, num_layers, model_backend="dgl"): + model = GraphSAGE( + feat_size, 64, num_classes, num_layers, model_backend=model_backend + ) + model = model.to("cuda") + model.train() + return model diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py index 1de791bf588..7ee400b004f 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py @@ -57,7 +57,7 @@ def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): for i, conv in enumerate(self.convs): if i > 0: - new_num_edges = edge[1][-2] + new_num_edges = int(edge[1][-2]) edge[0] = edge[0].narrow( dim=0, start=0, diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh index 27ae0dc7788..8136018c877 100755 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh @@ -12,12 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -#SBATCH -A datascience_rapids_cugraphgnn -#SBATCH -p luna -#SBATCH -J datascience_rapids_cugraphgnn-papers:bulkSamplingPyG -#SBATCH -N 1 -#SBATCH -t 00:25:00 - CONTAINER_IMAGE=${CONTAINER_IMAGE:="please_specify_container"} SCRIPTS_DIR=$(pwd) LOGS_DIR=${LOGS_DIR:=$(pwd)"/logs"} @@ -31,10 +25,11 @@ mkdir -p $DATASETS_DIR BATCH_SIZE=512 FANOUT="10_10_10" NUM_EPOCHS=1 -REPLICATION_FACTOR=1 +REPLICATION_FACTOR=2 +JOB_ID=$RANDOM -# options: PyG or cuGraphPyG -FRAMEWORK="cuGraphPyG" +# options: PyG, cuGraphPyG, or cuGraphDGL +FRAMEWORK="cuGraphDGL" GPUS_PER_NODE=8 nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) @@ -52,6 +47,7 @@ echo Num GPUs Per Node: $gpus_per_node set -e + # First run without cuGraph to get data if [[ "$FRAMEWORK" == "cuGraphPyG" ]]; then @@ -59,25 +55,10 @@ if [[ "$FRAMEWORK" == "cuGraphPyG" ]]; then srun \ --container-image $CONTAINER_IMAGE \ --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ - bash /scripts/run_sampling.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS + bash /scripts/train.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS "cugraph_pyg" $nnodes $head_node_ip $JOB_ID +elif [[ "$FRAMEWORK" == "cuGraphDGL" ]]; then + srun \ + --container-image $CONTAINER_IMAGE \ + --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ + bash /scripts/train.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS "cugraph_dgl_csr" $nnodes $head_node_ip $JOB_ID fi - -# Train -srun \ - --container-image $CONTAINER_IMAGE \ - --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ - torchrun \ - --nnodes $nnodes \ - --nproc-per-node $gpus_per_node \ - --rdzv-id $RANDOM \ - --rdzv-backend c10d \ - --rdzv-endpoint $head_node_ip:29500 \ - /scripts/bench_cugraph_training.py \ - --output_file "/logs/output.txt" \ - --framework $FRAMEWORK \ - --dataset_dir "/datasets" \ - --sample_dir "/samples" \ - --batch_size $BATCH_SIZE \ - --fanout $FANOUT \ - --replication_factor $REPLICATION_FACTOR \ - --num_epochs $NUM_EPOCHS diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh b/benchmarks/cugraph/standalone/bulk_sampling/train.sh similarity index 66% rename from benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh rename to benchmarks/cugraph/standalone/bulk_sampling/train.sh index 1b3085dcc9a..a3b85e281f1 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/train.sh @@ -21,6 +21,10 @@ FANOUT=$2 REPLICATION_FACTOR=$3 SCRIPTS_DIR=$4 NUM_EPOCHS=$5 +SAMPLING_FRAMEWORK=$6 +N_NODES=$7 +HEAD_NODE_IP=$8 +JOB_ID=$9 SAMPLES_DIR=/samples DATASET_DIR=/datasets @@ -29,12 +33,19 @@ LOGS_DIR=/logs MG_UTILS_DIR=${SCRIPTS_DIR}/mg_utils SCHEDULER_FILE=${MG_UTILS_DIR}/dask_scheduler.json -export WORKER_RMM_POOL_SIZE=28G -export UCX_MAX_RNDV_RAILS=1 +echo $SAMPLES_DIR +ls $SAMPLES_DIR + +export WORKER_RMM_POOL_SIZE=75G +#export UCX_MAX_RNDV_RAILS=1 export RAPIDS_NO_INITIALIZE=1 export CUDF_SPILL=1 -export LIBCUDF_CUFILE_POLICY="OFF" +export LIBCUDF_CUFILE_POLICY="KVIKIO" +export KVIKIO_NTHREADS=64 export GPUS_PER_NODE=8 +#export NCCL_CUMEM_ENABLE=0 +#export NCCL_DEBUG="TRACE" +export NCCL_DEBUG_FILE=/logs/nccl_debug.%h.%p export SCHEDULER_FILE=$SCHEDULER_FILE export LOGS_DIR=$LOGS_DIR @@ -59,8 +70,9 @@ else fi echo "properly waiting for workers to connect" -NUM_GPUS=$(python -c "import os; print(int(os.environ['SLURM_JOB_NUM_NODES'])*int(os.environ['GPUS_PER_NODE']))") -handleTimeout 120 python ${MG_UTILS_DIR}/wait_for_workers.py \ +export NUM_GPUS=$(python -c "import os; print(int(os.environ['SLURM_JOB_NUM_NODES'])*int(os.environ['GPUS_PER_NODE']))") +SEEDS_PER_CALL=$(python -c "import os; print(int(os.environ['NUM_GPUS'])*65536)") +handleTimeout 630 python ${MG_UTILS_DIR}/wait_for_workers.py \ --num-expected-workers ${NUM_GPUS} \ --scheduler-file-path ${SCHEDULER_FILE} @@ -76,14 +88,15 @@ if [[ $SLURM_NODEID == 0 ]]; then --datasets "ogbn_papers100M["$REPLICATION_FACTOR"]" \ --fanouts $FANOUT \ --batch_sizes $BATCH_SIZE \ - --seeds_per_call_opts "524288" \ + --seeds_per_call_opts $SEEDS_PER_CALL \ --num_epochs $NUM_EPOCHS \ - --random_seed 42 + --random_seed 42 \ + --sampling_target_framework $SAMPLING_FRAMEWORK - echo "DONE" > ${SAMPLES_DIR}/status.txt + echo "DONE" > ${LOGS_DIR}/status.txt fi -while [ ! -f "${SAMPLES_DIR}"/status.txt ] +while [ ! -f "${LOGS_DIR}"/status.txt ] do sleep 1 done @@ -106,6 +119,25 @@ if [[ ${#python_processes[@]} -gt 1 || $dask_processes ]]; then fi sleep 2 +torchrun \ + --nnodes $N_NODES \ + --nproc-per-node $GPUS_PER_NODE \ + --rdzv-id $JOB_ID \ + --rdzv-backend c10d \ + --rdzv-endpoint $HEAD_NODE_IP:29500 \ + /scripts/bench_cugraph_training.py \ + --output_file "/logs/output.txt" \ + --framework $SAMPLING_FRAMEWORK \ + --dataset_dir "/datasets" \ + --sample_dir "/samples" \ + --batch_size $BATCH_SIZE \ + --fanout $FANOUT \ + --replication_factor $REPLICATION_FACTOR \ + --num_epochs $NUM_EPOCHS \ + --use_wholegraph \ + --skip_download + + if [[ $SLURM_NODEID == 0 ]]; then - rm ${SAMPLES_DIR}/status.txt + rm ${LOGS_DIR}/status.txt fi diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py new file mode 100644 index 00000000000..03d2a51e538 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .trainers_dgl import DGLTrainer +from .trainers_cugraph_dgl import DGLCuGraphTrainer diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py new file mode 100644 index 00000000000..37745e645fd --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py @@ -0,0 +1,315 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import time +import re + +from .trainers_dgl import DGLTrainer +from models.dgl import GraphSAGE +from datasets import Dataset + +import torch +import numpy as np +import warnings + +from torch.nn.parallel import DistributedDataParallel as ddp +from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset +from cugraph.gnn import FeatureStore + +from typing import List + + +def get_dataloader( + input_file_paths: List[str], + total_num_nodes: int, + sparse_format: str, + return_type: str, +) -> torch.utils.data.DataLoader: + """ + Returns a dataloader that reads bulk samples from the given input paths. + + Parameters + ---------- + input_file_paths: List[str] + List of input parquet files containing samples. + total_num_nodes: int + Total number of nodes in the graph. + sparse_format: str + The sparse format to read (i.e. coo) + return_type: str + The type of object to be returned by the dataloader (i.e. dgl.Block) + + Returns + ------- + torch.utils.data.DataLoader + """ + + print("Creating dataloader", flush=True) + st = time.time() + if len(input_file_paths) > 0: + dataset = HomogenousBulkSamplerDataset( + total_num_nodes, + edge_dir="in", + sparse_format=sparse_format, + return_type=return_type, + ) + dataset.set_input_files(input_file_paths=input_file_paths) + dataloader = torch.utils.data.DataLoader( + dataset, + collate_fn=lambda x: x, + shuffle=False, + num_workers=0, + batch_size=None, + ) + et = time.time() + print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True) + return dataloader + else: + return [] + + +class DGLCuGraphTrainer(DGLTrainer): + """ + Trainer implementation for cuGraph-DGL that supports + WholeGraph as a feature store. + """ + + def __init__( + self, + dataset: Dataset, + model: str = "GraphSAGE", + device: int = 0, + rank: int = 0, + world_size: int = 1, + gpus_per_node: int = 1, + num_epochs: int = 1, + sample_dir: str = ".", + backend: str = "torch", + **kwargs, + ): + """ + Parameters + ---------- + dataset: Dataset + The dataset to train on. + model: str + The model to use for training. + Currently only "GraphSAGE" is supported. + device: int, default=0 + The CUDA device to use. + rank: int, default=0 + The global rank of the worker this trainer is assigned to. + world_size: int, default=1 + The number of workers in the world. + num_epochs: int, default=1 + The number of training epochs to run. + sample_dir: str, default="." + The directory where samples generated by the bulk sampler + are stored. + backend: str, default="torch" + The feature store backend to be used by the cuGraph Feature Store. + Defaults to "torch". Options are "torch" and "wholegraph" + kwargs + Keyword arguments to pass to the loader + """ + self.__data = None + self.__device = device + self.__rank = rank + self.__world_size = world_size + self.__gpus_per_node = gpus_per_node + self.__num_epochs = num_epochs + self.__dataset = dataset + self.__sample_dir = sample_dir + self.__loader_kwargs = kwargs + self.__model = self.get_model(model) + self.__optimizer = None + self.__backend = backend + + @property + def rank(self): + return self.__rank + + @property + def model(self): + return self.__model + + @property + def dataset(self): + return self.__dataset + + @property + def optimizer(self): + if self.__optimizer is None: + self.__optimizer = torch.optim.Adam( + self.model.parameters(), lr=0.01, weight_decay=0.0005 + ) + return self.__optimizer + + @property + def num_epochs(self) -> int: + return self.__num_epochs + + def get_loader(self, epoch: int = 0, stage="train") -> int: + # TODO support online sampling + if stage == "train": + path = os.path.join(self.__sample_dir, f"epoch={epoch}", stage, "samples") + elif stage in ["test", "val"]: + path = os.path.join(self.__sample_dir, stage, "samples") + else: + raise ValueError(f"Invalid stage {stage}") + + input_file_paths, num_batches = self.get_input_files( + path, epoch=epoch, stage=stage + ) + + dataloader = get_dataloader( + input_file_paths=input_file_paths.tolist(), + total_num_nodes=None, + sparse_format="csc", + return_type="cugraph_dgl.nn.SparseGraph", + ) + return dataloader, num_batches + + @property + def data(self): + import logging + + logger = logging.getLogger("DGLCuGraphTrainer") + logger.info("getting data") + + if self.__data is None: + logger.info("using wholegraph backend") + if self.__backend == "wholegraph": + fs = FeatureStore( + backend="wholegraph", + wg_type="chunked", + wg_location="cpu", + ) + else: + fs = FeatureStore(backend=self.__backend) + num_nodes_dict = {} + + if self.__backend == "wholegraph": + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + + for node_type, x in self.__dataset.x_dict.items(): + logger.debug(f"getting x for {node_type}") + fs.add_data(x, node_type, "x") + num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) + if self.__backend == "wholegraph": + wm_comm.barrier() + + for node_type, y in self.__dataset.y_dict.items(): + logger.debug(f"getting y for {node_type}") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs.add_data(y, node_type, "y") + wm_comm.barrier() + else: + y = y.cuda() + y = y.reshape((y.shape[0], 1)) + fs.add_data(y, node_type, "y") + + """ + for node_type, train in self.__dataset.train_dict.items(): + logger.debug(f"getting train for {node_type}") + train = train.reshape((train.shape[0], 1)) + if self.__backend != "wholegraph": + train = train.cuda() + fs.add_data(train, node_type, "train") + + for node_type, test in self.__dataset.test_dict.items(): + logger.debug(f"getting test for {node_type}") + test = test.reshape((test.shape[0], 1)) + if self.__backend != "wholegraph": + test = test.cuda() + fs.add_data(test, node_type, "test") + + for node_type, val in self.__dataset.val_dict.items(): + logger.debug(f"getting val for {node_type}") + val = val.reshape((val.shape[0], 1)) + if self.__backend != "wholegraph": + val = val.cuda() + fs.add_data(val, node_type, "val") + """ + + # # TODO support online sampling if the edge index is provided + # num_edges_dict = self.__dataset.edge_index_dict + # if not isinstance(list(num_edges_dict.values())[0], int): + # num_edges_dict = {k: len(v) for k, v in num_edges_dict} + + if self.__backend == "wholegraph": + wm_comm.barrier() + + self.__data = fs + return self.__data + + def get_model(self, name="GraphSAGE"): + if name != "GraphSAGE": + raise ValueError("only GraphSAGE is currently supported") + + num_input_features = self.__dataset.num_input_features + num_output_features = self.__dataset.num_labels + num_layers = len(self.__loader_kwargs["num_neighbors"]) + + with torch.cuda.device(self.__device): + model = ( + GraphSAGE( + in_channels=num_input_features, + hidden_channels=64, + out_channels=num_output_features, + num_layers=num_layers, + model_backend="cugraph_dgl", + ) + .to(torch.float32) + .to(self.__device) + ) + # TODO: Fix for distributed models + if torch.distributed.is_initialized(): + model = ddp(model, device_ids=[self.__device]) + else: + warnings.warn("Distributed training is not available") + print("done creating model") + + return model + + def get_input_files(self, path, epoch=0, stage="train"): + file_list = np.array([f.path for f in os.scandir(path)]) + file_list.sort() + np.random.seed(epoch) + np.random.shuffle(file_list) + + splits = np.array_split(file_list, self.__gpus_per_node) + + ex = re.compile(r"batch=([0-9]+)\-([0-9]+).parquet") + num_batches = min( + [ + sum( + [ + int(ex.match(fname.split("/")[-1])[2]) + - int(ex.match(fname.split("/")[-1])[1]) + for fname in s + ] + ) + for s in splits + ] + ) + if num_batches == 0: + raise ValueError( + f"Too few batches for training with world size {self.__world_size}" + ) + + return splits[self.__device], num_batches diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py new file mode 100644 index 00000000000..fad986257b2 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py @@ -0,0 +1,361 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import torch +import torch.distributed as td +import torch.nn.functional as F +from torchmetrics import Accuracy +from trainers import Trainer +import time + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cugraph.gnn import FeatureStore + + +def get_features(input_nodes, output_nodes, feature_store, key="paper"): + if isinstance(input_nodes, dict): + input_nodes = input_nodes[key] + if isinstance(output_nodes, dict): + output_nodes = output_nodes[key] + + # TODO: Fix below + # Adding based on assumption that cpu features + # and gpu index is not supported yet + + if feature_store.backend == "torch": + input_nodes = input_nodes.to("cpu") + output_nodes = output_nodes.to("cpu") + + x = feature_store.get_data(indices=input_nodes, type_name=key, feat_name="x") + y = feature_store.get_data(indices=output_nodes, type_name=key, feat_name="y") + y = y.reshape((y.shape[0],)) + return x, y + + +def log_batch( + logger: logging.Logger, + iter_i: int, + num_batches: int, + time_forward: int, + time_backward: int, + time_start: int, + loader_time_iter: int, + epoch: int, + rank: int, +): + """ + Logs the current performance of the trainer. + + Parameters + ---------- + logger: logging.Logger + The logger to use for logging the performance details. + iter_i: int + The current training iteration. + num_batches: int + The number of batches processed so far + time_forward: int + The total amount of time for the model forward pass so far + time_backward: int + The total amount of the for the model backwards pass so far + time_start: int + The time at which training was started + loader_time_iter: int + The time taken by the loader in the current iteraiton + epoch: int + The current training epoch + rank: int + The global rank of this worker + + Returns + ------- + None + """ + + time_forward_iter = time_forward / num_batches + time_backward_iter = time_backward / num_batches + total_time_iter = (time.perf_counter() - time_start) / num_batches + logger.info(f"epoch {epoch}, iteration {iter_i}, rank {rank}") + logger.info(f"time forward: {time_forward_iter}") + logger.info(f"time backward: {time_backward_iter}") + logger.info(f"loader time: {loader_time_iter}") + logger.info(f"total time: {total_time_iter}") + + +def train_epoch( + model, + optimizer, + loader, + feature_store, + epoch, + num_classes, + time_d, + logger, + rank, + max_num_batches, +): + """ + Train the model for one epoch. + model: The model to train. + optimizer: The optimizer to use. + loader: The loader to use. + data: cuGraph.gnn.FeatueStore + epoch: The epoch number. + num_classes: The number of classes. + time_d: A dictionary of times. + logger: The logger to use. + rank: Global rank + max_num_batches: Number of batches after which to quit (to avoid hang due to asymmetry) + """ + model = model.train() + time_feature_indexing = time_d["time_feature_indexing"] + time_feature_transfer = time_d["time_feature_transfer"] + time_forward = time_d["time_forward"] + time_backward = time_d["time_backward"] + time_loader = time_d["time_loader"] + + time_start = time.perf_counter() + end_time_backward = time.perf_counter() + + num_batches = 0 + + for iter_i, (input_nodes, output_nodes, blocks) in enumerate(loader): + loader_time_iter = time.perf_counter() - end_time_backward + time_loader += loader_time_iter + feature_indexing_time_start = time.perf_counter() + x, y_true = get_features(input_nodes, output_nodes, feature_store=feature_store) + additional_feature_time_end = time.perf_counter() + time_feature_indexing += ( + additional_feature_time_end - feature_indexing_time_start + ) + feature_trasfer_time_start = time.perf_counter() + x = x.to("cuda") + y_true = y_true.to("cuda") + time_feature_transfer += time.perf_counter() - feature_trasfer_time_start + num_batches += 1 + + start_time_forward = time.perf_counter() + y_pred = model( + blocks, + x, + ) + end_time_forward = time.perf_counter() + time_forward += end_time_forward - start_time_forward + + if y_pred.shape[0] > len(y_true): + raise ValueError(f"illegal shape: {y_pred.shape}; {y_true.shape}") + + y_true = y_true[: y_pred.shape[0]] + y_true = F.one_hot( + y_true.to(torch.int64), + num_classes=num_classes, + ).to(torch.float32) + + if y_true.shape != y_pred.shape: + raise ValueError( + f"y_true shape was {y_true.shape} " + f"but y_pred shape was {y_pred.shape} " + f"in iteration {iter_i} " + f"on rank {y_pred.device.index}" + ) + + start_time_backward = time.perf_counter() + loss = F.cross_entropy(y_pred, y_true) + optimizer.zero_grad() + loss.backward() + optimizer.step() + end_time_backward = time.perf_counter() + time_backward += end_time_backward - start_time_backward + + if iter_i % 50 == 0: + log_batch( + logger=logger, + iter_i=iter_i, + num_batches=num_batches, + time_forward=time_forward, + time_backward=time_backward, + time_start=time_start, + loader_time_iter=loader_time_iter, + epoch=epoch, + rank=rank, + ) + + if max_num_batches is not None and iter_i >= max_num_batches: + break + + time_d["time_loader"] += time_loader + time_d["time_feature_indexing"] += time_feature_indexing + time_d["time_feature_transfer"] += time_feature_transfer + time_d["time_forward"] += time_forward + time_d["time_backward"] += time_backward + + return num_batches + + +def get_accuracy( + model: torch.nn.Module, + loader: torch.utils.DataLoader, + feature_store: FeatureStore, + num_classes: int, + max_num_batches: int, +) -> float: + """ + Computes the accuracy given a loader that ouputs evaluation data, the model being evaluated, + the feature store where node features are stored, and the number of output classes. + + Parameters + ---------- + model: torch.nn.Module + The model being evaluated + loader: torch.utils.DataLoader + The loader over evaluation samples + feature_store: cugraph.gnn.FeatureStore + The feature store containing node features + num_classes: int + The number of output classes of the model + max_num_batches: int + The number of batches to iterate for, will quit after reaching this number. + Used to avoid hang due to asymmetric input. + + Returns + ------- + float + The calcuated accuracy, as a percentage. + + """ + + print("Computing accuracy...", flush=True) + acc = Accuracy(task="multiclass", num_classes=num_classes).cuda() + acc_sum = 0.0 + num_batches = 0 + with torch.no_grad(): + for iter_i, (input_nodes, output_nodes, blocks) in enumerate(loader): + x, y_true = get_features( + input_nodes, output_nodes, feature_store=feature_store + ) + x = x.to("cuda") + y_true = y_true.to("cuda") + + out = model(blocks, x) + batch_size = out.shape[0] + acc_sum += acc(out[:batch_size].softmax(dim=-1), y_true[:batch_size]) + num_batches += 1 + + if max_num_batches is not None and iter_i >= max_num_batches: + break + + num_batches = num_batches + + acc_sum = torch.tensor(float(acc_sum), dtype=torch.float32, device="cuda") + td.all_reduce(acc_sum, op=td.ReduceOp.SUM) + nb = torch.tensor(float(num_batches), dtype=torch.float32, device=acc_sum.device) + td.all_reduce(nb, op=td.ReduceOp.SUM) + + acc = acc_sum / nb + + print( + f"Accuracy: {acc * 100.0:.4f}%", + ) + return acc * 100.0 + + +class DGLTrainer(Trainer): + """ + Trainer implementation for node classification in DGL. + """ + + def train(self): + logger = logging.getLogger("DGLTrainer") + time_d = { + "time_loader": 0.0, + "time_feature_indexing": 0.0, + "time_feature_transfer": 0.0, + "time_forward": 0.0, + "time_backward": 0.0, + } + total_batches = 0 + for epoch in range(self.num_epochs): + start_time = time.perf_counter() + self.model.train() + with td.algorithms.join.Join( + [self.model], divide_by_initial_world_size=False + ): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="train") + num_batches = train_epoch( + model=self.model, + optimizer=self.optimizer, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + epoch=epoch, + time_d=time_d, + logger=logger, + rank=self.rank, + max_num_batches=max_num_batches, + ) + total_batches = total_batches + num_batches + end_time = time.perf_counter() + epoch_time_taken = end_time - start_time + print( + f"RANK: {self.rank} Total time taken for training epoch {epoch} = {epoch_time_taken}", + flush=True, + ) + print("---" * 30) + td.barrier() + self.model.eval() + with td.algorithms.join.Join( + [self.model], divide_by_initial_world_size=False + ): + # test + loader, max_num_batches = self.get_loader(epoch=epoch, stage="test") + test_acc = get_accuracy( + model=self.model.module, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + max_num_batches=max_num_batches, + ) + print(f"Accuracy: {test_acc:.4f}%") + + # val: + self.model.eval() + with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="val") + val_acc = get_accuracy( + model=self.model.module, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + max_num_batches=max_num_batches, + ) + print(f"Validation Accuracy: {val_acc:.4f}%") + + val_acc = float(val_acc) + stats = { + "Accuracy": val_acc, + "# Batches": total_batches, + "Loader Time": time_d["time_loader"], + "Feature Time": time_d["time_feature_indexing"] + + time_d["time_feature_transfer"], + "Forward Time": time_d["time_forward"], + "Backward Time": time_d["time_backward"], + } + return stats + + +# For native DGL training, see benchmarks/cugraph-dgl/scale-benchmarks diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py index 71151e9ba59..833322deffe 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py @@ -13,41 +13,84 @@ from .trainers_pyg import PyGTrainer from models.pyg import CuGraphSAGE +from datasets import Dataset import torch import numpy as np from torch.nn.parallel import DistributedDataParallel as ddp +from torch.distributed.optim import ZeroRedundancyOptimizer from cugraph.gnn import FeatureStore from cugraph_pyg.data import CuGraphStore from cugraph_pyg.loader import BulkSampleLoader import os +import re class PyGCuGraphTrainer(PyGTrainer): + """ + Trainer implementation for cuGraph-PyG that supports + WholeGraph as a feature store. + """ + def __init__( self, - dataset, - model="GraphSAGE", - device=0, - rank=0, - world_size=1, - num_epochs=1, - sample_dir=".", + dataset: Dataset, + model: str = "GraphSAGE", + device: int = 0, + rank: int = 0, + world_size: int = 1, + gpus_per_node: int = 1, + num_epochs: int = 1, + sample_dir: str = ".", + backend: str = "torch", **kwargs, ): + """ + Parameters + ---------- + dataset: Dataset + The dataset to train on. + model: str + The model to use for training. + Currently only "GraphSAGE" is supported. + device: int, default=0 + The CUDA device to use. + rank: int, default=0 + The global rank of the worker this trainer is assigned to. + world_size: int, default=1 + The number of workers in the world. + num_epochs: int, default=1 + The number of training epochs to run. + sample_dir: str, default="." + The directory where samples generated by the bulk sampler + are stored. + backend: str, default="torch" + The feature store backend to be used by the cuGraph Feature Store. + Defaults to "torch". Options are "torch" and "wholegraph" + kwargs + Keyword arguments to pass to the loader. + """ + + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + logger.info("creating trainer") self.__data = None self.__device = device self.__rank = rank self.__world_size = world_size + self.__gpus_per_node = gpus_per_node self.__num_epochs = num_epochs self.__dataset = dataset self.__sample_dir = sample_dir self.__loader_kwargs = kwargs self.__model = self.get_model(model) + self.__backend = backend self.__optimizer = None + logger.info("created trainer") @property def rank(self): @@ -64,8 +107,11 @@ def dataset(self): @property def optimizer(self): if self.__optimizer is None: - self.__optimizer = torch.optim.Adam( - self.model.parameters(), lr=0.01, weight_decay=0.0005 + self.__optimizer = ZeroRedundancyOptimizer( + self.model.parameters(), + lr=0.01, + weight_decay=0.0005, + optimizer_class=torch.optim.Adam, ) return self.__optimizer @@ -73,7 +119,7 @@ def optimizer(self): def num_epochs(self) -> int: return self.__num_epochs - def get_loader(self, epoch: int = 0, stage="train") -> int: + def get_loader(self, epoch: int = 0, stage="train"): import logging logger = logging.getLogger("PyGCuGraphTrainer") @@ -81,22 +127,25 @@ def get_loader(self, epoch: int = 0, stage="train") -> int: logger.info(f"getting loader for epoch {epoch}, {stage} stage") # TODO support online sampling - if stage == "val": - path = os.path.join(self.__sample_dir, "val", "samples") - else: + if stage == "train": path = os.path.join(self.__sample_dir, f"epoch={epoch}", stage, "samples") + elif stage in ["test", "val"]: + path = os.path.join(self.__sample_dir, stage, "samples") + else: + raise ValueError(f"invalid stage {stage}") + input_files, num_batches = self.get_input_files(path, epoch=epoch, stage=stage) loader = BulkSampleLoader( self.data, self.data, None, # FIXME get input nodes properly directory=path, - input_files=self.get_input_files(path, epoch=epoch, stage=stage), + input_files=input_files, **self.__loader_kwargs, ) logger.info(f"got loader successfully on rank {self.rank}") - return loader + return loader, num_batches @property def data(self): @@ -106,36 +155,73 @@ def data(self): logger.info("getting data") if self.__data is None: - # FIXME wholegraph - fs = FeatureStore(backend="torch") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs = FeatureStore( + backend="wholegraph", + wg_type="chunked", + wg_location="cpu", + ) + else: + fs = FeatureStore(backend=self.__backend) num_nodes_dict = {} + if self.__backend == "wholegraph": + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + for node_type, x in self.__dataset.x_dict.items(): logger.debug(f"getting x for {node_type}") fs.add_data(x, node_type, "x") num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) + if self.__backend == "wholegraph": + wm_comm.barrier() for node_type, y in self.__dataset.y_dict.items(): logger.debug(f"getting y for {node_type}") - fs.add_data(y, node_type, "y") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs.add_data(y, node_type, "y") + wm_comm.barrier() + else: + y = y.cuda() + y = y.reshape((y.shape[0], 1)) + fs.add_data(y, node_type, "y") + + """ for node_type, train in self.__dataset.train_dict.items(): logger.debug(f"getting train for {node_type}") + train = train.reshape((train.shape[0], 1)) + if self.__backend != "wholegraph": + train = train.cuda() fs.add_data(train, node_type, "train") for node_type, test in self.__dataset.test_dict.items(): logger.debug(f"getting test for {node_type}") + test = test.reshape((test.shape[0], 1)) + if self.__backend != "wholegraph": + test = test.cuda() fs.add_data(test, node_type, "test") for node_type, val in self.__dataset.val_dict.items(): logger.debug(f"getting val for {node_type}") + val = val.reshape((val.shape[0], 1)) + if self.__backend != "wholegraph": + val = val.cuda() fs.add_data(val, node_type, "val") + """ # TODO support online sampling if the edge index is provided num_edges_dict = self.__dataset.edge_index_dict if not isinstance(list(num_edges_dict.values())[0], int): num_edges_dict = {k: len(v) for k, v in num_edges_dict} + if self.__backend == "wholegraph": + wm_comm.barrier() + self.__data = CuGraphStore( fs, num_edges_dict, @@ -147,14 +233,28 @@ def data(self): return self.__data def get_model(self, name="GraphSAGE"): + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + + logger.info("Creating model...") + if name != "GraphSAGE": raise ValueError("only GraphSAGE is currently supported") + logger.info("getting input features...") num_input_features = self.__dataset.num_input_features + + logger.info("getting output features...") num_output_features = self.__dataset.num_labels + + logger.info("getting num neighbors...") num_layers = len(self.__loader_kwargs["num_neighbors"]) + logger.info("Got input features, output features, num neighbors") + with torch.cuda.device(self.__device): + logger.info("Constructing CuGraphSAGE model...") model = ( CuGraphSAGE( in_channels=num_input_features, @@ -166,8 +266,10 @@ def get_model(self, name="GraphSAGE"): .to(self.__device) ) + logger.info("Parallelizing model with ddp...") model = ddp(model, device_ids=[self.__device]) - print("done creating model") + + logger.info("done creating model") return model @@ -175,10 +277,28 @@ def get_input_files(self, path, epoch=0, stage="train"): file_list = np.array(os.listdir(path)) file_list.sort() - if stage == "train": - splits = np.array_split(file_list, self.__world_size) - np.random.seed(epoch) - np.random.shuffle(splits) - return splits[self.rank] - else: - return file_list + np.random.seed(epoch) + np.random.shuffle(file_list) + + splits = np.array_split(file_list, self.__gpus_per_node) + + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + + split = splits[self.__device] + logger.info(f"rank {self.__rank} input files: {str(split)}") + + ex = re.compile(r"batch=([0-9]+)\-([0-9]+).parquet") + num_batches = min( + [ + sum([int(ex.match(fname)[2]) - int(ex.match(fname)[1]) for fname in s]) + for s in splits + ] + ) + if num_batches == 0: + raise ValueError( + f"Too few batches for training with world size {self.__world_size}" + ) + + return split, num_batches diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py index bddd6ae2644..d6205901b68 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py @@ -33,7 +33,12 @@ import time -def pyg_num_workers(world_size): +def pyg_num_workers(world_size: int) -> int: + """ + Calculates the number of workers for the + loader in PyG by calling sched_getaffinity. + """ + num_workers = None if hasattr(os, "sched_getaffinity"): try: @@ -45,14 +50,80 @@ def pyg_num_workers(world_size): return int(num_workers) +def calc_accuracy( + loader: NeighborLoader, + max_num_batches: int, + model: torch.nn.Module, + num_classes: int, +) -> float: + """ + Evaluates the accuracy of a model given a loader over evaluation samples. + + Parameters + ---------- + loader: NeighborLoader + The loader over evaluation samples. + model: torch.nn.Module + The model being evaluated. + num_classes: int + The number of output classes of the model. + + Returns + ------- + The calculated accuracy as a fraction. + """ + + from torchmetrics import Accuracy + + acc = Accuracy(task="multiclass", num_classes=num_classes).cuda() + + acc_sum = 0.0 + num_batches = 0 + with torch.no_grad(): + for i, batch in enumerate(loader): + num_sampled_nodes = sum( + [torch.as_tensor(n) for n in batch.num_sampled_nodes_dict.values()] + ) + num_sampled_edges = sum( + [torch.as_tensor(e) for e in batch.num_sampled_edges_dict.values()] + ) + batch_size = num_sampled_nodes[0] + + batch = batch.to_homogeneous().cuda() + + batch.y = batch.y.to(torch.long).reshape((batch.y.shape[0],)) + + out = model( + batch.x, + batch.edge_index, + num_sampled_nodes, + num_sampled_edges, + ) + acc_sum += acc(out[:batch_size].softmax(dim=-1), batch.y[:batch_size]) + num_batches += 1 + + if max_num_batches is not None and i >= max_num_batches: + break + + acc_sum = torch.tensor(float(acc_sum), dtype=torch.float32, device="cuda") + td.all_reduce(acc_sum, op=td.ReduceOp.SUM) + nb = torch.tensor(float(num_batches), dtype=torch.float32, device=acc_sum.device) + td.all_reduce(nb, op=td.ReduceOp.SUM) + + return acc_sum / nb + + class PyGTrainer(Trainer): + """ + Trainer implementation for node classification in PyG. + """ + def train(self): import logging logger = logging.getLogger("PyGTrainer") logger.info("Entered train loop") - total_loss = 0.0 num_batches = 0 time_forward = 0.0 @@ -62,19 +133,32 @@ def train(self): start_time = time.perf_counter() end_time_backward = start_time + num_layers = len(self.model.module.convs) + for epoch in range(self.num_epochs): with td.algorithms.join.Join( - [self.model], divide_by_initial_world_size=False + [self.model, self.optimizer], divide_by_initial_world_size=False ): self.model.train() - for iter_i, data in enumerate( - self.get_loader(epoch=epoch, stage="train") - ): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="train") + + max_num_batches = torch.tensor([max_num_batches], device="cuda") + torch.distributed.all_reduce( + max_num_batches, op=torch.distributed.ReduceOp.MIN + ) + max_num_batches = int(max_num_batches[0]) + + for iter_i, data in enumerate(loader): loader_time_iter = time.perf_counter() - end_time_backward time_loader += loader_time_iter time_feature_transfer_start = time.perf_counter() + if len(data.edge_index_dict[("paper", "cites", "paper")][0]) < 3: + logger.error(f"Invalid edge index in iteration {iter_i}") + data = old_data + + old_data = data num_sampled_nodes = sum( [ torch.as_tensor(n) @@ -89,7 +173,6 @@ def train(self): ) # FIXME find a way to get around this and not have to call extend_tensor - num_layers = len(self.model.module.convs) num_sampled_nodes = extend_tensor(num_sampled_nodes, num_layers + 1) num_sampled_edges = extend_tensor(num_sampled_edges, num_layers) @@ -118,7 +201,12 @@ def train(self): ) logger.info(f"total time: {total_time_iter}") + # from pynvml.smi import nvidia_smi + # mem_info = nvidia_smi.getInstance().DeviceQuery('memory.free, memory.total')['gpu'][self.rank % 8]['fb_memory_usage'] + # logger.info(f"rank {self.rank} memory: {mem_info}") + y_true = data.y + y_true = y_true.reshape((y_true.shape[0],)) x = data.x.to(torch.float32) start_time_forward = time.perf_counter() @@ -160,101 +248,48 @@ def train(self): self.optimizer.zero_grad() loss.backward() self.optimizer.step() - total_loss += loss.item() end_time_backward = time.perf_counter() time_backward += end_time_backward - start_time_backward - end_time = time.perf_counter() - - # test - from torchmetrics import Accuracy + if max_num_batches is not None and iter_i >= max_num_batches: + break - acc = Accuracy( - task="multiclass", num_classes=self.dataset.num_labels - ).cuda() + end_time = time.perf_counter() + """ + logger.info("Entering test stage...") with td.algorithms.join.Join( [self.model], divide_by_initial_world_size=False ): self.model.eval() - if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="test") - ): - num_sampled_nodes = sum( - [ - torch.as_tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.as_tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) - print( - f"Accuracy: {acc_sum/(i) * 100.0:.4f}%", - ) + loader, max_num_batches = self.get_loader(epoch=epoch, stage="test") + num_classes = self.dataset.num_labels - td.barrier() + acc = calc_accuracy( + loader, max_num_batches, self.model.module, num_classes + ) - with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): - self.model.eval() if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="val") - ): - num_sampled_nodes = sum( - [ - torch.as_tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.as_tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) print( - f"Validation Accuracy: {acc_sum/(i) * 100.0:.4f}%", + f"Accuracy: {acc * 100.0:.4f}%", ) + """ + + """ + logger.info("Entering validation stage") + with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): + self.model.eval() + loader, max_num_batches = self.get_loader(epoch=epoch, stage="val") + num_classes = self.dataset.num_labels + acc = calc_accuracy(loader, max_num_batches, self.model.module, num_classes) + + if self.rank == 0: + print( + f"Validation Accuracy: {acc * 100.0:.4f}%", + ) + """ stats = { - "Accuracy": float(acc_sum / (i) * 100.0) if self.rank == 0 else 0.0, "# Batches": num_batches, "Loader Time": time_loader, "Feature Transfer Time": time_feature_transfer, @@ -265,6 +300,12 @@ def train(self): class PyGNativeTrainer(PyGTrainer): + """ + Trainer implementation for native PyG + training using HeteroData as the graph and feature + store and NeighborLoader as the loader. + """ + def __init__( self, dataset, @@ -403,7 +444,7 @@ def get_loader(self, epoch: int = 0, stage="train"): ) logger.info("done creating loader") - return loader + return loader, None def get_model(self, name="GraphSAGE"): if name != "GraphSAGE": diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py index 815fd30d8eb..f6fe38fe9f8 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -63,6 +63,10 @@ def __getitem__(self, idx: int): fn, batch_offset = self._batch_to_fn_d[idx] if fn != self._current_batch_fn: + # Remove current batches to free up memory + # before loading new batches + if hasattr(self, "_current_batches"): + del self._current_batches if self.sparse_format == "csc": df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True) self._current_batches = ( diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index 05d540b7c45..df16fc9fd6c 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -1083,13 +1083,12 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType: idx = attr.index if idx is not None: - if feature_backend == "torch": + if feature_backend in ["torch", "wholegraph"]: if not isinstance(idx, torch.Tensor): raise TypeError( f"Type {type(idx)} invalid" f" for feature store backend {feature_backend}" ) - idx = idx.cpu() elif feature_backend == "numpy": # allow feature indexing through cupy arrays if isinstance(idx, cupy.ndarray): @@ -1244,5 +1243,77 @@ def _infer_unspecified_attr(self, attr: CuGraphTensorAttr) -> CuGraphTensorAttr: return attr + def filter( + self, + format: str, + node_dict: Dict[str, torch.Tensor], + row_dict: Dict[str, torch.Tensor], + col_dict: Dict[str, torch.Tensor], + edge_dict: Dict[str, Tuple[torch.Tensor]], + ) -> torch_geometric.data.HeteroData: + """ + Parameters + ---------- + format: str + COO or CSC + node_dict: Dict[str, torch.Tensor] + IDs of nodes in original store being outputted + row_dict: Dict[str, torch.Tensor] + Renumbered output edge index row + col_dict: Dict[str, torch.Tensor] + Renumbered output edge index column + edge_dict: Dict[str, Tuple[torch.Tensor]] + Currently unused original edge mapping + """ + data = torch_geometric.data.HeteroData() + + # TODO use torch_geometric.EdgeIndex in release 24.04 (Issue #4051) + for attr in self.get_all_edge_attrs(): + key = attr.edge_type + if key in row_dict and key in col_dict: + if format == "CSC": + data.put_edge_index( + (row_dict[key], col_dict[key]), + edge_type=key, + layout="csc", + is_sorted=True, + ) + else: + data[key].edge_index = torch.stack( + [ + row_dict[key], + col_dict[key], + ], + dim=0, + ) + + required_attrs = [] + # To prevent copying multiple times, we use a cache; + # the original node_dict serves as the gpu cache if needed + node_dict_cpu = {} + for attr in self.get_all_tensor_attrs(): + if attr.group_name in node_dict: + device = self.__features.get_storage(attr.group_name, attr.attr_name) + attr.index = node_dict[attr.group_name] + if not isinstance(attr.index, torch.Tensor): + raise ValueError("Node index must be a tensor!") + if attr.index.is_cuda and device == "cpu": + if attr.group_name not in node_dict_cpu: + node_dict_cpu[attr.group_name] = attr.index.cpu() + attr.index = node_dict_cpu[attr.group_name] + elif attr.index.is_cpu and device == "cuda": + node_dict_cpu[attr.group_name] = attr.index + node_dict[attr.group_name] = attr.index.cuda() + attr.index = node_dict[attr.group_name] + + required_attrs.append(attr) + data[attr.group_name].num_nodes = attr.index.size(0) + + tensors = self.multi_get_tensor(required_attrs) + for i, attr in enumerate(required_attrs): + data[attr.group_name][attr.attr_name] = tensors[i] + + return data + def __len__(self): return len(self.get_all_tensor_attrs()) diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index bcfaf579820..55c9e9b3329 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -28,7 +28,6 @@ _sampler_output_from_sampling_results_heterogeneous, _sampler_output_from_sampling_results_homogeneous_csr, _sampler_output_from_sampling_results_homogeneous_coo, - filter_cugraph_store_csc, ) from typing import Union, Tuple, Sequence, List, Dict @@ -454,31 +453,20 @@ def __next__(self): start_time_feature = perf_counter() # Create a PyG HeteroData object, loading the required features - if self.__coo: - pyg_filter_fn = ( - torch_geometric.loader.utils.filter_custom_hetero_store - if hasattr(torch_geometric.loader.utils, "filter_custom_hetero_store") - else torch_geometric.loader.utils.filter_custom_store - ) - out = pyg_filter_fn( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) - else: - out = filter_cugraph_store_csc( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) + if self.__graph_store != self.__feature_store: + # TODO Possibly support this if there is an actual use case + raise ValueError("Separate graph and feature stores currently unsupported") + + out = self.__graph_store.filter( + "COO" if self.__coo else "CSC", + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) # Account for CSR format in cuGraph vs. CSC format in PyG + # TODO deprecate and remove this functionality if self.__coo and self.__graph_store.order == "CSC": for edge_type in out.edge_index_dict: out[edge_type].edge_index = out[edge_type].edge_index.flip(dims=[0]) diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py index 65cb63d25e0..ffab54efe08 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py @@ -411,6 +411,10 @@ def filter_cugraph_store_csc( col_dict: Dict[str, torch.Tensor], edge_dict: Dict[str, Tuple[torch.Tensor]], ) -> torch_geometric.data.HeteroData: + """ + Deprecated + """ + data = torch_geometric.data.HeteroData() for attr in graph_store.get_all_edge_attrs(): diff --git a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py index 77a53882fc4..f0186220114 100644 --- a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py +++ b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -168,19 +168,54 @@ def get_data( feat, wgth.WholeMemoryEmbedding ): indices_tensor = ( - indices + indices.cuda() if isinstance(indices, torch.Tensor) else torch.as_tensor(indices, device="cuda") ) return feat.gather(indices_tensor) - else: - return feat[indices] + elif not isinstance(torch, MissingModule) and isinstance(feat, torch.Tensor): + if indices is not None: + if not isinstance(indices, torch.Tensor): + indices = torch.as_tensor(indices) + + if feat.is_cpu and indices.is_cuda: + # TODO maybe add a warning here + indices = indices.cpu() + return feat[indices] def get_feature_list(self) -> list[str]: return {feat_name: feats.keys() for feat_name, feats in self.fd.items()} + def get_storage(self, type_name: str, feat_name: str) -> str: + """ + Returns where the data is stored (cuda, cpu). + Note: will return "cuda" for data managed by CUDA, even if + it is in host memory. + + Parameters + ---------- + type_name : str + The node-type/edge-type to store data + feat_name: + The feature name to retrieve data for + + Returns + ------- + "cuda" for data managed by CUDA, otherwise "CPU". + """ + feat = self.fd[feat_name][type_name] + if not isinstance(wgth, MissingModule) and isinstance( + feat, wgth.WholeMemoryEmbedding + ): + return "cuda" + elif isinstance(feat, torch.Tensor): + return "cpu" if feat.is_cpu else "cuda" + else: + return "cpu" + @staticmethod def _cast_feat_obj_to_backend(feat_obj, backend: str, **kwargs): + # TODO (Issue #4078) support casting WG tensors to numpy and torch if backend == "numpy": if isinstance(feat_obj, (cudf.DataFrame, pd.DataFrame)): return _cast_to_numpy_ar(feat_obj.values, **kwargs) @@ -192,6 +227,8 @@ def _cast_feat_obj_to_backend(feat_obj, backend: str, **kwargs): else: return _cast_to_torch_tensor(feat_obj, **kwargs) elif backend == "wholegraph": + if isinstance(feat_obj, wgth.WholeMemoryEmbedding): + return feat_obj return _get_wg_embedding(feat_obj, **kwargs)