Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Dec 2, 2024
1 parent 8da5c95 commit 4587bd9
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 58 deletions.
43 changes: 32 additions & 11 deletions python/cugraph-pyg/cugraph_pyg/data/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def _graph(self) -> Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph]:
else None,
)
else:
print(self._vertex_offsets)
print(edgelist_dict)
self.__graph = pylibcugraph.SGGraph(
self._resource_handle,
Expand Down Expand Up @@ -242,6 +243,27 @@ def _vertex_offsets(self) -> Dict[str, int]:

return dict(self.__vertex_offsets)

@property
def _vertex_offset_array(self) -> "torch.Tensor":
off = torch.tensor(
[self._vertex_offsets[k] for k in sorted(self._vertex_offsets.keys())],
dtype=torch.int64,
device="cuda",
)

return torch.concat(
[
off,
torch.tensor(
list(self._num_vertices().values()),
device="cuda",
dtype=torch.int64,
)
.sum()
.reshape((1,)),
]
)

@property
def is_homogeneous(self) -> bool:
return len(self._vertex_offsets) == 1
Expand Down Expand Up @@ -286,23 +308,22 @@ def _numeric_edge_types(self) -> Tuple[List, "torch.Tensor", "torch.Tensor"]:
)

vtype_table = {
k: i
for i, k in enumerate(sorted(self._vertex_offsets.keys()))
k: i for i, k in enumerate(sorted(self._vertex_offsets.keys()))
}

srcs = []
dsts = []

for can_etype in sorted_keys:
srcs.append(
vtype_table[can_etype[0]]
)
dsts.append(
vtype_table[can_etype[2]]
)

self.__numeric_edge_types = (sorted_keys, torch.tensor(srcs,device='cuda',dtype=torch.int32), torch.tensor(dsts,device='cuda',dtype=torch.int32))
srcs.append(vtype_table[can_etype[0]])
dsts.append(vtype_table[can_etype[2]])

self.__numeric_edge_types = (
sorted_keys,
torch.tensor(srcs, device="cuda", dtype=torch.int32),
torch.tensor(dsts, device="cuda", dtype=torch.int32),
)

return self.__numeric_edge_types

