Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Apr 2, 2024
1 parent 9bc944b commit 5e9135b
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 72 deletions.
17 changes: 7 additions & 10 deletions python/cugraph-pyg/cugraph_pyg/examples/cugraph_dist_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
cugraph_comms_create_unique_id,
cugraph_comms_get_raft_handle,
DistSampleWriter,
UniformNeighborSampler
UniformNeighborSampler,
)

from pylibcugraph import MGGraph, ResourceHandle, GraphProperties
Expand Down Expand Up @@ -70,14 +70,11 @@ def sample(rank: int, world_size: int, uid, edgelist, directory):
)
print("graph constructed")

sample_writer = DistSampleWriter(
directory=directory,
batches_per_partition=2
)
sample_writer = DistSampleWriter(directory=directory, batches_per_partition=2)
sampler = UniformNeighborSampler(
G,
sample_writer,
fanout=[5,5],
fanout=[5, 5],
)

sampler.sample_from_nodes(seeds, batch_size=16, random_state=62)
Expand All @@ -97,17 +94,17 @@ def main():
with tempfile.TemporaryDirectory() as directory:
tmp.spawn(
sample,
args=(world_size, uid, el, '.'),
args=(world_size, uid, el, "."),
nprocs=world_size,
)

print("Printing samples...")
for file in os.listdir(directory):
m=re.match(r'batch=([0-9]+)\.([0-9]+)\-([0-9]+)\.([0-9]+)\.parquet', file)
m = re.match(r"batch=([0-9]+)\.([0-9]+)\-([0-9]+)\.([0-9]+)\.parquet", file)
rank, start, _, end = int(m[1]), int(m[2]), int(m[3]), int(m[4])
print(f'File: {file} (batches {start} to {end} for rank {rank})')
print(f"File: {file} (batches {start} to {end} for rank {rank})")
print(cudf.read_parquet(os.path.join(directory, file)))
print('\n')
print("\n")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/gnn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .data_loading.dist_sampler import (
DistSampler,
DistSampleWriter,
UniformNeighborSampler
UniformNeighborSampler,
)
from .comms.cugraph_nccl_comms import (
cugraph_comms_init,
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/gnn/comms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
cugraph_comms_shutdown,
cugraph_comms_create_unique_id,
cugraph_comms_get_raft_handle,
)
)
6 changes: 3 additions & 3 deletions python/cugraph/cugraph/gnn/data_loading/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,5 +15,5 @@
from cugraph.gnn.data_loading.dist_sampler import (
DistSampler,
DistSampleWriter,
UniformNeighborSampler
)
UniformNeighborSampler,
)
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/gnn/data_loading/bulk_sampler_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def create_df_from_disjoint_series(series_list: List[cudf.Series]):
def create_df_from_disjoint_arrays(array_dict: Dict[str, cupy.array]):
for k in list(array_dict.keys()):
array_dict[k] = cudf.Series(array_dict[k], name=k)

return create_df_from_disjoint_series(list(array_dict.values()))


Expand Down
155 changes: 99 additions & 56 deletions python/cugraph/cugraph/gnn/data_loading/dist_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@


class DistSampleWriter:
def __init__(self, directory: str, *, batches_per_partition: int=256, format: str="parquet"):
def __init__(
self,
directory: str,
*,
batches_per_partition: int = 256,
format: str = "parquet",
):
if format != "parquet":
raise ValueError("Invalid format (currently supported: 'parquet')")

Expand All @@ -52,92 +58,114 @@ def _directory(self):
@property
def _batches_per_partition(self):
return self.__batches_per_partition

def __write_minibatches_coo(self, minibatch_dict):
has_edge_ids = minibatch_dict['edge_id'] is not None
has_edge_types = minibatch_dict['edge_type'] is not None
has_weights = minibatch_dict['weight'] is not None
has_edge_ids = minibatch_dict["edge_id"] is not None
has_edge_types = minibatch_dict["edge_type"] is not None
has_weights = minibatch_dict["weight"] is not None

if minibatch_dict['renumber_map'] is None:
raise ValueError("Distributed sampling without renumbering is not supported")
if minibatch_dict["renumber_map"] is None:
raise ValueError(
"Distributed sampling without renumbering is not supported"
)

fanout_length = (len(minibatch_dict['label_hop_offsets']) - 1) // len(minibatch_dict['batch_id'])
rank_batch_offset = minibatch_dict['batch_id'][0]
fanout_length = (len(minibatch_dict["label_hop_offsets"]) - 1) // len(
minibatch_dict["batch_id"]
)
rank_batch_offset = minibatch_dict["batch_id"][0]

