diff --git a/python/cugraph-pyg/cugraph_pyg/examples/cugraph_dist_sampling.py b/python/cugraph-pyg/cugraph_pyg/examples/cugraph_dist_sampling.py index 29366c404df..c66d649658f 100644 --- a/python/cugraph-pyg/cugraph_pyg/examples/cugraph_dist_sampling.py +++ b/python/cugraph-pyg/cugraph_pyg/examples/cugraph_dist_sampling.py @@ -33,7 +33,7 @@ cugraph_comms_create_unique_id, cugraph_comms_get_raft_handle, DistSampleWriter, - UniformNeighborSampler + UniformNeighborSampler, ) from pylibcugraph import MGGraph, ResourceHandle, GraphProperties @@ -70,14 +70,11 @@ def sample(rank: int, world_size: int, uid, edgelist, directory): ) print("graph constructed") - sample_writer = DistSampleWriter( - directory=directory, - batches_per_partition=2 - ) + sample_writer = DistSampleWriter(directory=directory, batches_per_partition=2) sampler = UniformNeighborSampler( G, sample_writer, - fanout=[5,5], + fanout=[5, 5], ) sampler.sample_from_nodes(seeds, batch_size=16, random_state=62) @@ -97,17 +94,17 @@ def main(): with tempfile.TemporaryDirectory() as directory: tmp.spawn( sample, - args=(world_size, uid, el, '.'), + args=(world_size, uid, el, "."), nprocs=world_size, ) print("Printing samples...") for file in os.listdir(directory): - m=re.match(r'batch=([0-9]+)\.([0-9]+)\-([0-9]+)\.([0-9]+)\.parquet', file) + m = re.match(r"batch=([0-9]+)\.([0-9]+)\-([0-9]+)\.([0-9]+)\.parquet", file) rank, start, _, end = int(m[1]), int(m[2]), int(m[3]), int(m[4]) - print(f'File: {file} (batches {start} to {end} for rank {rank})') + print(f"File: {file} (batches {start} to {end} for rank {rank})") print(cudf.read_parquet(os.path.join(directory, file))) - print('\n') + print("\n") if __name__ == "__main__": diff --git a/python/cugraph/cugraph/gnn/__init__.py b/python/cugraph/cugraph/gnn/__init__.py index 2f0bc2c0bd5..1f4d98f0230 100644 --- a/python/cugraph/cugraph/gnn/__init__.py +++ b/python/cugraph/cugraph/gnn/__init__.py @@ -16,7 +16,7 @@ from .data_loading.dist_sampler import ( DistSampler, DistSampleWriter, - UniformNeighborSampler + UniformNeighborSampler, ) from .comms.cugraph_nccl_comms import ( cugraph_comms_init, diff --git a/python/cugraph/cugraph/gnn/comms/__init__.py b/python/cugraph/cugraph/gnn/comms/__init__.py index 4b40de85abb..b842dd0927d 100644 --- a/python/cugraph/cugraph/gnn/comms/__init__.py +++ b/python/cugraph/cugraph/gnn/comms/__init__.py @@ -16,4 +16,4 @@ cugraph_comms_shutdown, cugraph_comms_create_unique_id, cugraph_comms_get_raft_handle, -) \ No newline at end of file +) diff --git a/python/cugraph/cugraph/gnn/data_loading/__init__.py b/python/cugraph/cugraph/gnn/data_loading/__init__.py index 4eb4bf3de9e..a50f6085e9a 100644 --- a/python/cugraph/cugraph/gnn/data_loading/__init__.py +++ b/python/cugraph/cugraph/gnn/data_loading/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,5 +15,5 @@ from cugraph.gnn.data_loading.dist_sampler import ( DistSampler, DistSampleWriter, - UniformNeighborSampler -) \ No newline at end of file + UniformNeighborSampler, +) diff --git a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py index a109f8fac93..6abbd82647b 100644 --- a/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py +++ b/python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py @@ -35,7 +35,7 @@ def create_df_from_disjoint_series(series_list: List[cudf.Series]): def create_df_from_disjoint_arrays(array_dict: Dict[str, cupy.array]): for k in list(array_dict.keys()): array_dict[k] = cudf.Series(array_dict[k], name=k) - + return create_df_from_disjoint_series(list(array_dict.values())) diff --git a/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py b/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py index c3948e73da8..42e7d95c44d 100644 --- a/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py +++ b/python/cugraph/cugraph/gnn/data_loading/dist_sampler.py @@ -33,7 +33,13 @@ class DistSampleWriter: - def __init__(self, directory: str, *, batches_per_partition: int=256, format: str="parquet"): + def __init__( + self, + directory: str, + *, + batches_per_partition: int = 256, + format: str = "parquet", + ): if format != "parquet": raise ValueError("Invalid format (currently supported: 'parquet')") @@ -52,92 +58,114 @@ def _directory(self): @property def _batches_per_partition(self): return self.__batches_per_partition - + def __write_minibatches_coo(self, minibatch_dict): - has_edge_ids = minibatch_dict['edge_id'] is not None - has_edge_types = minibatch_dict['edge_type'] is not None - has_weights = minibatch_dict['weight'] is not None + has_edge_ids = minibatch_dict["edge_id"] is not None + has_edge_types = minibatch_dict["edge_type"] is not None + has_weights = minibatch_dict["weight"] is not None - if minibatch_dict['renumber_map'] is None: - raise ValueError("Distributed sampling without renumbering is not supported") + if minibatch_dict["renumber_map"] is None: + raise ValueError( + "Distributed sampling without renumbering is not supported" + ) - fanout_length = (len(minibatch_dict['label_hop_offsets']) - 1) // len(minibatch_dict['batch_id']) - rank_batch_offset = minibatch_dict['batch_id'][0] + fanout_length = (len(minibatch_dict["label_hop_offsets"]) - 1) // len( + minibatch_dict["batch_id"] + ) + rank_batch_offset = minibatch_dict["batch_id"][0] - for p in range(0, int(ceil(len(minibatch_dict['batch_id']) / self.__batches_per_partition))): + for p in range( + 0, int(ceil(len(minibatch_dict["batch_id"]) / self.__batches_per_partition)) + ): partition_start = p * (self.__batches_per_partition) partition_end = (p + 1) * (self.__batches_per_partition) - label_hop_offsets_array_p = minibatch_dict['label_hop_offsets'][ + label_hop_offsets_array_p = minibatch_dict["label_hop_offsets"][ partition_start * fanout_length : partition_end * fanout_length + 1 ] - batch_id_array_p = minibatch_dict['batch_id'][partition_start:partition_end] + batch_id_array_p = minibatch_dict["batch_id"][partition_start:partition_end] start_batch_id = batch_id_array_p[0] - rank_batch_offset start_ix, end_ix = label_hop_offsets_array_p[[0, -1]] majors_array_p = minibatch_dict["majors"][start_ix:end_ix] minors_array_p = minibatch_dict["minors"][start_ix:end_ix] edge_id_array_p = ( - minibatch_dict["edge_id"][start_ix:end_ix] if has_edge_ids - else cupy.array([], dtype='int64') + minibatch_dict["edge_id"][start_ix:end_ix] + if has_edge_ids + else cupy.array([], dtype="int64") ) edge_type_array_p = ( - minibatch_dict["edge_type"][start_ix:end_ix] if has_edge_types - else cupy.array([], dtype='int32') + minibatch_dict["edge_type"][start_ix:end_ix] + if has_edge_types + else cupy.array([], dtype="int32") ) weight_array_p = ( - minibatch_dict["weight"][start_ix:end_ix] if has_weights - else cupy.array([], dtype='float32') + minibatch_dict["weight"][start_ix:end_ix] + if has_weights + else cupy.array([], dtype="float32") ) - + # create the renumber map offsets - renumber_map_offsets_array_p = minibatch_dict['renumber_map_offsets'][ + renumber_map_offsets_array_p = minibatch_dict["renumber_map_offsets"][ partition_start : partition_end + 1 ] - renumber_map_start_ix, renumber_map_end_ix = renumber_map_offsets_array_p[[0,-1]] + renumber_map_start_ix, renumber_map_end_ix = renumber_map_offsets_array_p[ + [0, -1] + ] - renumber_map_array_p = minibatch_dict['renumber_map'][ + renumber_map_array_p = minibatch_dict["renumber_map"][ renumber_map_start_ix:renumber_map_end_ix ] results_dataframe_p = create_df_from_disjoint_arrays( { - 'majors': majors_array_p, - 'minors': minors_array_p, - 'map': renumber_map_array_p, - 'label_hop_offsets': label_hop_offsets_array_p, - 'weight': weight_array_p, - 'edge_id': edge_id_array_p, - 'edge_type': edge_type_array_p, - 'renumber_map_offsets': renumber_map_offsets_array_p, + "majors": majors_array_p, + "minors": minors_array_p, + "map": renumber_map_array_p, + "label_hop_offsets": label_hop_offsets_array_p, + "weight": weight_array_p, + "edge_id": edge_id_array_p, + "edge_type": edge_type_array_p, + "renumber_map_offsets": renumber_map_offsets_array_p, } ) end_batch_id = start_batch_id + len(batch_id_array_p) - 1 - if 'rank' in minibatch_dict: - rank = minibatch_dict['rank'] + if "rank" in minibatch_dict: + rank = minibatch_dict["rank"] full_output_path = os.path.join( - self.__directory, f"batch={rank:05d}.{start_batch_id:08d}-{rank:05d}.{end_batch_id:08d}.parquet" + self.__directory, + f"batch={rank:05d}.{start_batch_id:08d}-" + f"{rank:05d}.{end_batch_id:08d}.parquet", ) else: full_output_path = os.path.join( - self.__directory, f"batch={start_batch_id:010d}-{end_batch_id:010d}.parquet" + self.__directory, + f"batch={start_batch_id:010d}-{end_batch_id:010d}.parquet", ) - results_dataframe_p.to_parquet( - full_output_path, compression=None, index=False, force_nullable_schema=True + full_output_path, + compression=None, + index=False, + force_nullable_schema=True, ) - + def __write_minibatches_csr(self, minibatch_dict): - raise NotImplementedError("CSR format currently not supported for distributed sampling") + raise NotImplementedError( + "CSR format currently not supported for distributed sampling" + ) def write_minibatches(self, minibatch_dict): - if (minibatch_dict['majors'] is not None) and (minibatch_dict['minors'] is not None): + if (minibatch_dict["majors"] is not None) and ( + minibatch_dict["minors"] is not None + ): self.__write_minibatches_coo(minibatch_dict) - elif (minibatch_dict["major_offsets"] is not None) and (minibatch_dict['minors'] is not None): + elif (minibatch_dict["major_offsets"] is not None) and ( + minibatch_dict["minors"] is not None + ): self.__write_minibatches_csr(minibatch_dict) else: raise ValueError("invalid columns") @@ -159,20 +187,26 @@ def sample_batches( ): raise NotImplementedError("Must be implemented by subclass") - def sample_from_nodes(self, nodes: TensorType, *, batch_size: int=16, random_state: int=62): + def sample_from_nodes( + self, nodes: TensorType, *, batch_size: int = 16, random_state: int = 62 + ): batches_per_call = self._local_seeds_per_call // batch_size actual_seeds_per_call = batches_per_call * batch_size num_seeds = len(nodes) - nodes = torch.split(torch.as_tensor(nodes, device="cuda"), actual_seeds_per_call) + nodes = torch.split( + torch.as_tensor(nodes, device="cuda"), actual_seeds_per_call + ) if self.is_multi_gpu: rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() - t = torch.empty((world_size,), dtype=torch.int64, device='cuda') - local_size = torch.tensor([int(ceil(num_seeds / batch_size))], dtype=torch.int64, device='cuda') - + t = torch.empty((world_size,), dtype=torch.int64, device="cuda") + local_size = torch.tensor( + [int(ceil(num_seeds / batch_size))], dtype=torch.int64, device="cuda" + ) + torch.distributed.all_gather_into_tensor(t, local_size) if (t != local_size).any() and rank == 0: warnings.warn( @@ -210,7 +244,7 @@ def is_multi_gpu(self): @property def _local_seeds_per_call(self): return self.__local_seeds_per_call - + @property def _graph(self): return self.__graph @@ -222,9 +256,9 @@ def __init__( graph: Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph], writer: DistSampleWriter, *, - local_seeds_per_call:int = 32768, + local_seeds_per_call: int = 32768, fanout: List[int] = [-1], - prior_sources_behavior: str = 'exclude', + prior_sources_behavior: str = "exclude", deduplicate_sources: bool = True, compression: str = "COO", compress_per_hop: bool = False, @@ -245,24 +279,33 @@ def sample_batches( # flag that assumes all ranks have the same number of batches. if self.is_multi_gpu: rank = torch.distributed.get_rank() - world_size = torch.distributed.get_world_size() handle = pylibcugraph.ResourceHandle( cugraph_comms_get_raft_handle().getHandle() ) - + local_label_list = torch.unique(batch_ids) local_label_to_output_comm_rank = torch.full( (len(local_label_list),), rank, dtype=torch.int32, device="cuda" ) - num_batches = torch.tensor([len(local_label_list)], device='cuda', dtype=torch.int64) + num_batches = torch.tensor( + [len(local_label_list)], device="cuda", dtype=torch.int64 + ) torch.distributed.all_reduce(num_batches, op=torch.distributed.ReduceOp.SUM) - label_list = torch.empty((num_batches,), device='cuda', dtype=torch.int32) - w1 = torch.distributed.all_gather_into_tensor(label_list, local_label_list, async_op=True) + label_list = torch.empty((num_batches,), device="cuda", dtype=torch.int32) + w1 = torch.distributed.all_gather_into_tensor( + label_list, local_label_list, async_op=True + ) - label_to_output_comm_rank = torch.empty((num_batches,), device='cuda', dtype=torch.int32) - w2 = torch.distributed.all_gather_into_tensor(label_to_output_comm_rank, local_label_to_output_comm_rank, async_op=True) + label_to_output_comm_rank = torch.empty( + (num_batches,), device="cuda", dtype=torch.int32 + ) + w2 = torch.distributed.all_gather_into_tensor( + label_to_output_comm_rank, + local_label_to_output_comm_rank, + async_op=True, + ) w1.wait() w2.wait() @@ -287,7 +330,7 @@ def sample_batches( compress_per_hop=self.__compress_per_hop, return_dict=True, ) - sampling_results_dict['rank'] = rank + sampling_results_dict["rank"] = rank else: sampling_results_dict = pylibcugraph.uniform_neighbor_sample( pylibcugraph.ResourceHandle(),