diff --git a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py index ae4caa3..002028d 100644 --- a/python/cugraph-pyg/cugraph_pyg/data/graph_store.py +++ b/python/cugraph-pyg/cugraph_pyg/data/graph_store.py @@ -174,6 +174,7 @@ def _graph(self) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]: else None, ) else: + print(self._vertex_offsets) print(edgelist_dict) self.__graph = pylibcugraph.SGGraph( self._resource_handle, @@ -242,6 +243,27 @@ def _vertex_offsets(self) -> Dict[str, int]: return dict(self.__vertex_offsets) + @property + def _vertex_offset_array(self) -> "torch.Tensor": + off = torch.tensor( + [self._vertex_offsets[k] for k in sorted(self._vertex_offsets.keys())], + dtype=torch.int64, + device="cuda", + ) + + return torch.concat( + [ + off, + torch.tensor( + list(self._num_vertices().values()), + device="cuda", + dtype=torch.int64, + ) + .sum() + .reshape((1,)), + ] + ) + @property def is_homogeneous(self) -> bool: return len(self._vertex_offsets) == 1 @@ -286,23 +308,22 @@ def _numeric_edge_types(self) -> Tuple[List, "torch.Tensor", "torch.Tensor"]: ) vtype_table = { - k: i - for i, k in enumerate(sorted(self._vertex_offsets.keys())) + k: i for i, k in enumerate(sorted(self._vertex_offsets.keys())) } srcs = [] dsts = [] for can_etype in sorted_keys: - srcs.append( - vtype_table[can_etype[0]] - ) - dsts.append( - vtype_table[can_etype[2]] - ) - - self.__numeric_edge_types = (sorted_keys, torch.tensor(srcs,device='cuda',dtype=torch.int32), torch.tensor(dsts,device='cuda',dtype=torch.int32)) - + srcs.append(vtype_table[can_etype[0]]) + dsts.append(vtype_table[can_etype[2]]) + + self.__numeric_edge_types = ( + sorted_keys, + torch.tensor(srcs, device="cuda", dtype=torch.int32), + torch.tensor(dsts, device="cuda", dtype=torch.int32), + ) + return self.__numeric_edge_types def __get_edgelist(self): diff --git a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py index eacb75c..a511ebb 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py @@ -188,19 +188,22 @@ def __init__( # Will eventually automatically convert these objects to cuGraph objects. raise NotImplementedError("Currently can't accept non-cugraph graphs") - feature_store, graph_store = data if compression is None: - compression = "CSR" if graph_store.is_homogeneous else 'COO' + compression = "CSR" if graph_store.is_homogeneous else "COO" elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") - - if (not graph_store.is_homogeneous): - if compression != 'COO': - raise ValueError("Only COO format is supported for heterogeneous graphs!") + + if not graph_store.is_homogeneous: + if compression != "COO": + raise ValueError( + "Only COO format is supported for heterogeneous graphs!" + ) if directory is not None: - raise ValueError("Writing to disk is not supported for heterogeneous graphs!") + raise ValueError( + "Writing to disk is not supported for heterogeneous graphs!" + ) writer = ( None @@ -229,6 +232,7 @@ def __init__( local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), heterogeneous=(not graph_store.is_homogeneous), + vertex_type_offsets=graph_store._vertex_offset_array, num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), diff --git a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py index 125708a..98b6432 100644 --- a/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py +++ b/python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py @@ -184,15 +184,19 @@ def __init__( feature_store, graph_store = data if compression is None: - compression = "CSR" if graph_store.is_homogeneous else 'COO' + compression = "CSR" if graph_store.is_homogeneous else "COO" elif compression not in ["CSR", "COO"]: raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')") - if (not graph_store.is_homogeneous): - if compression != 'COO': - raise ValueError("Only COO format is supported for heterogeneous graphs!") + if not graph_store.is_homogeneous: + if compression != "COO": + raise ValueError( + "Only COO format is supported for heterogeneous graphs!" + ) if directory is not None: - raise ValueError("Writing to disk is not supported for heterogeneous graphs!") + raise ValueError( + "Writing to disk is not supported for heterogeneous graphs!" + ) writer = ( None @@ -221,6 +225,7 @@ def __init__( local_seeds_per_call=local_seeds_per_call, biased=(weight_attr is not None), heterogeneous=(not graph_store.is_homogeneous), + vertex_type_offsets=graph_store._vertex_offset_array, num_edge_types=len(graph_store.get_all_edge_attrs()), ), (feature_store, graph_store), diff --git a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py index 5227c2b..1603b88 100644 --- a/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py +++ b/python/cugraph-pyg/cugraph_pyg/sampler/sampler.py @@ -122,7 +122,7 @@ def __next__(self): elif isinstance(next_sample, torch_geometric.sampler.HeteroSamplerOutput): col = {} - for edge_type, col_idx in next_sample.col: + for edge_type, col_idx in next_sample.col.items(): sz = next_sample.edge[edge_type].numel() if sz == col_idx.numel(): col[edge_type] = col_idx @@ -190,14 +190,18 @@ def __next__(self): self.__base_reader ) print(self.__raw_sample_data) - lho_name = "label_type_hop_offsets" if "label_type_hop_offsets" in self.__raw_sample_data else "label_type_hop_offsets" + lho_name = ( + "label_type_hop_offsets" + if "label_type_hop_offsets" in self.__raw_sample_data + else "label_type_hop_offsets" + ) self.__raw_sample_data["input_offsets"] -= self.__raw_sample_data[ "input_offsets" ][0].clone() - self.__raw_sample_data[lho_name] -= self.__raw_sample_data[ - lho_name - ][0].clone() + self.__raw_sample_data[lho_name] -= self.__raw_sample_data[lho_name][ + 0 + ].clone() self.__raw_sample_data["renumber_map_offsets"] -= self.__raw_sample_data[ "renumber_map_offsets" ][0].clone() @@ -225,7 +229,12 @@ class HeterogeneousSampleReader(SampleReader): """ def __init__( - self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], src_types: "torch.Tensor", dst_types: "torch.Tensor", edge_types: List[Tuple[str, str, str]] + self, + base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], + src_types: "torch.Tensor", + dst_types: "torch.Tensor", + vertex_offsets: "torch.Tensor", + edge_types: List[Tuple[str, str, str]], ): """ Constructs a new HeterogeneousSampleReader @@ -239,6 +248,10 @@ def __init__( Integer source type for each integer edge type. dst_types: torch.Tensor Integer destination type for each integer edge type. + vertex_offsets: torch.Tensor + Vertex offsets for each vertex type. Used to de-offset vertices + outputted by the cuGraph sampler and return PyG-compliant vertex + IDs. edge_types: List[Tuple[str, str, str]] List of edge types in the graph in order, so they can be mapped to numeric edge types. @@ -247,14 +260,16 @@ def __init__( self.__src_types = src_types self.__dst_types = dst_types self.__edge_types = edge_types - self.__num_vertex_types = max(self.__src_types.max(), self.__dst_types.max()) + 1 + self.__num_vertex_types = ( + max(self.__src_types.max(), self.__dst_types.max()) + 1 + ) + self.__vertex_offsets = vertex_offsets super().__init__(base_reader) - def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): - fanout_length = raw_sample_data['fanout'].numel() num_edge_types = self.__src_types.numel() + fanout_length = raw_sample_data["fanout"].numel() // num_edge_types num_sampled_edges = {} node = {} @@ -264,40 +279,47 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): for etype in range(num_edge_types): pyg_can_etype = self.__edge_types[etype] + print(raw_sample_data["map"]) + print(raw_sample_data["renumber_map_offsets"]) + jx = self.__src_types[etype] + index * self.__num_vertex_types - map_ptr_src_beg, map_ptr_src_end = raw_sample_data["renumber_map_offsets"][ - [jx, jx + 1] - ] - map_src = raw_sample_data["renumber_map"][map_ptr_src_beg:map_ptr_src_end] - node[pyg_can_etype[0]] = map_src.cpu() + map_ptr_src_beg = raw_sample_data["renumber_map_offsets"][jx] + map_ptr_src_end = raw_sample_data["renumber_map_offsets"][jx + 1] + + map_src = raw_sample_data["map"][map_ptr_src_beg:map_ptr_src_end] + node[pyg_can_etype[0]] = ( + map_src - self.__vertex_offsets[self.__src_types[etype]] + ).cpu() kx = self.__dst_types[etype] + index * self.__num_vertex_types - map_ptr_dst_beg, map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][ - [kx, kx + 1] - ] - map_dst = raw_sample_data["renumber_map"][map_ptr_dst_beg:map_ptr_dst_end] - node[pyg_can_etype[2]] = map_dst.cpu() + map_ptr_dst_beg = raw_sample_data["renumber_map_offsets"][kx] + map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][kx + 1] - edge_ptr_beg = index * num_edge_types * fanout_length + etype * fanout_length - edge_ptr_end = index * num_edge_types * fanout_length + (etype+1) * fanout_length - lho = raw_sample_data['label_type_hop_offsets'][ - edge_ptr_beg:edge_ptr_end - ] + map_dst = raw_sample_data["map"][map_ptr_dst_beg:map_ptr_dst_end] + node[pyg_can_etype[2]] = ( + map_dst - self.__vertex_offsets[self.__dst_types[etype]] + ).cpu() + + edge_ptr_beg = ( + index * num_edge_types * fanout_length + etype * fanout_length + ) + edge_ptr_end = ( + index * num_edge_types * fanout_length + (etype + 1) * fanout_length + ) + lho = raw_sample_data["label_type_hop_offsets"][edge_ptr_beg:edge_ptr_end] - num_sampled_edges[pyg_can_etype] = ( - lho - ).diff().cpu() + num_sampled_edges[pyg_can_etype] = (lho).diff().cpu() eid_i = raw_sample_data["edge_id"][edge_ptr_beg:edge_ptr_end] eirx = (index * num_edge_types) + etype - edge_id_ptr_beg, edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][ - [eirx, eirx + 1] - ] + edge_id_ptr_beg = raw_sample_data["edge_renumber_map_offsets"][eirx] + edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][eirx + 1] + emap = raw_sample_data["edge_renumber_map"][edge_id_ptr_beg:edge_id_ptr_end] edge[pyg_can_etype] = emap[eid_i] - col[pyg_can_etype] = raw_sample_data['majors'][edge_ptr_beg:edge_ptr_end] - row[pyg_can_etype] = raw_sample_data['minors'][edge_ptr_beg:edge_ptr_end] + col[pyg_can_etype] = raw_sample_data["majors"][edge_ptr_beg:edge_ptr_end] + row[pyg_can_etype] = raw_sample_data["minors"][edge_ptr_beg:edge_ptr_end] num_sampled_nodes = {} @@ -345,7 +367,9 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): def _decode(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int): if "major_offsets" in raw_sample_data: - raise ValueError("CSR format not currently supported for heterogeneous graphs") + raise ValueError( + "CSR format not currently supported for heterogeneous graphs" + ) else: return self.__decode_coo(raw_sample_data, index) @@ -599,13 +623,14 @@ def sample_from_nodes( ): return HomogeneousSampleReader(reader) else: - edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types return HeterogeneousSampleReader( reader, src_types=src_types, dst_types=dst_types, edge_types=edge_types, + vertex_offsets=self.__graph_store._vertex_offset_array, ) def sample_from_edges( @@ -670,10 +695,11 @@ def sample_from_edges( ): return HomogeneousSampleReader(reader) else: - edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types + edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types return HeterogeneousSampleReader( reader, src_types=src_types, dst_types=dst_types, edge_types=edge_types, + vertex_offsets=self.__graph_store._vertex_offset_array, )