for p in range(0, int(ceil(len(minibatch_dict['batch_id']) / self.__batches_per_partition))):
for p in range(
0, int(ceil(len(minibatch_dict["batch_id"]) / self.__batches_per_partition))
):
partition_start = p * (self.__batches_per_partition)
partition_end = (p + 1) * (self.__batches_per_partition)

label_hop_offsets_array_p = minibatch_dict['label_hop_offsets'][
label_hop_offsets_array_p = minibatch_dict["label_hop_offsets"][
partition_start * fanout_length : partition_end * fanout_length + 1
]

batch_id_array_p = minibatch_dict['batch_id'][partition_start:partition_end]
batch_id_array_p = minibatch_dict["batch_id"][partition_start:partition_end]
start_batch_id = batch_id_array_p[0] - rank_batch_offset

start_ix, end_ix = label_hop_offsets_array_p[[0, -1]]
majors_array_p = minibatch_dict["majors"][start_ix:end_ix]
minors_array_p = minibatch_dict["minors"][start_ix:end_ix]
edge_id_array_p = (
minibatch_dict["edge_id"][start_ix:end_ix] if has_edge_ids
else cupy.array([], dtype='int64')
minibatch_dict["edge_id"][start_ix:end_ix]
if has_edge_ids
else cupy.array([], dtype="int64")
)
edge_type_array_p = (
minibatch_dict["edge_type"][start_ix:end_ix] if has_edge_types
else cupy.array([], dtype='int32')
minibatch_dict["edge_type"][start_ix:end_ix]
if has_edge_types
else cupy.array([], dtype="int32")
)
weight_array_p = (
minibatch_dict["weight"][start_ix:end_ix] if has_weights
else cupy.array([], dtype='float32')
minibatch_dict["weight"][start_ix:end_ix]
if has_weights
else cupy.array([], dtype="float32")
)

# create the renumber map offsets
renumber_map_offsets_array_p = minibatch_dict['renumber_map_offsets'][
renumber_map_offsets_array_p = minibatch_dict["renumber_map_offsets"][
partition_start : partition_end + 1
]

renumber_map_start_ix, renumber_map_end_ix = renumber_map_offsets_array_p[[0,-1]]
renumber_map_start_ix, renumber_map_end_ix = renumber_map_offsets_array_p[
[0, -1]
]

renumber_map_array_p = minibatch_dict['renumber_map'][
renumber_map_array_p = minibatch_dict["renumber_map"][
renumber_map_start_ix:renumber_map_end_ix
]

results_dataframe_p = create_df_from_disjoint_arrays(
{
'majors': majors_array_p,
'minors': minors_array_p,
'map': renumber_map_array_p,
'label_hop_offsets': label_hop_offsets_array_p,
'weight': weight_array_p,
'edge_id': edge_id_array_p,
'edge_type': edge_type_array_p,
'renumber_map_offsets': renumber_map_offsets_array_p,
"majors": majors_array_p,
"minors": minors_array_p,
"map": renumber_map_array_p,
"label_hop_offsets": label_hop_offsets_array_p,
"weight": weight_array_p,
"edge_id": edge_id_array_p,
"edge_type": edge_type_array_p,
"renumber_map_offsets": renumber_map_offsets_array_p,
}
)

end_batch_id = start_batch_id + len(batch_id_array_p) - 1
if 'rank' in minibatch_dict:
rank = minibatch_dict['rank']
if "rank" in minibatch_dict:
rank = minibatch_dict["rank"]
full_output_path = os.path.join(
self.__directory, f"batch={rank:05d}.{start_batch_id:08d}-{rank:05d}.{end_batch_id:08d}.parquet"
self.__directory,
f"batch={rank:05d}.{start_batch_id:08d}-"
f"{rank:05d}.{end_batch_id:08d}.parquet",
)
else:
full_output_path = os.path.join(
self.__directory, f"batch={start_batch_id:010d}-{end_batch_id:010d}.parquet"
self.__directory,
f"batch={start_batch_id:010d}-{end_batch_id:010d}.parquet",
)


results_dataframe_p.to_parquet(
full_output_path, compression=None, index=False, force_nullable_schema=True
full_output_path,
compression=None,
index=False,
force_nullable_schema=True,
)

def __write_minibatches_csr(self, minibatch_dict):
raise NotImplementedError("CSR format currently not supported for distributed sampling")
raise NotImplementedError(
"CSR format currently not supported for distributed sampling"
)

def write_minibatches(self, minibatch_dict):
if (minibatch_dict['majors'] is not None) and (minibatch_dict['minors'] is not None):
if (minibatch_dict["majors"] is not None) and (
minibatch_dict["minors"] is not None
):
self.__write_minibatches_coo(minibatch_dict)
elif (minibatch_dict["major_offsets"] is not None) and (minibatch_dict['minors'] is not None):
elif (minibatch_dict["major_offsets"] is not None) and (
minibatch_dict["minors"] is not None
):
self.__write_minibatches_csr(minibatch_dict)
else:
raise ValueError("invalid columns")
Expand All @@ -159,20 +187,26 @@ def sample_batches(
):
raise NotImplementedError("Must be implemented by subclass")

def sample_from_nodes(self, nodes: TensorType, *, batch_size: int=16, random_state: int=62):
def sample_from_nodes(
self, nodes: TensorType, *, batch_size: int = 16, random_state: int = 62
):
batches_per_call = self._local_seeds_per_call // batch_size
actual_seeds_per_call = batches_per_call * batch_size

num_seeds = len(nodes)
nodes = torch.split(torch.as_tensor(nodes, device="cuda"), actual_seeds_per_call)
nodes = torch.split(
torch.as_tensor(nodes, device="cuda"), actual_seeds_per_call
)

if self.is_multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

t = torch.empty((world_size,), dtype=torch.int64, device='cuda')
local_size = torch.tensor([int(ceil(num_seeds / batch_size))], dtype=torch.int64, device='cuda')

t = torch.empty((world_size,), dtype=torch.int64, device="cuda")
local_size = torch.tensor(
[int(ceil(num_seeds / batch_size))], dtype=torch.int64, device="cuda"
)

torch.distributed.all_gather_into_tensor(t, local_size)
if (t != local_size).any() and rank == 0:
warnings.warn(
Expand Down Expand Up @@ -210,7 +244,7 @@ def is_multi_gpu(self):
@property
def _local_seeds_per_call(self):
return self.__local_seeds_per_call

@property
def _graph(self):
return self.__graph
Expand All @@ -222,9 +256,9 @@ def __init__(
graph: Union[pylibcugraph.SGGraph, pylibcugraph.MGGraph],
writer: DistSampleWriter,
*,
local_seeds_per_call:int = 32768,
local_seeds_per_call: int = 32768,
fanout: List[int] = [-1],
prior_sources_behavior: str = 'exclude',
prior_sources_behavior: str = "exclude",
deduplicate_sources: bool = True,
compression: str = "COO",
compress_per_hop: bool = False,
Expand All @@ -245,24 +279,33 @@ def sample_batches(
# flag that assumes all ranks have the same number of batches.
if self.is_multi_gpu:
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
handle = pylibcugraph.ResourceHandle(
cugraph_comms_get_raft_handle().getHandle()
)

local_label_list = torch.unique(batch_ids)
local_label_to_output_comm_rank = torch.full(
(len(local_label_list),), rank, dtype=torch.int32, device="cuda"
)

num_batches = torch.tensor([len(local_label_list)], device='cuda', dtype=torch.int64)
num_batches = torch.tensor(
[len(local_label_list)], device="cuda", dtype=torch.int64
)
torch.distributed.all_reduce(num_batches, op=torch.distributed.ReduceOp.SUM)

label_list = torch.empty((num_batches,), device='cuda', dtype=torch.int32)
w1 = torch.distributed.all_gather_into_tensor(label_list, local_label_list, async_op=True)
label_list = torch.empty((num_batches,), device="cuda", dtype=torch.int32)
w1 = torch.distributed.all_gather_into_tensor(
label_list, local_label_list, async_op=True
)

label_to_output_comm_rank = torch.empty((num_batches,), device='cuda', dtype=torch.int32)
w2 = torch.distributed.all_gather_into_tensor(label_to_output_comm_rank, local_label_to_output_comm_rank, async_op=True)
label_to_output_comm_rank = torch.empty(
(num_batches,), device="cuda", dtype=torch.int32
)
w2 = torch.distributed.all_gather_into_tensor(
label_to_output_comm_rank,
local_label_to_output_comm_rank,
async_op=True,
)

w1.wait()
w2.wait()
Expand All @@ -287,7 +330,7 @@ def sample_batches(
compress_per_hop=self.__compress_per_hop,
return_dict=True,
)
sampling_results_dict['rank'] = rank
sampling_results_dict["rank"] = rank
else:
sampling_results_dict = pylibcugraph.uniform_neighbor_sample(
pylibcugraph.ResourceHandle(),
Expand Down

0 comments on commit 5e9135b

Please sign in to comment.