diff --git a/ci/test_python.sh b/ci/test_python.sh index 5183dc37a49..df0f34377a3 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -203,8 +203,11 @@ if [[ "${RAPIDS_CUDA_VERSION}" == "11.8.0" ]]; then --channel pytorch \ --channel nvidia \ 'pyg=2.3' \ - 'pytorch>=2.0' \ - 'pytorch-cuda>=11.8' + 'pytorch=2.0.0' \ + 'pytorch-cuda=11.8' + + # Install pyg dependencies (which requires pip) + pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.0.0+cu118.html rapids-mamba-retry install \ --channel "${CPP_CHANNEL}" \ diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index e0d318adbe0..fd2172e6ade 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -819,8 +819,8 @@ def _get_renumbered_edge_groups_from_sample( before this one to get the noi_index. Example Input: Series({ - 'sources': [0, 5, 11, 3], - 'destinations': [8, 2, 3, 5]}, + 'majors': [0, 5, 11, 3], + 'minors': [8, 2, 3, 5]}, 'edge_type': [1, 3, 5, 14] }), { @@ -865,24 +865,22 @@ def _get_renumbered_edge_groups_from_sample( index=cupy.asarray(id_table), ).sort_index() - # Renumber the sources using binary search + # Renumber the majors using binary search # Step 1: get the index of the new id ix_r = torch.searchsorted( torch.as_tensor(id_map.index.values, device="cuda"), - torch.as_tensor(sampling_results.sources.values, device="cuda"), + torch.as_tensor(sampling_results.majors.values, device="cuda"), ) # Step 2: Go from id indices to actual ids row_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ ix_r ] - # Renumber the destinations using binary search + # Renumber the minors using binary search # Step 1: get the index of the new id ix_c = torch.searchsorted( torch.as_tensor(id_map.index.values, device="cuda"), - torch.as_tensor( - sampling_results.destinations.values, device="cuda" - ), + torch.as_tensor(sampling_results.minors.values, device="cuda"), ) # Step 2: Go from id indices to actual ids col_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ @@ -897,7 +895,7 @@ def _get_renumbered_edge_groups_from_sample( "new_id": cupy.arange(dst_id_table.shape[0]), } ).set_index("dst") - dst = dst_id_map["new_id"].loc[sampling_results.destinations] + dst = dst_id_map["new_id"].loc[sampling_results.minors] col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda") src_id_table = noi_index[src_type] @@ -907,7 +905,7 @@ def _get_renumbered_edge_groups_from_sample( "new_id": cupy.arange(src_id_table.shape[0]), } ).set_index("src") - src = src_id_map["new_id"].loc[sampling_results.sources] + src = src_id_map["new_id"].loc[sampling_results.majors] row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda") else: @@ -929,12 +927,12 @@ def _get_renumbered_edge_groups_from_sample( else: # CSC dst_type, _, src_type = pyg_can_edge_type - # Get the de-offsetted destinations + # Get the de-offsetted minors dst_num_type = self._numeric_vertex_type_from_name(dst_type) - destinations = torch.as_tensor( - sampling_results.destinations.iloc[ix].values, device="cuda" + minors = torch.as_tensor( + sampling_results.minors.iloc[ix].values, device="cuda" ) - destinations -= self.__vertex_type_offsets["start"][dst_num_type] + minors -= self.__vertex_type_offsets["start"][dst_num_type] # Create the col entry for this type dst_id_table = noi_index[dst_type] @@ -944,15 +942,15 @@ def _get_renumbered_edge_groups_from_sample( .rename(columns={"index": "new_id"}) .set_index("dst") ) - dst = dst_id_map["new_id"].loc[cupy.asarray(destinations)] + dst = dst_id_map["new_id"].loc[cupy.asarray(minors)] col_dict[pyg_can_edge_type] = torch.as_tensor(dst.values, device="cuda") - # Get the de-offsetted sources + # Get the de-offsetted majors src_num_type = self._numeric_vertex_type_from_name(src_type) - sources = torch.as_tensor( - sampling_results.sources.iloc[ix].values, device="cuda" + majors = torch.as_tensor( + sampling_results.majors.iloc[ix].values, device="cuda" ) - sources -= self.__vertex_type_offsets["start"][src_num_type] + majors -= self.__vertex_type_offsets["start"][src_num_type] # Create the row entry for this type src_id_table = noi_index[src_type] @@ -962,7 +960,7 @@ def _get_renumbered_edge_groups_from_sample( .rename(columns={"index": "new_id"}) .set_index("src") ) - src = src_id_map["new_id"].loc[cupy.asarray(sources)] + src = src_id_map["new_id"].loc[cupy.asarray(majors)] row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda") return row_dict, col_dict diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index cf7eb330d67..8552e7412e0 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -25,7 +25,9 @@ from cugraph_pyg.data import CuGraphStore from cugraph_pyg.sampler.cugraph_sampler import ( _sampler_output_from_sampling_results_heterogeneous, - _sampler_output_from_sampling_results_homogeneous, + _sampler_output_from_sampling_results_homogeneous_csr, + _sampler_output_from_sampling_results_homogeneous_coo, + filter_cugraph_store_csc, ) from typing import Union, Tuple, Sequence, List, Dict @@ -58,6 +60,7 @@ def __init__( # Sampler args num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]] = None, replace: bool = True, + compression: str = "COO", # Other kwargs for the BulkSampler **kwargs, ): @@ -128,6 +131,10 @@ def __init__( self.__batches_per_partition = batches_per_partition self.__starting_batch_id = starting_batch_id + self._total_read_time = 0.0 + self._total_convert_time = 0.0 + self._total_feature_time = 0.0 + if input_nodes is None: # Will be loading from disk self.__num_batches = input_nodes @@ -174,6 +181,10 @@ def __init__( with_replacement=replace, batches_per_partition=self.__batches_per_partition, renumber=renumber, + use_legacy_names=False, + deduplicate_sources=True, + prior_sources_behavior="exclude", + include_hop_column=(compression == "COO"), **kwargs, ) @@ -211,6 +222,10 @@ def __init__( self.__input_files = iter(os.listdir(self.__directory.name)) def __next__(self): + from time import perf_counter + + start_time_read_data = perf_counter() + # Load the next set of sampling results if necessary if self.__next_batch >= self.__end_exclusive: if self.__directory is None: @@ -245,51 +260,98 @@ def __next__(self): fname, ) - columns = { - "sources": "int64", - "destinations": "int64", - # 'edge_id':'int64', - "edge_type": "int32", - "batch_id": "int32", - "hop_id": "int32", - } - raw_sample_data = cudf.read_parquet(parquet_path) + if "map" in raw_sample_data.columns: - num_batches = end_inclusive - self.__start_inclusive + 1 + if "renumber_map_offsets" not in raw_sample_data.columns: + num_batches = end_inclusive - self.__start_inclusive + 1 - map_end = raw_sample_data["map"].iloc[num_batches] + map_end = raw_sample_data["map"].iloc[num_batches] - map = torch.as_tensor( - raw_sample_data["map"].iloc[0:map_end], device="cuda" - ) - raw_sample_data.drop("map", axis=1, inplace=True) + map = torch.as_tensor( + raw_sample_data["map"].iloc[0:map_end], device="cuda" + ) + raw_sample_data.drop("map", axis=1, inplace=True) - self.__renumber_map_offsets = map[0 : num_batches + 1] - map[0] - self.__renumber_map = map[num_batches + 1 :] + self.__renumber_map_offsets = map[0 : num_batches + 1] - map[0] + self.__renumber_map = map[num_batches + 1 :] + else: + self.__renumber_map = raw_sample_data["map"] + self.__renumber_map_offsets = raw_sample_data[ + "renumber_map_offsets" + ] + raw_sample_data.drop( + columns=["map", "renumber_map_offsets"], inplace=True + ) + + self.__renumber_map.dropna(inplace=True) + self.__renumber_map = torch.as_tensor( + self.__renumber_map, device="cuda" + ) + + self.__renumber_map_offsets.dropna(inplace=True) + self.__renumber_map_offsets = torch.as_tensor( + self.__renumber_map_offsets, device="cuda" + ) else: self.__renumber_map = None - self.__data = raw_sample_data[list(columns.keys())].astype(columns) - self.__data.dropna(inplace=True) + self.__data = raw_sample_data + self.__coo = "majors" in self.__data.columns + if self.__coo: + self.__data.dropna(inplace=True) if ( len(self.__graph_store.edge_types) == 1 and len(self.__graph_store.node_types) == 1 ): - group_cols = ["batch_id", "hop_id"] - self.__data_index = self.__data.groupby(group_cols, as_index=True).agg( - {"sources": "max", "destinations": "max"} - ) - self.__data_index.rename( - columns={"sources": "src_max", "destinations": "dst_max"}, - inplace=True, - ) - self.__data_index = self.__data_index.to_dict(orient="index") + if self.__coo: + group_cols = ["batch_id", "hop_id"] + self.__data_index = self.__data.groupby( + group_cols, as_index=True + ).agg({"majors": "max", "minors": "max"}) + self.__data_index.rename( + columns={"majors": "src_max", "minors": "dst_max"}, + inplace=True, + ) + self.__data_index = self.__data_index.to_dict(orient="index") + else: + self.__data_index = None + + self.__label_hop_offsets = self.__data["label_hop_offsets"] + self.__data.drop(columns=["label_hop_offsets"], inplace=True) + self.__label_hop_offsets.dropna(inplace=True) + self.__label_hop_offsets = torch.as_tensor( + self.__label_hop_offsets, device="cuda" + ) + self.__label_hop_offsets -= self.__label_hop_offsets[0].clone() + + self.__major_offsets = self.__data["major_offsets"] + self.__data.drop(columns="major_offsets", inplace=True) + self.__major_offsets.dropna(inplace=True) + self.__major_offsets = torch.as_tensor( + self.__major_offsets, device="cuda" + ) + self.__major_offsets -= self.__major_offsets[0].clone() + + self.__minors = self.__data["minors"] + self.__data.drop(columns="minors", inplace=True) + self.__minors.dropna(inplace=True) + self.__minors = torch.as_tensor(self.__minors, device="cuda") + + num_batches = self.__end_exclusive - self.__start_inclusive + offsets_len = len(self.__label_hop_offsets) - 1 + if offsets_len % num_batches != 0: + raise ValueError("invalid label-hop offsets") + self.__fanout_length = int(offsets_len / num_batches) + + end_time_read_data = perf_counter() + self._total_read_time += end_time_read_data - start_time_read_data # Pull the next set of sampling results out of the dataframe in memory - f = self.__data["batch_id"] == self.__next_batch + if self.__coo: + f = self.__data["batch_id"] == self.__next_batch if self.__renumber_map is not None: i = self.__next_batch - self.__start_inclusive @@ -301,18 +363,43 @@ def __next__(self): else: current_renumber_map = None + start_time_convert = perf_counter() # Get and return the sampled subgraph if ( len(self.__graph_store.edge_types) == 1 and len(self.__graph_store.node_types) == 1 ): - sampler_output = _sampler_output_from_sampling_results_homogeneous( - self.__data[f], - current_renumber_map, - self.__graph_store, - self.__data_index, - self.__next_batch, - ) + if self.__coo: + sampler_output = _sampler_output_from_sampling_results_homogeneous_coo( + self.__data[f], + current_renumber_map, + self.__graph_store, + self.__data_index, + self.__next_batch, + ) + else: + i = (self.__next_batch - self.__start_inclusive) * self.__fanout_length + current_label_hop_offsets = self.__label_hop_offsets[ + i : i + self.__fanout_length + 1 + ] + + current_major_offsets = self.__major_offsets[ + current_label_hop_offsets[0] : (current_label_hop_offsets[-1] + 1) + ] + + current_minors = self.__minors[ + current_major_offsets[0] : current_major_offsets[-1] + ] + + sampler_output = _sampler_output_from_sampling_results_homogeneous_csr( + current_major_offsets, + current_minors, + current_renumber_map, + self.__graph_store, + current_label_hop_offsets, + self.__data_index, + self.__next_batch, + ) else: sampler_output = _sampler_output_from_sampling_results_heterogeneous( self.__data[f], current_renumber_map, self.__graph_store @@ -321,18 +408,35 @@ def __next__(self): # Get ready for next iteration self.__next_batch += 1 + end_time_convert = perf_counter() + self._total_convert_time += end_time_convert - start_time_convert + + start_time_feature = perf_counter() # Create a PyG HeteroData object, loading the required features - out = torch_geometric.loader.utils.filter_custom_store( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, - ) + if self.__coo: + out = torch_geometric.loader.utils.filter_custom_store( + self.__feature_store, + self.__graph_store, + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) + else: + if self.__graph_store.order == "CSR": + raise ValueError("CSR format incompatible with CSC output") + + out = filter_cugraph_store_csc( + self.__feature_store, + self.__graph_store, + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) # Account for CSR format in cuGraph vs. CSC format in PyG - if self.__graph_store.order == "CSC": + if self.__coo and self.__graph_store.order == "CSC": for node_type in out.edge_index_dict: out[node_type].edge_index[0], out[node_type].edge_index[1] = ( out[node_type].edge_index[1], @@ -342,6 +446,9 @@ def __next__(self): out.set_value_dict("num_sampled_nodes", sampler_output.num_sampled_nodes) out.set_value_dict("num_sampled_edges", sampler_output.num_sampled_edges) + end_time_feature = perf_counter() + self._total_feature_time = end_time_feature - start_time_feature + return out @property diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py index 6e8c4322418..300ca9beb5a 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py @@ -53,10 +53,10 @@ def _get_unique_nodes( The unique nodes of the given node type. """ if node_position == "src": - edge_index = "sources" + edge_index = "majors" edge_sel = 0 elif node_position == "dst": - edge_index = "destinations" + edge_index = "minors" edge_sel = -1 else: raise ValueError(f"Illegal value {node_position} for node_position") @@ -78,7 +78,7 @@ def _get_unique_nodes( return sampling_results_node[edge_index] -def _sampler_output_from_sampling_results_homogeneous( +def _sampler_output_from_sampling_results_homogeneous_coo( sampling_results: cudf.DataFrame, renumber_map: torch.Tensor, graph_store: CuGraphStore, @@ -133,11 +133,11 @@ def _sampler_output_from_sampling_results_homogeneous( noi_index = {node_type: torch.as_tensor(renumber_map, device="cuda")} row_dict = { - edge_type: torch.as_tensor(sampling_results.sources, device="cuda"), + edge_type: torch.as_tensor(sampling_results.majors, device="cuda"), } col_dict = { - edge_type: torch.as_tensor(sampling_results.destinations, device="cuda"), + edge_type: torch.as_tensor(sampling_results.minors, device="cuda"), } num_nodes_per_hop_dict[node_type][0] = data_index[batch_id, 0]["src_max"] + 1 @@ -177,6 +177,88 @@ def _sampler_output_from_sampling_results_homogeneous( ) +def _sampler_output_from_sampling_results_homogeneous_csr( + major_offsets: torch.Tensor, + minors: torch.Tensor, + renumber_map: torch.Tensor, + graph_store: CuGraphStore, + label_hop_offsets: torch.Tensor, + batch_id: int, + metadata: Sequence = None, +) -> HeteroSamplerOutput: + """ + Parameters + ---------- + major_offsets: torch.Tensor + The major offsets for the CSC/CSR matrix ("row pointer") + minors: torch.Tensor + The minors for the CSC/CSR matrix ("col index") + renumber_map: torch.Tensor + The tensor containing the renumber map. + Required. + graph_store: CuGraphStore + The graph store containing the structure of the sampled graph. + label_hop_offsets: torch.Tensor + The tensor containing the label-hop offsets. + batch_id: int + The current batch id, whose samples are being retrieved + from the sampling results and data index. + metadata: Tensor + The metadata for the sampled batch. + + Returns + ------- + HeteroSamplerOutput + """ + + if len(graph_store.edge_types) > 1 or len(graph_store.node_types) > 1: + raise ValueError("Graph is heterogeneous") + + if renumber_map is None: + raise ValueError("Renumbered input is expected for homogeneous graphs") + + node_type = graph_store.node_types[0] + edge_type = graph_store.edge_types[0] + + major_offsets = major_offsets.clone() - major_offsets[0] + label_hop_offsets = label_hop_offsets.clone() - label_hop_offsets[0] + + num_edges_per_hop_dict = {edge_type: major_offsets[label_hop_offsets].diff().cpu()} + + label_hop_offsets = label_hop_offsets.cpu() + num_nodes_per_hop_dict = { + node_type: torch.concat( + [ + label_hop_offsets.diff(), + (renumber_map.shape[0] - label_hop_offsets[-1]).reshape((1,)), + ] + ).cpu() + } + + noi_index = {node_type: torch.as_tensor(renumber_map, device="cuda")} + + col_dict = { + edge_type: major_offsets, + } + + row_dict = { + edge_type: minors, + } + + if HeteroSamplerOutput is None: + raise ImportError("Error importing from pyg") + + return HeteroSamplerOutput( + node=noi_index, + row=row_dict, + col=col_dict, + edge=None, + num_sampled_nodes=num_nodes_per_hop_dict, + num_sampled_edges=num_edges_per_hop_dict, + metadata=metadata, + ) + + def _sampler_output_from_sampling_results_heterogeneous( sampling_results: cudf.DataFrame, renumber_map: cudf.Series, @@ -244,8 +326,8 @@ def _sampler_output_from_sampling_results_heterogeneous( cudf.Series( torch.concat( [ - torch.as_tensor(sampling_results_hop_0.sources, device="cuda"), - torch.as_tensor(sampling_results.destinations, device="cuda"), + torch.as_tensor(sampling_results_hop_0.majors, device="cuda"), + torch.as_tensor(sampling_results.minors, device="cuda"), ] ), name="nodes_of_interest", @@ -320,3 +402,37 @@ def _sampler_output_from_sampling_results_heterogeneous( num_sampled_edges=num_edges_per_hop_dict, metadata=metadata, ) + + +def filter_cugraph_store_csc( + feature_store: torch_geometric.data.FeatureStore, + graph_store: torch_geometric.data.GraphStore, + node_dict: Dict[str, torch.Tensor], + row_dict: Dict[str, torch.Tensor], + col_dict: Dict[str, torch.Tensor], + edge_dict: Dict[str, Tuple[torch.Tensor]], +) -> torch_geometric.data.HeteroData: + data = torch_geometric.data.HeteroData() + + for attr in graph_store.get_all_edge_attrs(): + key = attr.edge_type + if key in row_dict and key in col_dict: + data.put_edge_index( + (row_dict[key], col_dict[key]), + edge_type=key, + layout="csc", + is_sorted=True, + ) + + required_attrs = [] + for attr in feature_store.get_all_tensor_attrs(): + if attr.group_name in node_dict: + attr.index = node_dict[attr.group_name] + required_attrs.append(attr) + data[attr.group_name].num_nodes = attr.index.size(0) + + tensors = feature_store.multi_get_tensor(required_attrs) + for i, attr in enumerate(required_attrs): + data[attr.group_name][attr.attr_name] = tensors[i] + + return data diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index a1a72a44d0c..80a2d0a6c79 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -53,9 +53,10 @@ def test_neighbor_sample(dask_client, basic_graph_1): random_state=62, return_offsets=False, return_hops=True, + use_legacy_names=False, ) .compute() - .sort_values(by=["sources", "destinations"]) + .sort_values(by=["majors", "minors"]) ) out = _sampler_output_from_sampling_results_heterogeneous( @@ -116,8 +117,9 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph random_state=62, return_offsets=False, with_batch_ids=True, + use_legacy_names=False, ) - .sort_values(by=["sources", "destinations"]) + .sort_values(by=["majors", "minors"]) .compute() ) @@ -193,8 +195,8 @@ def test_neighbor_sample_mock_sampling_results(dask_client): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py index 43b1e5da5a0..ed7f70034e2 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py @@ -220,8 +220,8 @@ def test_renumber_edges(abc_graph, dask_client): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py index 48a21cb7fd6..03274948158 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py @@ -22,6 +22,8 @@ from cugraph_pyg.loader import CuGraphNeighborLoader from cugraph_pyg.loader import BulkSampleLoader from cugraph_pyg.data import CuGraphStore +from cugraph_pyg.nn import SAGEConv as CuGraphSAGEConv + from cugraph.gnn import FeatureStore from cugraph.utilities.utils import import_optional, MissingModule @@ -98,8 +100,8 @@ def test_cugraph_loader_from_disk(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -130,12 +132,10 @@ def test_cugraph_loader_from_disk(): assert list(edge_index.shape) == [2, 8] assert ( - edge_index[0].tolist() - == bogus_samples.sources.dropna().values_host.tolist() + edge_index[0].tolist() == bogus_samples.majors.dropna().values_host.tolist() ) assert ( - edge_index[1].tolist() - == bogus_samples.destinations.dropna().values_host.tolist() + edge_index[1].tolist() == bogus_samples.minors.dropna().values_host.tolist() ) assert num_samples == 256 @@ -157,8 +157,8 @@ def test_cugraph_loader_from_disk_subset(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -190,13 +190,77 @@ def test_cugraph_loader_from_disk_subset(): assert list(edge_index.shape) == [2, 8] assert ( - edge_index[0].tolist() - == bogus_samples.sources.dropna().values_host.tolist() + edge_index[0].tolist() == bogus_samples.majors.dropna().values_host.tolist() + ) + assert ( + edge_index[1].tolist() == bogus_samples.minors.dropna().values_host.tolist() ) + + assert num_samples == 100 + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +def test_cugraph_loader_from_disk_subset_csr(): + m = [2, 9, 99, 82, 11, 13] + n = torch.arange(1, 1 + len(m), dtype=torch.int32) + x = torch.zeros(256, dtype=torch.int32) + x[torch.tensor(m, dtype=torch.int32)] = n + F = FeatureStore() + F.add_data(x, "t0", "x") + + G = {("t0", "knows", "t0"): 9080} + N = {"t0": 256} + + cugraph_store = CuGraphStore(F, G, N) + + bogus_samples = cudf.DataFrame( + { + "major_offsets": [0, 3, 5, 7, 8, None, None, None], + "minors": [1, 2, 3, 0, 3, 4, 5, 1], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "label_hop_offsets": cudf.Series( + [0, 1, 4, None, None, None, None, None], dtype="int32" + ), + "renumber_map_offsets": cudf.Series([0, 6], dtype="int32"), + } + ) + map = cudf.Series(m, name="map") + bogus_samples["map"] = map + + tempdir = tempfile.TemporaryDirectory() + for s in range(256): + # offset the offsets + bogus_samples["batch_id"] = cupy.int32(s) + bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet")) + + loader = BulkSampleLoader( + feature_store=cugraph_store, + graph_store=cugraph_store, + directory=tempdir, + input_files=list(os.listdir(tempdir.name))[100:200], + ) + + num_samples = 0 + for sample in loader: + num_samples += 1 + assert sample["t0"]["num_nodes"] == 6 + + assert sample["t0"]["x"].tolist() == [1, 2, 3, 4, 5, 6] + + edge_index = sample[("t0", "knows", "t0")]["adj_t"] + assert edge_index.size(0) == 4 + assert edge_index.size(1) == 6 + + colptr, row, _ = edge_index.csr() + assert ( - edge_index[1].tolist() - == bogus_samples.destinations.dropna().values_host.tolist() + colptr.tolist() == bogus_samples.major_offsets.dropna().values_host.tolist() ) + assert row.tolist() == bogus_samples.minors.dropna().values_host.tolist() + + assert sample["t0"]["num_sampled_nodes"].tolist() == [1, 3, 2] + assert sample["t0", "knows", "t0"]["num_sampled_edges"].tolist() == [3, 5] assert num_samples == 100 @@ -215,8 +279,8 @@ def test_cugraph_loader_e2e_coo(): bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6, 6], - "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "majors": [0, 1, 2, 3, 4, 5, 6, 6], + "minors": [5, 4, 3, 2, 2, 6, 5, 2], "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), @@ -253,8 +317,6 @@ def test_cugraph_loader_e2e_coo(): num_sampled_nodes = hetero_data["t0"]["num_sampled_nodes"] num_sampled_edges = hetero_data["t0", "knows", "t0"]["num_sampled_edges"] - print(num_sampled_nodes, num_sampled_edges) - for i in range(len(convs)): x, ei, _ = trim(i, num_sampled_nodes, num_sampled_edges, x, ei, None) @@ -263,9 +325,111 @@ def test_cugraph_loader_e2e_coo(): x = convs[i](x, ei, size=(s, s)) x = relu(x) x = dropout(x, p=0.5) - print(x.shape) - print(x.shape) x = x.narrow(dim=0, start=0, length=x.shape[0] - num_sampled_nodes[1]) assert list(x.shape) == [3, 1] + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.parametrize("framework", ["pyg", "cugraph-ops"]) +def test_cugraph_loader_e2e_csc(framework): + m = [2, 9, 99, 82, 9, 3, 18, 1, 12] + x = torch.randint(3000, (256, 256)).to(torch.float32) + F = FeatureStore() + F.add_data(x, "t0", "x") + + G = {("t0", "knows", "t0"): 9999} + N = {"t0": 256} + + cugraph_store = CuGraphStore(F, G, N) + + bogus_samples = cudf.DataFrame( + { + "major_offsets": [0, 3, 5, 7, 8, None, None, None], + "minors": [1, 2, 3, 0, 3, 4, 5, 1], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "label_hop_offsets": cudf.Series( + [0, 1, 4, None, None, None, None, None], dtype="int32" + ), + "renumber_map_offsets": cudf.Series([0, 6], dtype="int32"), + } + ) + map = cudf.Series(m, name="map") + bogus_samples = bogus_samples.join(map, how="outer").sort_index() + + tempdir = tempfile.TemporaryDirectory() + for s in range(256): + bogus_samples["batch_id"] = cupy.int32(s) + bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet")) + + loader = BulkSampleLoader( + feature_store=cugraph_store, + graph_store=cugraph_store, + directory=tempdir, + input_files=list(os.listdir(tempdir.name))[100:200], + ) + + if framework == "pyg": + convs = [ + torch_geometric.nn.SAGEConv(256, 64, aggr="mean").cuda(), + torch_geometric.nn.SAGEConv(64, 1, aggr="mean").cuda(), + ] + else: + convs = [ + CuGraphSAGEConv(256, 64, aggr="mean").cuda(), + CuGraphSAGEConv(64, 1, aggr="mean").cuda(), + ] + + trim = trim_to_layer.TrimToLayer() + relu = torch.nn.functional.relu + dropout = torch.nn.functional.dropout + + for hetero_data in loader: + x = hetero_data["t0"]["x"].cuda() + + if framework == "pyg": + ei = hetero_data["t0", "knows", "t0"]["adj_t"].coo() + ei = torch.stack((ei[0], ei[1])) + else: + ei = hetero_data["t0", "knows", "t0"]["adj_t"].csr() + ei = [ei[1], ei[0], x.shape[0]] + + num_sampled_nodes = hetero_data["t0"]["num_sampled_nodes"] + num_sampled_edges = hetero_data["t0", "knows", "t0"]["num_sampled_edges"] + + s = x.shape[0] + for i in range(len(convs)): + if framework == "pyg": + x, ei, _ = trim(i, num_sampled_nodes, num_sampled_edges, x, ei, None) + else: + if i > 0: + x = x.narrow( + dim=0, + start=0, + length=s - num_sampled_nodes[-i], + ) + + ei[0] = ei[0].narrow( + dim=0, + start=0, + length=ei[0].size(0) - num_sampled_edges[-i], + ) + ei[1] = ei[1].narrow( + dim=0, start=0, length=ei[1].size(0) - num_sampled_nodes[-i] + ) + ei[2] = x.size(0) + + s = x.shape[0] + + if framework == "pyg": + x = convs[i](x, ei, size=(s, s)) + else: + x = convs[i](x, ei) + x = relu(x) + x = dropout(x, p=0.5) + + x = x.narrow(dim=0, start=0, length=s - num_sampled_nodes[1]) + + assert list(x.shape) == [1, 1] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 84f62e80c9d..e703d477b70 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -49,7 +49,8 @@ def test_neighbor_sample(basic_graph_1): with_batch_ids=True, random_state=62, return_offsets=False, - ).sort_values(by=["sources", "destinations"]) + use_legacy_names=False, + ).sort_values(by=["majors", "minors"]) out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, @@ -107,7 +108,8 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): random_state=62, return_offsets=False, with_batch_ids=True, - ).sort_values(by=["sources", "destinations"]) + use_legacy_names=False, + ).sort_values(by=["majors", "minors"]) out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, @@ -154,8 +156,8 @@ def test_neighbor_sample_mock_sampling_results(abc_graph): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py index e815b813050..da3043760d4 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py @@ -204,8 +204,8 @@ def test_renumber_edges(abc_graph): # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( { - "sources": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), - "destinations": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), + "majors": cudf.Series([0, 0, 1, 2, 3, 3, 1, 3, 3, 3], dtype="int64"), + "minors": cudf.Series([2, 3, 3, 8, 1, 7, 3, 1, 5, 7], dtype="int64"), "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 3, 3, 3], dtype="int32"), "edge_type": cudf.Series([0, 0, 0, 2, 1, 2, 0, 1, 2, 2], dtype="int32"), } diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 7c0aad6c3ee..25311902b29 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -99,19 +99,65 @@ def _chunk_lst(ls, num_parts): return [ls[i::num_parts] for i in range(num_parts)] -def persist_dask_df_equal_parts_per_worker(dask_df, client): +def persist_dask_df_equal_parts_per_worker( + dask_df, client, return_type="dask_cudf.DataFrame" +): + """ + Persist dask_df with equal parts per worker + Args: + dask_df: dask_cudf.DataFrame + client: dask.distributed.Client + return_type: str, "dask_cudf.DataFrame" or "dict" + Returns: + persisted_keys: dict of {worker: [persisted_keys]} + """ + if return_type not in ["dask_cudf.DataFrame", "dict"]: + raise ValueError("return_type must be either 'dask_cudf.DataFrame' or 'dict'") + ddf_keys = dask_df.to_delayed() workers = client.scheduler_info()["workers"].keys() ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) - persisted_keys = [] + persisted_keys_d = {} for w, ddf_k in zip(workers, ddf_keys_ls): - persisted_keys.extend( - client.persist(ddf_k, workers=w, allow_other_workers=False) + persisted_keys_d[w] = client.compute( + ddf_k, workers=w, allow_other_workers=False, pure=False ) - dask_df = dask_cudf.from_delayed(persisted_keys, meta=dask_df._meta).persist() - wait(dask_df) - client.rebalance(dask_df) - return dask_df + + persisted_keys_ls = [ + item for sublist in persisted_keys_d.values() for item in sublist + ] + wait(persisted_keys_ls) + if return_type == "dask_cudf.DataFrame": + dask_df = dask_cudf.from_delayed( + persisted_keys_ls, meta=dask_df._meta + ).persist() + wait(dask_df) + return dask_df + + return persisted_keys_d + + +def get_length_of_parts(persisted_keys_d, client): + """ + Get the length of each partition + Args: + persisted_keys_d: dict of {worker: [persisted_keys]} + client: dask.distributed.Client + Returns: + length_of_parts: dict of {worker: [length_of_parts]} + """ + length_of_parts = {} + for w, p_keys in persisted_keys_d.items(): + length_of_parts[w] = [ + client.submit( + len, p_key, pure=False, workers=[w], allow_other_workers=False + ) + for p_key in p_keys + ] + + for w, len_futures in length_of_parts.items(): + length_of_parts[w] = client.gather(len_futures) + return length_of_parts async def _extract_partitions( diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index fa94fa67625..935d0c597d4 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -36,6 +36,7 @@ from cugraph.structure.symmetrize import symmetrize from cugraph.dask.common.part_utils import ( get_persisted_df_worker_map, + get_length_of_parts, persist_dask_df_equal_parts_per_worker, ) from cugraph.dask import get_n_workers @@ -318,9 +319,14 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - ddf = persist_dask_df_equal_parts_per_worker(ddf, _client) - num_edges = len(ddf) - ddf = get_persisted_df_worker_map(ddf, _client) + persisted_keys_d = persist_dask_df_equal_parts_per_worker( + ddf, _client, return_type="dict" + ) + del ddf + length_of_parts = get_length_of_parts(persisted_keys_d, _client) + num_edges = sum( + [item for sublist in length_of_parts.values() for item in sublist] + ) delayed_tasks_d = { w: delayed(simpleDistributedGraphImpl._make_plc_graph)( Comms.get_session_id(), @@ -331,14 +337,16 @@ def __from_edgelist( store_transposed, num_edges, ) - for w, edata in ddf.items() + for w, edata in persisted_keys_d.items() } - # FIXME: For now, don't delete the copied dataframe to avoid crash self._plc_graph = { - w: _client.compute(delayed_task, workers=w, allow_other_workers=False) + w: _client.compute( + delayed_task, workers=w, allow_other_workers=False, pure=False + ) for w, delayed_task in delayed_tasks_d.items() } wait(list(self._plc_graph.values())) + del persisted_keys_d del delayed_tasks_d _client.run(gc.collect) @@ -1192,5 +1200,7 @@ def _get_column_from_ls_dfs(lst_df, col_name): if len_df == 0: return lst_df[0][col_name] output_col = cudf.concat([df[col_name] for df in lst_df], ignore_index=True) - # FIXME: For now, don't delete the copied dataframe to avoid cras + for df in lst_df: + df.drop(columns=[col_name], inplace=True) + gc.collect() return output_col diff --git a/python/nx-cugraph/_nx_cugraph/__init__.py b/python/nx-cugraph/_nx_cugraph/__init__.py index ba7fe5e91d7..227caea29d5 100644 --- a/python/nx-cugraph/_nx_cugraph/__init__.py +++ b/python/nx-cugraph/_nx_cugraph/__init__.py @@ -31,20 +31,23 @@ # BEGIN: functions "betweenness_centrality", "edge_betweenness_centrality", + "is_isolate", + "isolates", "louvain_communities", + "number_of_isolates", # END: functions }, "extra_docstrings": { # BEGIN: extra_docstrings "betweenness_centrality": "`weight` parameter is not yet supported.", "edge_betweenness_centrality": "`weight` parameter is not yet supported.", - "louvain_communities": "`threshold` and `seed` parameters are currently ignored.", + "louvain_communities": "`seed` parameter is currently ignored.", # END: extra_docstrings }, "extra_parameters": { # BEGIN: extra_parameters "louvain_communities": { - "max_level : int, optional": "Upper limit of the number of macro-iterations.", + "max_level : int, optional": "Upper limit of the number of macro-iterations (max: 500).", }, # END: extra_parameters }, diff --git a/python/nx-cugraph/lint.yaml b/python/nx-cugraph/lint.yaml index 6a462a6af79..338ca7b230e 100644 --- a/python/nx-cugraph/lint.yaml +++ b/python/nx-cugraph/lint.yaml @@ -63,7 +63,7 @@ repos: # These versions need updated manually - flake8==6.1.0 - flake8-bugbear==23.9.16 - - flake8-simplify==0.20.0 + - flake8-simplify==0.21.0 - repo: https://github.com/asottile/yesqa rev: v1.5.0 hooks: diff --git a/python/nx-cugraph/nx_cugraph/algorithms/__init__.py b/python/nx-cugraph/nx_cugraph/algorithms/__init__.py index 3a585452d6d..dfd9adfc61a 100644 --- a/python/nx-cugraph/nx_cugraph/algorithms/__init__.py +++ b/python/nx-cugraph/nx_cugraph/algorithms/__init__.py @@ -12,3 +12,4 @@ # limitations under the License. from . import centrality, community from .centrality import * +from .isolate import * diff --git a/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py b/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py index a183b59fe1d..dc209870c89 100644 --- a/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py +++ b/python/nx-cugraph/nx_cugraph/algorithms/community/louvain.py @@ -10,7 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import sys +import warnings import pylibcugraph as plc @@ -22,19 +22,23 @@ not_implemented_for, ) +from ..isolate import _isolates + __all__ = ["louvain_communities"] @not_implemented_for("directed") @networkx_algorithm( extra_params={ - "max_level : int, optional": "Upper limit of the number of macro-iterations." + "max_level : int, optional": ( + "Upper limit of the number of macro-iterations (max: 500)." + ) } ) def louvain_communities( G, weight="weight", resolution=1, threshold=0.0000001, seed=None, *, max_level=None ): - """`threshold` and `seed` parameters are currently ignored.""" + """`seed` parameter is currently ignored.""" # NetworkX allows both directed and undirected, but cugraph only allows undirected. seed = _seed_to_int(seed) # Unused, but ensure it's valid for future compatibility G = _to_undirected_graph(G, weight) @@ -42,7 +46,14 @@ def louvain_communities( # TODO: PLC doesn't handle empty graphs gracefully! return [{key} for key in G._nodeiter_to_iter(range(len(G)))] if max_level is None: - max_level = sys.maxsize + max_level = 500 + elif max_level > 500: + warnings.warn( + f"max_level is set too high (={max_level}), setting it to 500.", + UserWarning, + stacklevel=2, + ) + max_level = 500 vertices, clusters, modularity = plc.louvain( resource_handle=plc.ResourceHandle(), graph=G._get_plc_graph(), @@ -52,7 +63,14 @@ def louvain_communities( do_expensive_check=False, ) groups = _groupby(clusters, vertices) - return [set(G._nodearray_to_list(node_ids)) for node_ids in groups.values()] + rv = [set(G._nodearray_to_list(node_ids)) for node_ids in groups.values()] + # TODO: PLC doesn't handle isolated vertices yet, so this is a temporary fix + isolates = _isolates(G) + if isolates.size > 0: + isolates = isolates[isolates > vertices.max()] + if isolates.size > 0: + rv.extend({node} for node in G._nodearray_to_list(isolates)) + return rv @louvain_communities._can_run diff --git a/python/nx-cugraph/nx_cugraph/algorithms/isolate.py b/python/nx-cugraph/nx_cugraph/algorithms/isolate.py new file mode 100644 index 00000000000..774627e84f6 --- /dev/null +++ b/python/nx-cugraph/nx_cugraph/algorithms/isolate.py @@ -0,0 +1,63 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import cupy as cp + +from nx_cugraph.convert import _to_graph +from nx_cugraph.utils import networkx_algorithm + +if TYPE_CHECKING: # pragma: no cover + from nx_cugraph.typing import IndexValue + +__all__ = ["is_isolate", "isolates", "number_of_isolates"] + + +@networkx_algorithm +def is_isolate(G, n): + G = _to_graph(G) + index = n if G.key_to_id is None else G.key_to_id[n] + return not ( + (G.row_indices == index).any().tolist() + or G.is_directed() + and (G.col_indices == index).any().tolist() + ) + + +def _mark_isolates(G) -> cp.ndarray[bool]: + """Return a boolean mask array indicating indices of isolated nodes.""" + mark_isolates = cp.ones(len(G), bool) + mark_isolates[G.row_indices] = False + if G.is_directed(): + mark_isolates[G.col_indices] = False + return mark_isolates + + +def _isolates(G) -> cp.ndarray[IndexValue]: + """Like isolates, but return an array of indices instead of an iterator of nodes.""" + G = _to_graph(G) + return cp.nonzero(_mark_isolates(G))[0] + + +@networkx_algorithm +def isolates(G): + G = _to_graph(G) + return G._nodeiter_to_iter(iter(_isolates(G).tolist())) + + +@networkx_algorithm +def number_of_isolates(G): + G = _to_graph(G) + return _mark_isolates(G).sum().tolist() diff --git a/python/nx-cugraph/nx_cugraph/classes/digraph.py b/python/nx-cugraph/nx_cugraph/classes/digraph.py index 0aaf88fd793..72a1bff21a9 100644 --- a/python/nx-cugraph/nx_cugraph/classes/digraph.py +++ b/python/nx-cugraph/nx_cugraph/classes/digraph.py @@ -20,7 +20,7 @@ from .graph import Graph -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from nx_cugraph.typing import NodeKey __all__ = ["DiGraph"] diff --git a/python/nx-cugraph/nx_cugraph/classes/graph.py b/python/nx-cugraph/nx_cugraph/classes/graph.py index 1432f68c752..ded4cc3943f 100644 --- a/python/nx-cugraph/nx_cugraph/classes/graph.py +++ b/python/nx-cugraph/nx_cugraph/classes/graph.py @@ -23,7 +23,7 @@ import nx_cugraph as nxcg -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from collections.abc import Iterable, Iterator from nx_cugraph.typing import ( @@ -245,9 +245,9 @@ def from_dcsc( def __new__(cls, incoming_graph_data=None, **attr) -> Graph: if incoming_graph_data is None: new_graph = cls.from_coo(0, cp.empty(0, np.int32), cp.empty(0, np.int32)) - elif incoming_graph_data.__class__ is new_graph.__class__: + elif incoming_graph_data.__class__ is cls: new_graph = incoming_graph_data.copy() - elif incoming_graph_data.__class__ is new_graph.to_networkx_class(): + elif incoming_graph_data.__class__ is cls.to_networkx_class(): new_graph = nxcg.from_networkx(incoming_graph_data, preserve_all_attrs=True) else: raise NotImplementedError diff --git a/python/nx-cugraph/nx_cugraph/convert.py b/python/nx-cugraph/nx_cugraph/convert.py index 9be8cac7877..1240ea71db7 100644 --- a/python/nx-cugraph/nx_cugraph/convert.py +++ b/python/nx-cugraph/nx_cugraph/convert.py @@ -24,7 +24,7 @@ import nx_cugraph as nxcg -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from nx_cugraph.typing import AttrKey, Dtype, EdgeValue, NodeValue __all__ = [ diff --git a/python/nx-cugraph/nx_cugraph/tests/test_community.py b/python/nx-cugraph/nx_cugraph/tests/test_community.py new file mode 100644 index 00000000000..126f45c14ae --- /dev/null +++ b/python/nx-cugraph/nx_cugraph/tests/test_community.py @@ -0,0 +1,53 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import networkx as nx +import pytest + +import nx_cugraph as nxcg + + +def test_louvain_isolated_nodes(): + is_nx_30_or_31 = hasattr(nx.classes, "backends") + + def check(left, right): + assert len(left) == len(right) + assert set(map(frozenset, left)) == set(map(frozenset, right)) + + # Empty graph (no nodes) + G = nx.Graph() + if is_nx_30_or_31: + with pytest.raises(ZeroDivisionError): + nx.community.louvain_communities(G) + else: + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Graph with no edges + G.add_nodes_from(range(5)) + if is_nx_30_or_31: + with pytest.raises(ZeroDivisionError): + nx.community.louvain_communities(G) + else: + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Graph with isolated nodes + G.add_edge(1, 2) + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result) + # Another one + G.add_edge(4, 4) + nx_result = nx.community.louvain_communities(G) + cg_result = nxcg.community.louvain_communities(G) + check(nx_result, cg_result)