Skip to content

Commit

Permalink
get the native workflow working
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Dec 6, 2023
1 parent da389e0 commit e29b4e8
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 93 deletions.
45 changes: 29 additions & 16 deletions benchmarks/cugraph-pyg/bench_cugraph_pyg.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@
import gc
import os
import socket
import json

import torch
import numpy as np
import pandas

import torch.nn.functional as F
import torch.distributed as td
import torch.distributed as dist
import torch.multiprocessing as tmp
from torch.nn.parallel import DistributedDataParallel as ddp
from torch.distributed.optim import ZeroRedundancyOptimizer

from typing import Union, List

from models_cugraph import CuGraphSAGE
from trainers_cugraph import CuGraphTrainer
from trainers_native import NativeTrainer
from trainers_cugraph import PyGCuGraphTrainer
from trainers_native import PyGNativeTrainer

from datasets import OGBNPapers100MDataset

Expand Down Expand Up @@ -135,7 +136,7 @@ def train(bulk_samples_dir: str, output_dir:str, native_times:List[float], devic
if num_classes > num_output_features:
num_output_features = num_classes
print('done loading data')
td.barrier()
dist.barrier()

print(f"num input features: {num_input_features}; num output features: {num_output_features}; fanout: {output_meta['fanout']}")

Expand All @@ -151,15 +152,15 @@ def train(bulk_samples_dir: str, output_dir:str, native_times:List[float], devic
model = ddp(model, device_ids=[device])

print('done creating model')
td.barrier()
dist.barrier()

cugraph_store = CuGraphStore(fs, G, N)
print('done creating store')
td.barrier()
dist.barrier()

#optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
optimizer = ZeroRedundancyOptimizer(model.parameters(), torch.optim.Adam, lr=0.01)
td.barrier()
dist.barrier()

for epoch in range(num_epochs):
start_time_train = time.perf_counter_ns()
Expand All @@ -180,7 +181,7 @@ def train(bulk_samples_dir: str, output_dir:str, native_times:List[float], devic
directory=samples_dir,
)
print('done creating loader')
td.barrier()
dist.barrier()

total_loss, num_batches, mean_total_time, mean_time_fw, mean_time_bw, mean_time_loader, mean_additional_feature_time = train_epoch(model, cugraph_loader, optimizer)

Expand Down Expand Up @@ -381,12 +382,20 @@ def parse_args():


def main(args):
rank = int(os.environ['LOCAL_RANK'])
import logging
logging.basicConfig(
level=logging.INFO,
)
logger = logging.getLogger('bench_cugraph_pyg')
logger.setLevel(logging.INFO)

init_pytorch_worker(rank, use_rmm_torch_allocator=(args.framework == "cuGraph"))
local_rank = int(os.environ['LOCAL_RANK'])
global_rank = int(os.environ["RANK"])

init_pytorch_worker(local_rank, use_rmm_torch_allocator=(args.framework == "cuGraph"))
enable_spilling()
print(f'worker initialized')
td.barrier()
dist.barrier()

world_size = int(os.environ['SLURM_JOB_NUM_NODES']) * int(os.environ['SLURM_GPUS_PER_NODE'])

Expand All @@ -398,22 +407,26 @@ def main(args):
)

if args.framework == "Native":
trainer = NativeTrainer(
trainer = PyGNativeTrainer(
model=args.model,
dataset=dataset,
device=rank,
rank=rank,
device=local_rank,
rank=global_rank,
world_size=world_size,
num_epochs=args.num_epochs,
shuffle=True,
replace=False,
fanout=[int(f) for f in args.fanout.split('_')],
num_neighbors=[int(f) for f in args.fanout.split('_')],
batch_size=args.batch_size,
)
else:
raise ValueError("unsuported framework")

trainer.train()
stats = trainer.train()
logger.info(stats)

with open(f'{args.output_file}[{global_rank}]', 'w') as f:
json.dump(stats, f)

if __name__ == "__main__":
args = parse_args()
Expand Down
54 changes: 30 additions & 24 deletions benchmarks/cugraph-pyg/datasets/ogbn_papers100M.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import pandas
import torch
import numpy as np

from sklearn.model_selection import train_test_split

import gc
import os

# TODO automatically generate this dataset and splits
class OGBNPapers100MDataset(Dataset):
Expand All @@ -28,40 +30,44 @@ def __init__(self, *, replication_factor=1, dataset_dir='.', train_split=0.8, va
self.__disk_x = None
self.__y = None
self.__edge_index = None
self.__dataset_dir = '.'
self.__dataset_dir = dataset_dir
self.__train_split = train_split
self.__val_split = val_split

@property
def edge_index_dict(self) -> Dict[Tuple[str, str, str], Dict[str, torch.Tensor]]:
import logging
logger = logging.getLogger('OGBNPapers100MDataset')

if self.__edge_index is None:
parquet_path = os.path.join(
npy_path = os.path.join(
self.__dataset_dir,
'ogbn_papers100M',
'parquet'
)

ei = pandas.read_parquet(
os.path.join(parquet_path, 'paper__cites__paper', 'edge_index.parquet')
'ogbn_papers100M_copy',
'npy',
'paper__cites__paper',
'edge_index.npy'
)

logger.info(f'loading edge index from {npy_path}')
ei = np.load(npy_path, mmap_mode='r')
ei = torch.as_tensor(ei)
ei = {
'src': torch.as_tensor(ei.src.values, device='cpu'),
'dst': torch.as_tensor(ei.dst.values, device='cpu'),
'src': ei[1],
'dst': ei[0],
}

print('sorting edge index...')
logger.info('sorting edge index...')
ei['dst'], ix = torch.sort(ei['dst'])
ei['src'] = ei['src'][ix]
del ix
gc.collect()

print('processing replications...')
logger.info('processing replications...')
orig_num_nodes = self.num_nodes('paper') // self.__replication_factor
if self.__replication_factor > 1:
orig_src = ei['src'].clone().detach()
orig_dst = ei['dst'].clone().detach()
for r in range(1, replication_factor):
for r in range(1, self.__replication_factor):
ei['src'] = torch.concat([
ei['src'],
orig_src + int(r * orig_num_nodes),
Expand All @@ -79,7 +85,7 @@ def edge_index_dict(self) -> Dict[Tuple[str, str, str], Dict[str, torch.Tensor]]
ei['dst'] = ei['dst'].contiguous()
gc.collect()

print(f"# edges: {len(ei['src'])}")
logger.info(f"# edges: {len(ei['src'])}")
self.__edge_index = {('paper','cites','paper'): ei}

return self.__edge_index
Expand All @@ -94,10 +100,10 @@ def x_dict(self) -> Dict[str, torch.Tensor]:
)

if self.__disk_x is None:
if replication_factor == 1:
if self.__replication_factor == 1:
full_path = os.path.join(node_type_path, 'node_feat.npy')
else:
full_path = os.path.join(node_type_path, f'node_feat_{replication_factor}x.npy')
full_path = os.path.join(node_type_path, f'node_feat_{self.__replication_factor}x.npy')

self.__disk_x = {'paper': np.load(
full_path,
Expand Down Expand Up @@ -133,11 +139,11 @@ def val_dict(self) -> Dict[str, torch.Tensor]:

@property
def num_input_features(self) -> int:
return self.x_dict['paper'].shape[1]
return int(self.x_dict['paper'].shape[1])

@property
def num_labels(self) -> int:
return self.y_dict['paper'].max() + 1
return int(self.y_dict['paper'].max()) + 1

def num_nodes(self, node_type: str) -> int:
if node_type != 'paper':
Expand All @@ -162,11 +168,11 @@ def __get_labels(self):

node_label = pandas.read_parquet(label_path)

if replication_factor > 1:
orig_num_nodes = self.num_nodes('paper') // replication_factor
if self.__replication_factor > 1:
orig_num_nodes = self.num_nodes('paper') // self.__replication_factor
dfr = pandas.DataFrame({
'node': pandas.concat([node_label.node + (r * orig_num_nodes) for r in range(1, replication_factor)]),
'label': pandas.concat([node_label.label for r in range(1, replication_factor)]),
'node': pandas.concat([node_label.node + (r * orig_num_nodes) for r in range(1, self.__replication_factor)]),
'label': pandas.concat([node_label.label for r in range(1, self.__replication_factor)]),
})
node_label = pandas.concat([node_label, dfr]).reset_index(drop=True)

Expand All @@ -177,8 +183,8 @@ def __get_labels(self):

self.__y = {'paper': node_label_tensor.contiguous()}

train_ix, test_val_ix = train_test_split(torch.as_tensor(node_label.node.values), train_split=self.__train_split, random_state=num_nodes)
test_ix, val_ix = train_test_split(test_val_ix, test_split=self.__val_split, random_state=num_nodes)
train_ix, test_val_ix = train_test_split(torch.as_tensor(node_label.node.values), train_size=self.__train_split, random_state=num_nodes)
test_ix, val_ix = train_test_split(test_val_ix, test_size=self.__val_split, random_state=num_nodes)

train_tensor = torch.full((num_nodes,), 0, dtype=torch.bool, device='cpu')
train_tensor[train_ix] = 1
Expand Down
Loading

0 comments on commit e29b4e8

Please sign in to comment.