diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 7f0b95e3573..7c8c9973462 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -15,6 +15,7 @@ jobs: - checks - conda-cpp-build - conda-cpp-tests + - conda-cpp-checks - conda-notebook-tests - conda-python-build - conda-python-tests @@ -52,6 +53,14 @@ jobs: uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-tests.yaml@branch-24.04 with: build_type: pull-request + conda-cpp-checks: + needs: conda-cpp-build + secrets: inherit + uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-post-build-checks.yaml@branch-24.04 + with: + build_type: pull-request + enable_check_symbols: true + symbol_exclusions: (cugraph::ops|hornet|void writeEdgeCountsKernel|void markUniqueOffsetsKernel) conda-python-build: needs: conda-cpp-build secrets: inherit diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 32fb2d62b29..0bd095bfa94 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -14,6 +14,16 @@ on: type: string jobs: + conda-cpp-checks: + secrets: inherit + uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-post-build-checks.yaml@branch-24.04 + with: + build_type: nightly + branch: ${{ inputs.branch }} + date: ${{ inputs.date }} + sha: ${{ inputs.sha }} + enable_check_symbols: true + symbol_exclusions: (cugraph::ops|hornet|void writeEdgeCountsKernel|void markUniqueOffsetsKernel) conda-cpp-tests: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-tests.yaml@branch-24.04 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3d893e0e562..ddb84d8a0f0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -46,16 +46,20 @@ repos: ) types_or: [c, c++, cuda] args: ["-fallback-style=none", "-style=file", "-i"] - - repo: local - hooks: - - id: copyright-check - name: copyright-check - entry: python ./ci/checks/copyright.py --git-modified-only --update-current-year - language: python - pass_filenames: false - additional_dependencies: [gitpython] - repo: https://github.com/rapidsai/dependency-file-generator rev: v1.8.0 hooks: - id: rapids-dependency-file-generator args: ["--clean"] + - repo: https://github.com/rapidsai/pre-commit-hooks + rev: v0.0.1 + hooks: + - id: verify-copyright + files: | + (?x) + [.](cmake|cpp|cu|cuh|h|hpp|sh|pxd|py|pyx)$| + CMakeLists[.]txt$| + CMakeLists_standalone[.]txt$| + [.]flake8[.]cython$| + meta[.]yaml$| + setup[.]cfg$ diff --git a/benchmarks/cugraph/standalone/bulk_sampling/README.md b/benchmarks/cugraph/standalone/bulk_sampling/README.md index 2d09466fb2f..56e9f4f5f64 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/README.md +++ b/benchmarks/cugraph/standalone/bulk_sampling/README.md @@ -152,7 +152,7 @@ Next are standard GNN training arguments such as `FANOUT`, `BATCH_SIZE`, etc. Y the number of training epochs here. These are followed by the `REPLICATION_FACTOR` argument, which can be used to create replications of the dataset for scale testing purposes. -The final two arguments are `FRAMEWORK` which can be either "cuGraphPyG" or "PyG", and `GPUS_PER_NODE` +The final two arguments are `FRAMEWORK` which can be "cugraph_dgl_csr", "cugraph_pyg" or "pyg", and `GPUS_PER_NODE` which must be set to the correct value, even if this is provided by a SLURM argument. If `GPUS_PER_NODE` is not set to the correct number of GPUs, the script will hang indefinitely until it times out. Mismatched GPUs per node is currently unsupported by this script but should be possible in practice. diff --git a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py index c9e347b261d..2604642b748 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/bench_cugraph_training.py @@ -43,8 +43,9 @@ def init_pytorch_worker(rank: int, use_rmm_torch_allocator: bool = False) -> Non rmm.reinitialize( devices=[rank], - pool_allocator=True, - initial_pool_size=pool_size, + pool_allocator=False, + # pool_allocator=True, + # initial_pool_size=pool_size, ) if use_rmm_torch_allocator: @@ -119,10 +120,17 @@ def parse_args(): parser.add_argument( "--framework", type=str, - help="The framework to test (PyG, cuGraphPyG)", + help="The framework to test (PyG, cugraph_pyg, cugraph_dgl_csr)", required=True, ) + parser.add_argument( + "--use_wholegraph", + action="store_true", + help="Whether to use WholeGraph feature storage", + required=False, + ) + parser.add_argument( "--model", type=str, @@ -162,6 +170,13 @@ def parse_args(): required=False, ) + parser.add_argument( + "--skip_download", + action="store_true", + help="Whether to skip downloading", + required=False, + ) + return parser.parse_args() @@ -186,21 +201,43 @@ def main(args): world_size = int(os.environ["SLURM_JOB_NUM_NODES"]) * args.gpus_per_node + if args.use_wholegraph: + # TODO support WG without cuGraph + if args.framework.lower() not in ["cugraph_pyg", "cugraph_dgl_csr"]: + raise ValueError("WG feature store only supported with cuGraph backends") + from pylibwholegraph.torch.initialize import ( + get_global_communicator, + get_local_node_communicator, + init, + ) + + logger.info("initializing WG comms...") + init(global_rank, world_size, local_rank, args.gpus_per_node) + wm_comm = get_global_communicator() + get_local_node_communicator() + + wm_comm = wm_comm.wmb_comm + logger.info(f"rank {global_rank} successfully initialized WG comms") + wm_comm.barrier() + dataset = OGBNPapers100MDataset( replication_factor=args.replication_factor, dataset_dir=args.dataset_dir, train_split=args.train_split, val_split=args.val_split, - load_edge_index=(args.framework == "PyG"), + load_edge_index=(args.framework.lower() == "pyg"), + backend="wholegraph" if args.use_wholegraph else "torch", ) - if global_rank == 0: + # Note: this does not generate WG files + if global_rank == 0 and not args.skip_download: dataset.download() + dist.barrier() fanout = [int(f) for f in args.fanout.split("_")] - if args.framework == "PyG": + if args.framework.lower() == "pyg": from trainers.pyg import PyGNativeTrainer trainer = PyGNativeTrainer( @@ -215,7 +252,7 @@ def main(args): num_neighbors=fanout, batch_size=args.batch_size, ) - elif args.framework == "cuGraphPyG": + elif args.framework.lower() == "cugraph_pyg": sample_dir = os.path.join( args.sample_dir, f"ogbn_papers100M[{args.replication_factor}]_b{args.batch_size}_f{fanout}", @@ -229,11 +266,35 @@ def main(args): device=local_rank, rank=global_rank, world_size=world_size, + gpus_per_node=args.gpus_per_node, num_epochs=args.num_epochs, shuffle=True, replace=False, num_neighbors=fanout, batch_size=args.batch_size, + backend="wholegraph" if args.use_wholegraph else "torch", + ) + elif args.framework.lower() == "cugraph_dgl_csr": + sample_dir = os.path.join( + args.sample_dir, + f"ogbn_papers100M[{args.replication_factor}]_b{args.batch_size}_f{fanout}", + ) + from trainers.dgl import DGLCuGraphTrainer + + trainer = DGLCuGraphTrainer( + model=args.model, + dataset=dataset, + sample_dir=sample_dir, + device=local_rank, + rank=global_rank, + world_size=world_size, + gpus_per_node=args.gpus_per_node, + num_epochs=args.num_epochs, + shuffle=True, + replace=False, + num_neighbors=[int(f) for f in args.fanout.split("_")], + batch_size=args.batch_size, + backend="wholegraph" if args.use_wholegraph else "torch", ) else: raise ValueError("unsupported framework") diff --git a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py index e3a5bba3162..95e1afcb28b 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/cugraph_bulk_sampling.py @@ -190,6 +190,10 @@ def sample_graph( val_perc=0.5, sampling_kwargs={}, ): + logger = logging.getLogger("__main__") + logger.info("Starting sampling phase...") + + logger.info("Calculating random splits...") cupy.random.seed(seed) train_df, test_df = label_df.random_split( [train_perc, 1 - train_perc], random_state=seed, shuffle=True @@ -197,24 +201,35 @@ def sample_graph( val_df, test_df = label_df.random_split( [val_perc, 1 - val_perc], random_state=seed, shuffle=True ) + logger.info("Calculated random splits") total_time = 0.0 for epoch in range(num_epochs): - steps = [("train", train_df), ("test", test_df)] + steps = [("train", train_df)] if epoch == num_epochs - 1: steps.append(("val", val_df)) + steps.append(("test", test_df)) for step, batch_df in steps: - batch_df = batch_df.sample(frac=1.0, random_state=seed) + logger.info("Shuffling batch dataframe...") + batch_df = batch_df.sample(frac=1.0, random_state=seed).persist() + logger.info("Shuffled and persisted batch dataframe...") - if step == "val": - output_sample_path = os.path.join(output_path, "val", "samples") - else: + if step == "train": output_sample_path = os.path.join( output_path, f"epoch={epoch}", f"{step}", "samples" ) - os.makedirs(output_sample_path) + else: + output_sample_path = os.path.join(output_path, step, "samples") + + client = default_client() + + def func(): + os.makedirs(output_sample_path, exist_ok=True) + + client.run(func) + logger.info("Creating bulk sampler...") sampler = BulkSampler( batch_size=batch_size, output_path=output_sample_path, @@ -227,6 +242,7 @@ def sample_graph( log_level=logging.INFO, **sampling_kwargs, ) + logger.info("Bulk sampler created and ready for input") n_workers = len(default_client().scheduler_info()["workers"]) @@ -244,13 +260,13 @@ def sample_graph( # should always persist the batch dataframe or performance may be suboptimal batch_df = batch_df.persist() - print("created batches") + logger.info("created and persisted batches") start_time = perf_counter() sampler.add_batches(batch_df, start_col_name="node", batch_col_name="batch") sampler.flush() end_time = perf_counter() - print("flushed all batches") + logger.info("flushed all batches") total_time += end_time - start_time return total_time @@ -356,23 +372,29 @@ def load_disk_dataset( path = Path(dataset_dir) / dataset parquet_path = path / "parquet" + logger = logging.getLogger("__main__") + + logger.info("getting n workers...") n_workers = get_n_workers() + logger.info(f"there are {n_workers} workers") with open(os.path.join(path, "meta.json")) as meta_file: meta = json.load(meta_file) + logger.info("assigning offsets...") node_offsets, node_offsets_replicated, total_num_nodes = assign_offsets_pyg( meta["num_nodes"], replication_factor=replication_factor ) + logger.info("offsets assigned") edge_index_dict = {} for edge_type in meta["num_edges"].keys(): - print(f"Loading edge index for edge type {edge_type}") + logger.info(f"Loading edge index for edge type {edge_type}") can_edge_type = tuple(edge_type.split("__")) edge_index_dict[can_edge_type] = dask_cudf.read_parquet( Path(parquet_path) / edge_type / "edge_index.parquet" - ).repartition(n_workers * 2) + ).repartition(npartitions=n_workers * 2) edge_index_dict[can_edge_type]["src"] += node_offsets_replicated[ can_edge_type[0] @@ -384,6 +406,7 @@ def load_disk_dataset( edge_index_dict[can_edge_type] = edge_index_dict[can_edge_type] if replication_factor > 1: + logger.info("processing replications") edge_index_dict[can_edge_type] = edge_index_dict[ can_edge_type ].map_partitions( @@ -400,6 +423,7 @@ def load_disk_dataset( } ), ) + logger.info("replications processed") gc.collect() @@ -407,48 +431,63 @@ def load_disk_dataset( edge_index_dict[can_edge_type] = edge_index_dict[can_edge_type].rename( columns={"src": "dst", "dst": "src"} ) + logger.info("edge index loaded") # Assign numeric edge type ids based on lexicographic order edge_offsets = {} edge_count = 0 - for num_edge_type, can_edge_type in enumerate(sorted(edge_index_dict.keys())): - if add_edge_types: - edge_index_dict[can_edge_type]["etp"] = cupy.int32(num_edge_type) - edge_offsets[can_edge_type] = edge_count - edge_count += len(edge_index_dict[can_edge_type]) + # for num_edge_type, can_edge_type in enumerate(sorted(edge_index_dict.keys())): + # if add_edge_types: + # edge_index_dict[can_edge_type]["etp"] = cupy.int32(num_edge_type) + # edge_offsets[can_edge_type] = edge_count + # edge_count += len(edge_index_dict[can_edge_type]) + + if len(edge_index_dict) != 1: + raise ValueError("should only be 1 edge index") + + logger.info("setting edge type") + + all_edges_df = list(edge_index_dict.values())[0] + if add_edge_types: + all_edges_df["etp"] = cupy.int32(0) - all_edges_df = dask_cudf.concat(list(edge_index_dict.values())) + # all_edges_df = dask_cudf.concat(list(edge_index_dict.values())) del edge_index_dict gc.collect() node_labels = {} for node_type, offset in node_offsets_replicated.items(): - print(f"Loading node labels for node type {node_type} (offset={offset})") + logger.info(f"Loading node labels for node type {node_type} (offset={offset})") node_label_path = os.path.join( os.path.join(parquet_path, node_type), "node_label.parquet" ) if os.path.exists(node_label_path): node_labels[node_type] = ( dask_cudf.read_parquet(node_label_path) - .repartition(n_workers) + .repartition(npartitions=n_workers) .drop("label", axis=1) .persist() ) + logger.info(f"Loaded and persisted initial labels") node_labels[node_type]["node"] += offset node_labels[node_type] = node_labels[node_type].persist() + logger.info(f"Set and persisted node offsets") if replication_factor > 1: + logger.info(f"Replicating labels...") node_labels[node_type] = node_labels[node_type].map_partitions( _replicate_df, replication_factor, {"node": meta["num_nodes"][node_type]}, meta=cudf.DataFrame({"node": cudf.Series(dtype="int64")}), ) + logger.info(f"Replicated labels (will likely evaluate later)") gc.collect() node_labels_df = dask_cudf.concat(list(node_labels.values())).reset_index(drop=True) + logger.info("Dataset successfully loaded") del node_labels gc.collect() @@ -459,6 +498,7 @@ def load_disk_dataset( node_offsets_replicated, edge_offsets, total_num_nodes, + sum(meta["num_edges"].values()) * replication_factor, ) @@ -540,6 +580,7 @@ def benchmark_cugraph_bulk_sampling( node_offsets, edge_offsets, total_num_nodes, + num_input_edges, ) = load_disk_dataset( dataset, dataset_dir=dataset_dir, @@ -548,7 +589,6 @@ def benchmark_cugraph_bulk_sampling( add_edge_types=add_edge_types, ) - num_input_edges = len(dask_edgelist_df) logger.info(f"Number of input edges = {num_input_edges:,}") G = construct_graph(dask_edgelist_df) @@ -562,7 +602,13 @@ def benchmark_cugraph_bulk_sampling( output_path, f"{dataset}[{replication_factor}]_b{batch_size}_f{fanout}", ) - os.makedirs(output_subdir) + + client = default_client() + + def func(): + os.makedirs(output_subdir, exist_ok=True) + + client.run(func) if sampling_target_framework == "cugraph_dgl_csr": sampling_kwargs = { @@ -574,8 +620,8 @@ def benchmark_cugraph_bulk_sampling( "use_legacy_names": False, "include_hop_column": False, } - else: - # FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.02) + elif sampling_target_framework == "cugraph_pyg": + # FIXME: Update these arguments when CSC mode is fixed in cuGraph-PyG (release 24.04) sampling_kwargs = { "deduplicate_sources": True, "prior_sources_behavior": "exclude", @@ -585,8 +631,10 @@ def benchmark_cugraph_bulk_sampling( "use_legacy_names": False, "include_hop_column": True, } + else: + raise ValueError("Only cugraph_dgl_csr or cugraph_pyg are valid frameworks") - batches_per_partition = 600_000 // batch_size + batches_per_partition = 256 execution_time, allocation_counts = sample_graph( G=G, label_df=dask_label_df, @@ -761,9 +809,9 @@ def get_args(): logger.setLevel(logging.INFO) args = get_args() - if args.sampling_target_framework not in ["cugraph_dgl_csr", None]: + if args.sampling_target_framework not in ["cugraph_dgl_csr", "cugraph_pyg"]: raise ValueError( - "sampling_target_framework must be one of cugraph_dgl_csr or None", + "sampling_target_framework must be one of cugraph_dgl_csr or cugraph_pyg", "Other frameworks are not supported at this time.", ) @@ -775,12 +823,30 @@ def get_args(): seeds_per_call_opts = [int(s) for s in args.seeds_per_call_opts.split(",")] dask_worker_devices = [int(d) for d in args.dask_worker_devices.split(",")] - logger.info("starting dask client") - client, cluster = start_dask_client() + import time + + time_dask_start = time.localtime() + + logger.info(f"{time.asctime(time_dask_start)}: starting dask client") + from dask_cuda.initialize import initialize + from dask.distributed import Client + from cugraph.dask.comms import comms as Comms + import os, time + + client = Client(scheduler_file=os.environ["SCHEDULER_FILE"], timeout=360) + time.sleep(30) + cluster = Comms.initialize(p2p=True) + # client, cluster = start_dask_client() + time_dask_end = time.localtime() + logger.info(f"{time.asctime(time_dask_end)}: dask client started") + + logger.info("enabling spilling") enable_spilling() - stats_ls = [] client.run(enable_spilling) - logger.info("dask client started") + logger.info("enabled spilling") + + stats_ls = [] + for dataset in datasets: m = re.match(r"(\w+)\[([0-9]+)\]", dataset) if m: diff --git a/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py b/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py index a50e40f6d55..e3151e37a25 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/datasets/ogbn_papers100M.py @@ -34,6 +34,7 @@ def __init__( train_split=0.8, val_split=0.5, load_edge_index=True, + backend="torch", ): self.__replication_factor = replication_factor self.__disk_x = None @@ -43,6 +44,7 @@ def __init__( self.__train_split = train_split self.__val_split = val_split self.__load_edge_index = load_edge_index + self.__backend = backend def download(self): import logging @@ -152,6 +154,27 @@ def download(self): ) ldf.to_parquet(node_label_file_path) + # WholeGraph + wg_bin_file_path = os.path.join(dataset_path, "wgb", "paper") + if self.__replication_factor == 1: + wg_bin_rep_path = os.path.join(wg_bin_file_path, "node_feat.d") + else: + wg_bin_rep_path = os.path.join( + wg_bin_file_path, f"node_feat_{self.__replication_factor}x.d" + ) + + if not os.path.exists(wg_bin_rep_path): + os.makedirs(wg_bin_rep_path) + if dataset is None: + from ogb.nodeproppred import NodePropPredDataset + + dataset = NodePropPredDataset( + name="ogbn-papers100M", root=self.__dataset_dir + ) + node_feat = dataset[0][0]["node_feat"] + for k in range(self.__replication_factor): + node_feat.tofile(os.path.join(wg_bin_rep_path, f"{k:04d}.bin")) + @property def edge_index_dict( self, @@ -224,45 +247,87 @@ def edge_index_dict( @property def x_dict(self) -> Dict[str, torch.Tensor]: + if self.__disk_x is None: + if self.__backend == "wholegraph": + self.__load_x_wg() + else: + self.__load_x_torch() + + return self.__disk_x + + def __load_x_torch(self) -> None: node_type_path = os.path.join( self.__dataset_dir, "ogbn_papers100M", "npy", "paper" ) + if self.__replication_factor == 1: + full_path = os.path.join(node_type_path, "node_feat.npy") + else: + full_path = os.path.join( + node_type_path, f"node_feat_{self.__replication_factor}x.npy" + ) - if self.__disk_x is None: - if self.__replication_factor == 1: - full_path = os.path.join(node_type_path, "node_feat.npy") - else: - full_path = os.path.join( - node_type_path, f"node_feat_{self.__replication_factor}x.npy" - ) + self.__disk_x = {"paper": torch.as_tensor(np.load(full_path, mmap_mode="r"))} - self.__disk_x = {"paper": np.load(full_path, mmap_mode="r")} + def __load_x_wg(self) -> None: + import logging - return self.__disk_x + logger = logging.getLogger("OGBNPapers100MDataset") + logger.info("Loading x into WG embedding...") + + import pylibwholegraph.torch as wgth + + node_type_path = os.path.join( + self.__dataset_dir, "ogbn_papers100M", "wgb", "paper" + ) + if self.__replication_factor == 1: + full_path = os.path.join(node_type_path, "node_feat.d") + else: + full_path = os.path.join( + node_type_path, f"node_feat_{self.__replication_factor}x.d" + ) + + file_list = [os.path.join(full_path, f) for f in os.listdir(full_path)] + + x = wgth.create_embedding_from_filelist( + wgth.get_global_communicator(), + "distributed", # TODO support other options + "cpu", # TODO support GPU + file_list, + torch.float32, + 128, + ) + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + + logger.info("created x wg embedding") + + self.__disk_x = {"paper": x} @property def y_dict(self) -> Dict[str, torch.Tensor]: if self.__y is None: - self.__get_labels() + self.__get_y() return self.__y @property def train_dict(self) -> Dict[str, torch.Tensor]: if self.__train is None: - self.__get_labels() + self.__get_split() return self.__train @property def test_dict(self) -> Dict[str, torch.Tensor]: if self.__test is None: - self.__get_labels() + self.__get_split() return self.__test @property def val_dict(self) -> Dict[str, torch.Tensor]: if self.__val is None: - self.__get_labels() + self.__get_split() return self.__val @property @@ -271,7 +336,7 @@ def num_input_features(self) -> int: @property def num_labels(self) -> int: - return int(self.y_dict["paper"].max()) + 1 + return 172 def num_nodes(self, node_type: str) -> int: if node_type != "paper": @@ -285,46 +350,49 @@ def num_edges(self, edge_type: Tuple[str, str, str]) -> int: return 1_615_685_872 * self.__replication_factor - def __get_labels(self): + def __get_y(self): label_path = os.path.join( self.__dataset_dir, "ogbn_papers100M", - "parquet", + "wgb", "paper", - "node_label.parquet", + "node_label.d", + "0.bin", ) - node_label = pandas.read_parquet(label_path) - - if self.__replication_factor > 1: - orig_num_nodes = self.num_nodes("paper") // self.__replication_factor - dfr = pandas.DataFrame( - { - "node": pandas.concat( - [ - node_label.node + (r * orig_num_nodes) - for r in range(1, self.__replication_factor) - ] - ), - "label": pandas.concat( - [node_label.label for r in range(1, self.__replication_factor)] - ), - } + if self.__backend == "wholegraph": + import pylibwholegraph.torch as wgth + + node_label = wgth.create_embedding_from_filelist( + wgth.get_global_communicator(), + "distributed", # TODO support other options + "cpu", # TODO support GPU + [label_path] * self.__replication_factor, + torch.int16, + 1, + ) + + else: + node_label_1x = torch.as_tensor( + np.fromfile(label_path, dtype="int16"), device="cpu" ) - node_label = pandas.concat([node_label, dfr]).reset_index(drop=True) + if self.__replication_factor > 1: + node_label = torch.concatenate( + [node_label_1x] * self.__replication_factor + ) + else: + node_label = node_label_1x + + self.__y = {"paper": node_label} + + def __get_split(self): num_nodes = self.num_nodes("paper") - node_label_tensor = torch.full( - (num_nodes,), -1, dtype=torch.float32, device="cpu" - ) - node_label_tensor[ - torch.as_tensor(node_label.node.values, device="cpu") - ] = torch.as_tensor(node_label.label.values, device="cpu") - self.__y = {"paper": node_label_tensor.contiguous()} + node = self.y_dict["paper"][self.y_dict["paper"] > 0] train_ix, test_val_ix = train_test_split( - torch.as_tensor(node_label.node.values), + node, train_size=self.__train_split, random_state=num_nodes, ) diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py new file mode 100644 index 00000000000..610a7648801 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .models_dgl import GraphSAGE diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py new file mode 100644 index 00000000000..2cfdda2d2e7 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/dgl/models_dgl.py @@ -0,0 +1,69 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +import torch.nn.functional as F + + +class GraphSAGE(torch.nn.Module): + """ + GraphSAGE model implementation for DGL + supporting both native DGL and cuGraph-ops + backends. + """ + + def __init__( + self, + in_channels, + hidden_channels, + out_channels, + num_layers, + model_backend="dgl", + ): + if model_backend == "dgl": + from dgl.nn import SAGEConv + else: + from cugraph_dgl.nn import SAGEConv + + super(GraphSAGE, self).__init__() + self.convs = torch.nn.ModuleList() + for _ in range(num_layers - 1): + self.convs.append( + SAGEConv(in_channels, hidden_channels, aggregator_type="mean") + ) + in_channels = hidden_channels + self.convs.append( + SAGEConv(hidden_channels, out_channels, aggregator_type="mean") + ) + + def forward(self, blocks, x): + """ + Runs the model forward pass given a list of blocks + and feature tensor. + """ + + for i, conv in enumerate(self.convs): + x = conv(blocks[i], x) + if i != len(self.convs) - 1: + x = F.relu(x) + x = F.dropout(x, p=0.5) + return x + + +def create_model(feat_size, num_classes, num_layers, model_backend="dgl"): + model = GraphSAGE( + feat_size, 64, num_classes, num_layers, model_backend=model_backend + ) + model = model.to("cuda") + model.train() + return model diff --git a/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py index 1de791bf588..7ee400b004f 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/models/pyg/models_cugraph_pyg.py @@ -57,7 +57,7 @@ def forward(self, x, edge, num_sampled_nodes, num_sampled_edges): for i, conv in enumerate(self.convs): if i > 0: - new_num_edges = edge[1][-2] + new_num_edges = int(edge[1][-2]) edge[0] = edge[0].narrow( dim=0, start=0, diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh index 27ae0dc7788..8136018c877 100755 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/run_train_job.sh @@ -12,12 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -#SBATCH -A datascience_rapids_cugraphgnn -#SBATCH -p luna -#SBATCH -J datascience_rapids_cugraphgnn-papers:bulkSamplingPyG -#SBATCH -N 1 -#SBATCH -t 00:25:00 - CONTAINER_IMAGE=${CONTAINER_IMAGE:="please_specify_container"} SCRIPTS_DIR=$(pwd) LOGS_DIR=${LOGS_DIR:=$(pwd)"/logs"} @@ -31,10 +25,11 @@ mkdir -p $DATASETS_DIR BATCH_SIZE=512 FANOUT="10_10_10" NUM_EPOCHS=1 -REPLICATION_FACTOR=1 +REPLICATION_FACTOR=2 +JOB_ID=$RANDOM -# options: PyG or cuGraphPyG -FRAMEWORK="cuGraphPyG" +# options: PyG, cuGraphPyG, or cuGraphDGL +FRAMEWORK="cuGraphDGL" GPUS_PER_NODE=8 nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) @@ -52,6 +47,7 @@ echo Num GPUs Per Node: $gpus_per_node set -e + # First run without cuGraph to get data if [[ "$FRAMEWORK" == "cuGraphPyG" ]]; then @@ -59,25 +55,10 @@ if [[ "$FRAMEWORK" == "cuGraphPyG" ]]; then srun \ --container-image $CONTAINER_IMAGE \ --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ - bash /scripts/run_sampling.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS + bash /scripts/train.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS "cugraph_pyg" $nnodes $head_node_ip $JOB_ID +elif [[ "$FRAMEWORK" == "cuGraphDGL" ]]; then + srun \ + --container-image $CONTAINER_IMAGE \ + --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ + bash /scripts/train.sh $BATCH_SIZE $FANOUT $REPLICATION_FACTOR "/scripts" $NUM_EPOCHS "cugraph_dgl_csr" $nnodes $head_node_ip $JOB_ID fi - -# Train -srun \ - --container-image $CONTAINER_IMAGE \ - --container-mounts=${LOGS_DIR}":/logs",${SAMPLES_DIR}":/samples",${SCRIPTS_DIR}":/scripts",${DATASETS_DIR}":/datasets" \ - torchrun \ - --nnodes $nnodes \ - --nproc-per-node $gpus_per_node \ - --rdzv-id $RANDOM \ - --rdzv-backend c10d \ - --rdzv-endpoint $head_node_ip:29500 \ - /scripts/bench_cugraph_training.py \ - --output_file "/logs/output.txt" \ - --framework $FRAMEWORK \ - --dataset_dir "/datasets" \ - --sample_dir "/samples" \ - --batch_size $BATCH_SIZE \ - --fanout $FANOUT \ - --replication_factor $REPLICATION_FACTOR \ - --num_epochs $NUM_EPOCHS diff --git a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh b/benchmarks/cugraph/standalone/bulk_sampling/train.sh similarity index 66% rename from benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh rename to benchmarks/cugraph/standalone/bulk_sampling/train.sh index 1b3085dcc9a..a3b85e281f1 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/run_sampling.sh +++ b/benchmarks/cugraph/standalone/bulk_sampling/train.sh @@ -21,6 +21,10 @@ FANOUT=$2 REPLICATION_FACTOR=$3 SCRIPTS_DIR=$4 NUM_EPOCHS=$5 +SAMPLING_FRAMEWORK=$6 +N_NODES=$7 +HEAD_NODE_IP=$8 +JOB_ID=$9 SAMPLES_DIR=/samples DATASET_DIR=/datasets @@ -29,12 +33,19 @@ LOGS_DIR=/logs MG_UTILS_DIR=${SCRIPTS_DIR}/mg_utils SCHEDULER_FILE=${MG_UTILS_DIR}/dask_scheduler.json -export WORKER_RMM_POOL_SIZE=28G -export UCX_MAX_RNDV_RAILS=1 +echo $SAMPLES_DIR +ls $SAMPLES_DIR + +export WORKER_RMM_POOL_SIZE=75G +#export UCX_MAX_RNDV_RAILS=1 export RAPIDS_NO_INITIALIZE=1 export CUDF_SPILL=1 -export LIBCUDF_CUFILE_POLICY="OFF" +export LIBCUDF_CUFILE_POLICY="KVIKIO" +export KVIKIO_NTHREADS=64 export GPUS_PER_NODE=8 +#export NCCL_CUMEM_ENABLE=0 +#export NCCL_DEBUG="TRACE" +export NCCL_DEBUG_FILE=/logs/nccl_debug.%h.%p export SCHEDULER_FILE=$SCHEDULER_FILE export LOGS_DIR=$LOGS_DIR @@ -59,8 +70,9 @@ else fi echo "properly waiting for workers to connect" -NUM_GPUS=$(python -c "import os; print(int(os.environ['SLURM_JOB_NUM_NODES'])*int(os.environ['GPUS_PER_NODE']))") -handleTimeout 120 python ${MG_UTILS_DIR}/wait_for_workers.py \ +export NUM_GPUS=$(python -c "import os; print(int(os.environ['SLURM_JOB_NUM_NODES'])*int(os.environ['GPUS_PER_NODE']))") +SEEDS_PER_CALL=$(python -c "import os; print(int(os.environ['NUM_GPUS'])*65536)") +handleTimeout 630 python ${MG_UTILS_DIR}/wait_for_workers.py \ --num-expected-workers ${NUM_GPUS} \ --scheduler-file-path ${SCHEDULER_FILE} @@ -76,14 +88,15 @@ if [[ $SLURM_NODEID == 0 ]]; then --datasets "ogbn_papers100M["$REPLICATION_FACTOR"]" \ --fanouts $FANOUT \ --batch_sizes $BATCH_SIZE \ - --seeds_per_call_opts "524288" \ + --seeds_per_call_opts $SEEDS_PER_CALL \ --num_epochs $NUM_EPOCHS \ - --random_seed 42 + --random_seed 42 \ + --sampling_target_framework $SAMPLING_FRAMEWORK - echo "DONE" > ${SAMPLES_DIR}/status.txt + echo "DONE" > ${LOGS_DIR}/status.txt fi -while [ ! -f "${SAMPLES_DIR}"/status.txt ] +while [ ! -f "${LOGS_DIR}"/status.txt ] do sleep 1 done @@ -106,6 +119,25 @@ if [[ ${#python_processes[@]} -gt 1 || $dask_processes ]]; then fi sleep 2 +torchrun \ + --nnodes $N_NODES \ + --nproc-per-node $GPUS_PER_NODE \ + --rdzv-id $JOB_ID \ + --rdzv-backend c10d \ + --rdzv-endpoint $HEAD_NODE_IP:29500 \ + /scripts/bench_cugraph_training.py \ + --output_file "/logs/output.txt" \ + --framework $SAMPLING_FRAMEWORK \ + --dataset_dir "/datasets" \ + --sample_dir "/samples" \ + --batch_size $BATCH_SIZE \ + --fanout $FANOUT \ + --replication_factor $REPLICATION_FACTOR \ + --num_epochs $NUM_EPOCHS \ + --use_wholegraph \ + --skip_download + + if [[ $SLURM_NODEID == 0 ]]; then - rm ${SAMPLES_DIR}/status.txt + rm ${LOGS_DIR}/status.txt fi diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py new file mode 100644 index 00000000000..03d2a51e538 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .trainers_dgl import DGLTrainer +from .trainers_cugraph_dgl import DGLCuGraphTrainer diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py new file mode 100644 index 00000000000..37745e645fd --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_cugraph_dgl.py @@ -0,0 +1,315 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import time +import re + +from .trainers_dgl import DGLTrainer +from models.dgl import GraphSAGE +from datasets import Dataset + +import torch +import numpy as np +import warnings + +from torch.nn.parallel import DistributedDataParallel as ddp +from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset +from cugraph.gnn import FeatureStore + +from typing import List + + +def get_dataloader( + input_file_paths: List[str], + total_num_nodes: int, + sparse_format: str, + return_type: str, +) -> torch.utils.data.DataLoader: + """ + Returns a dataloader that reads bulk samples from the given input paths. + + Parameters + ---------- + input_file_paths: List[str] + List of input parquet files containing samples. + total_num_nodes: int + Total number of nodes in the graph. + sparse_format: str + The sparse format to read (i.e. coo) + return_type: str + The type of object to be returned by the dataloader (i.e. dgl.Block) + + Returns + ------- + torch.utils.data.DataLoader + """ + + print("Creating dataloader", flush=True) + st = time.time() + if len(input_file_paths) > 0: + dataset = HomogenousBulkSamplerDataset( + total_num_nodes, + edge_dir="in", + sparse_format=sparse_format, + return_type=return_type, + ) + dataset.set_input_files(input_file_paths=input_file_paths) + dataloader = torch.utils.data.DataLoader( + dataset, + collate_fn=lambda x: x, + shuffle=False, + num_workers=0, + batch_size=None, + ) + et = time.time() + print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True) + return dataloader + else: + return [] + + +class DGLCuGraphTrainer(DGLTrainer): + """ + Trainer implementation for cuGraph-DGL that supports + WholeGraph as a feature store. + """ + + def __init__( + self, + dataset: Dataset, + model: str = "GraphSAGE", + device: int = 0, + rank: int = 0, + world_size: int = 1, + gpus_per_node: int = 1, + num_epochs: int = 1, + sample_dir: str = ".", + backend: str = "torch", + **kwargs, + ): + """ + Parameters + ---------- + dataset: Dataset + The dataset to train on. + model: str + The model to use for training. + Currently only "GraphSAGE" is supported. + device: int, default=0 + The CUDA device to use. + rank: int, default=0 + The global rank of the worker this trainer is assigned to. + world_size: int, default=1 + The number of workers in the world. + num_epochs: int, default=1 + The number of training epochs to run. + sample_dir: str, default="." + The directory where samples generated by the bulk sampler + are stored. + backend: str, default="torch" + The feature store backend to be used by the cuGraph Feature Store. + Defaults to "torch". Options are "torch" and "wholegraph" + kwargs + Keyword arguments to pass to the loader + """ + self.__data = None + self.__device = device + self.__rank = rank + self.__world_size = world_size + self.__gpus_per_node = gpus_per_node + self.__num_epochs = num_epochs + self.__dataset = dataset + self.__sample_dir = sample_dir + self.__loader_kwargs = kwargs + self.__model = self.get_model(model) + self.__optimizer = None + self.__backend = backend + + @property + def rank(self): + return self.__rank + + @property + def model(self): + return self.__model + + @property + def dataset(self): + return self.__dataset + + @property + def optimizer(self): + if self.__optimizer is None: + self.__optimizer = torch.optim.Adam( + self.model.parameters(), lr=0.01, weight_decay=0.0005 + ) + return self.__optimizer + + @property + def num_epochs(self) -> int: + return self.__num_epochs + + def get_loader(self, epoch: int = 0, stage="train") -> int: + # TODO support online sampling + if stage == "train": + path = os.path.join(self.__sample_dir, f"epoch={epoch}", stage, "samples") + elif stage in ["test", "val"]: + path = os.path.join(self.__sample_dir, stage, "samples") + else: + raise ValueError(f"Invalid stage {stage}") + + input_file_paths, num_batches = self.get_input_files( + path, epoch=epoch, stage=stage + ) + + dataloader = get_dataloader( + input_file_paths=input_file_paths.tolist(), + total_num_nodes=None, + sparse_format="csc", + return_type="cugraph_dgl.nn.SparseGraph", + ) + return dataloader, num_batches + + @property + def data(self): + import logging + + logger = logging.getLogger("DGLCuGraphTrainer") + logger.info("getting data") + + if self.__data is None: + logger.info("using wholegraph backend") + if self.__backend == "wholegraph": + fs = FeatureStore( + backend="wholegraph", + wg_type="chunked", + wg_location="cpu", + ) + else: + fs = FeatureStore(backend=self.__backend) + num_nodes_dict = {} + + if self.__backend == "wholegraph": + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + + for node_type, x in self.__dataset.x_dict.items(): + logger.debug(f"getting x for {node_type}") + fs.add_data(x, node_type, "x") + num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) + if self.__backend == "wholegraph": + wm_comm.barrier() + + for node_type, y in self.__dataset.y_dict.items(): + logger.debug(f"getting y for {node_type}") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs.add_data(y, node_type, "y") + wm_comm.barrier() + else: + y = y.cuda() + y = y.reshape((y.shape[0], 1)) + fs.add_data(y, node_type, "y") + + """ + for node_type, train in self.__dataset.train_dict.items(): + logger.debug(f"getting train for {node_type}") + train = train.reshape((train.shape[0], 1)) + if self.__backend != "wholegraph": + train = train.cuda() + fs.add_data(train, node_type, "train") + + for node_type, test in self.__dataset.test_dict.items(): + logger.debug(f"getting test for {node_type}") + test = test.reshape((test.shape[0], 1)) + if self.__backend != "wholegraph": + test = test.cuda() + fs.add_data(test, node_type, "test") + + for node_type, val in self.__dataset.val_dict.items(): + logger.debug(f"getting val for {node_type}") + val = val.reshape((val.shape[0], 1)) + if self.__backend != "wholegraph": + val = val.cuda() + fs.add_data(val, node_type, "val") + """ + + # # TODO support online sampling if the edge index is provided + # num_edges_dict = self.__dataset.edge_index_dict + # if not isinstance(list(num_edges_dict.values())[0], int): + # num_edges_dict = {k: len(v) for k, v in num_edges_dict} + + if self.__backend == "wholegraph": + wm_comm.barrier() + + self.__data = fs + return self.__data + + def get_model(self, name="GraphSAGE"): + if name != "GraphSAGE": + raise ValueError("only GraphSAGE is currently supported") + + num_input_features = self.__dataset.num_input_features + num_output_features = self.__dataset.num_labels + num_layers = len(self.__loader_kwargs["num_neighbors"]) + + with torch.cuda.device(self.__device): + model = ( + GraphSAGE( + in_channels=num_input_features, + hidden_channels=64, + out_channels=num_output_features, + num_layers=num_layers, + model_backend="cugraph_dgl", + ) + .to(torch.float32) + .to(self.__device) + ) + # TODO: Fix for distributed models + if torch.distributed.is_initialized(): + model = ddp(model, device_ids=[self.__device]) + else: + warnings.warn("Distributed training is not available") + print("done creating model") + + return model + + def get_input_files(self, path, epoch=0, stage="train"): + file_list = np.array([f.path for f in os.scandir(path)]) + file_list.sort() + np.random.seed(epoch) + np.random.shuffle(file_list) + + splits = np.array_split(file_list, self.__gpus_per_node) + + ex = re.compile(r"batch=([0-9]+)\-([0-9]+).parquet") + num_batches = min( + [ + sum( + [ + int(ex.match(fname.split("/")[-1])[2]) + - int(ex.match(fname.split("/")[-1])[1]) + for fname in s + ] + ) + for s in splits + ] + ) + if num_batches == 0: + raise ValueError( + f"Too few batches for training with world size {self.__world_size}" + ) + + return splits[self.__device], num_batches diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py new file mode 100644 index 00000000000..fad986257b2 --- /dev/null +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/dgl/trainers_dgl.py @@ -0,0 +1,361 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import torch +import torch.distributed as td +import torch.nn.functional as F +from torchmetrics import Accuracy +from trainers import Trainer +import time + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from cugraph.gnn import FeatureStore + + +def get_features(input_nodes, output_nodes, feature_store, key="paper"): + if isinstance(input_nodes, dict): + input_nodes = input_nodes[key] + if isinstance(output_nodes, dict): + output_nodes = output_nodes[key] + + # TODO: Fix below + # Adding based on assumption that cpu features + # and gpu index is not supported yet + + if feature_store.backend == "torch": + input_nodes = input_nodes.to("cpu") + output_nodes = output_nodes.to("cpu") + + x = feature_store.get_data(indices=input_nodes, type_name=key, feat_name="x") + y = feature_store.get_data(indices=output_nodes, type_name=key, feat_name="y") + y = y.reshape((y.shape[0],)) + return x, y + + +def log_batch( + logger: logging.Logger, + iter_i: int, + num_batches: int, + time_forward: int, + time_backward: int, + time_start: int, + loader_time_iter: int, + epoch: int, + rank: int, +): + """ + Logs the current performance of the trainer. + + Parameters + ---------- + logger: logging.Logger + The logger to use for logging the performance details. + iter_i: int + The current training iteration. + num_batches: int + The number of batches processed so far + time_forward: int + The total amount of time for the model forward pass so far + time_backward: int + The total amount of the for the model backwards pass so far + time_start: int + The time at which training was started + loader_time_iter: int + The time taken by the loader in the current iteraiton + epoch: int + The current training epoch + rank: int + The global rank of this worker + + Returns + ------- + None + """ + + time_forward_iter = time_forward / num_batches + time_backward_iter = time_backward / num_batches + total_time_iter = (time.perf_counter() - time_start) / num_batches + logger.info(f"epoch {epoch}, iteration {iter_i}, rank {rank}") + logger.info(f"time forward: {time_forward_iter}") + logger.info(f"time backward: {time_backward_iter}") + logger.info(f"loader time: {loader_time_iter}") + logger.info(f"total time: {total_time_iter}") + + +def train_epoch( + model, + optimizer, + loader, + feature_store, + epoch, + num_classes, + time_d, + logger, + rank, + max_num_batches, +): + """ + Train the model for one epoch. + model: The model to train. + optimizer: The optimizer to use. + loader: The loader to use. + data: cuGraph.gnn.FeatueStore + epoch: The epoch number. + num_classes: The number of classes. + time_d: A dictionary of times. + logger: The logger to use. + rank: Global rank + max_num_batches: Number of batches after which to quit (to avoid hang due to asymmetry) + """ + model = model.train() + time_feature_indexing = time_d["time_feature_indexing"] + time_feature_transfer = time_d["time_feature_transfer"] + time_forward = time_d["time_forward"] + time_backward = time_d["time_backward"] + time_loader = time_d["time_loader"] + + time_start = time.perf_counter() + end_time_backward = time.perf_counter() + + num_batches = 0 + + for iter_i, (input_nodes, output_nodes, blocks) in enumerate(loader): + loader_time_iter = time.perf_counter() - end_time_backward + time_loader += loader_time_iter + feature_indexing_time_start = time.perf_counter() + x, y_true = get_features(input_nodes, output_nodes, feature_store=feature_store) + additional_feature_time_end = time.perf_counter() + time_feature_indexing += ( + additional_feature_time_end - feature_indexing_time_start + ) + feature_trasfer_time_start = time.perf_counter() + x = x.to("cuda") + y_true = y_true.to("cuda") + time_feature_transfer += time.perf_counter() - feature_trasfer_time_start + num_batches += 1 + + start_time_forward = time.perf_counter() + y_pred = model( + blocks, + x, + ) + end_time_forward = time.perf_counter() + time_forward += end_time_forward - start_time_forward + + if y_pred.shape[0] > len(y_true): + raise ValueError(f"illegal shape: {y_pred.shape}; {y_true.shape}") + + y_true = y_true[: y_pred.shape[0]] + y_true = F.one_hot( + y_true.to(torch.int64), + num_classes=num_classes, + ).to(torch.float32) + + if y_true.shape != y_pred.shape: + raise ValueError( + f"y_true shape was {y_true.shape} " + f"but y_pred shape was {y_pred.shape} " + f"in iteration {iter_i} " + f"on rank {y_pred.device.index}" + ) + + start_time_backward = time.perf_counter() + loss = F.cross_entropy(y_pred, y_true) + optimizer.zero_grad() + loss.backward() + optimizer.step() + end_time_backward = time.perf_counter() + time_backward += end_time_backward - start_time_backward + + if iter_i % 50 == 0: + log_batch( + logger=logger, + iter_i=iter_i, + num_batches=num_batches, + time_forward=time_forward, + time_backward=time_backward, + time_start=time_start, + loader_time_iter=loader_time_iter, + epoch=epoch, + rank=rank, + ) + + if max_num_batches is not None and iter_i >= max_num_batches: + break + + time_d["time_loader"] += time_loader + time_d["time_feature_indexing"] += time_feature_indexing + time_d["time_feature_transfer"] += time_feature_transfer + time_d["time_forward"] += time_forward + time_d["time_backward"] += time_backward + + return num_batches + + +def get_accuracy( + model: torch.nn.Module, + loader: torch.utils.DataLoader, + feature_store: FeatureStore, + num_classes: int, + max_num_batches: int, +) -> float: + """ + Computes the accuracy given a loader that ouputs evaluation data, the model being evaluated, + the feature store where node features are stored, and the number of output classes. + + Parameters + ---------- + model: torch.nn.Module + The model being evaluated + loader: torch.utils.DataLoader + The loader over evaluation samples + feature_store: cugraph.gnn.FeatureStore + The feature store containing node features + num_classes: int + The number of output classes of the model + max_num_batches: int + The number of batches to iterate for, will quit after reaching this number. + Used to avoid hang due to asymmetric input. + + Returns + ------- + float + The calcuated accuracy, as a percentage. + + """ + + print("Computing accuracy...", flush=True) + acc = Accuracy(task="multiclass", num_classes=num_classes).cuda() + acc_sum = 0.0 + num_batches = 0 + with torch.no_grad(): + for iter_i, (input_nodes, output_nodes, blocks) in enumerate(loader): + x, y_true = get_features( + input_nodes, output_nodes, feature_store=feature_store + ) + x = x.to("cuda") + y_true = y_true.to("cuda") + + out = model(blocks, x) + batch_size = out.shape[0] + acc_sum += acc(out[:batch_size].softmax(dim=-1), y_true[:batch_size]) + num_batches += 1 + + if max_num_batches is not None and iter_i >= max_num_batches: + break + + num_batches = num_batches + + acc_sum = torch.tensor(float(acc_sum), dtype=torch.float32, device="cuda") + td.all_reduce(acc_sum, op=td.ReduceOp.SUM) + nb = torch.tensor(float(num_batches), dtype=torch.float32, device=acc_sum.device) + td.all_reduce(nb, op=td.ReduceOp.SUM) + + acc = acc_sum / nb + + print( + f"Accuracy: {acc * 100.0:.4f}%", + ) + return acc * 100.0 + + +class DGLTrainer(Trainer): + """ + Trainer implementation for node classification in DGL. + """ + + def train(self): + logger = logging.getLogger("DGLTrainer") + time_d = { + "time_loader": 0.0, + "time_feature_indexing": 0.0, + "time_feature_transfer": 0.0, + "time_forward": 0.0, + "time_backward": 0.0, + } + total_batches = 0 + for epoch in range(self.num_epochs): + start_time = time.perf_counter() + self.model.train() + with td.algorithms.join.Join( + [self.model], divide_by_initial_world_size=False + ): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="train") + num_batches = train_epoch( + model=self.model, + optimizer=self.optimizer, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + epoch=epoch, + time_d=time_d, + logger=logger, + rank=self.rank, + max_num_batches=max_num_batches, + ) + total_batches = total_batches + num_batches + end_time = time.perf_counter() + epoch_time_taken = end_time - start_time + print( + f"RANK: {self.rank} Total time taken for training epoch {epoch} = {epoch_time_taken}", + flush=True, + ) + print("---" * 30) + td.barrier() + self.model.eval() + with td.algorithms.join.Join( + [self.model], divide_by_initial_world_size=False + ): + # test + loader, max_num_batches = self.get_loader(epoch=epoch, stage="test") + test_acc = get_accuracy( + model=self.model.module, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + max_num_batches=max_num_batches, + ) + print(f"Accuracy: {test_acc:.4f}%") + + # val: + self.model.eval() + with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="val") + val_acc = get_accuracy( + model=self.model.module, + loader=loader, + feature_store=self.data, + num_classes=self.dataset.num_labels, + max_num_batches=max_num_batches, + ) + print(f"Validation Accuracy: {val_acc:.4f}%") + + val_acc = float(val_acc) + stats = { + "Accuracy": val_acc, + "# Batches": total_batches, + "Loader Time": time_d["time_loader"], + "Feature Time": time_d["time_feature_indexing"] + + time_d["time_feature_transfer"], + "Forward Time": time_d["time_forward"], + "Backward Time": time_d["time_backward"], + } + return stats + + +# For native DGL training, see benchmarks/cugraph-dgl/scale-benchmarks diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py index 71151e9ba59..833322deffe 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_cugraph_pyg.py @@ -13,41 +13,84 @@ from .trainers_pyg import PyGTrainer from models.pyg import CuGraphSAGE +from datasets import Dataset import torch import numpy as np from torch.nn.parallel import DistributedDataParallel as ddp +from torch.distributed.optim import ZeroRedundancyOptimizer from cugraph.gnn import FeatureStore from cugraph_pyg.data import CuGraphStore from cugraph_pyg.loader import BulkSampleLoader import os +import re class PyGCuGraphTrainer(PyGTrainer): + """ + Trainer implementation for cuGraph-PyG that supports + WholeGraph as a feature store. + """ + def __init__( self, - dataset, - model="GraphSAGE", - device=0, - rank=0, - world_size=1, - num_epochs=1, - sample_dir=".", + dataset: Dataset, + model: str = "GraphSAGE", + device: int = 0, + rank: int = 0, + world_size: int = 1, + gpus_per_node: int = 1, + num_epochs: int = 1, + sample_dir: str = ".", + backend: str = "torch", **kwargs, ): + """ + Parameters + ---------- + dataset: Dataset + The dataset to train on. + model: str + The model to use for training. + Currently only "GraphSAGE" is supported. + device: int, default=0 + The CUDA device to use. + rank: int, default=0 + The global rank of the worker this trainer is assigned to. + world_size: int, default=1 + The number of workers in the world. + num_epochs: int, default=1 + The number of training epochs to run. + sample_dir: str, default="." + The directory where samples generated by the bulk sampler + are stored. + backend: str, default="torch" + The feature store backend to be used by the cuGraph Feature Store. + Defaults to "torch". Options are "torch" and "wholegraph" + kwargs + Keyword arguments to pass to the loader. + """ + + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + logger.info("creating trainer") self.__data = None self.__device = device self.__rank = rank self.__world_size = world_size + self.__gpus_per_node = gpus_per_node self.__num_epochs = num_epochs self.__dataset = dataset self.__sample_dir = sample_dir self.__loader_kwargs = kwargs self.__model = self.get_model(model) + self.__backend = backend self.__optimizer = None + logger.info("created trainer") @property def rank(self): @@ -64,8 +107,11 @@ def dataset(self): @property def optimizer(self): if self.__optimizer is None: - self.__optimizer = torch.optim.Adam( - self.model.parameters(), lr=0.01, weight_decay=0.0005 + self.__optimizer = ZeroRedundancyOptimizer( + self.model.parameters(), + lr=0.01, + weight_decay=0.0005, + optimizer_class=torch.optim.Adam, ) return self.__optimizer @@ -73,7 +119,7 @@ def optimizer(self): def num_epochs(self) -> int: return self.__num_epochs - def get_loader(self, epoch: int = 0, stage="train") -> int: + def get_loader(self, epoch: int = 0, stage="train"): import logging logger = logging.getLogger("PyGCuGraphTrainer") @@ -81,22 +127,25 @@ def get_loader(self, epoch: int = 0, stage="train") -> int: logger.info(f"getting loader for epoch {epoch}, {stage} stage") # TODO support online sampling - if stage == "val": - path = os.path.join(self.__sample_dir, "val", "samples") - else: + if stage == "train": path = os.path.join(self.__sample_dir, f"epoch={epoch}", stage, "samples") + elif stage in ["test", "val"]: + path = os.path.join(self.__sample_dir, stage, "samples") + else: + raise ValueError(f"invalid stage {stage}") + input_files, num_batches = self.get_input_files(path, epoch=epoch, stage=stage) loader = BulkSampleLoader( self.data, self.data, None, # FIXME get input nodes properly directory=path, - input_files=self.get_input_files(path, epoch=epoch, stage=stage), + input_files=input_files, **self.__loader_kwargs, ) logger.info(f"got loader successfully on rank {self.rank}") - return loader + return loader, num_batches @property def data(self): @@ -106,36 +155,73 @@ def data(self): logger.info("getting data") if self.__data is None: - # FIXME wholegraph - fs = FeatureStore(backend="torch") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs = FeatureStore( + backend="wholegraph", + wg_type="chunked", + wg_location="cpu", + ) + else: + fs = FeatureStore(backend=self.__backend) num_nodes_dict = {} + if self.__backend == "wholegraph": + from pylibwholegraph.torch.initialize import get_global_communicator + + wm_comm = get_global_communicator() + wm_comm.barrier() + for node_type, x in self.__dataset.x_dict.items(): logger.debug(f"getting x for {node_type}") fs.add_data(x, node_type, "x") num_nodes_dict[node_type] = self.__dataset.num_nodes(node_type) + if self.__backend == "wholegraph": + wm_comm.barrier() for node_type, y in self.__dataset.y_dict.items(): logger.debug(f"getting y for {node_type}") - fs.add_data(y, node_type, "y") + if self.__backend == "wholegraph": + logger.info("using wholegraph backend") + fs.add_data(y, node_type, "y") + wm_comm.barrier() + else: + y = y.cuda() + y = y.reshape((y.shape[0], 1)) + fs.add_data(y, node_type, "y") + + """ for node_type, train in self.__dataset.train_dict.items(): logger.debug(f"getting train for {node_type}") + train = train.reshape((train.shape[0], 1)) + if self.__backend != "wholegraph": + train = train.cuda() fs.add_data(train, node_type, "train") for node_type, test in self.__dataset.test_dict.items(): logger.debug(f"getting test for {node_type}") + test = test.reshape((test.shape[0], 1)) + if self.__backend != "wholegraph": + test = test.cuda() fs.add_data(test, node_type, "test") for node_type, val in self.__dataset.val_dict.items(): logger.debug(f"getting val for {node_type}") + val = val.reshape((val.shape[0], 1)) + if self.__backend != "wholegraph": + val = val.cuda() fs.add_data(val, node_type, "val") + """ # TODO support online sampling if the edge index is provided num_edges_dict = self.__dataset.edge_index_dict if not isinstance(list(num_edges_dict.values())[0], int): num_edges_dict = {k: len(v) for k, v in num_edges_dict} + if self.__backend == "wholegraph": + wm_comm.barrier() + self.__data = CuGraphStore( fs, num_edges_dict, @@ -147,14 +233,28 @@ def data(self): return self.__data def get_model(self, name="GraphSAGE"): + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + + logger.info("Creating model...") + if name != "GraphSAGE": raise ValueError("only GraphSAGE is currently supported") + logger.info("getting input features...") num_input_features = self.__dataset.num_input_features + + logger.info("getting output features...") num_output_features = self.__dataset.num_labels + + logger.info("getting num neighbors...") num_layers = len(self.__loader_kwargs["num_neighbors"]) + logger.info("Got input features, output features, num neighbors") + with torch.cuda.device(self.__device): + logger.info("Constructing CuGraphSAGE model...") model = ( CuGraphSAGE( in_channels=num_input_features, @@ -166,8 +266,10 @@ def get_model(self, name="GraphSAGE"): .to(self.__device) ) + logger.info("Parallelizing model with ddp...") model = ddp(model, device_ids=[self.__device]) - print("done creating model") + + logger.info("done creating model") return model @@ -175,10 +277,28 @@ def get_input_files(self, path, epoch=0, stage="train"): file_list = np.array(os.listdir(path)) file_list.sort() - if stage == "train": - splits = np.array_split(file_list, self.__world_size) - np.random.seed(epoch) - np.random.shuffle(splits) - return splits[self.rank] - else: - return file_list + np.random.seed(epoch) + np.random.shuffle(file_list) + + splits = np.array_split(file_list, self.__gpus_per_node) + + import logging + + logger = logging.getLogger("PyGCuGraphTrainer") + + split = splits[self.__device] + logger.info(f"rank {self.__rank} input files: {str(split)}") + + ex = re.compile(r"batch=([0-9]+)\-([0-9]+).parquet") + num_batches = min( + [ + sum([int(ex.match(fname)[2]) - int(ex.match(fname)[1]) for fname in s]) + for s in splits + ] + ) + if num_batches == 0: + raise ValueError( + f"Too few batches for training with world size {self.__world_size}" + ) + + return split, num_batches diff --git a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py index bddd6ae2644..d6205901b68 100644 --- a/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py +++ b/benchmarks/cugraph/standalone/bulk_sampling/trainers/pyg/trainers_pyg.py @@ -33,7 +33,12 @@ import time -def pyg_num_workers(world_size): +def pyg_num_workers(world_size: int) -> int: + """ + Calculates the number of workers for the + loader in PyG by calling sched_getaffinity. + """ + num_workers = None if hasattr(os, "sched_getaffinity"): try: @@ -45,14 +50,80 @@ def pyg_num_workers(world_size): return int(num_workers) +def calc_accuracy( + loader: NeighborLoader, + max_num_batches: int, + model: torch.nn.Module, + num_classes: int, +) -> float: + """ + Evaluates the accuracy of a model given a loader over evaluation samples. + + Parameters + ---------- + loader: NeighborLoader + The loader over evaluation samples. + model: torch.nn.Module + The model being evaluated. + num_classes: int + The number of output classes of the model. + + Returns + ------- + The calculated accuracy as a fraction. + """ + + from torchmetrics import Accuracy + + acc = Accuracy(task="multiclass", num_classes=num_classes).cuda() + + acc_sum = 0.0 + num_batches = 0 + with torch.no_grad(): + for i, batch in enumerate(loader): + num_sampled_nodes = sum( + [torch.as_tensor(n) for n in batch.num_sampled_nodes_dict.values()] + ) + num_sampled_edges = sum( + [torch.as_tensor(e) for e in batch.num_sampled_edges_dict.values()] + ) + batch_size = num_sampled_nodes[0] + + batch = batch.to_homogeneous().cuda() + + batch.y = batch.y.to(torch.long).reshape((batch.y.shape[0],)) + + out = model( + batch.x, + batch.edge_index, + num_sampled_nodes, + num_sampled_edges, + ) + acc_sum += acc(out[:batch_size].softmax(dim=-1), batch.y[:batch_size]) + num_batches += 1 + + if max_num_batches is not None and i >= max_num_batches: + break + + acc_sum = torch.tensor(float(acc_sum), dtype=torch.float32, device="cuda") + td.all_reduce(acc_sum, op=td.ReduceOp.SUM) + nb = torch.tensor(float(num_batches), dtype=torch.float32, device=acc_sum.device) + td.all_reduce(nb, op=td.ReduceOp.SUM) + + return acc_sum / nb + + class PyGTrainer(Trainer): + """ + Trainer implementation for node classification in PyG. + """ + def train(self): import logging logger = logging.getLogger("PyGTrainer") logger.info("Entered train loop") - total_loss = 0.0 num_batches = 0 time_forward = 0.0 @@ -62,19 +133,32 @@ def train(self): start_time = time.perf_counter() end_time_backward = start_time + num_layers = len(self.model.module.convs) + for epoch in range(self.num_epochs): with td.algorithms.join.Join( - [self.model], divide_by_initial_world_size=False + [self.model, self.optimizer], divide_by_initial_world_size=False ): self.model.train() - for iter_i, data in enumerate( - self.get_loader(epoch=epoch, stage="train") - ): + loader, max_num_batches = self.get_loader(epoch=epoch, stage="train") + + max_num_batches = torch.tensor([max_num_batches], device="cuda") + torch.distributed.all_reduce( + max_num_batches, op=torch.distributed.ReduceOp.MIN + ) + max_num_batches = int(max_num_batches[0]) + + for iter_i, data in enumerate(loader): loader_time_iter = time.perf_counter() - end_time_backward time_loader += loader_time_iter time_feature_transfer_start = time.perf_counter() + if len(data.edge_index_dict[("paper", "cites", "paper")][0]) < 3: + logger.error(f"Invalid edge index in iteration {iter_i}") + data = old_data + + old_data = data num_sampled_nodes = sum( [ torch.as_tensor(n) @@ -89,7 +173,6 @@ def train(self): ) # FIXME find a way to get around this and not have to call extend_tensor - num_layers = len(self.model.module.convs) num_sampled_nodes = extend_tensor(num_sampled_nodes, num_layers + 1) num_sampled_edges = extend_tensor(num_sampled_edges, num_layers) @@ -118,7 +201,12 @@ def train(self): ) logger.info(f"total time: {total_time_iter}") + # from pynvml.smi import nvidia_smi + # mem_info = nvidia_smi.getInstance().DeviceQuery('memory.free, memory.total')['gpu'][self.rank % 8]['fb_memory_usage'] + # logger.info(f"rank {self.rank} memory: {mem_info}") + y_true = data.y + y_true = y_true.reshape((y_true.shape[0],)) x = data.x.to(torch.float32) start_time_forward = time.perf_counter() @@ -160,101 +248,48 @@ def train(self): self.optimizer.zero_grad() loss.backward() self.optimizer.step() - total_loss += loss.item() end_time_backward = time.perf_counter() time_backward += end_time_backward - start_time_backward - end_time = time.perf_counter() - - # test - from torchmetrics import Accuracy + if max_num_batches is not None and iter_i >= max_num_batches: + break - acc = Accuracy( - task="multiclass", num_classes=self.dataset.num_labels - ).cuda() + end_time = time.perf_counter() + """ + logger.info("Entering test stage...") with td.algorithms.join.Join( [self.model], divide_by_initial_world_size=False ): self.model.eval() - if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="test") - ): - num_sampled_nodes = sum( - [ - torch.as_tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.as_tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) - print( - f"Accuracy: {acc_sum/(i) * 100.0:.4f}%", - ) + loader, max_num_batches = self.get_loader(epoch=epoch, stage="test") + num_classes = self.dataset.num_labels - td.barrier() + acc = calc_accuracy( + loader, max_num_batches, self.model.module, num_classes + ) - with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): - self.model.eval() if self.rank == 0: - acc_sum = 0.0 - with torch.no_grad(): - for i, batch in enumerate( - self.get_loader(epoch=epoch, stage="val") - ): - num_sampled_nodes = sum( - [ - torch.as_tensor(n) - for n in batch.num_sampled_nodes_dict.values() - ] - ) - num_sampled_edges = sum( - [ - torch.as_tensor(e) - for e in batch.num_sampled_edges_dict.values() - ] - ) - batch_size = num_sampled_nodes[0] - - batch = batch.to_homogeneous().cuda() - - batch.y = batch.y.to(torch.long) - out = self.model.module( - batch.x, - batch.edge_index, - num_sampled_nodes, - num_sampled_edges, - ) - acc_sum += acc( - out[:batch_size].softmax(dim=-1), batch.y[:batch_size] - ) print( - f"Validation Accuracy: {acc_sum/(i) * 100.0:.4f}%", + f"Accuracy: {acc * 100.0:.4f}%", ) + """ + + """ + logger.info("Entering validation stage") + with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False): + self.model.eval() + loader, max_num_batches = self.get_loader(epoch=epoch, stage="val") + num_classes = self.dataset.num_labels + acc = calc_accuracy(loader, max_num_batches, self.model.module, num_classes) + + if self.rank == 0: + print( + f"Validation Accuracy: {acc * 100.0:.4f}%", + ) + """ stats = { - "Accuracy": float(acc_sum / (i) * 100.0) if self.rank == 0 else 0.0, "# Batches": num_batches, "Loader Time": time_loader, "Feature Transfer Time": time_feature_transfer, @@ -265,6 +300,12 @@ def train(self): class PyGNativeTrainer(PyGTrainer): + """ + Trainer implementation for native PyG + training using HeteroData as the graph and feature + store and NeighborLoader as the loader. + """ + def __init__( self, dataset, @@ -403,7 +444,7 @@ def get_loader(self, epoch: int = 0, stage="train"): ) logger.info("done creating loader") - return loader + return loader, None def get_model(self, name="GraphSAGE"): if name != "GraphSAGE": diff --git a/ci/checks/copyright.py b/ci/checks/copyright.py deleted file mode 100644 index ba8b73898e2..00000000000 --- a/ci/checks/copyright.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import argparse -import datetime -import os -import re -import sys - -import git - -FilesToCheck = [ - re.compile(r"[.](cmake|cpp|cu|cuh|h|hpp|sh|pxd|py|pyx)$"), - re.compile(r"CMakeLists[.]txt$"), - re.compile(r"setup[.]cfg$"), - re.compile(r"[.]flake8[.]cython$"), - re.compile(r"meta[.]yaml$"), -] - -# this will break starting at year 10000, which is probably OK :) -CheckSimple = re.compile( - r"Copyright *(?:\(c\))? *(\d{4}),? *NVIDIA C(?:ORPORATION|orporation)" -) -CheckDouble = re.compile( - r"Copyright *(?:\(c\))? *(\d{4})-(\d{4}),? *NVIDIA C(?:ORPORATION|orporation)" # noqa: E501 -) - - -def checkThisFile(f): - if isinstance(f, git.Diff): - if f.deleted_file or f.b_blob.size == 0: - return False - f = f.b_path - elif not os.path.exists(f) or os.stat(f).st_size == 0: - # This check covers things like symlinks which point to files that DNE - return False - for checker in FilesToCheck: - if checker.search(f): - return True - return False - - -def modifiedFiles(): - """Get a set of all modified files, as Diff objects. - - The files returned have been modified in git since the merge base of HEAD - and the upstream of the target branch. We return the Diff objects so that - we can read only the staged changes. - """ - repo = git.Repo() - # Use the environment variable TARGET_BRANCH or RAPIDS_BASE_BRANCH (defined in CI) if possible - target_branch = os.environ.get("TARGET_BRANCH", os.environ.get("RAPIDS_BASE_BRANCH")) - if target_branch is None: - # Fall back to the closest branch if not on CI - target_branch = repo.git.describe( - all=True, tags=True, match="branch-*", abbrev=0 - ).lstrip("heads/") - - upstream_target_branch = None - if target_branch in repo.heads: - # Use the tracking branch of the local reference if it exists. This - # returns None if no tracking branch is set. - upstream_target_branch = repo.heads[target_branch].tracking_branch() - if upstream_target_branch is None: - # Fall back to the remote with the newest target_branch. This code - # path is used on CI because the only local branch reference is - # current-pr-branch, and thus target_branch is not in repo.heads. - # This also happens if no tracking branch is defined for the local - # target_branch. We use the remote with the latest commit if - # multiple remotes are defined. - candidate_branches = [ - remote.refs[target_branch] for remote in repo.remotes - if target_branch in remote.refs - ] - if len(candidate_branches) > 0: - upstream_target_branch = sorted( - candidate_branches, - key=lambda branch: branch.commit.committed_datetime, - )[-1] - else: - # If no remotes are defined, try to use the local version of the - # target_branch. If this fails, the repo configuration must be very - # strange and we can fix this script on a case-by-case basis. - upstream_target_branch = repo.heads[target_branch] - merge_base = repo.merge_base("HEAD", upstream_target_branch.commit)[0] - diff = merge_base.diff() - changed_files = {f for f in diff if f.b_path is not None} - return changed_files - - -def getCopyrightYears(line): - res = CheckSimple.search(line) - if res: - return int(res.group(1)), int(res.group(1)) - res = CheckDouble.search(line) - if res: - return int(res.group(1)), int(res.group(2)) - return None, None - - -def replaceCurrentYear(line, start, end): - # first turn a simple regex into double (if applicable). then update years - res = CheckSimple.sub(r"Copyright (c) \1-\1, NVIDIA CORPORATION", line) - res = CheckDouble.sub( - rf"Copyright (c) {start:04d}-{end:04d}, NVIDIA CORPORATION", - res, - ) - return res - - -def checkCopyright(f, update_current_year): - """Checks for copyright headers and their years.""" - errs = [] - thisYear = datetime.datetime.now().year - lineNum = 0 - crFound = False - yearMatched = False - - if isinstance(f, git.Diff): - path = f.b_path - lines = f.b_blob.data_stream.read().decode().splitlines(keepends=True) - else: - path = f - with open(f, encoding="utf-8") as fp: - lines = fp.readlines() - - for line in lines: - lineNum += 1 - start, end = getCopyrightYears(line) - if start is None: - continue - crFound = True - if start > end: - e = [ - path, - lineNum, - "First year after second year in the copyright " - "header (manual fix required)", - None, - ] - errs.append(e) - elif thisYear < start or thisYear > end: - e = [ - path, - lineNum, - "Current year not included in the copyright header", - None, - ] - if thisYear < start: - e[-1] = replaceCurrentYear(line, thisYear, end) - if thisYear > end: - e[-1] = replaceCurrentYear(line, start, thisYear) - errs.append(e) - else: - yearMatched = True - # copyright header itself not found - if not crFound: - e = [ - path, - 0, - "Copyright header missing or formatted incorrectly " - "(manual fix required)", - None, - ] - errs.append(e) - # even if the year matches a copyright header, make the check pass - if yearMatched: - errs = [] - - if update_current_year: - errs_update = [x for x in errs if x[-1] is not None] - if len(errs_update) > 0: - lines_changed = ", ".join(str(x[1]) for x in errs_update) - print(f"File: {path}. Changing line(s) {lines_changed}") - for _, lineNum, __, replacement in errs_update: - lines[lineNum - 1] = replacement - with open(path, "w", encoding="utf-8") as out_file: - out_file.writelines(lines) - - return errs - - -def getAllFilesUnderDir(root, pathFilter=None): - retList = [] - for dirpath, dirnames, filenames in os.walk(root): - for fn in filenames: - filePath = os.path.join(dirpath, fn) - if pathFilter(filePath): - retList.append(filePath) - return retList - - -def checkCopyright_main(): - """ - Checks for copyright headers in all the modified files. In case of local - repo, this script will just look for uncommitted files and in case of CI - it compares between branches "$PR_TARGET_BRANCH" and "current-pr-branch" - """ - retVal = 0 - - argparser = argparse.ArgumentParser( - "Checks for a consistent copyright header in git's modified files" - ) - argparser.add_argument( - "--update-current-year", - dest="update_current_year", - action="store_true", - required=False, - help="If set, " - "update the current year if a header is already " - "present and well formatted.", - ) - argparser.add_argument( - "--git-modified-only", - dest="git_modified_only", - action="store_true", - required=False, - help="If set, " - "only files seen as modified by git will be " - "processed.", - ) - - args, dirs = argparser.parse_known_args() - - if args.git_modified_only: - files = [f for f in modifiedFiles() if checkThisFile(f)] - else: - files = [] - for d in [os.path.abspath(d) for d in dirs]: - if not os.path.isdir(d): - raise ValueError(f"{d} is not a directory.") - files += getAllFilesUnderDir(d, pathFilter=checkThisFile) - - errors = [] - for f in files: - errors += checkCopyright(f, args.update_current_year) - - if len(errors) > 0: - if any(e[-1] is None for e in errors): - print("Copyright headers incomplete in some of the files!") - for e in errors: - print(" %s:%d Issue: %s" % (e[0], e[1], e[2])) - print("") - n_fixable = sum(1 for e in errors if e[-1] is not None) - path_parts = os.path.abspath(__file__).split(os.sep) - file_from_repo = os.sep.join(path_parts[path_parts.index("ci") :]) - if n_fixable > 0 and not args.update_current_year: - print( - f"You can run `python {file_from_repo} --git-modified-only " - "--update-current-year` and stage the results in git to " - f"fix {n_fixable} of these errors.\n" - ) - retVal = 1 - - return retVal - - -if __name__ == "__main__": - sys.exit(checkCopyright_main()) diff --git a/cpp/include/cugraph/detail/decompress_edge_partition.cuh b/cpp/include/cugraph/detail/decompress_edge_partition.cuh index dad5ce77e45..6b974a326dd 100644 --- a/cpp/include/cugraph/detail/decompress_edge_partition.cuh +++ b/cpp/include/cugraph/detail/decompress_edge_partition.cuh @@ -44,7 +44,7 @@ namespace detail { int32_t constexpr decompress_edge_partition_block_size = 1024; template -__global__ void decompress_to_edgelist_mid_degree( +__global__ static void decompress_to_edgelist_mid_degree( edge_partition_device_view_t edge_partition, vertex_t major_range_first, vertex_t major_range_last, @@ -74,7 +74,7 @@ __global__ void decompress_to_edgelist_mid_degree( } template -__global__ void decompress_to_edgelist_high_degree( +__global__ static void decompress_to_edgelist_high_degree( edge_partition_device_view_t edge_partition, vertex_t major_range_first, vertex_t major_range_last, diff --git a/cpp/libcugraph_etl/include/hash/helper_functions.cuh b/cpp/libcugraph_etl/include/hash/helper_functions.cuh index db377f938d2..8a11867f7e2 100644 --- a/cpp/libcugraph_etl/include/hash/helper_functions.cuh +++ b/cpp/libcugraph_etl/include/hash/helper_functions.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2022, NVIDIA CORPORATION. + * Copyright (c) 2017-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,7 +131,7 @@ __forceinline__ __device__ void store_pair_vectorized(pair_type* __restrict__ co } template -__global__ void init_hashtbl(value_type* __restrict__ const hashtbl_values, +__global__ static void init_hashtbl(value_type* __restrict__ const hashtbl_values, const size_type n, const key_type key_val, const elem_type elem_val) diff --git a/cpp/libcugraph_etl/src/renumbering.cu b/cpp/libcugraph_etl/src/renumbering.cu index 08759702ab4..1cbeeeeea05 100644 --- a/cpp/libcugraph_etl/src/renumbering.cu +++ b/cpp/libcugraph_etl/src/renumbering.cu @@ -270,7 +270,7 @@ __device__ __inline__ int32_t validate_ht_col_insert(volatile int32_t* ptr_col) return col; } -__global__ void concat_and_create_histogram(int8_t* col_1, +__global__ static void concat_and_create_histogram(int8_t* col_1, int32_t* offset_1, int8_t* col_2, int32_t* offset_2, @@ -349,7 +349,7 @@ __global__ void concat_and_create_histogram(int8_t* col_1, } } -__global__ void concat_and_create_histogram_2(int8_t* col_1, +__global__ static void concat_and_create_histogram_2(int8_t* col_1, int32_t* offset_1, int8_t* col_2, int32_t* offset_2, @@ -452,7 +452,7 @@ __global__ void concat_and_create_histogram_2(int8_t* col_1, } template -__global__ void set_src_vertex_idx(int8_t* col_1, +__global__ static void set_src_vertex_idx(int8_t* col_1, int32_t* offset_1, int8_t* col_2, int32_t* offset_2, @@ -509,7 +509,7 @@ __global__ void set_src_vertex_idx(int8_t* col_1, } template -__global__ void set_dst_vertex_idx(int8_t* col_1, +__global__ static void set_dst_vertex_idx(int8_t* col_1, int32_t* offset_1, int8_t* col_2, int32_t* offset_2, @@ -585,7 +585,7 @@ __global__ void set_dst_vertex_idx(int8_t* col_1, } } -__global__ void create_mapping_histogram(uint32_t* hash_value, +__global__ static void create_mapping_histogram(uint32_t* hash_value, str_hash_value* payload, cudf_map_type hash_map, accum_type count) @@ -595,7 +595,7 @@ __global__ void create_mapping_histogram(uint32_t* hash_value, if (idx < count) { auto it = hash_map.insert(thrust::make_pair(hash_value[idx], payload[idx])); } } -__global__ void assign_histogram_idx(cudf_map_type cuda_map_obj, +__global__ static void assign_histogram_idx(cudf_map_type cuda_map_obj, size_t slot_count, str_hash_value* key, uint32_t* value, @@ -621,7 +621,7 @@ __global__ void assign_histogram_idx(cudf_map_type cuda_map_obj, } } -__global__ void set_vertex_indices(str_hash_value* ht_value_payload, accum_type count) +__global__ static void set_vertex_indices(str_hash_value* ht_value_payload, accum_type count) { accum_type tid = threadIdx.x + blockIdx.x * blockDim.x; // change count_ to renumber_idx @@ -630,7 +630,7 @@ __global__ void set_vertex_indices(str_hash_value* ht_value_payload, accum_type } } -__global__ void set_output_col_offsets(str_hash_value* row_col_pair, +__global__ static void set_output_col_offsets(str_hash_value* row_col_pair, int32_t* out_col1_offset, int32_t* out_col2_offset, int dst_pair_match, @@ -653,7 +653,7 @@ __global__ void set_output_col_offsets(str_hash_value* row_col_pair, } } -__global__ void offset_buffer_size_comp(int32_t* out_col1_length, +__global__ static void offset_buffer_size_comp(int32_t* out_col1_length, int32_t* out_col2_length, int32_t* out_col1_offsets, int32_t* out_col2_offsets, @@ -673,7 +673,7 @@ __global__ void offset_buffer_size_comp(int32_t* out_col1_length, } } -__global__ void select_unrenumber_string(str_hash_value* idx_to_col_row, +__global__ static void select_unrenumber_string(str_hash_value* idx_to_col_row, int32_t total_elements, int8_t* src_col1, int8_t* src_col2, diff --git a/cpp/src/community/legacy/ecg.cu b/cpp/src/community/legacy/ecg.cu index d93a4446faa..b2ad79204ed 100644 --- a/cpp/src/community/legacy/ecg.cu +++ b/cpp/src/community/legacy/ecg.cu @@ -52,7 +52,7 @@ binsearch_maxle(const IndexType* vec, const IndexType val, IndexType low, IndexT // FIXME: This shouldn't need to be a custom kernel, this // seems like it should just be a thrust::transform template -__global__ void match_check_kernel( +__global__ static void match_check_kernel( IdxT size, IdxT num_verts, IdxT* offsets, IdxT* indices, IdxT* parts, ValT* weights) { IdxT tid = blockIdx.x * blockDim.x + threadIdx.x; diff --git a/cpp/src/components/legacy/weak_cc.cuh b/cpp/src/components/legacy/weak_cc.cuh index ad9aa773590..f4254e2d55d 100644 --- a/cpp/src/components/legacy/weak_cc.cuh +++ b/cpp/src/components/legacy/weak_cc.cuh @@ -59,15 +59,15 @@ class WeakCCState { }; template -__global__ void weak_cc_label_device(vertex_t* labels, - edge_t const* offsets, - vertex_t const* indices, - edge_t nnz, - bool* fa, - bool* xa, - bool* m, - vertex_t startVertexId, - vertex_t batchSize) +__global__ static void weak_cc_label_device(vertex_t* labels, + edge_t const* offsets, + vertex_t const* indices, + edge_t nnz, + bool* fa, + bool* xa, + bool* m, + vertex_t startVertexId, + vertex_t batchSize) { vertex_t tid = threadIdx.x + blockIdx.x * TPB_X; if (tid < batchSize) { @@ -118,11 +118,11 @@ __global__ void weak_cc_label_device(vertex_t* labels, } template -__global__ void weak_cc_init_label_kernel(vertex_t* labels, - vertex_t startVertexId, - vertex_t batchSize, - vertex_t MAX_LABEL, - Lambda filter_op) +__global__ static void weak_cc_init_label_kernel(vertex_t* labels, + vertex_t startVertexId, + vertex_t batchSize, + vertex_t MAX_LABEL, + Lambda filter_op) { /** F1 and F2 in the paper correspond to fa and xa */ /** Cd in paper corresponds to db_cluster */ @@ -134,7 +134,7 @@ __global__ void weak_cc_init_label_kernel(vertex_t* labels, } template -__global__ void weak_cc_init_all_kernel( +__global__ static void weak_cc_init_all_kernel( vertex_t* labels, bool* fa, bool* xa, vertex_t N, vertex_t MAX_LABEL) { vertex_t tid = threadIdx.x + blockIdx.x * TPB_X; diff --git a/cpp/src/layout/legacy/bh_kernels.cuh b/cpp/src/layout/legacy/bh_kernels.cuh index 5b101363314..f6e163ab306 100644 --- a/cpp/src/layout/legacy/bh_kernels.cuh +++ b/cpp/src/layout/legacy/bh_kernels.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,9 +42,9 @@ namespace detail { /** * Intializes the states of objects. This speeds the overall kernel up. */ -__global__ void InitializationKernel(unsigned* restrict limiter, - int* restrict maxdepthd, - float* restrict radiusd) +__global__ static void InitializationKernel(unsigned* restrict limiter, + int* restrict maxdepthd, + float* restrict radiusd) { maxdepthd[0] = 1; limiter[0] = 0; @@ -54,10 +54,10 @@ __global__ void InitializationKernel(unsigned* restrict limiter, /** * Reset root. */ -__global__ void ResetKernel(float* restrict radiusd_squared, - int* restrict bottomd, - const int NNODES, - const float* restrict radiusd) +__global__ static void ResetKernel(float* restrict radiusd_squared, + int* restrict bottomd, + const int NNODES, + const float* restrict radiusd) { radiusd_squared[0] = radiusd[0] * radiusd[0]; // create root node @@ -67,20 +67,21 @@ __global__ void ResetKernel(float* restrict radiusd_squared, /** * Figures the bounding boxes for every point in the embedding. */ -__global__ __launch_bounds__(THREADS1, FACTOR1) void BoundingBoxKernel(int* restrict startd, - int* restrict childd, - int* restrict massd, - float* restrict posxd, - float* restrict posyd, - float* restrict maxxd, - float* restrict maxyd, - float* restrict minxd, - float* restrict minyd, - const int FOUR_NNODES, - const int NNODES, - const int N, - unsigned* restrict limiter, - float* restrict radiusd) +__global__ static __launch_bounds__(THREADS1, + FACTOR1) void BoundingBoxKernel(int* restrict startd, + int* restrict childd, + int* restrict massd, + float* restrict posxd, + float* restrict posyd, + float* restrict maxxd, + float* restrict maxyd, + float* restrict minxd, + float* restrict minyd, + const int FOUR_NNODES, + const int NNODES, + const int N, + unsigned* restrict limiter, + float* restrict radiusd) { float val, minx, maxx, miny, maxy; __shared__ float sminx[THREADS1], smaxx[THREADS1], sminy[THREADS1], smaxy[THREADS1]; @@ -158,9 +159,9 @@ __global__ __launch_bounds__(THREADS1, FACTOR1) void BoundingBoxKernel(int* rest /** * Clear some of the state vectors up. */ -__global__ __launch_bounds__(1024, 1) void ClearKernel1(int* restrict childd, - const int FOUR_NNODES, - const int FOUR_N) +__global__ static __launch_bounds__(1024, 1) void ClearKernel1(int* restrict childd, + const int FOUR_NNODES, + const int FOUR_N) { const int inc = blockDim.x * gridDim.x; int k = (FOUR_N & -32) + threadIdx.x + blockIdx.x * blockDim.x; @@ -175,15 +176,15 @@ __global__ __launch_bounds__(1024, 1) void ClearKernel1(int* restrict childd, /** * Build the actual KD Tree. */ -__global__ __launch_bounds__(THREADS2, - FACTOR2) void TreeBuildingKernel(int* restrict childd, - const float* restrict posxd, - const float* restrict posyd, - const int NNODES, - const int N, - int* restrict maxdepthd, - int* restrict bottomd, - const float* restrict radiusd) +__global__ static __launch_bounds__(THREADS2, + FACTOR2) void TreeBuildingKernel(int* restrict childd, + const float* restrict posxd, + const float* restrict posyd, + const int NNODES, + const int N, + int* restrict maxdepthd, + int* restrict bottomd, + const float* restrict radiusd) { int j, depth; float x, y, r; @@ -296,10 +297,10 @@ __global__ __launch_bounds__(THREADS2, /** * Clean more state vectors. */ -__global__ __launch_bounds__(1024, 1) void ClearKernel2(int* restrict startd, - int* restrict massd, - const int NNODES, - const int* restrict bottomd) +__global__ static __launch_bounds__(1024, 1) void ClearKernel2(int* restrict startd, + int* restrict massd, + const int NNODES, + const int* restrict bottomd) { const int bottom = bottomd[0]; const int inc = blockDim.x * gridDim.x; @@ -317,15 +318,15 @@ __global__ __launch_bounds__(1024, 1) void ClearKernel2(int* restrict startd, /** * Summarize the KD Tree via cell gathering */ -__global__ __launch_bounds__(THREADS3, - FACTOR3) void SummarizationKernel(int* restrict countd, - const int* restrict childd, - volatile int* restrict massd, - float* restrict posxd, - float* restrict posyd, - const int NNODES, - const int N, - const int* restrict bottomd) +__global__ static __launch_bounds__(THREADS3, + FACTOR3) void SummarizationKernel(int* restrict countd, + const int* restrict childd, + volatile int* restrict massd, + float* restrict posxd, + float* restrict posyd, + const int NNODES, + const int N, + const int* restrict bottomd) { bool flag = 0; float cm, px, py; @@ -453,13 +454,14 @@ __global__ __launch_bounds__(THREADS3, /** * Sort the cells */ -__global__ __launch_bounds__(THREADS4, FACTOR4) void SortKernel(int* restrict sortd, - const int* restrict countd, - volatile int* restrict startd, - int* restrict childd, - const int NNODES, - const int N, - const int* restrict bottomd) +__global__ static __launch_bounds__(THREADS4, + FACTOR4) void SortKernel(int* restrict sortd, + const int* restrict countd, + volatile int* restrict startd, + int* restrict childd, + const int NNODES, + const int N, + const int* restrict bottomd) { const int bottom = bottomd[0]; const int dec = blockDim.x * gridDim.x; @@ -502,7 +504,7 @@ __global__ __launch_bounds__(THREADS4, FACTOR4) void SortKernel(int* restrict so /** * Calculate the repulsive forces using the KD Tree */ -__global__ __launch_bounds__( +__global__ static __launch_bounds__( THREADS5, FACTOR5) void RepulsionKernel(/* int *restrict errd, */ const float scaling_ratio, const float theta, @@ -612,18 +614,18 @@ __global__ __launch_bounds__( } } -__global__ __launch_bounds__(THREADS6, - FACTOR6) void apply_forces_bh(float* restrict Y_x, - float* restrict Y_y, - const float* restrict attract_x, - const float* restrict attract_y, - const float* restrict repel_x, - const float* restrict repel_y, - float* restrict old_dx, - float* restrict old_dy, - const float* restrict swinging, - const float speed, - const int n) +__global__ static __launch_bounds__(THREADS6, + FACTOR6) void apply_forces_bh(float* restrict Y_x, + float* restrict Y_y, + const float* restrict attract_x, + const float* restrict attract_y, + const float* restrict repel_x, + const float* restrict repel_y, + float* restrict old_dx, + float* restrict old_dy, + const float* restrict swinging, + const float speed, + const int n) { // For evrery vertex for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { diff --git a/cpp/src/layout/legacy/exact_repulsion.cuh b/cpp/src/layout/legacy/exact_repulsion.cuh index fe895bae6a0..8530202afd5 100644 --- a/cpp/src/layout/legacy/exact_repulsion.cuh +++ b/cpp/src/layout/legacy/exact_repulsion.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,13 +22,13 @@ namespace cugraph { namespace detail { template -__global__ void repulsion_kernel(const float* restrict x_pos, - const float* restrict y_pos, - float* restrict repel_x, - float* restrict repel_y, - const int* restrict mass, - const float scaling_ratio, - const vertex_t n) +__global__ static void repulsion_kernel(const float* restrict x_pos, + const float* restrict y_pos, + float* restrict repel_x, + float* restrict repel_y, + const int* restrict mass, + const float scaling_ratio, + const vertex_t n) { int j = (blockIdx.x * blockDim.x) + threadIdx.x; // for every item in row int i = (blockIdx.y * blockDim.y) + threadIdx.y; // for every row diff --git a/cpp/src/layout/legacy/fa2_kernels.cuh b/cpp/src/layout/legacy/fa2_kernels.cuh index 4f1ce520387..33e7841a380 100644 --- a/cpp/src/layout/legacy/fa2_kernels.cuh +++ b/cpp/src/layout/legacy/fa2_kernels.cuh @@ -23,19 +23,19 @@ namespace cugraph { namespace detail { template -__global__ void attraction_kernel(const vertex_t* restrict row, - const vertex_t* restrict col, - const weight_t* restrict v, - const edge_t e, - const float* restrict x_pos, - const float* restrict y_pos, - float* restrict attract_x, - float* restrict attract_y, - const int* restrict mass, - bool outbound_attraction_distribution, - bool lin_log_mode, - const float edge_weight_influence, - const float coef) +__global__ static void attraction_kernel(const vertex_t* restrict row, + const vertex_t* restrict col, + const weight_t* restrict v, + const edge_t e, + const float* restrict x_pos, + const float* restrict y_pos, + float* restrict attract_x, + float* restrict attract_y, + const int* restrict mass, + bool outbound_attraction_distribution, + bool lin_log_mode, + const float edge_weight_influence, + const float coef) { vertex_t i, src, dst; weight_t weight = 1; @@ -116,13 +116,13 @@ void apply_attraction(const vertex_t* restrict row, } template -__global__ void linear_gravity_kernel(const float* restrict x_pos, - const float* restrict y_pos, - float* restrict attract_x, - float* restrict attract_y, - const int* restrict mass, - const float gravity, - const vertex_t n) +__global__ static void linear_gravity_kernel(const float* restrict x_pos, + const float* restrict y_pos, + float* restrict attract_x, + float* restrict attract_y, + const int* restrict mass, + const float gravity, + const vertex_t n) { // For every node. for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { @@ -136,14 +136,14 @@ __global__ void linear_gravity_kernel(const float* restrict x_pos, } template -__global__ void strong_gravity_kernel(const float* restrict x_pos, - const float* restrict y_pos, - float* restrict attract_x, - float* restrict attract_y, - const int* restrict mass, - const float gravity, - const float scaling_ratio, - const vertex_t n) +__global__ static void strong_gravity_kernel(const float* restrict x_pos, + const float* restrict y_pos, + float* restrict attract_x, + float* restrict attract_y, + const int* restrict mass, + const float gravity, + const float scaling_ratio, + const vertex_t n) { // For every node. for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { @@ -187,16 +187,16 @@ void apply_gravity(const float* restrict x_pos, } template -__global__ void local_speed_kernel(const float* restrict repel_x, - const float* restrict repel_y, - const float* restrict attract_x, - const float* restrict attract_y, - const float* restrict old_dx, - const float* restrict old_dy, - const int* restrict mass, - float* restrict swinging, - float* restrict traction, - const vertex_t n) +__global__ static void local_speed_kernel(const float* restrict repel_x, + const float* restrict repel_y, + const float* restrict attract_x, + const float* restrict attract_y, + const float* restrict old_dx, + const float* restrict old_dy, + const int* restrict mass, + float* restrict swinging, + float* restrict traction, + const vertex_t n) { // For every node. for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { @@ -272,17 +272,17 @@ void adapt_speed(const float jitter_tolerance, } template -__global__ void update_positions_kernel(float* restrict x_pos, - float* restrict y_pos, - const float* restrict repel_x, - const float* restrict repel_y, - const float* restrict attract_x, - const float* restrict attract_y, - float* restrict old_dx, - float* restrict old_dy, - const float* restrict swinging, - const float speed, - const vertex_t n) +__global__ static void update_positions_kernel(float* restrict x_pos, + float* restrict y_pos, + const float* restrict repel_x, + const float* restrict repel_y, + const float* restrict attract_x, + const float* restrict attract_y, + float* restrict old_dx, + float* restrict old_dy, + const float* restrict swinging, + const float speed, + const vertex_t n) { // For every node. for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { diff --git a/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh b/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh index fc3da3cac07..0b6447f50d9 100644 --- a/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh +++ b/cpp/src/prims/detail/extract_transform_v_frontier_e.cuh @@ -127,7 +127,7 @@ template -__global__ void extract_transform_v_frontier_e_hypersparse_or_low_degree( +__global__ static void extract_transform_v_frontier_e_hypersparse_or_low_degree( edge_partition_device_view_t edge_partition, @@ -295,7 +295,7 @@ template -__global__ void extract_transform_v_frontier_e_mid_degree( +__global__ static void extract_transform_v_frontier_e_mid_degree( edge_partition_device_view_t edge_partition, @@ -396,7 +396,7 @@ template -__global__ void extract_transform_v_frontier_e_high_degree( +__global__ static void extract_transform_v_frontier_e_high_degree( edge_partition_device_view_t edge_partition, diff --git a/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh b/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh index 9cb3365116e..5240c49cb80 100644 --- a/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh +++ b/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh @@ -328,7 +328,7 @@ struct return_value_compute_offset_t { }; template -__global__ void compute_valid_local_nbr_inclusive_sums_mid_local_degree( +__global__ static void compute_valid_local_nbr_inclusive_sums_mid_local_degree( edge_partition_device_view_t edge_partition, edge_partition_edge_property_device_view_t edge_partition_e_mask, raft::device_span edge_partition_frontier_majors, @@ -382,7 +382,7 @@ __global__ void compute_valid_local_nbr_inclusive_sums_mid_local_degree( } template -__global__ void compute_valid_local_nbr_inclusive_sums_high_local_degree( +__global__ static void compute_valid_local_nbr_inclusive_sums_high_local_degree( edge_partition_device_view_t edge_partition, edge_partition_edge_property_device_view_t edge_partition_e_mask, raft::device_span edge_partition_frontier_majors, diff --git a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh index 083487fa5b4..509ab56d3fe 100644 --- a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh +++ b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh @@ -149,7 +149,7 @@ template -__global__ void per_v_transform_reduce_e_hypersparse( +__global__ static void per_v_transform_reduce_e_hypersparse( edge_partition_device_view_t edge_partition, @@ -251,7 +251,7 @@ template -__global__ void per_v_transform_reduce_e_low_degree( +__global__ static void per_v_transform_reduce_e_low_degree( edge_partition_device_view_t edge_partition, @@ -350,7 +350,7 @@ template -__global__ void per_v_transform_reduce_e_mid_degree( +__global__ static void per_v_transform_reduce_e_mid_degree( edge_partition_device_view_t edge_partition, @@ -466,7 +466,7 @@ template -__global__ void per_v_transform_reduce_e_high_degree( +__global__ static void per_v_transform_reduce_e_high_degree( edge_partition_device_view_t edge_partition, diff --git a/cpp/src/prims/transform_e.cuh b/cpp/src/prims/transform_e.cuh index 2cb1a5358b0..9c7670f68d2 100644 --- a/cpp/src/prims/transform_e.cuh +++ b/cpp/src/prims/transform_e.cuh @@ -51,7 +51,7 @@ template -__global__ void transform_e_packed_bool( +__global__ static void transform_e_packed_bool( edge_partition_device_view_t edge_partition, diff --git a/cpp/src/prims/transform_reduce_e.cuh b/cpp/src/prims/transform_reduce_e.cuh index e5855b105ee..43722550c58 100644 --- a/cpp/src/prims/transform_reduce_e.cuh +++ b/cpp/src/prims/transform_reduce_e.cuh @@ -61,7 +61,7 @@ template -__global__ void transform_reduce_e_hypersparse( +__global__ static void transform_reduce_e_hypersparse( edge_partition_device_view_t edge_partition, @@ -153,7 +153,7 @@ template -__global__ void transform_reduce_e_low_degree( +__global__ static void transform_reduce_e_low_degree( edge_partition_device_view_t edge_partition, @@ -242,7 +242,7 @@ template -__global__ void transform_reduce_e_mid_degree( +__global__ static void transform_reduce_e_mid_degree( edge_partition_device_view_t edge_partition, @@ -320,7 +320,7 @@ template -__global__ void transform_reduce_e_high_degree( +__global__ static void transform_reduce_e_high_degree( edge_partition_device_view_t edge_partition, diff --git a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh index 42203085077..eee0ed03d1c 100644 --- a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh +++ b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh @@ -97,7 +97,7 @@ template -__global__ void transform_reduce_by_src_dst_key_hypersparse( +__global__ static void transform_reduce_by_src_dst_key_hypersparse( edge_partition_device_view_t edge_partition, @@ -156,7 +156,7 @@ template -__global__ void transform_reduce_by_src_dst_key_low_degree( +__global__ static void transform_reduce_by_src_dst_key_low_degree( edge_partition_device_view_t edge_partition, @@ -214,7 +214,7 @@ template -__global__ void transform_reduce_by_src_dst_key_mid_degree( +__global__ static void transform_reduce_by_src_dst_key_mid_degree( edge_partition_device_view_t edge_partition, @@ -274,7 +274,7 @@ template -__global__ void transform_reduce_by_src_dst_key_high_degree( +__global__ static void transform_reduce_by_src_dst_key_high_degree( edge_partition_device_view_t edge_partition, diff --git a/cpp/src/structure/graph_view_impl.cuh b/cpp/src/structure/graph_view_impl.cuh index 4ee5ad5ca02..29dca6ef409 100644 --- a/cpp/src/structure/graph_view_impl.cuh +++ b/cpp/src/structure/graph_view_impl.cuh @@ -241,7 +241,7 @@ rmm::device_uvector compute_minor_degrees( int32_t constexpr count_edge_partition_multi_edges_block_size = 1024; template -__global__ void for_all_major_for_all_nbr_mid_degree( +__global__ static void for_all_major_for_all_nbr_mid_degree( edge_partition_device_view_t edge_partition, vertex_t major_range_first, vertex_t major_range_last, @@ -275,7 +275,7 @@ __global__ void for_all_major_for_all_nbr_mid_degree( } template -__global__ void for_all_major_for_all_nbr_high_degree( +__global__ static void for_all_major_for_all_nbr_high_degree( edge_partition_device_view_t edge_partition, vertex_t major_range_first, vertex_t major_range_last, diff --git a/cpp/src/traversal/od_shortest_distances_impl.cuh b/cpp/src/traversal/od_shortest_distances_impl.cuh index c2a3f1160ca..612eb0c48f2 100644 --- a/cpp/src/traversal/od_shortest_distances_impl.cuh +++ b/cpp/src/traversal/od_shortest_distances_impl.cuh @@ -215,7 +215,7 @@ template -__global__ void multi_partition_copy( +__global__ static void multi_partition_copy( InputIterator input_first, InputIterator input_last, raft::device_span output_buffer_ptrs, diff --git a/cpp/src/utilities/eidecl_graph_utils.hpp b/cpp/src/utilities/eidecl_graph_utils.hpp index 84240ba2845..abf026cbbfe 100644 --- a/cpp/src/utilities/eidecl_graph_utils.hpp +++ b/cpp/src/utilities/eidecl_graph_utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,9 +29,12 @@ extern template void offsets_to_indices(int const*, int, int*); extern template void offsets_to_indices(long const*, int, int*); extern template void offsets_to_indices(long const*, long, long*); -extern template __global__ void offsets_to_indices_kernel(int const*, int, int*); -extern template __global__ void offsets_to_indices_kernel(long const*, int, int*); -extern template __global__ void offsets_to_indices_kernel(long const*, long, long*); +extern template __attribute__((visibility("hidden"))) __global__ void +offsets_to_indices_kernel(int const*, int, int*); +extern template __attribute__((visibility("hidden"))) __global__ void +offsets_to_indices_kernel(long const*, int, int*); +extern template __attribute__((visibility("hidden"))) __global__ void +offsets_to_indices_kernel(long const*, long, long*); } // namespace detail } // namespace cugraph diff --git a/cpp/src/utilities/eidir_graph_utils.hpp b/cpp/src/utilities/eidir_graph_utils.hpp index 033bb197ce8..ba06c6f56ea 100644 --- a/cpp/src/utilities/eidir_graph_utils.hpp +++ b/cpp/src/utilities/eidir_graph_utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,15 +29,12 @@ template void offsets_to_indices(int32_t const*, int32_t, int3 template void offsets_to_indices(int64_t const*, int32_t, int32_t*); template void offsets_to_indices(int64_t const*, int64_t, int64_t*); -template __global__ void offsets_to_indices_kernel(int32_t const*, - int32_t, - int32_t*); -template __global__ void offsets_to_indices_kernel(int64_t const*, - int32_t, - int32_t*); -template __global__ void offsets_to_indices_kernel(int64_t const*, - int64_t, - int64_t*); +template __global__ __attribute__((visibility("hidden"))) void +offsets_to_indices_kernel(int32_t const*, int32_t, int32_t*); +template __global__ __attribute__((visibility("hidden"))) void +offsets_to_indices_kernel(int64_t const*, int32_t, int32_t*); +template __global__ __attribute__((visibility("hidden"))) void +offsets_to_indices_kernel(int64_t const*, int64_t, int64_t*); } // namespace detail } // namespace cugraph diff --git a/cpp/src/utilities/graph_utils.cuh b/cpp/src/utilities/graph_utils.cuh index 2d542956531..0b257e7abde 100644 --- a/cpp/src/utilities/graph_utils.cuh +++ b/cpp/src/utilities/graph_utils.cuh @@ -247,34 +247,36 @@ void update_dangling_nodes(size_t n, T* dangling_nodes, T damping_factor) // google matrix kernels template -__global__ void degree_coo(const IndexType n, - const IndexType e, - const IndexType* ind, - ValueType* degree) +__global__ static void degree_coo(const IndexType n, + const IndexType e, + const IndexType* ind, + ValueType* degree) { for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < e; i += gridDim.x * blockDim.x) atomicAdd(°ree[ind[i]], (ValueType)1.0); } template -__global__ void flag_leafs_kernel(const size_t n, const IndexType* degree, ValueType* bookmark) +__global__ static void flag_leafs_kernel(const size_t n, + const IndexType* degree, + ValueType* bookmark) { for (auto i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) if (degree[i] == 0) bookmark[i] = 1.0; } template -__global__ void degree_offsets(const IndexType n, - const IndexType e, - const IndexType* ind, - ValueType* degree) +__global__ static void degree_offsets(const IndexType n, + const IndexType e, + const IndexType* ind, + ValueType* degree) { for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) degree[i] += ind[i + 1] - ind[i]; } template -__global__ void type_convert(FromType* array, int n) +__global__ static void type_convert(FromType* array, int n) { for (int i = threadIdx.x + blockIdx.x * blockDim.x; i < n; i += gridDim.x * blockDim.x) { ToType val = array[i]; @@ -284,12 +286,12 @@ __global__ void type_convert(FromType* array, int n) } template -__global__ void equi_prob3(const IndexType n, - const IndexType e, - const IndexType* csrPtr, - const IndexType* csrInd, - ValueType* val, - IndexType* degree) +__global__ static void equi_prob3(const IndexType n, + const IndexType e, + const IndexType* csrPtr, + const IndexType* csrInd, + ValueType* val, + IndexType* degree) { int j, row, col; for (row = threadIdx.z + blockIdx.z * blockDim.z; row < n; row += gridDim.z * blockDim.z) { @@ -303,12 +305,12 @@ __global__ void equi_prob3(const IndexType n, } template -__global__ void equi_prob2(const IndexType n, - const IndexType e, - const IndexType* csrPtr, - const IndexType* csrInd, - ValueType* val, - IndexType* degree) +__global__ static void equi_prob2(const IndexType n, + const IndexType e, + const IndexType* csrPtr, + const IndexType* csrInd, + ValueType* val, + IndexType* degree) { int row = blockIdx.x * blockDim.x + threadIdx.x; if (row < n) { @@ -372,7 +374,8 @@ void HT_matrix_csc_coo(const IndexType n, } template -__global__ void offsets_to_indices_kernel(const offsets_t* offsets, index_t v, index_t* indices) +__attribute__((visibility("hidden"))) __global__ void offsets_to_indices_kernel( + const offsets_t* offsets, index_t v, index_t* indices) { auto tid{threadIdx.x}; auto ctaStart{blockIdx.x}; diff --git a/cpp/src/utilities/path_retrieval.cu b/cpp/src/utilities/path_retrieval.cu index e37ce3a3ced..eda60941c23 100644 --- a/cpp/src/utilities/path_retrieval.cu +++ b/cpp/src/utilities/path_retrieval.cu @@ -29,13 +29,13 @@ namespace cugraph { namespace detail { template -__global__ void get_traversed_cost_kernel(vertex_t const* vertices, - vertex_t const* preds, - vertex_t const* vtx_map, - weight_t const* info_weights, - weight_t* out, - vertex_t stop_vertex, - vertex_t num_vertices) +__global__ static void get_traversed_cost_kernel(vertex_t const* vertices, + vertex_t const* preds, + vertex_t const* vtx_map, + weight_t const* info_weights, + weight_t* out, + vertex_t stop_vertex, + vertex_t num_vertices) { for (vertex_t i = threadIdx.x + blockIdx.x * blockDim.x; i < num_vertices; i += gridDim.x * blockDim.x) { diff --git a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py index 815fd30d8eb..f6fe38fe9f8 100644 --- a/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py +++ b/python/cugraph-dgl/cugraph_dgl/dataloading/dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -63,6 +63,10 @@ def __getitem__(self, idx: int): fn, batch_offset = self._batch_to_fn_d[idx] if fn != self._current_batch_fn: + # Remove current batches to free up memory + # before loading new batches + if hasattr(self, "_current_batches"): + del self._current_batches if self.sparse_format == "csc": df = _load_sampled_file(dataset_obj=self, fn=fn, skip_rename=True) self._current_batches = ( diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/gatconv.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/gatconv.py index cc4ce474f2d..e8813271fd8 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/gatconv.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/gatconv.py @@ -186,6 +186,10 @@ def forward( nfeat: Union[torch.Tensor, tuple[torch.Tensor, torch.Tensor]], efeat: Optional[torch.Tensor] = None, max_in_degree: Optional[int] = None, + deterministic_dgrad: bool = False, + deterministic_wgrad: bool = False, + high_precision_dgrad: bool = False, + high_precision_wgrad: bool = False, ) -> torch.Tensor: r"""Forward computation. @@ -204,6 +208,20 @@ def forward( from a neighbor sampler, the value should be set to the corresponding :attr:`fanout`. This option is used to invoke the MFG-variant of cugraph-ops kernel. + deterministic_dgrad : bool, default=False + Optional flag indicating whether the feature gradients + are computed deterministically using a dedicated workspace buffer. + deterministic_wgrad: bool, default=False + Optional flag indicating whether the weight gradients + are computed deterministically using a dedicated workspace buffer. + high_precision_dgrad: bool, default=False + Optional flag indicating whether gradients for inputs in half precision + are kept in single precision as long as possible and only casted to + the corresponding input type at the very end. + high_precision_wgrad: bool, default=False + Optional flag indicating whether gradients for weights in half precision + are kept in single precision as long as possible and only casted to + the corresponding input type at the very end. Returns ------- @@ -232,6 +250,8 @@ def forward( _graph = self.get_cugraph_ops_CSC( g, is_bipartite=bipartite, max_in_degree=max_in_degree ) + if deterministic_dgrad: + _graph.add_reverse_graph() if bipartite: nfeat = (self.feat_drop(nfeat[0]), self.feat_drop(nfeat[1])) @@ -273,6 +293,10 @@ def forward( negative_slope=self.negative_slope, concat_heads=self.concat, edge_feat=efeat, + deterministic_dgrad=deterministic_dgrad, + deterministic_wgrad=deterministic_wgrad, + high_precision_dgrad=high_precision_dgrad, + high_precision_wgrad=high_precision_wgrad, )[: g.num_dst_nodes()] if self.concat: diff --git a/python/cugraph-dgl/cugraph_dgl/nn/conv/gatv2conv.py b/python/cugraph-dgl/cugraph_dgl/nn/conv/gatv2conv.py index 6c78b4df0b8..4f47005f8ee 100644 --- a/python/cugraph-dgl/cugraph_dgl/nn/conv/gatv2conv.py +++ b/python/cugraph-dgl/cugraph_dgl/nn/conv/gatv2conv.py @@ -150,6 +150,8 @@ def forward( nfeat: Union[torch.Tensor, tuple[torch.Tensor, torch.Tensor]], efeat: Optional[torch.Tensor] = None, max_in_degree: Optional[int] = None, + deterministic_dgrad: bool = False, + deterministic_wgrad: bool = False, ) -> torch.Tensor: r"""Forward computation. @@ -166,6 +168,12 @@ def forward( from a neighbor sampler, the value should be set to the corresponding :attr:`fanout`. This option is used to invoke the MFG-variant of cugraph-ops kernel. + deterministic_dgrad : bool, default=False + Optional flag indicating whether the feature gradients + are computed deterministically using a dedicated workspace buffer. + deterministic_wgrad: bool, default=False + Optional flag indicating whether the weight gradients + are computed deterministically using a dedicated workspace buffer. Returns ------- @@ -196,6 +204,8 @@ def forward( _graph = self.get_cugraph_ops_CSC( g, is_bipartite=graph_bipartite, max_in_degree=max_in_degree ) + if deterministic_dgrad: + _graph.add_reverse_graph() if nfeat_bipartite: nfeat = (self.feat_drop(nfeat[0]), self.feat_drop(nfeat[1])) @@ -228,6 +238,8 @@ def forward( negative_slope=self.negative_slope, concat_heads=self.concat, edge_feat=efeat, + deterministic_dgrad=deterministic_dgrad, + deterministic_wgrad=deterministic_wgrad, )[: g.num_dst_nodes()] if self.concat: diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index 05d540b7c45..df16fc9fd6c 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -1083,13 +1083,12 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType: idx = attr.index if idx is not None: - if feature_backend == "torch": + if feature_backend in ["torch", "wholegraph"]: if not isinstance(idx, torch.Tensor): raise TypeError( f"Type {type(idx)} invalid" f" for feature store backend {feature_backend}" ) - idx = idx.cpu() elif feature_backend == "numpy": # allow feature indexing through cupy arrays if isinstance(idx, cupy.ndarray): @@ -1244,5 +1243,77 @@ def _infer_unspecified_attr(self, attr: CuGraphTensorAttr) -> CuGraphTensorAttr: return attr + def filter( + self, + format: str, + node_dict: Dict[str, torch.Tensor], + row_dict: Dict[str, torch.Tensor], + col_dict: Dict[str, torch.Tensor], + edge_dict: Dict[str, Tuple[torch.Tensor]], + ) -> torch_geometric.data.HeteroData: + """ + Parameters + ---------- + format: str + COO or CSC + node_dict: Dict[str, torch.Tensor] + IDs of nodes in original store being outputted + row_dict: Dict[str, torch.Tensor] + Renumbered output edge index row + col_dict: Dict[str, torch.Tensor] + Renumbered output edge index column + edge_dict: Dict[str, Tuple[torch.Tensor]] + Currently unused original edge mapping + """ + data = torch_geometric.data.HeteroData() + + # TODO use torch_geometric.EdgeIndex in release 24.04 (Issue #4051) + for attr in self.get_all_edge_attrs(): + key = attr.edge_type + if key in row_dict and key in col_dict: + if format == "CSC": + data.put_edge_index( + (row_dict[key], col_dict[key]), + edge_type=key, + layout="csc", + is_sorted=True, + ) + else: + data[key].edge_index = torch.stack( + [ + row_dict[key], + col_dict[key], + ], + dim=0, + ) + + required_attrs = [] + # To prevent copying multiple times, we use a cache; + # the original node_dict serves as the gpu cache if needed + node_dict_cpu = {} + for attr in self.get_all_tensor_attrs(): + if attr.group_name in node_dict: + device = self.__features.get_storage(attr.group_name, attr.attr_name) + attr.index = node_dict[attr.group_name] + if not isinstance(attr.index, torch.Tensor): + raise ValueError("Node index must be a tensor!") + if attr.index.is_cuda and device == "cpu": + if attr.group_name not in node_dict_cpu: + node_dict_cpu[attr.group_name] = attr.index.cpu() + attr.index = node_dict_cpu[attr.group_name] + elif attr.index.is_cpu and device == "cuda": + node_dict_cpu[attr.group_name] = attr.index + node_dict[attr.group_name] = attr.index.cuda() + attr.index = node_dict[attr.group_name] + + required_attrs.append(attr) + data[attr.group_name].num_nodes = attr.index.size(0) + + tensors = self.multi_get_tensor(required_attrs) + for i, attr in enumerate(required_attrs): + data[attr.group_name][attr.attr_name] = tensors[i] + + return data + def __len__(self): return len(self.get_all_tensor_attrs()) diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py index 9c0adaad879..4ca573504a1 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -21,7 +21,7 @@ import torch import numpy as np -from torch_geometric.nn import CuGraphSAGEConv +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv import torch.nn as nn import torch.nn.functional as F diff --git a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py index 82f5e7ea67d..9c96a707e4d 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/graph_sage_sg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -18,7 +18,7 @@ import torch -from torch_geometric.nn import CuGraphSAGEConv +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv import torch.nn as nn import torch.nn.functional as F diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index bcfaf579820..55c9e9b3329 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -28,7 +28,6 @@ _sampler_output_from_sampling_results_heterogeneous, _sampler_output_from_sampling_results_homogeneous_csr, _sampler_output_from_sampling_results_homogeneous_coo, - filter_cugraph_store_csc, ) from typing import Union, Tuple, Sequence, List, Dict @@ -454,31 +453,20 @@ def __next__(self): start_time_feature = perf_counter() # Create a PyG HeteroData object, loading the required features - if self.__coo: - pyg_filter_fn = ( - torch_geometric.loader.utils.filter_custom_hetero_store - if hasattr(torch_geometric.loader.utils, "filter_custom_hetero_store") - else torch_geometric.loader.utils.filter_custom_store - ) - out = pyg_filter_fn( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) - else: - out = filter_cugraph_store_csc( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) + if self.__graph_store != self.__feature_store: + # TODO Possibly support this if there is an actual use case + raise ValueError("Separate graph and feature stores currently unsupported") + + out = self.__graph_store.filter( + "COO" if self.__coo else "CSC", + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) # Account for CSR format in cuGraph vs. CSC format in PyG + # TODO deprecate and remove this functionality if self.__coo and self.__graph_store.order == "CSC": for edge_type in out.edge_index_dict: out[edge_type].edge_index = out[edge_type].edge_index.flip(dims=[0]) diff --git a/python/cugraph-pyg/cugraph_pyg/nn/conv/gat_conv.py b/python/cugraph-pyg/cugraph_pyg/nn/conv/gat_conv.py index 309bee4e228..d1785f2bef8 100644 --- a/python/cugraph-pyg/cugraph_pyg/nn/conv/gat_conv.py +++ b/python/cugraph-pyg/cugraph_pyg/nn/conv/gat_conv.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -162,6 +162,10 @@ def forward( csc: Tuple[torch.Tensor, torch.Tensor, int], edge_attr: Optional[torch.Tensor] = None, max_num_neighbors: Optional[int] = None, + deterministic_dgrad: bool = False, + deterministic_wgrad: bool = False, + high_precision_dgrad: bool = False, + high_precision_wgrad: bool = False, ) -> torch.Tensor: r"""Runs the forward pass of the module. @@ -178,11 +182,27 @@ def forward( of a destination node. When enabled, it allows models to use the message-flow-graph primitives in cugraph-ops. (default: :obj:`None`) + deterministic_dgrad : bool, default=False + Optional flag indicating whether the feature gradients + are computed deterministically using a dedicated workspace buffer. + deterministic_wgrad: bool, default=False + Optional flag indicating whether the weight gradients + are computed deterministically using a dedicated workspace buffer. + high_precision_dgrad: bool, default=False + Optional flag indicating whether gradients for inputs in half precision + are kept in single precision as long as possible and only casted to + the corresponding input type at the very end. + high_precision_wgrad: bool, default=False + Optional flag indicating whether gradients for weights in half precision + are kept in single precision as long as possible and only casted to + the corresponding input type at the very end. """ bipartite = not isinstance(x, torch.Tensor) graph = self.get_cugraph( csc, bipartite=bipartite, max_num_neighbors=max_num_neighbors ) + if deterministic_dgrad: + graph.add_reverse_graph() if edge_attr is not None: if self.lin_edge is None: @@ -220,6 +240,10 @@ def forward( negative_slope=self.negative_slope, concat_heads=self.concat, edge_feat=edge_attr, + deterministic_dgrad=deterministic_dgrad, + deterministic_wgrad=deterministic_wgrad, + high_precision_dgrad=high_precision_dgrad, + high_precision_wgrad=high_precision_wgrad, ) if self.bias is not None: diff --git a/python/cugraph-pyg/cugraph_pyg/nn/conv/gatv2_conv.py b/python/cugraph-pyg/cugraph_pyg/nn/conv/gatv2_conv.py index 32956dcb400..33865898816 100644 --- a/python/cugraph-pyg/cugraph_pyg/nn/conv/gatv2_conv.py +++ b/python/cugraph-pyg/cugraph_pyg/nn/conv/gatv2_conv.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -174,6 +174,8 @@ def forward( x: Union[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]], csc: Tuple[torch.Tensor, torch.Tensor, int], edge_attr: Optional[torch.Tensor] = None, + deterministic_dgrad: bool = False, + deterministic_wgrad: bool = False, ) -> torch.Tensor: r"""Runs the forward pass of the module. @@ -186,9 +188,17 @@ def forward( :meth:`to_csc` method to convert an :obj:`edge_index` representation to the desired format. edge_attr: (torch.Tensor, optional) The edge features. + deterministic_dgrad : bool, default=False + Optional flag indicating whether the feature gradients + are computed deterministically using a dedicated workspace buffer. + deterministic_wgrad: bool, default=False + Optional flag indicating whether the weight gradients + are computed deterministically using a dedicated workspace buffer. """ bipartite = not isinstance(x, torch.Tensor) or not self.share_weights graph = self.get_cugraph(csc, bipartite=bipartite) + if deterministic_dgrad: + graph.add_reverse_graph() if edge_attr is not None: if self.lin_edge is None: @@ -217,6 +227,8 @@ def forward( negative_slope=self.negative_slope, concat_heads=self.concat, edge_feat=edge_attr, + deterministic_dgrad=deterministic_dgrad, + deterministic_wgrad=deterministic_wgrad, ) if self.bias is not None: diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py index 65cb63d25e0..ffab54efe08 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py @@ -411,6 +411,10 @@ def filter_cugraph_store_csc( col_dict: Dict[str, torch.Tensor], edge_dict: Dict[str, Tuple[torch.Tensor]], ) -> torch_geometric.data.HeteroData: + """ + Deprecated + """ + data = torch_geometric.data.HeteroData() for attr in graph_store.get_all_edge_attrs(): diff --git a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py index 77a53882fc4..f0186220114 100644 --- a/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py +++ b/python/cugraph/cugraph/gnn/feature_storage/feat_storage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -168,19 +168,54 @@ def get_data( feat, wgth.WholeMemoryEmbedding ): indices_tensor = ( - indices + indices.cuda() if isinstance(indices, torch.Tensor) else torch.as_tensor(indices, device="cuda") ) return feat.gather(indices_tensor) - else: - return feat[indices] + elif not isinstance(torch, MissingModule) and isinstance(feat, torch.Tensor): + if indices is not None: + if not isinstance(indices, torch.Tensor): + indices = torch.as_tensor(indices) + + if feat.is_cpu and indices.is_cuda: + # TODO maybe add a warning here + indices = indices.cpu() + return feat[indices] def get_feature_list(self) -> list[str]: return {feat_name: feats.keys() for feat_name, feats in self.fd.items()} + def get_storage(self, type_name: str, feat_name: str) -> str: + """ + Returns where the data is stored (cuda, cpu). + Note: will return "cuda" for data managed by CUDA, even if + it is in host memory. + + Parameters + ---------- + type_name : str + The node-type/edge-type to store data + feat_name: + The feature name to retrieve data for + + Returns + ------- + "cuda" for data managed by CUDA, otherwise "CPU". + """ + feat = self.fd[feat_name][type_name] + if not isinstance(wgth, MissingModule) and isinstance( + feat, wgth.WholeMemoryEmbedding + ): + return "cuda" + elif isinstance(feat, torch.Tensor): + return "cpu" if feat.is_cpu else "cuda" + else: + return "cpu" + @staticmethod def _cast_feat_obj_to_backend(feat_obj, backend: str, **kwargs): + # TODO (Issue #4078) support casting WG tensors to numpy and torch if backend == "numpy": if isinstance(feat_obj, (cudf.DataFrame, pd.DataFrame)): return _cast_to_numpy_ar(feat_obj.values, **kwargs) @@ -192,6 +227,8 @@ def _cast_feat_obj_to_backend(feat_obj, backend: str, **kwargs): else: return _cast_to_torch_tensor(feat_obj, **kwargs) elif backend == "wholegraph": + if isinstance(feat_obj, wgth.WholeMemoryEmbedding): + return feat_obj return _get_wg_embedding(feat_obj, **kwargs)