def __get_edgelist(self):
Expand Down
18 changes: 11 additions & 7 deletions python/cugraph-pyg/cugraph_pyg/loader/link_neighbor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,22 @@ def __init__(
# Will eventually automatically convert these objects to cuGraph objects.
raise NotImplementedError("Currently can't accept non-cugraph graphs")


feature_store, graph_store = data

if compression is None:
compression = "CSR" if graph_store.is_homogeneous else 'COO'
compression = "CSR" if graph_store.is_homogeneous else "COO"
elif compression not in ["CSR", "COO"]:
raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')")

if (not graph_store.is_homogeneous):
if compression != 'COO':
raise ValueError("Only COO format is supported for heterogeneous graphs!")

if not graph_store.is_homogeneous:
if compression != "COO":
raise ValueError(
"Only COO format is supported for heterogeneous graphs!"
)
if directory is not None:
raise ValueError("Writing to disk is not supported for heterogeneous graphs!")
raise ValueError(
"Writing to disk is not supported for heterogeneous graphs!"
)

writer = (
None
Expand Down Expand Up @@ -229,6 +232,7 @@ def __init__(
local_seeds_per_call=local_seeds_per_call,
biased=(weight_attr is not None),
heterogeneous=(not graph_store.is_homogeneous),
vertex_type_offsets=graph_store._vertex_offset_array,
num_edge_types=len(graph_store.get_all_edge_attrs()),
),
(feature_store, graph_store),
Expand Down
15 changes: 10 additions & 5 deletions python/cugraph-pyg/cugraph_pyg/loader/neighbor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,19 @@ def __init__(
feature_store, graph_store = data

if compression is None:
compression = "CSR" if graph_store.is_homogeneous else 'COO'
compression = "CSR" if graph_store.is_homogeneous else "COO"
elif compression not in ["CSR", "COO"]:
raise ValueError("Invalid value for compression (expected 'CSR' or 'COO')")

if (not graph_store.is_homogeneous):
if compression != 'COO':
raise ValueError("Only COO format is supported for heterogeneous graphs!")
if not graph_store.is_homogeneous:
if compression != "COO":
raise ValueError(
"Only COO format is supported for heterogeneous graphs!"
)
if directory is not None:
raise ValueError("Writing to disk is not supported for heterogeneous graphs!")
raise ValueError(
"Writing to disk is not supported for heterogeneous graphs!"
)

writer = (
None
Expand Down Expand Up @@ -221,6 +225,7 @@ def __init__(
local_seeds_per_call=local_seeds_per_call,
biased=(weight_attr is not None),
heterogeneous=(not graph_store.is_homogeneous),
vertex_type_offsets=graph_store._vertex_offset_array,
num_edge_types=len(graph_store.get_all_edge_attrs()),
),
(feature_store, graph_store),
Expand Down
96 changes: 61 additions & 35 deletions python/cugraph-pyg/cugraph_pyg/sampler/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __next__(self):

elif isinstance(next_sample, torch_geometric.sampler.HeteroSamplerOutput):
col = {}
for edge_type, col_idx in next_sample.col:
for edge_type, col_idx in next_sample.col.items():
sz = next_sample.edge[edge_type].numel()
if sz == col_idx.numel():
col[edge_type] = col_idx
Expand Down Expand Up @@ -190,14 +190,18 @@ def __next__(self):
self.__base_reader
)
print(self.__raw_sample_data)
lho_name = "label_type_hop_offsets" if "label_type_hop_offsets" in self.__raw_sample_data else "label_type_hop_offsets"
lho_name = (
"label_type_hop_offsets"
if "label_type_hop_offsets" in self.__raw_sample_data
else "label_type_hop_offsets"
)

self.__raw_sample_data["input_offsets"] -= self.__raw_sample_data[
"input_offsets"
][0].clone()
self.__raw_sample_data[lho_name] -= self.__raw_sample_data[
lho_name
][0].clone()
self.__raw_sample_data[lho_name] -= self.__raw_sample_data[lho_name][
0
].clone()
self.__raw_sample_data["renumber_map_offsets"] -= self.__raw_sample_data[
"renumber_map_offsets"
][0].clone()
Expand Down Expand Up @@ -225,7 +229,12 @@ class HeterogeneousSampleReader(SampleReader):
"""

def __init__(
self, base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]], src_types: "torch.Tensor", dst_types: "torch.Tensor", edge_types: List[Tuple[str, str, str]]
self,
base_reader: Iterator[Tuple[Dict[str, "torch.Tensor"], int, int]],
src_types: "torch.Tensor",
dst_types: "torch.Tensor",
vertex_offsets: "torch.Tensor",
edge_types: List[Tuple[str, str, str]],
):
"""
Constructs a new HeterogeneousSampleReader
Expand All @@ -239,6 +248,10 @@ def __init__(
Integer source type for each integer edge type.
dst_types: torch.Tensor
Integer destination type for each integer edge type.
vertex_offsets: torch.Tensor
Vertex offsets for each vertex type. Used to de-offset vertices
outputted by the cuGraph sampler and return PyG-compliant vertex
IDs.
edge_types: List[Tuple[str, str, str]]
List of edge types in the graph in order, so they can be
mapped to numeric edge types.
Expand All @@ -247,14 +260,16 @@ def __init__(
self.__src_types = src_types
self.__dst_types = dst_types
self.__edge_types = edge_types
self.__num_vertex_types = max(self.__src_types.max(), self.__dst_types.max()) + 1
self.__num_vertex_types = (
max(self.__src_types.max(), self.__dst_types.max()) + 1
)
self.__vertex_offsets = vertex_offsets

super().__init__(base_reader)


def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int):
fanout_length = raw_sample_data['fanout'].numel()
num_edge_types = self.__src_types.numel()
fanout_length = raw_sample_data["fanout"].numel() // num_edge_types

num_sampled_edges = {}
node = {}
Expand All @@ -264,40 +279,47 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int):
for etype in range(num_edge_types):
pyg_can_etype = self.__edge_types[etype]

print(raw_sample_data["map"])
print(raw_sample_data["renumber_map_offsets"])

jx = self.__src_types[etype] + index * self.__num_vertex_types
map_ptr_src_beg, map_ptr_src_end = raw_sample_data["renumber_map_offsets"][
[jx, jx + 1]
]
map_src = raw_sample_data["renumber_map"][map_ptr_src_beg:map_ptr_src_end]
node[pyg_can_etype[0]] = map_src.cpu()
map_ptr_src_beg = raw_sample_data["renumber_map_offsets"][jx]
map_ptr_src_end = raw_sample_data["renumber_map_offsets"][jx + 1]

map_src = raw_sample_data["map"][map_ptr_src_beg:map_ptr_src_end]
node[pyg_can_etype[0]] = (
map_src - self.__vertex_offsets[self.__src_types[etype]]
).cpu()

kx = self.__dst_types[etype] + index * self.__num_vertex_types
map_ptr_dst_beg, map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][
[kx, kx + 1]
]
map_dst = raw_sample_data["renumber_map"][map_ptr_dst_beg:map_ptr_dst_end]
node[pyg_can_etype[2]] = map_dst.cpu()
map_ptr_dst_beg = raw_sample_data["renumber_map_offsets"][kx]
map_ptr_dst_end = raw_sample_data["renumber_map_offsets"][kx + 1]

edge_ptr_beg = index * num_edge_types * fanout_length + etype * fanout_length
edge_ptr_end = index * num_edge_types * fanout_length + (etype+1) * fanout_length
lho = raw_sample_data['label_type_hop_offsets'][
edge_ptr_beg:edge_ptr_end
]
map_dst = raw_sample_data["map"][map_ptr_dst_beg:map_ptr_dst_end]
node[pyg_can_etype[2]] = (
map_dst - self.__vertex_offsets[self.__dst_types[etype]]
).cpu()

edge_ptr_beg = (
index * num_edge_types * fanout_length + etype * fanout_length
)
edge_ptr_end = (
index * num_edge_types * fanout_length + (etype + 1) * fanout_length
)
lho = raw_sample_data["label_type_hop_offsets"][edge_ptr_beg:edge_ptr_end]

num_sampled_edges[pyg_can_etype] = (
lho
).diff().cpu()
num_sampled_edges[pyg_can_etype] = (lho).diff().cpu()

eid_i = raw_sample_data["edge_id"][edge_ptr_beg:edge_ptr_end]
eirx = (index * num_edge_types) + etype
edge_id_ptr_beg, edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][
[eirx, eirx + 1]
]
edge_id_ptr_beg = raw_sample_data["edge_renumber_map_offsets"][eirx]
edge_id_ptr_end = raw_sample_data["edge_renumber_map_offsets"][eirx + 1]

emap = raw_sample_data["edge_renumber_map"][edge_id_ptr_beg:edge_id_ptr_end]
edge[pyg_can_etype] = emap[eid_i]

col[pyg_can_etype] = raw_sample_data['majors'][edge_ptr_beg:edge_ptr_end]
row[pyg_can_etype] = raw_sample_data['minors'][edge_ptr_beg:edge_ptr_end]
col[pyg_can_etype] = raw_sample_data["majors"][edge_ptr_beg:edge_ptr_end]
row[pyg_can_etype] = raw_sample_data["minors"][edge_ptr_beg:edge_ptr_end]

num_sampled_nodes = {}

Expand Down Expand Up @@ -345,7 +367,9 @@ def __decode_coo(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int):

def _decode(self, raw_sample_data: Dict[str, "torch.Tensor"], index: int):
if "major_offsets" in raw_sample_data:
raise ValueError("CSR format not currently supported for heterogeneous graphs")
raise ValueError(
"CSR format not currently supported for heterogeneous graphs"
)
else:
return self.__decode_coo(raw_sample_data, index)

Expand Down Expand Up @@ -599,13 +623,14 @@ def sample_from_nodes(
):
return HomogeneousSampleReader(reader)
else:
edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types
edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types

return HeterogeneousSampleReader(
reader,
src_types=src_types,
dst_types=dst_types,
edge_types=edge_types,
vertex_offsets=self.__graph_store._vertex_offset_array,
)

def sample_from_edges(
Expand Down Expand Up @@ -670,10 +695,11 @@ def sample_from_edges(
):
return HomogeneousSampleReader(reader)
else:
edge_types,src_types,dst_types = self.__graph_store._numeric_edge_types
edge_types, src_types, dst_types = self.__graph_store._numeric_edge_types
return HeterogeneousSampleReader(
reader,
src_types=src_types,
dst_types=dst_types,
edge_types=edge_types,
vertex_offsets=self.__graph_store._vertex_offset_array,
)

0 comments on commit 4587bd9

Please sign in to comment.