diff --git a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py index 8d5d2fd4894..e0d318adbe0 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py @@ -25,6 +25,7 @@ import pandas import cudf import cugraph +import warnings from cugraph.utilities.utils import import_optional, MissingModule @@ -211,7 +212,9 @@ def __init__( F: cugraph.gnn.FeatureStore, G: Union[Dict[str, Tuple[TensorType]], Dict[str, int]], num_nodes_dict: Dict[str, int], + *, multi_gpu: bool = False, + order: str = "CSC", ): """ Constructs a new CuGraphStore from the provided @@ -256,11 +259,20 @@ def __init__( multi_gpu: bool (Optional, default = False) Whether the store should be backed by a multi-GPU graph. Requires dask to have been set up. + + order: str (Optional ["CSR", "CSC"], default = CSC) + The order to use for sampling. Should nearly always be CSC + unless there is a specific expectation of "reverse" sampling. + It is also not uncommon to use CSR order for correctness + testing, which some cuGraph-PyG tests do. """ if None in G: raise ValueError("Unspecified edge types not allowed in PyG") + if order != "CSR" and order != "CSC": + raise ValueError("invalid valid for order") + self.__vertex_dtype = torch.int64 self._tensor_attr_cls = CuGraphTensorAttr @@ -289,6 +301,7 @@ def __init__( self.__features = F self.__graph = None self.__is_graph_owner = False + self.__order = order if construct_graph: if multi_gpu: @@ -297,7 +310,9 @@ def __init__( ) if self.__graph is None: - self.__graph = self.__construct_graph(G, multi_gpu=multi_gpu) + self.__graph = self.__construct_graph( + G, multi_gpu=multi_gpu, order=order + ) self.__is_graph_owner = True self.__subgraphs = {} @@ -347,6 +362,7 @@ def __construct_graph( self, edge_info: Dict[Tuple[str, str, str], List[TensorType]], multi_gpu: bool = False, + order: str = "CSC", ) -> cugraph.MultiGraph: """ This function takes edge information and uses it to construct @@ -363,6 +379,14 @@ def __construct_graph( multi_gpu: bool (Optional, default=False) Whether to construct a single-GPU or multi-GPU cugraph Graph. Defaults to a single-GPU graph. + + order: str (CSC or CSR) + Essentially whether to reverse edges so that the cuGraph + sampling algorithm operates on the CSC matrix instead of + the CSR matrix. Should nearly always be CSC unless there + is a specific expectation of reverse sampling, or correctness + testing is being performed. + Returns ------- A newly-constructed directed cugraph.MultiGraph object. @@ -371,6 +395,9 @@ def __construct_graph( # Ensure the original dict is not modified. edge_info_cg = {} + if order != "CSR" and order != "CSC": + raise ValueError("Order must be either CSC (default) or CSR!") + # Iterate over the keys in sorted order so that the created # numerical types correspond to the lexicographic order # of the keys, which is critical to converting the numeric @@ -430,20 +457,43 @@ def __construct_graph( df = pandas.DataFrame( { - "src": pandas.Series(na_src), - "dst": pandas.Series(na_dst), + "src": pandas.Series(na_dst) + if order == "CSC" + else pandas.Series(na_src), + "dst": pandas.Series(na_src) + if order == "CSC" + else pandas.Series(na_dst), "etp": pandas.Series(na_etp), } ) + vertex_dtype = df.src.dtype if multi_gpu: nworkers = len(distributed.get_client().scheduler_info()["workers"]) - df = dd.from_pandas(df, npartitions=nworkers).persist() - df = df.map_partitions(cudf.DataFrame.from_pandas) - else: - df = cudf.from_pandas(df) + df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1) + + # Ensure the dataframe is constructed on each partition + # instead of adding additional synchronization head from potential + # host to device copies. + def get_empty_df(): + return cudf.DataFrame( + { + "src": cudf.Series([], dtype=vertex_dtype), + "dst": cudf.Series([], dtype=vertex_dtype), + "etp": cudf.Series([], dtype="int32"), + } + ) - df = df.reset_index(drop=True) + # Have to check for empty partitions and handle them appropriately + df = df.persist() + df = df.map_partitions( + lambda f: cudf.DataFrame.from_pandas(f) + if len(f) > 0 + else get_empty_df(), + meta=get_empty_df(), + ).reset_index(drop=True) + else: + df = cudf.from_pandas(df).reset_index(drop=True) graph = cugraph.MultiGraph(directed=True) if multi_gpu: @@ -468,6 +518,10 @@ def __construct_graph( def _edge_types_to_attrs(self) -> dict: return dict(self.__edge_types_to_attrs) + @property + def order(self) -> str: + return self.__order + @property def node_types(self) -> List[NodeType]: return list(self.__vertex_type_offsets["type"]) @@ -557,6 +611,7 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType raise ValueError("Graph is not in memory, cannot access edge index!") if attr.layout != EdgeLayout.COO: + # TODO support returning CSR/CSC (Issue #3802) raise TypeError("Only COO direct access is supported!") # Currently, graph creation enforces that input vertex ids are always of @@ -566,12 +621,14 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType # This may change in the future if/when renumbering or the graph # creation process is refactored. # See Issue #3201 for more details. + # Also note src/dst are flipped so that cuGraph sampling is done in + # CSC format rather than CSR format. if self._is_delayed: - src_col_name = self.__graph.renumber_map.renumbered_src_col_name - dst_col_name = self.__graph.renumber_map.renumbered_dst_col_name + dst_col_name = self.__graph.renumber_map.renumbered_src_col_name + src_col_name = self.__graph.renumber_map.renumbered_dst_col_name else: - src_col_name = self.__graph.srcCol - dst_col_name = self.__graph.dstCol + dst_col_name = self.__graph.srcCol + src_col_name = self.__graph.dstCol # If there is only one edge type (homogeneous graph) then # bypass the edge filters for a significant speed improvement. @@ -785,29 +842,73 @@ def _get_renumbered_edge_groups_from_sample( """ row_dict = {} col_dict = {} - if len(self.__edge_types_to_attrs) == 1: + # If there is only 1 edge type (includes heterogeneous graphs) + if len(self.edge_types) == 1: t_pyg_type = list(self.__edge_types_to_attrs.values())[0].edge_type src_type, _, dst_type = t_pyg_type - dst_id_table = noi_index[dst_type] - dst_id_map = ( - cudf.Series(cupy.asarray(dst_id_table), name="dst") - .reset_index() - .rename(columns={"index": "new_id"}) - .set_index("dst") - ) - dst = dst_id_map["new_id"].loc[sampling_results.destinations] - col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda") - - src_id_table = noi_index[src_type] - src_id_map = ( - cudf.Series(cupy.asarray(src_id_table), name="src") - .reset_index() - .rename(columns={"index": "new_id"}) - .set_index("src") - ) - src = src_id_map["new_id"].loc[sampling_results.sources] - row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda") + # If there is only 1 node type (homogeneous) + # This should only occur if the cuGraph loader was + # not used. This logic is deprecated. + if len(self.node_types) == 1: + warnings.warn( + "Renumbering after sampling for homogeneous graphs is deprecated.", + FutureWarning, + ) + + # Create a dataframe mapping old ids to new ids. + vtype = src_type + id_table = noi_index[vtype] + id_map = cudf.Series( + cupy.arange(id_table.shape[0], dtype="int32"), + name="new_id", + index=cupy.asarray(id_table), + ).sort_index() + + # Renumber the sources using binary search + # Step 1: get the index of the new id + ix_r = torch.searchsorted( + torch.as_tensor(id_map.index.values, device="cuda"), + torch.as_tensor(sampling_results.sources.values, device="cuda"), + ) + # Step 2: Go from id indices to actual ids + row_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ + ix_r + ] + + # Renumber the destinations using binary search + # Step 1: get the index of the new id + ix_c = torch.searchsorted( + torch.as_tensor(id_map.index.values, device="cuda"), + torch.as_tensor( + sampling_results.destinations.values, device="cuda" + ), + ) + # Step 2: Go from id indices to actual ids + col_dict[t_pyg_type] = torch.as_tensor(id_map.values, device="cuda")[ + ix_c + ] + else: + # Handle the heterogeneous case where there is only 1 edge type + dst_id_table = noi_index[dst_type] + dst_id_map = cudf.DataFrame( + { + "dst": cupy.asarray(dst_id_table), + "new_id": cupy.arange(dst_id_table.shape[0]), + } + ).set_index("dst") + dst = dst_id_map["new_id"].loc[sampling_results.destinations] + col_dict[t_pyg_type] = torch.as_tensor(dst.values, device="cuda") + + src_id_table = noi_index[src_type] + src_id_map = cudf.DataFrame( + { + "src": cupy.asarray(src_id_table), + "new_id": cupy.arange(src_id_table.shape[0]), + } + ).set_index("src") + src = src_id_map["new_id"].loc[sampling_results.sources] + row_dict[t_pyg_type] = torch.as_tensor(src.values, device="cuda") else: # This will retrieve the single string representation. @@ -822,36 +923,18 @@ def _get_renumbered_edge_groups_from_sample( for pyg_can_edge_type_str, ix in eoi_types.items(): pyg_can_edge_type = tuple(pyg_can_edge_type_str.split("__")) - src_type, _, dst_type = pyg_can_edge_type - - # Get the de-offsetted sources - sources = torch.as_tensor( - sampling_results.sources.iloc[ix].values, device="cuda" - ) - sources_ix = torch.searchsorted( - self.__vertex_type_offsets["stop"], sources - ) - sources -= self.__vertex_type_offsets["start"][sources_ix] - # Create the row entry for this type - src_id_table = noi_index[src_type] - src_id_map = ( - cudf.Series(cupy.asarray(src_id_table), name="src") - .reset_index() - .rename(columns={"index": "new_id"}) - .set_index("src") - ) - src = src_id_map["new_id"].loc[cupy.asarray(sources)] - row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda") + if self.__order == "CSR": + src_type, _, dst_type = pyg_can_edge_type + else: # CSC + dst_type, _, src_type = pyg_can_edge_type # Get the de-offsetted destinations + dst_num_type = self._numeric_vertex_type_from_name(dst_type) destinations = torch.as_tensor( sampling_results.destinations.iloc[ix].values, device="cuda" ) - destinations_ix = torch.searchsorted( - self.__vertex_type_offsets["stop"], destinations - ) - destinations -= self.__vertex_type_offsets["start"][destinations_ix] + destinations -= self.__vertex_type_offsets["start"][dst_num_type] # Create the col entry for this type dst_id_table = noi_index[dst_type] @@ -864,6 +947,24 @@ def _get_renumbered_edge_groups_from_sample( dst = dst_id_map["new_id"].loc[cupy.asarray(destinations)] col_dict[pyg_can_edge_type] = torch.as_tensor(dst.values, device="cuda") + # Get the de-offsetted sources + src_num_type = self._numeric_vertex_type_from_name(src_type) + sources = torch.as_tensor( + sampling_results.sources.iloc[ix].values, device="cuda" + ) + sources -= self.__vertex_type_offsets["start"][src_num_type] + + # Create the row entry for this type + src_id_table = noi_index[src_type] + src_id_map = ( + cudf.Series(cupy.asarray(src_id_table), name="src") + .reset_index() + .rename(columns={"index": "new_id"}) + .set_index("src") + ) + src = src_id_map["new_id"].loc[cupy.asarray(sources)] + row_dict[pyg_can_edge_type] = torch.as_tensor(src.values, device="cuda") + return row_dict, col_dict def put_tensor(self, tensor, attr) -> None: @@ -959,9 +1060,7 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType: t = t[-1] if isinstance(t, np.ndarray): - t = torch.as_tensor(t, device="cuda") - else: - t = t.cuda() + t = torch.as_tensor(t, device="cpu") return t @@ -979,7 +1078,6 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType: t = torch.concatenate([t, u]) - t = t.cuda() return t def _multi_get_tensor(self, attrs: List[CuGraphTensorAttr]) -> List[TensorType]: diff --git a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py index 8d79685965f..cf7eb330d67 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py @@ -23,12 +23,15 @@ from cugraph.utilities.utils import import_optional, MissingModule from cugraph_pyg.data import CuGraphStore -from cugraph_pyg.loader.filter import _filter_cugraph_store -from cugraph_pyg.sampler.cugraph_sampler import _sampler_output_from_sampling_results +from cugraph_pyg.sampler.cugraph_sampler import ( + _sampler_output_from_sampling_results_heterogeneous, + _sampler_output_from_sampling_results_homogeneous, +) from typing import Union, Tuple, Sequence, List, Dict torch_geometric = import_optional("torch_geometric") +torch = import_optional("torch") InputNodes = ( Sequence if isinstance(torch_geometric, MissingModule) @@ -253,55 +256,97 @@ def __next__(self): raw_sample_data = cudf.read_parquet(parquet_path) if "map" in raw_sample_data.columns: - self.__renumber_map = raw_sample_data["map"] + num_batches = end_inclusive - self.__start_inclusive + 1 + + map_end = raw_sample_data["map"].iloc[num_batches] + + map = torch.as_tensor( + raw_sample_data["map"].iloc[0:map_end], device="cuda" + ) raw_sample_data.drop("map", axis=1, inplace=True) + + self.__renumber_map_offsets = map[0 : num_batches + 1] - map[0] + self.__renumber_map = map[num_batches + 1 :] + else: self.__renumber_map = None self.__data = raw_sample_data[list(columns.keys())].astype(columns) self.__data.dropna(inplace=True) + if ( + len(self.__graph_store.edge_types) == 1 + and len(self.__graph_store.node_types) == 1 + ): + group_cols = ["batch_id", "hop_id"] + self.__data_index = self.__data.groupby(group_cols, as_index=True).agg( + {"sources": "max", "destinations": "max"} + ) + self.__data_index.rename( + columns={"sources": "src_max", "destinations": "dst_max"}, + inplace=True, + ) + self.__data_index = self.__data_index.to_dict(orient="index") + # Pull the next set of sampling results out of the dataframe in memory f = self.__data["batch_id"] == self.__next_batch if self.__renumber_map is not None: i = self.__next_batch - self.__start_inclusive - ix = self.__renumber_map.iloc[[i, i + 1]] - ix_start, ix_end = ix.iloc[0], ix.iloc[1] - current_renumber_map = self.__renumber_map.iloc[ix_start:ix_end] - if len(current_renumber_map) != ix_end - ix_start: - raise ValueError("invalid renumber map") - else: - current_renumber_map = None - sampler_output = _sampler_output_from_sampling_results( - self.__data[f], current_renumber_map, self.__graph_store - ) + # this should avoid d2h copy + current_renumber_map = self.__renumber_map[ + self.__renumber_map_offsets[i] : self.__renumber_map_offsets[i + 1] + ] - # Get ready for next iteration - self.__next_batch += 1 + else: + current_renumber_map = None # Get and return the sampled subgraph - if isinstance(torch_geometric, MissingModule): - noi_index, row_dict, col_dict, edge_dict = sampler_output["out"] - return _filter_cugraph_store( - self.__feature_store, + if ( + len(self.__graph_store.edge_types) == 1 + and len(self.__graph_store.node_types) == 1 + ): + sampler_output = _sampler_output_from_sampling_results_homogeneous( + self.__data[f], + current_renumber_map, self.__graph_store, - noi_index, - row_dict, - col_dict, - edge_dict, + self.__data_index, + self.__next_batch, ) else: - out = torch_geometric.loader.utils.filter_custom_store( - self.__feature_store, - self.__graph_store, - sampler_output.node, - sampler_output.row, - sampler_output.col, - sampler_output.edge, + sampler_output = _sampler_output_from_sampling_results_heterogeneous( + self.__data[f], current_renumber_map, self.__graph_store ) - return out + # Get ready for next iteration + self.__next_batch += 1 + + # Create a PyG HeteroData object, loading the required features + out = torch_geometric.loader.utils.filter_custom_store( + self.__feature_store, + self.__graph_store, + sampler_output.node, + sampler_output.row, + sampler_output.col, + sampler_output.edge, + ) + + # Account for CSR format in cuGraph vs. CSC format in PyG + if self.__graph_store.order == "CSC": + for node_type in out.edge_index_dict: + out[node_type].edge_index[0], out[node_type].edge_index[1] = ( + out[node_type].edge_index[1], + out[node_type].edge_index[0], + ) + + out.set_value_dict("num_sampled_nodes", sampler_output.num_sampled_nodes) + out.set_value_dict("num_sampled_edges", sampler_output.num_sampled_edges) + + return out + + @property + def _starting_batch_id(self): + return self.__starting_batch_id def __iter__(self): return self diff --git a/python/cugraph-pyg/cugraph_pyg/loader/filter.py b/python/cugraph-pyg/cugraph_pyg/loader/filter.py deleted file mode 100644 index f519ba7cfc9..00000000000 --- a/python/cugraph-pyg/cugraph_pyg/loader/filter.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import cupy - -from cugraph_pyg.data import CuGraphStore - -from typing import ( - Dict, - Sequence, -) - - -def _filter_cugraph_store( - feature_store: CuGraphStore, - graph_store: CuGraphStore, - node_dict: Dict[str, Sequence], - row_dict: Dict[str, Sequence], - col_dict: Dict[str, Sequence], - edge_dict: Dict[str, Sequence], -) -> dict: - """ - Primarily for testing without torch and torch_geometric. - Returns a dictionary containing the sampled subgraph. - """ - data = {} - - for attr in graph_store.get_all_edge_attrs(): - key = attr.edge_type - if key in row_dict and key in col_dict: - edge_index = cupy.stack([row_dict[key], col_dict[key]]) - data[attr.edge_type] = {} - data[attr.edge_type]["edge_index"] = edge_index - - # Filter node storage: - required_attrs = [] - for attr in feature_store.get_all_tensor_attrs(): - if attr.group_name in node_dict: - attr.index = node_dict[attr.group_name] - required_attrs.append(attr) - data[attr.group_name] = {} - data["num_nodes"] = attr.index.size - tensors = feature_store.multi_get_tensor(required_attrs) - for i, attr in enumerate(required_attrs): - data[attr.group_name][attr.attr_name] = tensors[i] - - return data diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py index d4f600006be..6e8c4322418 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py @@ -12,26 +12,21 @@ # limitations under the License. -from typing import Sequence +from typing import Sequence, Dict, Tuple from cugraph_pyg.data import CuGraphStore -from cugraph.utilities.utils import import_optional, MissingModule +from cugraph.utilities.utils import import_optional import cudf dask_cudf = import_optional("dask_cudf") torch_geometric = import_optional("torch_geometric") torch = import_optional("torch") +HeteroSamplerOutput = torch_geometric.sampler.base.HeteroSamplerOutput -HeteroSamplerOutput = ( - None - if isinstance(torch_geometric, MissingModule) - else torch_geometric.sampler.base.HeteroSamplerOutput -) - -def _count_unique_nodes( +def _get_unique_nodes( sampling_results: cudf.DataFrame, graph_store: CuGraphStore, node_type: str, @@ -54,8 +49,8 @@ def _count_unique_nodes( Returns ------- - int - The number of unique nodes of the given node type. + cudf.Series + The unique nodes of the given node type. """ if node_position == "src": edge_index = "sources" @@ -78,12 +73,111 @@ def _count_unique_nodes( sampling_results_node = sampling_results[f] else: - return 0 + return cudf.Series([], dtype="int64") - return sampling_results_node[edge_index].nunique() + return sampling_results_node[edge_index] -def _sampler_output_from_sampling_results( +def _sampler_output_from_sampling_results_homogeneous( + sampling_results: cudf.DataFrame, + renumber_map: torch.Tensor, + graph_store: CuGraphStore, + data_index: Dict[Tuple[int, int], Dict[str, int]], + batch_id: int, + metadata: Sequence = None, +) -> HeteroSamplerOutput: + """ + Parameters + ---------- + sampling_results: cudf.DataFrame + The dataframe containing sampling results. + renumber_map: torch.Tensor + The tensor containing the renumber map, or None if there + is no renumber map. + graph_store: CuGraphStore + The graph store containing the structure of the sampled graph. + data_index: Dict[Tuple[int, int], Dict[str, int]] + Dictionary where keys are the batch id and hop id, + and values are dictionaries containing the max src + and max dst node ids for the batch and hop. + batch_id: int + The current batch id, whose samples are being retrieved + from the sampling results and data index. + metadata: Tensor + The metadata for the sampled batch. + + Returns + ------- + HeteroSamplerOutput + """ + + if len(graph_store.edge_types) > 1 or len(graph_store.node_types) > 1: + raise ValueError("Graph is heterogeneous") + + hops = torch.arange( + sampling_results.hop_id.iloc[len(sampling_results) - 1] + 1, device="cuda" + ) + hops = torch.searchsorted( + torch.as_tensor(sampling_results.hop_id, device="cuda"), hops + ) + + node_type = graph_store.node_types[0] + edge_type = graph_store.edge_types[0] + + num_nodes_per_hop_dict = {node_type: torch.zeros(len(hops) + 1, dtype=torch.int64)} + num_edges_per_hop_dict = {edge_type: torch.zeros(len(hops), dtype=torch.int64)} + + if renumber_map is None: + raise ValueError("Renumbered input is expected for homogeneous graphs") + + noi_index = {node_type: torch.as_tensor(renumber_map, device="cuda")} + + row_dict = { + edge_type: torch.as_tensor(sampling_results.sources, device="cuda"), + } + + col_dict = { + edge_type: torch.as_tensor(sampling_results.destinations, device="cuda"), + } + + num_nodes_per_hop_dict[node_type][0] = data_index[batch_id, 0]["src_max"] + 1 + for hop in range(len(hops)): + hop_ix_start = hops[hop] + hop_ix_end = hops[hop + 1] if hop < len(hops) - 1 else len(sampling_results) + + if num_nodes_per_hop_dict[node_type][hop] > 0: + max_id_hop = data_index[batch_id, hop]["dst_max"] + max_id_prev_hop = ( + data_index[batch_id, hop - 1]["dst_max"] + if hop > 0 + else data_index[batch_id, 0]["src_max"] + ) + + if max_id_hop > max_id_prev_hop: + num_nodes_per_hop_dict[node_type][hop + 1] = ( + max_id_hop - max_id_prev_hop + ) + else: + num_nodes_per_hop_dict[node_type][hop + 1] = 0 + # will default to 0 if the previous hop was 0, since this is a PyG requirement + + num_edges_per_hop_dict[edge_type][hop] = hop_ix_end - hop_ix_start + + if HeteroSamplerOutput is None: + raise ImportError("Error importing from pyg") + + return HeteroSamplerOutput( + node=noi_index, + row=row_dict, + col=col_dict, + edge=None, + num_sampled_nodes=num_nodes_per_hop_dict, + num_sampled_edges=num_edges_per_hop_dict, + metadata=metadata, + ) + + +def _sampler_output_from_sampling_results_heterogeneous( sampling_results: cudf.DataFrame, renumber_map: cudf.Series, graph_store: CuGraphStore, @@ -109,7 +203,7 @@ def _sampler_output_from_sampling_results( hops = torch.arange(sampling_results.hop_id.max() + 1, device="cuda") hops = torch.searchsorted( - torch.as_tensor(sampling_results.hop_id.values, device="cuda"), hops + torch.as_tensor(sampling_results.hop_id, device="cuda"), hops ) num_nodes_per_hop_dict = {} @@ -119,13 +213,11 @@ def _sampler_output_from_sampling_results( sampling_results_hop_0 = sampling_results.iloc[ 0 : (hops[1] if len(hops) > 1 else len(sampling_results)) ] + for node_type in graph_store.node_types: - if len(graph_store.node_types) == 1: - num_unique_nodes = sampling_results_hop_0.sources.nunique() - else: - num_unique_nodes = _count_unique_nodes( - sampling_results_hop_0, graph_store, node_type, "src" - ) + num_unique_nodes = _get_unique_nodes( + sampling_results_hop_0, graph_store, node_type, "src" + ).nunique() if num_unique_nodes > 0: num_nodes_per_hop_dict[node_type] = torch.zeros( @@ -134,112 +226,87 @@ def _sampler_output_from_sampling_results( num_nodes_per_hop_dict[node_type][0] = num_unique_nodes if renumber_map is not None: - if len(graph_store.node_types) > 1 or len(graph_store.edge_types) > 1: - raise ValueError( - "Precomputing the renumber map is currently " - "unsupported for heterogeneous graphs." - ) + raise ValueError( + "Precomputing the renumber map is currently " + "unsupported for heterogeneous graphs." + ) - node_type = graph_store.node_types[0] - if not isinstance(node_type, str): - raise ValueError("Node types must be strings") - noi_index = {node_type: torch.as_tensor(renumber_map.values, device="cuda")} - - edge_type = graph_store.edge_types[0] - if ( - not isinstance(edge_type, tuple) - or not isinstance(edge_type[0], str) - or len(edge_type) != 3 - ): - raise ValueError("Edge types must be 3-tuples of strings") - if edge_type[0] != node_type or edge_type[2] != node_type: - raise ValueError("Edge src/dst type must match for homogeneous graphs") - row_dict = { - edge_type: torch.as_tensor(sampling_results.sources.values, device="cuda"), - } - col_dict = { - edge_type: torch.as_tensor( - sampling_results.destinations.values, device="cuda" + # Calculate nodes of interest based on unique nodes in order of appearance + # Use hop 0 sources since those are the only ones not included in destinations + # Use torch.concat based on benchmark performance (vs. cudf.concat) + + if sampling_results_hop_0 is None: + sampling_results_hop_0 = sampling_results.iloc[ + 0 : (hops[1] if len(hops) > 1 else len(sampling_results)) + ] + + nodes_of_interest = ( + cudf.Series( + torch.concat( + [ + torch.as_tensor(sampling_results_hop_0.sources, device="cuda"), + torch.as_tensor(sampling_results.destinations, device="cuda"), + ] ), - } - else: - # Calculate nodes of interest based on unique nodes in order of appearance - # Use hop 0 sources since those are the only ones not included in destinations - # Use torch.concat based on benchmark performance (vs. cudf.concat) - nodes_of_interest = ( - cudf.Series( - torch.concat( - [ - torch.as_tensor( - sampling_results_hop_0.sources.values, device="cuda" - ), - torch.as_tensor( - sampling_results.destinations.values, device="cuda" - ), - ] - ), - name="nodes_of_interest", - ) - .drop_duplicates() - .sort_index() + name="nodes_of_interest", ) - del sampling_results_hop_0 + .drop_duplicates() + .sort_index() + ) - # Get the grouped node index (for creating the renumbered grouped edge index) - noi_index = graph_store._get_vertex_groups_from_sample( - torch.as_tensor(nodes_of_interest.values, device="cuda") - ) - del nodes_of_interest + # Get the grouped node index (for creating the renumbered grouped edge index) + noi_index = graph_store._get_vertex_groups_from_sample( + torch.as_tensor(nodes_of_interest, device="cuda") + ) + del nodes_of_interest - # Get the new edge index (by type as expected for HeteroData) - # FIXME handle edge ids/types after the C++ updates - row_dict, col_dict = graph_store._get_renumbered_edge_groups_from_sample( - sampling_results, noi_index - ) + # Get the new edge index (by type as expected for HeteroData) + # FIXME handle edge ids/types after the C++ updates + row_dict, col_dict = graph_store._get_renumbered_edge_groups_from_sample( + sampling_results, noi_index + ) for hop in range(len(hops)): hop_ix_start = hops[hop] hop_ix_end = hops[hop + 1] if hop < len(hops) - 1 else len(sampling_results) - sampling_results_hop = sampling_results.iloc[hop_ix_start:hop_ix_end] + sampling_results_to_hop = sampling_results.iloc[0:hop_ix_end] for node_type in graph_store.node_types: - if len(graph_store.node_types) == 1: - num_unique_nodes = sampling_results_hop.destinations.nunique() - else: - num_unique_nodes = _count_unique_nodes( - sampling_results_hop, graph_store, node_type, "dst" - ) + unique_nodes_hop = _get_unique_nodes( + sampling_results_to_hop, graph_store, node_type, "dst" + ) + + unique_nodes_0 = _get_unique_nodes( + sampling_results_hop_0, graph_store, node_type, "src" + ) + + num_unique_nodes = cudf.concat([unique_nodes_0, unique_nodes_hop]).nunique() if num_unique_nodes > 0: if node_type not in num_nodes_per_hop_dict: num_nodes_per_hop_dict[node_type] = torch.zeros( len(hops) + 1, dtype=torch.int64 ) - num_nodes_per_hop_dict[node_type][hop + 1] = num_unique_nodes + num_nodes_per_hop_dict[node_type][hop + 1] = num_unique_nodes - int( + num_nodes_per_hop_dict[node_type][: hop + 1].sum(0) + ) - if len(graph_store.edge_types) == 1: - edge_type = graph_store.edge_types[0] - if edge_type not in num_edges_per_hop_dict: - num_edges_per_hop_dict[edge_type] = torch.zeros( + numeric_etypes, counts = torch.unique( + torch.as_tensor( + sampling_results.iloc[hop_ix_start:hop_ix_end].edge_type, + device="cuda", + ), + return_counts=True, + ) + numeric_etypes = list(numeric_etypes) + counts = list(counts) + for num_etype, count in zip(numeric_etypes, counts): + can_etype = graph_store.numeric_edge_type_to_canonical(num_etype) + if can_etype not in num_edges_per_hop_dict: + num_edges_per_hop_dict[can_etype] = torch.zeros( len(hops), dtype=torch.int64 ) - num_edges_per_hop_dict[graph_store.edge_types[0]][hop] = len( - sampling_results_hop - ) - else: - numeric_etypes, counts = torch.unique( - torch.as_tensor(sampling_results_hop.edge_type.values, device="cuda"), - return_counts=True, - ) - numeric_etypes = list(numeric_etypes) - counts = list(counts) - for num_etype, count in zip(numeric_etypes, counts): - can_etype = graph_store.numeric_edge_type_to_canonical(num_etype) - if can_etype not in num_edges_per_hop_dict: - num_edges_per_hop_dict[can_etype] = torch.zeros( - len(hops), dtype=torch.int64 - ) - num_edges_per_hop_dict[can_etype][hop] = count + num_edges_per_hop_dict[can_etype][hop] = count if HeteroSamplerOutput is None: raise ImportError("Error importing from pyg") diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py index e29f3aea512..55aebf305da 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_loader.py @@ -24,7 +24,7 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_basic(dask_client, karate_gnn): F, G, N = karate_gnn - cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + cugraph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") loader = CuGraphNeighborLoader( (cugraph_store, cugraph_store), torch.arange(N["type0"] + N["type1"], dtype=torch.int64), @@ -52,7 +52,7 @@ def test_cugraph_loader_basic(dask_client, karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_hetero(dask_client, karate_gnn): F, G, N = karate_gnn - cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + cugraph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") loader = CuGraphNeighborLoader( (cugraph_store, cugraph_store), input_nodes=("type1", torch.tensor([0, 1, 2, 5], device="cuda")), diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py index 550852a3303..a1a72a44d0c 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_sampler.py @@ -17,7 +17,9 @@ import pytest from cugraph_pyg.data import CuGraphStore -from cugraph_pyg.sampler.cugraph_sampler import _sampler_output_from_sampling_results +from cugraph_pyg.sampler.cugraph_sampler import ( + _sampler_output_from_sampling_results_heterogeneous, +) from cugraph.gnn import FeatureStore @@ -31,7 +33,7 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_neighbor_sample(dask_client, basic_graph_1): F, G, N = basic_graph_1 - cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + cugraph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") batches = cudf.DataFrame( { @@ -56,7 +58,7 @@ def test_neighbor_sample(dask_client, basic_graph_1): .sort_values(by=["sources", "destinations"]) ) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, renumber_map=None, graph_store=cugraph_store, @@ -84,7 +86,7 @@ def test_neighbor_sample(dask_client, basic_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 1 - assert out.num_sampled_nodes["vt1"].tolist() == [4, 4] + assert out.num_sampled_nodes["vt1"].tolist() == [4, 1] assert len(out.num_sampled_edges) == 1 assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6] @@ -95,7 +97,7 @@ def test_neighbor_sample(dask_client, basic_graph_1): @pytest.mark.skip(reason="broken") def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph_1): F, G, N = multi_edge_multi_vertex_graph_1 - cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) + cugraph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") batches = cudf.DataFrame( { @@ -119,7 +121,7 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph .compute() ) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, renumber_map=None, graph_store=cugraph_store, @@ -144,8 +146,8 @@ def test_neighbor_sample_multi_vertex(dask_client, multi_edge_multi_vertex_graph # check the hop dictionaries assert len(out.num_sampled_nodes) == 2 - assert out.num_sampled_nodes["black"].tolist() == [2, 2] - assert out.num_sampled_nodes["brown"].tolist() == [3, 2] + assert out.num_sampled_nodes["black"].tolist() == [2, 0] + assert out.num_sampled_nodes["brown"].tolist() == [3, 0] assert len(out.num_sampled_edges) == 5 assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2] @@ -186,7 +188,7 @@ def test_neighbor_sample_mock_sampling_results(dask_client): torch.tensor([3.2, 2.1], dtype=torch.float32), type_name="A", feat_name="prop1" ) - graph_store = CuGraphStore(F, G, N, multi_gpu=True) + graph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( @@ -198,7 +200,7 @@ def test_neighbor_sample_mock_sampling_results(dask_client): } ) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( mock_sampling_results, None, graph_store, None ) @@ -218,9 +220,9 @@ def test_neighbor_sample_mock_sampling_results(dask_client): assert out.col[("B", "ba", "A")].tolist() == [1, 1] assert len(out.num_sampled_nodes) == 3 - assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1] - assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0] - assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2] + assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0] + assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0] + assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1] assert len(out.num_sampled_edges) == 3 assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py index a5a59623710..43b1e5da5a0 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/mg/test_mg_cugraph_store.py @@ -117,8 +117,8 @@ def test_get_edge_index(graph, edge_index_type, dask_client): G[et][1] = cudf.Series(G[et][1]) elif edge_index_type == "dask-cudf": for et in list(G.keys()): - G[et][0] = dask_cudf.from_cudf(cudf.Series(G[et][0]), npartitions=2) - G[et][1] = dask_cudf.from_cudf(cudf.Series(G[et][1]), npartitions=2) + G[et][0] = dask_cudf.from_cudf(cudf.Series(G[et][0]), npartitions=1) + G[et][1] = dask_cudf.from_cudf(cudf.Series(G[et][1]), npartitions=1) cugraph_store = CuGraphStore(F, G, N, multi_gpu=True) @@ -215,7 +215,7 @@ def test_renumber_vertices_multi_edge_multi_vertex( def test_renumber_edges(abc_graph, dask_client): F, G, N = abc_graph - graph_store = CuGraphStore(F, G, N, multi_gpu=True) + graph_store = CuGraphStore(F, G, N, multi_gpu=True, order="CSR") # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py index 620f1a5eb85..48a21cb7fd6 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py @@ -26,12 +26,14 @@ from cugraph.utilities.utils import import_optional, MissingModule torch = import_optional("torch") +torch_geometric = import_optional("torch_geometric") +trim_to_layer = import_optional("torch_geometric.utils.trim_to_layer") @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_basic(karate_gnn): F, G, N = karate_gnn - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") loader = CuGraphNeighborLoader( (cugraph_store, cugraph_store), torch.arange(N["type0"] + N["type1"], dtype=torch.int64), @@ -57,7 +59,7 @@ def test_cugraph_loader_basic(karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_hetero(karate_gnn): F, G, N = karate_gnn - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") loader = CuGraphNeighborLoader( (cugraph_store, cugraph_store), input_nodes=("type1", torch.tensor([0, 1, 2, 5], device="cuda")), @@ -82,23 +84,29 @@ def test_cugraph_loader_hetero(karate_gnn): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_from_disk(): + m = [2, 9, 99, 82, 9, 3, 18, 1, 12] + n = torch.arange(1, 1 + len(m), dtype=torch.int32) + x = torch.zeros(256, dtype=torch.int32) + x[torch.tensor(m, dtype=torch.int32)] = n F = FeatureStore() - F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x") + F.add_data(x, "t0", "x") - G = {("t0", "knows", "t0"): 7} - N = {"t0": 7} + G = {("t0", "knows", "t0"): 9080} + N = {"t0": 256} - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6], - "destinations": [6, 4, 3, 2, 2, 1, 5], - "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"), - "edge_id": [5, 10, 15, 20, 25, 30, 35], - "hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"), + "sources": [0, 1, 2, 3, 4, 5, 6, 6], + "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), } ) + map = cudf.Series(m, name="map") + bogus_samples = bogus_samples.join(map, how="outer").sort_index() tempdir = tempfile.TemporaryDirectory() for s in range(256): @@ -115,32 +123,49 @@ def test_cugraph_loader_from_disk(): for sample in loader: num_samples += 1 assert sample["t0"]["num_nodes"] == 7 - # correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6] - assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6] - assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7] + # correct vertex order is [0, 1, 2, 5, 4, 3, 6]; x = [1, 2, 3, 6, 5, 4, 7] + assert sample["t0"]["x"].tolist() == [3, 4, 5, 6, 7, 8, 9] + + edge_index = sample[("t0", "knows", "t0")]["edge_index"] + assert list(edge_index.shape) == [2, 8] + + assert ( + edge_index[0].tolist() + == bogus_samples.sources.dropna().values_host.tolist() + ) + assert ( + edge_index[1].tolist() + == bogus_samples.destinations.dropna().values_host.tolist() + ) assert num_samples == 256 @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_cugraph_loader_from_disk_subset(): + m = [2, 9, 99, 82, 9, 3, 18, 1, 12] + n = torch.arange(1, 1 + len(m), dtype=torch.int32) + x = torch.zeros(256, dtype=torch.int32) + x[torch.tensor(m, dtype=torch.int32)] = n F = FeatureStore() - F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x") + F.add_data(x, "t0", "x") - G = {("t0", "knows", "t0"): 7} - N = {"t0": 7} + G = {("t0", "knows", "t0"): 9080} + N = {"t0": 256} - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6], - "destinations": [6, 4, 3, 2, 2, 1, 5], - "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"), - "edge_id": [5, 10, 15, 20, 25, 30, 35], - "hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"), + "sources": [0, 1, 2, 3, 4, 5, 6, 6], + "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), } ) + map = cudf.Series(m, name="map") + bogus_samples = bogus_samples.join(map, how="outer").sort_index() tempdir = tempfile.TemporaryDirectory() for s in range(256): @@ -159,33 +184,45 @@ def test_cugraph_loader_from_disk_subset(): num_samples += 1 assert sample["t0"]["num_nodes"] == 7 # correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6] - assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6] - assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7] + assert sample["t0"]["x"].tolist() == [3, 4, 5, 6, 7, 8, 9] + + edge_index = sample[("t0", "knows", "t0")]["edge_index"] + assert list(edge_index.shape) == [2, 8] + + assert ( + edge_index[0].tolist() + == bogus_samples.sources.dropna().values_host.tolist() + ) + assert ( + edge_index[1].tolist() + == bogus_samples.destinations.dropna().values_host.tolist() + ) assert num_samples == 100 @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") -def test_cugraph_loader_from_disk_subset_renumbered(): +def test_cugraph_loader_e2e_coo(): + m = [2, 9, 99, 82, 9, 3, 18, 1, 12] + x = torch.randint(3000, (256, 256)).to(torch.float32) F = FeatureStore() - F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x") + F.add_data(x, "t0", "x") - G = {("t0", "knows", "t0"): 7} - N = {"t0": 7} + G = {("t0", "knows", "t0"): 9999} + N = {"t0": 256} - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") bogus_samples = cudf.DataFrame( { - "sources": [0, 1, 2, 3, 4, 5, 6], - "destinations": [6, 4, 3, 2, 2, 1, 5], - "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"), - "edge_id": [5, 10, 15, 20, 25, 30, 35], - "hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"), + "sources": [0, 1, 2, 3, 4, 5, 6, 6], + "destinations": [5, 4, 3, 2, 2, 6, 5, 2], + "edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0, 0], dtype="int32"), + "edge_id": [5, 10, 15, 20, 25, 30, 35, 40], + "hop_id": cudf.Series([0, 0, 0, 1, 1, 1, 2, 2], dtype="int32"), } ) - - map = cudf.Series([2, 9, 0, 2, 1, 3, 4, 6, 5], name="map") + map = cudf.Series(m, name="map") bogus_samples = bogus_samples.join(map, how="outer").sort_index() tempdir = tempfile.TemporaryDirectory() @@ -200,22 +237,35 @@ def test_cugraph_loader_from_disk_subset_renumbered(): input_files=list(os.listdir(tempdir.name))[100:200], ) - num_samples = 0 - for sample in loader: - num_samples += 1 - assert sample["t0"]["num_nodes"] == 7 - # correct vertex order is [0, 2, 1, 3, 4, 6, 5]; x = [1, 3, 2, 4, 5, 7, 6] - assert sample["t0"]["x"].tolist() == [1, 3, 2, 4, 5, 7, 6] + convs = [ + torch_geometric.nn.SAGEConv(256, 64, aggr="mean").cuda(), + torch_geometric.nn.SAGEConv(64, 8, aggr="mean").cuda(), + torch_geometric.nn.SAGEConv(8, 1, aggr="mean").cuda(), + ] - edge_index = sample[("t0", "knows", "t0")]["edge_index"] - assert list(edge_index.shape) == [2, 7] - assert ( - edge_index[0].tolist() - == bogus_samples.sources.dropna().values_host.tolist() - ) - assert ( - edge_index[1].tolist() - == bogus_samples.destinations.dropna().values_host.tolist() - ) + trim = trim_to_layer.TrimToLayer() + relu = torch.nn.functional.relu + dropout = torch.nn.functional.dropout - assert num_samples == 100 + for hetero_data in loader: + ei = hetero_data["t0", "knows", "t0"]["edge_index"] + x = hetero_data["t0"]["x"].cuda() + num_sampled_nodes = hetero_data["t0"]["num_sampled_nodes"] + num_sampled_edges = hetero_data["t0", "knows", "t0"]["num_sampled_edges"] + + print(num_sampled_nodes, num_sampled_edges) + + for i in range(len(convs)): + x, ei, _ = trim(i, num_sampled_nodes, num_sampled_edges, x, ei, None) + + s = x.shape[0] + + x = convs[i](x, ei, size=(s, s)) + x = relu(x) + x = dropout(x, p=0.5) + print(x.shape) + + print(x.shape) + x = x.narrow(dim=0, start=0, length=x.shape[0] - num_sampled_nodes[1]) + + assert list(x.shape) == [3, 1] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py index 08a8625b33b..84f62e80c9d 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_sampler.py @@ -17,7 +17,9 @@ import pytest from cugraph_pyg.data import CuGraphStore -from cugraph_pyg.sampler.cugraph_sampler import _sampler_output_from_sampling_results +from cugraph_pyg.sampler.cugraph_sampler import ( + _sampler_output_from_sampling_results_heterogeneous, +) from cugraph.utilities.utils import import_optional, MissingModule from cugraph import uniform_neighbor_sample @@ -29,7 +31,7 @@ @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_neighbor_sample(basic_graph_1): F, G, N = basic_graph_1 - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") batches = cudf.DataFrame( { @@ -49,7 +51,7 @@ def test_neighbor_sample(basic_graph_1): return_offsets=False, ).sort_values(by=["sources", "destinations"]) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, renumber_map=None, graph_store=cugraph_store, @@ -77,7 +79,7 @@ def test_neighbor_sample(basic_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 1 - assert out.num_sampled_nodes["vt1"].tolist() == [4, 4] + assert out.num_sampled_nodes["vt1"].tolist() == [4, 1] assert len(out.num_sampled_edges) == 1 assert out.num_sampled_edges[("vt1", "pig", "vt1")].tolist() == [6] @@ -87,7 +89,7 @@ def test_neighbor_sample(basic_graph_1): @pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): F, G, N = multi_edge_multi_vertex_graph_1 - cugraph_store = CuGraphStore(F, G, N) + cugraph_store = CuGraphStore(F, G, N, order="CSR") batches = cudf.DataFrame( { @@ -107,7 +109,7 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): with_batch_ids=True, ).sort_values(by=["sources", "destinations"]) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( sampling_results=sampling_results, renumber_map=None, graph_store=cugraph_store, @@ -132,8 +134,8 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): # check the hop dictionaries assert len(out.num_sampled_nodes) == 2 - assert out.num_sampled_nodes["black"].tolist() == [2, 2] - assert out.num_sampled_nodes["brown"].tolist() == [3, 2] + assert out.num_sampled_nodes["black"].tolist() == [2, 0] + assert out.num_sampled_nodes["brown"].tolist() == [3, 0] assert len(out.num_sampled_edges) == 5 assert out.num_sampled_edges[("brown", "horse", "brown")].tolist() == [2] @@ -147,7 +149,7 @@ def test_neighbor_sample_multi_vertex(multi_edge_multi_vertex_graph_1): def test_neighbor_sample_mock_sampling_results(abc_graph): F, G, N = abc_graph - graph_store = CuGraphStore(F, G, N) + graph_store = CuGraphStore(F, G, N, order="CSR") # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame( @@ -159,7 +161,7 @@ def test_neighbor_sample_mock_sampling_results(abc_graph): } ) - out = _sampler_output_from_sampling_results( + out = _sampler_output_from_sampling_results_heterogeneous( mock_sampling_results, None, graph_store, None ) @@ -179,9 +181,9 @@ def test_neighbor_sample_mock_sampling_results(abc_graph): assert out.col[("B", "ba", "A")].tolist() == [1, 1] assert len(out.num_sampled_nodes) == 3 - assert out.num_sampled_nodes["A"].tolist() == [2, 0, 1, 0, 1] - assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 1, 0] - assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 2] + assert out.num_sampled_nodes["A"].tolist() == [2, 0, 0, 0, 0] + assert out.num_sampled_nodes["B"].tolist() == [0, 2, 0, 0, 0] + assert out.num_sampled_nodes["C"].tolist() == [0, 0, 2, 0, 1] assert len(out.num_sampled_edges) == 3 assert out.num_sampled_edges[("A", "ab", "B")].tolist() == [3, 0, 1, 0] diff --git a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py index 289dd69a829..e815b813050 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_store.py @@ -199,7 +199,7 @@ def test_renumber_vertices_multi_edge_multi_vertex(multi_edge_multi_vertex_graph def test_renumber_edges(abc_graph): F, G, N = abc_graph - graph_store = CuGraphStore(F, G, N) + graph_store = CuGraphStore(F, G, N, order="CSR") # let 0, 1 be the start vertices, fanout = [2, 1, 2, 3] mock_sampling_results = cudf.DataFrame(