From 35af4b4ee3506a46efb2664d76681445c26b5ef6 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Mon, 18 Nov 2024 09:00:32 -0800 Subject: [PATCH 1/7] heterogeneous sampling --- .../loader/link_neighbor_loader.py | 14 ++- .../cugraph_pyg/loader/neighbor_loader.py | 13 +- .../cugraph_pyg/sampler/sampler.py | 118 +++++++++++++++++- 3 files changed, 134 insertions(+), 11 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py index 0805653..4c2e5b7 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py @@ -188,10 +188,19 @@ def __init__( # Will eventually automatically convert these objects to cuGraph objects. raise NotImplementedError("Currently can't accept non-cugraph graphs") + + feature_store, graph_store = data + if compression is None: - compression = "CSR" + compression = "CSR" if graph_store.is_homogeneous else 'COO' elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") + + if (not graph_store.is_homogeneous): + if compression != 'COO': + raise ValueError("Only COO format is supported for heterogeneous graphs!") + if directory is not None: + raise ValueError("Writing to disk is not supported for heterogeneous graphs!") writer = ( None @@ -203,8 +212,6 @@ def __init__( ) ) - feature_store, graph_store = data - if weight_attr is not None: graph_store._set_weight_attr((feature_store, weight_attr)) @@ -221,6 +228,7 @@ def __init__( with_replacement=replace, local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), + heterogeneous=(not graph_store.is_homogeneous) ), (feature_store, graph_store), batch_size=batch_size, diff --git a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py index 1da2c6d..a24c5c1 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py @@ -181,11 +181,19 @@ def __init__( # Will eventually automatically convert these objects to cuGraph objects. raise NotImplementedError("Currently can't accept non-cugraph graphs") + feature_store, graph_store = data + if compression is None: - compression = "CSR" + compression = "CSR" if graph_store.is_homogeneous else 'COO' elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") + if (not graph_store.is_homogeneous): + if compression != 'COO': + raise ValueError("Only COO format is supported for heterogeneous graphs!") + if directory is not None: + raise ValueError("Writing to disk is not supported for heterogeneous graphs!") + writer = ( None if directory is None @@ -196,8 +204,6 @@ def __init__( ) ) - feature_store, graph_store = data - if weight_attr is not None: graph_store._set_weight_attr((feature_store, weight_attr)) @@ -214,6 +220,7 @@ def __init__( with_replacement=replace, local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), + heterogeneous=(not graph_store.is_homogeneous) ), (feature_store, graph_store), batch_size=batch_size, diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index bc3d4fd..28f3176 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -216,6 +216,118 @@ def __iter__(self): return self +class HeterogeneousSampleReader(SampleReader): + """ + Subclass of SampleReader that reads heterogeneous output samples + produced by the cuGraph distributed sampler. + """ + + def __init__( + self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]] + ): + """ + Constructs a new HeterogeneousSampleReader + + Parameters + ---------- + base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]] + The iterator responsible for loading saved samples produced by + the cuGraph distributed sampler. + """ + super().__init__(base_reader) + + + def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): + fanout_length = raw_sample_data['fanout'].numel() + num_edge_types = raw_sample_data['edge_type'].max() + 1 + + major_minor_start = raw_sample_data["label_hop_offsets"][index * num_edge_types * fanout_length] + ix_end = (index + 1) * fanout_length + if ix_end == raw_sample_data["label_hop_offsets"].numel(): + major_minor_end = raw_sample_data["majors"].numel() + else: + major_minor_end = raw_sample_data["label_hop_offsets"][ix_end] + + majors = raw_sample_data["majors"][major_minor_start:major_minor_end] + minors = raw_sample_data["minors"][major_minor_start:major_minor_end] + edge_id = raw_sample_data["edge_id"][major_minor_start:major_minor_end] + edge_type = raw_sample_data['edge_type'][major_minor_start:major_minor_end] + + renumber_map_start = raw_sample_data["renumber_map_offsets"][index] + renumber_map_end = raw_sample_data["renumber_map_offsets"][index + 1] + + renumber_map = raw_sample_data["map"][renumber_map_start:renumber_map_end] + + num_sampled_edges = ( + raw_sample_data["label_hop_offsets"][ + index * fanout_length : (index + 1) * fanout_length + 1 + ] + .diff() + .cpu() + ) + + num_seeds = (majors[: num_sampled_edges[0]].max() + 1).reshape((1,)).cpu() + num_sampled_nodes_hops = torch.tensor( + [ + minors[: num_sampled_edges[:i].sum()].max() + 1 + for i in range(1, fanout_length + 1) + ], + device="cpu", + ) + + num_sampled_nodes = torch.concat( + [num_seeds, num_sampled_nodes_hops.diff(prepend=num_seeds)] + ) + + input_index = raw_sample_data["input_index"][ + raw_sample_data["input_offsets"][index] : raw_sample_data["input_offsets"][ + index + 1 + ] + ] + + edge_inverse = ( + ( + raw_sample_data["edge_inverse"][ + (raw_sample_data["input_offsets"][index] * 2) : ( + raw_sample_data["input_offsets"][index + 1] * 2 + ) + ] + ) + if "edge_inverse" in raw_sample_data + else None + ) + + if edge_inverse is None: + metadata = ( + input_index, + None, # TODO this will eventually include time + ) + else: + metadata = ( + input_index, + edge_inverse.view(2, -1), + None, + None, # TODO this will eventually include time + ) + + return torch_geometric.sampler.SamplerOutput( + node=renumber_map.cpu(), + row=minors, + col=majors, + edge=edge_id, + batch=renumber_map[:num_seeds], + num_sampled_nodes=num_sampled_nodes, + num_sampled_edges=num_sampled_edges, + metadata=metadata, + ) + + def _decode(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): + if "major_offsets" in raw_sample_data: + raise ValueError("CSR format not currently supported for heterogeneous graphs") + else: + return self.__decode_coo(raw_sample_data, index) + + class HomogeneousSampleReader(SampleReader): """ Subclass of SampleReader that reads homogeneous output samples @@ -465,11 +577,7 @@ def sample_from_nodes( ): return HomogeneousSampleReader(reader) else: - # TODO implement heterogeneous sampling - raise NotImplementedError( - "Sampling heterogeneous graphs is currently" - " unsupported in the non-dask API" - ) + return HeterogeneousSampleReader(reader) def sample_from_edges( self, From 8da5c951f73244de2ee56afd0225b0171db72483 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Fri, 22 Nov 2024 08:24:36 -0800 Subject: [PATCH 2/7] c --- .../cugraph_pyg/data/graph_store.py | 35 +++++ .../cugraph_pyg/loader/link_loader.py | 4 + .../loader/link_neighbor_loader.py | 3 +- .../cugraph_pyg/loader/neighbor_loader.py | 3 +- .../cugraph_pyg/loader/node_loader.py | 2 + .../cugraph_pyg/sampler/sampler.py | 125 +++++++++++------- 6 files changed, 123 insertions(+), 49 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py index c47dda5..ae4caa3 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py @@ -70,6 +70,7 @@ def __clear_graph(self): self.__graph = None self.__vertex_offsets = None self.__weight_attr = None + self.__numeric_edge_types = None def _put_edge_index( self, @@ -173,6 +174,7 @@ def _graph(self) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]: else None, ) else: + print(edgelist_dict) self.__graph = pylibcugraph.SGGraph( self._resource_handle, graph_properties, @@ -270,6 +272,39 @@ def __get_weight_tensor( return torch.concat(weights) + @property + def _numeric_edge_types(self) -> Tuple[List, "torch.Tensor", "torch.Tensor"]: + """ + Returns the canonical edge types in order (the 0th canonical type corresponds + to numeric edge type 0, etc.), along with the numeric source and destination + vertex types for each edge type. + """ + + if self.__numeric_edge_types is None: + sorted_keys = sorted( + list(self.__edge_indices.keys(leaves_only=True, include_nested=True)) + ) + + vtype_table = { + k: i + for i, k in enumerate(sorted(self._vertex_offsets.keys())) + } + + srcs = [] + dsts = [] + + for can_etype in sorted_keys: + srcs.append( + vtype_table[can_etype[0]] + ) + dsts.append( + vtype_table[can_etype[2]] + ) + + self.__numeric_edge_types = (sorted_keys, torch.tensor(srcs,device='cuda',dtype=torch.int32), torch.tensor(dsts,device='cuda',dtype=torch.int32)) + + return self.__numeric_edge_types + def __get_edgelist(self): """ Returns diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py index 77e2ac4..63018fd 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py @@ -128,6 +128,10 @@ def __init__( (None, edge_label_index), ) + # Note reverse of standard convention here + edge_label_index[0] += data[1]._vertex_offsets[input_type[0]] + edge_label_index[1] += data[1]._vertex_offsets[input_type[2]] + self.__input_data = torch_geometric.sampler.EdgeSamplerInput( input_id=torch.arange( edge_label_index[0].numel(), dtype=torch.int64, device="cuda" diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py index 4c2e5b7..eacb75c 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py @@ -228,7 +228,8 @@ def __init__( with_replacement=replace, local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), - heterogeneous=(not graph_store.is_homogeneous) + heterogeneous=(not graph_store.is_homogeneous), + num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), batch_size=batch_size, diff --git a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py index a24c5c1..125708a 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py @@ -220,7 +220,8 @@ def __init__( with_replacement=replace, local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), - heterogeneous=(not graph_store.is_homogeneous) + heterogeneous=(not graph_store.is_homogeneous), + num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), batch_size=batch_size, diff --git a/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py index 4b236f7..cac788e 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py @@ -110,6 +110,8 @@ def __init__( input_id, ) + input_nodes += data[1]._vertex_offsets[input_type] + self.__input_data = torch_geometric.sampler.NodeSamplerInput( input_id=torch.arange(len(input_nodes), dtype=torch.int64, device="cuda") if input_id is None diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 28f3176..5227c2b 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Iterator, Union, Dict, Tuple +from typing import Optional, Iterator, Union, Dict, Tuple, List from cugraph.utilities.utils import import_optional from cugraph.gnn import DistSampler @@ -189,12 +189,14 @@ def __next__(self): self.__raw_sample_data, start_inclusive, end_inclusive = next( self.__base_reader ) + print(self.__raw_sample_data) + lho_name = "label_type_hop_offsets" if "label_type_hop_offsets" in self.__raw_sample_data else "label_type_hop_offsets" self.__raw_sample_data["input_offsets"] -= self.__raw_sample_data[ "input_offsets" ][0].clone() - self.__raw_sample_data["label_hop_offsets"] -= self.__raw_sample_data[ - "label_hop_offsets" + self.__raw_sample_data[lho_name] -= self.__raw_sample_data[ + lho_name ][0].clone() self.__raw_sample_data["renumber_map_offsets"] -= self.__raw_sample_data[ "renumber_map_offsets" @@ -223,7 +225,7 @@ class HeterogeneousSampleReader(SampleReader): """ def __init__( - self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]] + self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], src_types: "torch.Tensor", dst_types: "torch.Tensor", edge_types: List[Tuple[str, str, str]] ): """ Constructs a new HeterogeneousSampleReader @@ -233,51 +235,71 @@ def __init__( base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]] The iterator responsible for loading saved samples produced by the cuGraph distributed sampler. + src_types: torch.Tensor + Integer source type for each integer edge type. + dst_types: torch.Tensor + Integer destination type for each integer edge type. + edge_types: List[Tuple[str, str, str]] + List of edge types in the graph in order, so they can be + mapped to numeric edge types. """ + + self.__src_types = src_types + self.__dst_types = dst_types + self.__edge_types = edge_types + self.__num_vertex_types = max(self.__src_types.max(), self.__dst_types.max()) + 1 + super().__init__(base_reader) def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): fanout_length = raw_sample_data['fanout'].numel() - num_edge_types = raw_sample_data['edge_type'].max() + 1 - - major_minor_start = raw_sample_data["label_hop_offsets"][index * num_edge_types * fanout_length] - ix_end = (index + 1) * fanout_length - if ix_end == raw_sample_data["label_hop_offsets"].numel(): - major_minor_end = raw_sample_data["majors"].numel() - else: - major_minor_end = raw_sample_data["label_hop_offsets"][ix_end] + num_edge_types = self.__src_types.numel() + + num_sampled_edges = {} + node = {} + row = {} + col = {} + edge = {} + for etype in range(num_edge_types): + pyg_can_etype = self.__edge_types[etype] + + jx = self.__src_types[etype] + index * self.__num_vertex_types + map_ptr_src_beg, map_ptr_src_end = raw_sample_data["renumber_map_offsets"][ + [jx, jx + 1] + ] + map_src = raw_sample_data["renumber_map"][map_ptr_src_beg:map_ptr_src_end] + node[pyg_can_etype[0]] = map_src.cpu() - majors = raw_sample_data["majors"][major_minor_start:major_minor_end] - minors = raw_sample_data["minors"][major_minor_start:major_minor_end] - edge_id = raw_sample_data["edge_id"][major_minor_start:major_minor_end] - edge_type = raw_sample_data['edge_type'][major_minor_start:major_minor_end] + kx = self.__dst_types[etype] + index * self.__num_vertex_types + map_ptr_dst_beg, map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][ + [kx, kx + 1] + ] + map_dst = raw_sample_data["renumber_map"][map_ptr_dst_beg:map_ptr_dst_end] + node[pyg_can_etype[2]] = map_dst.cpu() - renumber_map_start = raw_sample_data["renumber_map_offsets"][index] - renumber_map_end = raw_sample_data["renumber_map_offsets"][index + 1] + edge_ptr_beg = index * num_edge_types * fanout_length + etype * fanout_length + edge_ptr_end = index * num_edge_types * fanout_length + (etype+1) * fanout_length + lho = raw_sample_data['label_type_hop_offsets'][ + edge_ptr_beg:edge_ptr_end + ] - renumber_map = raw_sample_data["map"][renumber_map_start:renumber_map_end] + num_sampled_edges[pyg_can_etype] = ( + lho + ).diff().cpu() - num_sampled_edges = ( - raw_sample_data["label_hop_offsets"][ - index * fanout_length : (index + 1) * fanout_length + 1 + eid_i = raw_sample_data["edge_id"][edge_ptr_beg:edge_ptr_end] + eirx = (index * num_edge_types) + etype + edge_id_ptr_beg, edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][ + [eirx, eirx + 1] ] - .diff() - .cpu() - ) + emap = raw_sample_data["edge_renumber_map"][edge_id_ptr_beg:edge_id_ptr_end] + edge[pyg_can_etype] = emap[eid_i] - num_seeds = (majors[: num_sampled_edges[0]].max() + 1).reshape((1,)).cpu() - num_sampled_nodes_hops = torch.tensor( - [ - minors[: num_sampled_edges[:i].sum()].max() + 1 - for i in range(1, fanout_length + 1) - ], - device="cpu", - ) + col[pyg_can_etype] = raw_sample_data['majors'][edge_ptr_beg:edge_ptr_end] + row[pyg_can_etype] = raw_sample_data['minors'][edge_ptr_beg:edge_ptr_end] - num_sampled_nodes = torch.concat( - [num_seeds, num_sampled_nodes_hops.diff(prepend=num_seeds)] - ) + num_sampled_nodes = {} input_index = raw_sample_data["input_index"][ raw_sample_data["input_offsets"][index] : raw_sample_data["input_offsets"][ @@ -310,12 +332,12 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): None, # TODO this will eventually include time ) - return torch_geometric.sampler.SamplerOutput( - node=renumber_map.cpu(), - row=minors, - col=majors, - edge=edge_id, - batch=renumber_map[:num_seeds], + return torch_geometric.sampler.HeteroSamplerOutput( + node=node, + row=row, + col=col, + edge=edge, + batch=None, num_sampled_nodes=num_sampled_nodes, num_sampled_edges=num_sampled_edges, metadata=metadata, @@ -577,7 +599,14 @@ def sample_from_nodes( ): return HomogeneousSampleReader(reader) else: - return HeterogeneousSampleReader(reader) + edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + + return HeterogeneousSampleReader( + reader, + src_types=src_types, + dst_types=dst_types, + edge_types=edge_types, + ) def sample_from_edges( self, @@ -641,8 +670,10 @@ def sample_from_edges( ): return HomogeneousSampleReader(reader) else: - # TODO implement heterogeneous sampling - raise NotImplementedError( - "Sampling heterogeneous graphs is currently" - " unsupported in the non-dask API" + edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + return HeterogeneousSampleReader( + reader, + src_types=src_types, + dst_types=dst_types, + edge_types=edge_types, ) From 4587bd9395d782d38d56b26a4a0719c908fb0178 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Mon, 2 Dec 2024 09:07:39 -0800 Subject: [PATCH 3/7] reformat --- .../cugraph_pyg/data/graph_store.py | 43 ++++++--- .../loader/link_neighbor_loader.py | 18 ++-- .../cugraph_pyg/loader/neighbor_loader.py | 15 ++- .../cugraph_pyg/sampler/sampler.py | 96 ++++++++++++------- 4 files changed, 114 insertions(+), 58 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py index ae4caa3..002028d 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py @@ -174,6 +174,7 @@ def _graph(self) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]: else None, ) else: + print(self._vertex_offsets) print(edgelist_dict) self.__graph = pylibcugraph.SGGraph( self._resource_handle, @@ -242,6 +243,27 @@ def _vertex_offsets(self) -> Dict[str, int]: return dict(self.__vertex_offsets) + @property + def _vertex_offset_array(self) -> "torch.Tensor": + off = torch.tensor( + [self._vertex_offsets[k] for k in sorted(self._vertex_offsets.keys())], + dtype=torch.int64, + device="cuda", + ) + + return torch.concat( + [ + off, + torch.tensor( + list(self._num_vertices().values()), + device="cuda", + dtype=torch.int64, + ) + .sum() + .reshape((1,)), + ] + ) + @property def is_homogeneous(self) -> bool: return len(self._vertex_offsets) == 1 @@ -286,23 +308,22 @@ def _numeric_edge_types(self) -> Tuple[List, "torch.Tensor", "torch.Tensor"]: ) vtype_table = { - k: i - for i, k in enumerate(sorted(self._vertex_offsets.keys())) + k: i for i, k in enumerate(sorted(self._vertex_offsets.keys())) } srcs = [] dsts = [] for can_etype in sorted_keys: - srcs.append( - vtype_table[can_etype[0]] - ) - dsts.append( - vtype_table[can_etype[2]] - ) - - self.__numeric_edge_types = (sorted_keys, torch.tensor(srcs,device='cuda',dtype=torch.int32), torch.tensor(dsts,device='cuda',dtype=torch.int32)) - + srcs.append(vtype_table[can_etype[0]]) + dsts.append(vtype_table[can_etype[2]]) + + self.__numeric_edge_types = ( + sorted_keys, + torch.tensor(srcs, device="cuda", dtype=torch.int32), + torch.tensor(dsts, device="cuda", dtype=torch.int32), + ) + return self.__numeric_edge_types def __get_edgelist(self): diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py index eacb75c..a511ebb 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py @@ -188,19 +188,22 @@ def __init__( # Will eventually automatically convert these objects to cuGraph objects. raise NotImplementedError("Currently can't accept non-cugraph graphs") - feature_store, graph_store = data if compression is None: - compression = "CSR" if graph_store.is_homogeneous else 'COO' + compression = "CSR" if graph_store.is_homogeneous else "COO" elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") - - if (not graph_store.is_homogeneous): - if compression != 'COO': - raise ValueError("Only COO format is supported for heterogeneous graphs!") + + if not graph_store.is_homogeneous: + if compression != "COO": + raise ValueError( + "Only COO format is supported for heterogeneous graphs!" + ) if directory is not None: - raise ValueError("Writing to disk is not supported for heterogeneous graphs!") + raise ValueError( + "Writing to disk is not supported for heterogeneous graphs!" + ) writer = ( None @@ -229,6 +232,7 @@ def __init__( local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), heterogeneous=(not graph_store.is_homogeneous), + vertex_type_offsets=graph_store._vertex_offset_array, num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), diff --git a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py index 125708a..98b6432 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py @@ -184,15 +184,19 @@ def __init__( feature_store, graph_store = data if compression is None: - compression = "CSR" if graph_store.is_homogeneous else 'COO' + compression = "CSR" if graph_store.is_homogeneous else "COO" elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") - if (not graph_store.is_homogeneous): - if compression != 'COO': - raise ValueError("Only COO format is supported for heterogeneous graphs!") + if not graph_store.is_homogeneous: + if compression != "COO": + raise ValueError( + "Only COO format is supported for heterogeneous graphs!" + ) if directory is not None: - raise ValueError("Writing to disk is not supported for heterogeneous graphs!") + raise ValueError( + "Writing to disk is not supported for heterogeneous graphs!" + ) writer = ( None @@ -221,6 +225,7 @@ def __init__( local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), heterogeneous=(not graph_store.is_homogeneous), + vertex_type_offsets=graph_store._vertex_offset_array, num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 5227c2b..1603b88 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -122,7 +122,7 @@ def __next__(self): elif isinstance(next_sample, torch_geometric.sampler.HeteroSamplerOutput): col = {} - for edge_type, col_idx in next_sample.col: + for edge_type, col_idx in next_sample.col.items(): sz = next_sample.edge[edge_type].numel() if sz == col_idx.numel(): col[edge_type] = col_idx @@ -190,14 +190,18 @@ def __next__(self): self.__base_reader ) print(self.__raw_sample_data) - lho_name = "label_type_hop_offsets" if "label_type_hop_offsets" in self.__raw_sample_data else "label_type_hop_offsets" + lho_name = ( + "label_type_hop_offsets" + if "label_type_hop_offsets" in self.__raw_sample_data + else "label_type_hop_offsets" + ) self.__raw_sample_data["input_offsets"] -= self.__raw_sample_data[ "input_offsets" ][0].clone() - self.__raw_sample_data[lho_name] -= self.__raw_sample_data[ - lho_name - ][0].clone() + self.__raw_sample_data[lho_name] -= self.__raw_sample_data[lho_name][ + 0 + ].clone() self.__raw_sample_data["renumber_map_offsets"] -= self.__raw_sample_data[ "renumber_map_offsets" ][0].clone() @@ -225,7 +229,12 @@ class HeterogeneousSampleReader(SampleReader): """ def __init__( - self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], src_types: "torch.Tensor", dst_types: "torch.Tensor", edge_types: List[Tuple[str, str, str]] + self, + base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], + src_types: "torch.Tensor", + dst_types: "torch.Tensor", + vertex_offsets: "torch.Tensor", + edge_types: List[Tuple[str, str, str]], ): """ Constructs a new HeterogeneousSampleReader @@ -239,6 +248,10 @@ def __init__( Integer source type for each integer edge type. dst_types: torch.Tensor Integer destination type for each integer edge type. + vertex_offsets: torch.Tensor + Vertex offsets for each vertex type. Used to de-offset vertices + outputted by the cuGraph sampler and return PyG-compliant vertex + IDs. edge_types: List[Tuple[str, str, str]] List of edge types in the graph in order, so they can be mapped to numeric edge types. @@ -247,14 +260,16 @@ def __init__( self.__src_types = src_types self.__dst_types = dst_types self.__edge_types = edge_types - self.__num_vertex_types = max(self.__src_types.max(), self.__dst_types.max()) + 1 + self.__num_vertex_types = ( + max(self.__src_types.max(), self.__dst_types.max()) + 1 + ) + self.__vertex_offsets = vertex_offsets super().__init__(base_reader) - def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): - fanout_length = raw_sample_data['fanout'].numel() num_edge_types = self.__src_types.numel() + fanout_length = raw_sample_data["fanout"].numel() // num_edge_types num_sampled_edges = {} node = {} @@ -264,40 +279,47 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): for etype in range(num_edge_types): pyg_can_etype = self.__edge_types[etype] + print(raw_sample_data["map"]) + print(raw_sample_data["renumber_map_offsets"]) + jx = self.__src_types[etype] + index * self.__num_vertex_types - map_ptr_src_beg, map_ptr_src_end = raw_sample_data["renumber_map_offsets"][ - [jx, jx + 1] - ] - map_src = raw_sample_data["renumber_map"][map_ptr_src_beg:map_ptr_src_end] - node[pyg_can_etype[0]] = map_src.cpu() + map_ptr_src_beg = raw_sample_data["renumber_map_offsets"][jx] + map_ptr_src_end = raw_sample_data["renumber_map_offsets"][jx + 1] + + map_src = raw_sample_data["map"][map_ptr_src_beg:map_ptr_src_end] + node[pyg_can_etype[0]] = ( + map_src - self.__vertex_offsets[self.__src_types[etype]] + ).cpu() kx = self.__dst_types[etype] + index * self.__num_vertex_types - map_ptr_dst_beg, map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][ - [kx, kx + 1] - ] - map_dst = raw_sample_data["renumber_map"][map_ptr_dst_beg:map_ptr_dst_end] - node[pyg_can_etype[2]] = map_dst.cpu() + map_ptr_dst_beg = raw_sample_data["renumber_map_offsets"][kx] + map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][kx + 1] - edge_ptr_beg = index * num_edge_types * fanout_length + etype * fanout_length - edge_ptr_end = index * num_edge_types * fanout_length + (etype+1) * fanout_length - lho = raw_sample_data['label_type_hop_offsets'][ - edge_ptr_beg:edge_ptr_end - ] + map_dst = raw_sample_data["map"][map_ptr_dst_beg:map_ptr_dst_end] + node[pyg_can_etype[2]] = ( + map_dst - self.__vertex_offsets[self.__dst_types[etype]] + ).cpu() + + edge_ptr_beg = ( + index * num_edge_types * fanout_length + etype * fanout_length + ) + edge_ptr_end = ( + index * num_edge_types * fanout_length + (etype + 1) * fanout_length + ) + lho = raw_sample_data["label_type_hop_offsets"][edge_ptr_beg:edge_ptr_end] - num_sampled_edges[pyg_can_etype] = ( - lho - ).diff().cpu() + num_sampled_edges[pyg_can_etype] = (lho).diff().cpu() eid_i = raw_sample_data["edge_id"][edge_ptr_beg:edge_ptr_end] eirx = (index * num_edge_types) + etype - edge_id_ptr_beg, edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][ - [eirx, eirx + 1] - ] + edge_id_ptr_beg = raw_sample_data["edge_renumber_map_offsets"][eirx] + edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][eirx + 1] + emap = raw_sample_data["edge_renumber_map"][edge_id_ptr_beg:edge_id_ptr_end] edge[pyg_can_etype] = emap[eid_i] - col[pyg_can_etype] = raw_sample_data['majors'][edge_ptr_beg:edge_ptr_end] - row[pyg_can_etype] = raw_sample_data['minors'][edge_ptr_beg:edge_ptr_end] + col[pyg_can_etype] = raw_sample_data["majors"][edge_ptr_beg:edge_ptr_end] + row[pyg_can_etype] = raw_sample_data["minors"][edge_ptr_beg:edge_ptr_end] num_sampled_nodes = {} @@ -345,7 +367,9 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): def _decode(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): if "major_offsets" in raw_sample_data: - raise ValueError("CSR format not currently supported for heterogeneous graphs") + raise ValueError( + "CSR format not currently supported for heterogeneous graphs" + ) else: return self.__decode_coo(raw_sample_data, index) @@ -599,13 +623,14 @@ def sample_from_nodes( ): return HomogeneousSampleReader(reader) else: - edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types return HeterogeneousSampleReader( reader, src_types=src_types, dst_types=dst_types, edge_types=edge_types, + vertex_offsets=self.__graph_store._vertex_offset_array, ) def sample_from_edges( @@ -670,10 +695,11 @@ def sample_from_edges( ): return HomogeneousSampleReader(reader) else: - edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types return HeterogeneousSampleReader( reader, src_types=src_types, dst_types=dst_types, edge_types=edge_types, + vertex_offsets=self.__graph_store._vertex_offset_array, ) From 0786495230752339e1102cd2334eca6512437f33 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Thu, 5 Dec 2024 13:02:32 -0800 Subject: [PATCH 4/7] fix various bugs --- .../cugraph_pyg/loader/link_loader.py | 5 +- .../cugraph_pyg/loader/node_loader.py | 4 +- .../cugraph_pyg/sampler/sampler.py | 18 +++--- .../tests/loader/test_neighbor_loader.py | 61 +++++++++++++++++++ 4 files changed, 75 insertions(+), 13 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py index 63018fd..943f361 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_loader.py @@ -129,8 +129,9 @@ def __init__( ) # Note reverse of standard convention here - edge_label_index[0] += data[1]._vertex_offsets[input_type[0]] - edge_label_index[1] += data[1]._vertex_offsets[input_type[2]] + if input_type is not None: + edge_label_index[0] += data[1]._vertex_offsets[input_type[0]] + edge_label_index[1] += data[1]._vertex_offsets[input_type[2]] self.__input_data = torch_geometric.sampler.EdgeSamplerInput( input_id=torch.arange( diff --git a/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py index cac788e..52251d5 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/node_loader.py @@ -109,8 +109,8 @@ def __init__( input_nodes, input_id, ) - - input_nodes += data[1]._vertex_offsets[input_type] + if input_type is not None: + input_nodes += data[1]._vertex_offsets[input_type] self.__input_data = torch_geometric.sampler.NodeSamplerInput( input_id=torch.arange(len(input_nodes), dtype=torch.int64, device="cuda") diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 1603b88..8857d62 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -189,11 +189,11 @@ def __next__(self): self.__raw_sample_data, start_inclusive, end_inclusive = next( self.__base_reader ) - print(self.__raw_sample_data) + lho_name = ( "label_type_hop_offsets" if "label_type_hop_offsets" in self.__raw_sample_data - else "label_type_hop_offsets" + else "label_hop_offsets" ) self.__raw_sample_data["input_offsets"] -= self.__raw_sample_data[ @@ -279,9 +279,6 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): for etype in range(num_edge_types): pyg_can_etype = self.__edge_types[etype] - print(raw_sample_data["map"]) - print(raw_sample_data["renumber_map_offsets"]) - jx = self.__src_types[etype] + index * self.__num_vertex_types map_ptr_src_beg = raw_sample_data["renumber_map_offsets"][jx] map_ptr_src_end = raw_sample_data["renumber_map_offsets"][jx + 1] @@ -306,11 +303,14 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): edge_ptr_end = ( index * num_edge_types * fanout_length + (etype + 1) * fanout_length ) - lho = raw_sample_data["label_type_hop_offsets"][edge_ptr_beg:edge_ptr_end] + lho = raw_sample_data["label_type_hop_offsets"][ + edge_ptr_beg : edge_ptr_end + 1 + ] num_sampled_edges[pyg_can_etype] = (lho).diff().cpu() - eid_i = raw_sample_data["edge_id"][edge_ptr_beg:edge_ptr_end] + eid_i = raw_sample_data["edge_id"][lho[0] : lho[-1]] + eirx = (index * num_edge_types) + etype edge_id_ptr_beg = raw_sample_data["edge_renumber_map_offsets"][eirx] edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][eirx + 1] @@ -318,8 +318,8 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): emap = raw_sample_data["edge_renumber_map"][edge_id_ptr_beg:edge_id_ptr_end] edge[pyg_can_etype] = emap[eid_i] - col[pyg_can_etype] = raw_sample_data["majors"][edge_ptr_beg:edge_ptr_end] - row[pyg_can_etype] = raw_sample_data["minors"][edge_ptr_beg:edge_ptr_end] + col[pyg_can_etype] = raw_sample_data["majors"][lho[0] : lho[-1]] + row[pyg_can_etype] = raw_sample_data["minors"][lho[0] : lho[-1]] num_sampled_nodes = {} diff --git a/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py index 8ee18a8..5494d81 100644 --- a/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/tests/loader/test_neighbor_loader.py @@ -194,3 +194,64 @@ def test_link_neighbor_loader_negative_sampling_uneven(batch_size): elx = torch.tensor_split(elx, eix.numel() // batch_size, dim=1) for i, batch in enumerate(loader): assert batch.edge_label[0] == 1.0 + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.sg +def test_neighbor_loader_hetero_basic(): + src = torch.tensor([0, 1, 2, 4, 3, 4, 5, 5]) # paper + dst = torch.tensor([4, 5, 4, 3, 2, 1, 0, 1]) # paper + + asrc = torch.tensor([0, 1, 2, 3, 3, 0]) # author + adst = torch.tensor([0, 1, 2, 3, 4, 5]) # paper + + graph_store = GraphStore() + feature_store = TensorDictFeatureStore() + + graph_store[("paper", "cites", "paper"), "coo"] = [src, dst] + graph_store[("author", "writes", "paper"), "coo"] = [asrc, adst] + + from cugraph_pyg.loader import NeighborLoader + + loader = NeighborLoader( + (feature_store, graph_store), + num_neighbors=[1, 1, 1, 1], + input_nodes=("paper", torch.tensor([0, 1])), + batch_size=2, + ) + + out = next(iter(loader)) + + assert sorted(out["paper"].n_id.tolist()) == [0, 1, 4, 5] + assert sorted(out["author"].n_id.tolist()) == [0, 1, 3] + + +@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available") +@pytest.mark.sg +def test_neighbor_loader_hetero_single_etype(): + src = torch.tensor([0, 1, 2, 4, 3, 4, 5, 5]) # paper + dst = torch.tensor([4, 5, 4, 3, 2, 1, 0, 1]) # paper + + asrc = torch.tensor([0, 1, 2, 3, 3, 0]) # author + adst = torch.tensor([0, 1, 2, 3, 4, 5]) # paper + + graph_store = GraphStore() + feature_store = TensorDictFeatureStore() + + graph_store[("paper", "cites", "paper"), "coo"] = [src, dst] + graph_store[("author", "writes", "paper"), "coo"] = [asrc, adst] + + from cugraph_pyg.loader import NeighborLoader + + loader = NeighborLoader( + (feature_store, graph_store), + num_neighbors=[0, 1, 0, 1], + input_nodes=("paper", torch.tensor([0, 1])), + batch_size=2, + ) + + out = next(iter(loader)) + + assert out["author"].n_id.numel() == 0 + assert out["author", "writes", "paper"].edge_index.numel() == 0 + assert out["author", "writes", "paper"].num_sampled_edges.tolist() == [0, 0] From 9eb3319822d1ac6a9e0d81b016218bd4b7c60f24 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Thu, 5 Dec 2024 14:21:36 -0800 Subject: [PATCH 5/7] add num sampled nodes --- .../cugraph_pyg/data/graph_store.py | 2 - .../cugraph_pyg/sampler/sampler.py | 50 +++++++++++++++++-- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py index 002028d..59e2740 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py @@ -174,8 +174,6 @@ def _graph(self) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]: else None, ) else: - print(self._vertex_offsets) - print(edgelist_dict) self.__graph = pylibcugraph.SGGraph( self._resource_handle, graph_properties, diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 8857d62..dbddf02 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -235,6 +235,7 @@ def __init__( dst_types: "torch.Tensor", vertex_offsets: "torch.Tensor", edge_types: List[Tuple[str, str, str]], + vertex_types: List[str], ): """ Constructs a new HeterogeneousSampleReader @@ -255,14 +256,17 @@ def __init__( edge_types: List[Tuple[str, str, str]] List of edge types in the graph in order, so they can be mapped to numeric edge types. + vertex_types: List[str] + List of vertex types, in order so they can be mapped to + numeric vertex types. """ self.__src_types = src_types self.__dst_types = dst_types self.__edge_types = edge_types - self.__num_vertex_types = ( - max(self.__src_types.max(), self.__dst_types.max()) + 1 - ) + self.__vertex_types = vertex_types + self.__num_vertex_types = len(vertex_types) + self.__vertex_offsets = vertex_offsets super().__init__(base_reader) @@ -271,11 +275,18 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): num_edge_types = self.__src_types.numel() fanout_length = raw_sample_data["fanout"].numel() // num_edge_types + num_sampled_nodes = [ + torch.zeros((fanout_length + 1,), dtype=torch.int64, device="cuda") + for _ in range(self.__num_vertex_types) + ] + num_sampled_edges = {} + node = {} row = {} col = {} edge = {} + for etype in range(num_edge_types): pyg_can_etype = self.__edge_types[etype] @@ -307,7 +318,7 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): edge_ptr_beg : edge_ptr_end + 1 ] - num_sampled_edges[pyg_can_etype] = (lho).diff().cpu() + num_sampled_edges[pyg_can_etype] = (lho).diff() eid_i = raw_sample_data["edge_id"][lho[0] : lho[-1]] @@ -321,7 +332,35 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): col[pyg_can_etype] = raw_sample_data["majors"][lho[0] : lho[-1]] row[pyg_can_etype] = raw_sample_data["minors"][lho[0] : lho[-1]] - num_sampled_nodes = {} + for hop in range(fanout_length): + vx = raw_sample_data["majors"][: lho[hop + 1]] + if vx.numel() > 0: + num_sampled_nodes[self.__src_types[etype]][hop + 1] = torch.max( + num_sampled_nodes[self.__src_types[etype]][hop + 1], + vx.max() + 1, + ) + + vy = raw_sample_data["minors"][: lho[hop + 1]] + if vy.numel() > 0: + num_sampled_nodes[self.__dst_types[etype]][hop + 1] = torch.max( + num_sampled_nodes[self.__dst_types[etype]][hop + 1], + vy.max() + 1, + ) + + ux = col[pyg_can_etype][: num_sampled_edges[pyg_can_etype][0]] + if ux.numel() > 0: + num_sampled_nodes[self.__src_types[etype]][0] = torch.max( + num_sampled_nodes[self.__src_types[etype]][0], + (ux.max() + 1).reshape((1,)), + ) + + num_sampled_nodes = { + self.__vertex_types[i]: z.diff( + prepend=torch.zeros((1,), dtype=torch.int64, device="cuda") + ).cpu() + for i, z in enumerate(num_sampled_nodes) + } + num_sampled_edges = {k: v.cpu() for k, v in num_sampled_edges.items()} input_index = raw_sample_data["input_index"][ raw_sample_data["input_offsets"][index] : raw_sample_data["input_offsets"][ @@ -630,6 +669,7 @@ def sample_from_nodes( src_types=src_types, dst_types=dst_types, edge_types=edge_types, + vertex_types=sorted(self.__graph_store._num_vertices().keys()), vertex_offsets=self.__graph_store._vertex_offset_array, ) From ef575599b192133506cb2b7c3dec5212566589f2 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Thu, 5 Dec 2024 14:41:19 -0800 Subject: [PATCH 6/7] get hetero input ids working --- .../cugraph_pyg/sampler/sampler.py | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index dbddf02..40819cd 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -153,6 +153,9 @@ def __next__(self): data.set_value_dict("num_sampled_edges", next_sample.num_sampled_edges) # TODO figure out how to set input_id for heterogeneous output + input_type, input_id = next_sample.metadata[0] + data[input_type].input_id = input_id + data[input_type].batch_size = input_id.size(0) else: raise ValueError("Invalid output type") @@ -287,6 +290,8 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): col = {} edge = {} + input_type = None + for etype in range(num_edge_types): pyg_can_etype = self.__edge_types[etype] @@ -349,11 +354,15 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): ux = col[pyg_can_etype][: num_sampled_edges[pyg_can_etype][0]] if ux.numel() > 0: + input_type = pyg_can_etype[0] # can only ever be 1 num_sampled_nodes[self.__src_types[etype]][0] = torch.max( num_sampled_nodes[self.__src_types[etype]][0], (ux.max() + 1).reshape((1,)), ) + if input_type is None: + raise ValueError("No input type found!") + num_sampled_nodes = { self.__vertex_types[i]: z.diff( prepend=torch.zeros((1,), dtype=torch.int64, device="cuda") @@ -362,11 +371,14 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): } num_sampled_edges = {k: v.cpu() for k, v in num_sampled_edges.items()} - input_index = raw_sample_data["input_index"][ - raw_sample_data["input_offsets"][index] : raw_sample_data["input_offsets"][ - index + 1 - ] - ] + input_index = ( + input_type, + raw_sample_data["input_index"][ + raw_sample_data["input_offsets"][index] : raw_sample_data[ + "input_offsets" + ][index + 1] + ], + ) edge_inverse = ( ( From c7f00001f0516ddfac0aa7eef9d31f312aac57fa Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Mon, 23 Dec 2024 08:53:46 -0800 Subject: [PATCH 7/7] fix src/dst confusion --- .../cugraph_pyg/sampler/sampler.py | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 40819cd..3523753 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -156,6 +156,17 @@ def __next__(self): input_type, input_id = next_sample.metadata[0] data[input_type].input_id = input_id data[input_type].batch_size = input_id.size(0) + + if len(next_sample.metadata) == 2: + data[input_type].seed_time = next_sample.metadata[1] + elif len(next_sample.metadata) == 4: + ( + data[input_type].edge_label_index, + data[input_type].edge_label, + data[input_type].seed_time, + ) = next_sample.metadata[1:] + else: + raise ValueError("Invalid metadata") else: raise ValueError("Invalid output type") @@ -340,23 +351,24 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): for hop in range(fanout_length): vx = raw_sample_data["majors"][: lho[hop + 1]] if vx.numel() > 0: - num_sampled_nodes[self.__src_types[etype]][hop + 1] = torch.max( - num_sampled_nodes[self.__src_types[etype]][hop + 1], + num_sampled_nodes[self.__dst_types[etype]][hop + 1] = torch.max( + num_sampled_nodes[self.__dst_types[etype]][hop + 1], vx.max() + 1, ) vy = raw_sample_data["minors"][: lho[hop + 1]] if vy.numel() > 0: - num_sampled_nodes[self.__dst_types[etype]][hop + 1] = torch.max( - num_sampled_nodes[self.__dst_types[etype]][hop + 1], + num_sampled_nodes[self.__src_types[etype]][hop + 1] = torch.max( + num_sampled_nodes[self.__src_types[etype]][hop + 1], vy.max() + 1, ) ux = col[pyg_can_etype][: num_sampled_edges[pyg_can_etype][0]] if ux.numel() > 0: - input_type = pyg_can_etype[0] # can only ever be 1 - num_sampled_nodes[self.__src_types[etype]][0] = torch.max( - num_sampled_nodes[self.__src_types[etype]][0], + input_type = pyg_can_etype[2] # can only ever be 1 + print("input type:", input_type) + num_sampled_nodes[self.__dst_types[etype]][0] = torch.max( + num_sampled_nodes[self.__dst_types[etype]][0], (ux.max() + 1).reshape((1,)), )