Skip to content

Commit

Permalink
fixing issues impacting accuracy
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Dec 29, 2023
1 parent d47c3ba commit 367c79c
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ def main(args):
dataset_dir=args.dataset_dir,
train_split=args.train_split,
val_split=args.val_split,
load_edge_index=(args.framework == "Native"),
load_edge_index=(args.framework == "PyG"),
)

if global_rank == 0:
dataset.download()
dist.barrier()

fanout = [int(f) for f in args.fanout.split("_")]

if args.framework == "PyG":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def sample_graph(
batches_per_partition=100,
fanout=[5, 5, 5],
num_epochs=1,
train_perc=0.7,
train_perc=0.8,
val_perc=0.5,
sampling_kwargs={},
):
Expand Down Expand Up @@ -586,7 +586,7 @@ def benchmark_cugraph_bulk_sampling(
"include_hop_column": True,
}

batches_per_partition = 400_000 // batch_size
batches_per_partition = 600_000 // batch_size
execution_time, allocation_counts = sample_graph(
G=G,
label_df=dask_label_df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import gc
import os
import json

class OGBNPapers100MDataset(Dataset):
def __init__(
Expand All @@ -41,10 +42,8 @@ def __init__(
self.__train_split = train_split
self.__val_split = val_split
self.__load_edge_index = load_edge_index

self.__download_data()

def __download_data(self):
def download(self):
import logging
logger = logging.getLogger('OGBNPapers100MDataset')
logger.info("Processing dataset...")
Expand All @@ -54,6 +53,18 @@ def __download_data(self):
'ogbn_papers100M'
)

meta_json_path = os.path.join(
dataset_path,
'meta.json'
)
if not os.path.exists(meta_json_path):
j = {
'num_nodes': {'paper': 111059956},
'num_edges': {"paper__cites__paper": 1615685872},
}
with open(meta_json_path, 'w') as file:
json.dump(j, file)

dataset = None
if not os.path.exists(dataset_path):
from ogb.nodeproppred import NodePropPredDataset
Expand All @@ -71,6 +82,7 @@ def __download_data(self):
)
if not os.path.exists(replication_path):
if dataset is None:
from ogb.nodeproppred import NodePropPredDataset
dataset = NodePropPredDataset(name='ogbn-papers100M', root=self.__dataset_dir)

node_feat = dataset[0][0]['node_feat']
Expand All @@ -88,6 +100,7 @@ def __download_data(self):
edge_index_parquet_file_path = os.path.join(edge_index_parquet_path, 'edge_index.parquet')
if not os.path.exists(edge_index_parquet_file_path):
if dataset is None:
from ogb.nodeproppred import NodePropPredDataset
dataset = NodePropPredDataset(name='ogbn-papers100M', root=self.__dataset_dir)

edge_index = dataset[0][0]['edge_index']
Expand All @@ -98,11 +111,12 @@ def __download_data(self):
eidf.to_parquet(edge_index_parquet_file_path)

edge_index_npy_path = os.path.join(dataset_path, 'npy', 'paper__cites__paper')
os.makedirs(edge_index_npy_path)
os.makedirs(edge_index_npy_path, exist_ok=True)

edge_index_npy_file_path = os.path.join(edge_index_npy_path, 'edge_index.npy')
if not os.path.exists(edge_index_npy_file_path):
if dataset is None:
from ogb.nodeproppred import NodePropPredDataset
dataset = NodePropPredDataset(name='ogbn-papers100M', root=self.__dataset_dir)

edge_index = dataset[0][0]['edge_index']
Expand All @@ -115,6 +129,7 @@ def __download_data(self):
node_label_file_path = os.path.join(node_label_path, 'node_label.parquet')
if not os.path.exists(node_label_file_path):
if dataset is None:
from ogb.nodeproppred import NodePropPredDataset
dataset = NodePropPredDataset(name='ogbn-papers100M', root=self.__dataset_dir)

ldf = pandas.Series(dataset[0][1].T[0])
Expand All @@ -134,7 +149,7 @@ def edge_index_dict(
if self.__load_edge_index:
npy_path = os.path.join(
self.__dataset_dir,
"ogbn_papers100M_copy",
"ogbn_papers100M",
"npy",
"paper__cites__paper",
"edge_index.npy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
self.__sample_dir = sample_dir
self.__loader_kwargs = kwargs
self.__model = self.get_model(model)
self.__optimizer = None

@property
def rank(self):
Expand All @@ -64,9 +65,10 @@ def dataset(self):

@property
def optimizer(self):
return ZeroRedundancyOptimizer(
self.model.parameters(), torch.optim.Adam, lr=0.01
)
if self.__optimizer is None:
self.__optimizer = torch.optim.Adam(self.model.parameters(), lr=0.01,
weight_decay=0.0005)
return self.__optimizer

@property
def num_epochs(self) -> int:
Expand All @@ -89,7 +91,7 @@ def get_loader(self, epoch: int = 0, stage="train") -> int:
self.data,
None, # FIXME get input nodes properly
directory=path,
input_files=self.get_input_files(path),
input_files=self.get_input_files(path, epoch=epoch, stage=stage),
**self.__loader_kwargs,
)

Expand Down Expand Up @@ -169,7 +171,14 @@ def get_model(self, name="GraphSAGE"):

return model

def get_input_files(self, path):
def get_input_files(self, path, epoch=0, stage='train'):
file_list = np.array(os.listdir(path))
file_list.sort()

return np.array_split(file_list, self.__world_size)[self.__rank]
if stage == 'train':
splits = np.array_split(file_list, self.__world_size)
np.random.seed(epoch)
np.random.shuffle(splits)
return splits[self.rank]
else:
return file_list
46 changes: 32 additions & 14 deletions benchmarks/cugraph/standalone/bulk_sampling/trainers_pyg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import numpy as np

import torch.distributed as td
from torch.distributed.optim import ZeroRedundancyOptimizer
from torch.nn.parallel import DistributedDataParallel as ddp
import torch.nn.functional as F

Expand Down Expand Up @@ -63,7 +62,8 @@ def train(self):
end_time_backward = start_time

for epoch in range(self.num_epochs):
with td.algorithms.join.Join([self.model, self.optimizer]):
with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False):
self.model.train()
for iter_i, data in enumerate(
self.get_loader(epoch=epoch, stage="train")
):
Expand Down Expand Up @@ -112,6 +112,7 @@ def train(self):
start_time_forward = time.perf_counter()
edge_index = data.edge_index if "edge_index" in data else data.adj_t

self.optimizer.zero_grad()
y_pred = self.model(
x,
edge_index,
Expand Down Expand Up @@ -160,7 +161,8 @@ def train(self):
task="multiclass", num_classes=self.dataset.num_labels
).cuda()

with td.algorithms.join.Join([self.model, self.optimizer]):
with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False):
self.model.eval()
if self.rank == 0:
acc_sum = 0.0
with torch.no_grad():
Expand All @@ -169,13 +171,13 @@ def train(self):
):
num_sampled_nodes = sum(
[
torch.tensor(n)
torch.as_tensor(n)
for n in batch.num_sampled_nodes_dict.values()
]
)
num_sampled_edges = sum(
[
torch.tensor(e)
torch.as_tensor(e)
for e in batch.num_sampled_edges_dict.values()
]
)
Expand All @@ -199,7 +201,8 @@ def train(self):

td.barrier()

with td.algorithms.join.Join([self.model, self.optimizer]):
with td.algorithms.join.Join([self.model], divide_by_initial_world_size=False):
self.model.eval()
if self.rank == 0:
acc_sum = 0.0
with torch.no_grad():
Expand All @@ -208,13 +211,13 @@ def train(self):
):
num_sampled_nodes = sum(
[
torch.tensor(n)
torch.as_tensor(n)
for n in batch.num_sampled_nodes_dict.values()
]
)
num_sampled_edges = sum(
[
torch.tensor(e)
torch.as_tensor(e)
for e in batch.num_sampled_edges_dict.values()
]
)
Expand Down Expand Up @@ -266,6 +269,11 @@ def __init__(
self.__world_size = world_size
self.__loader_kwargs = kwargs
self.__model = self.get_model(model)
self.__optimizer = None

@property
def rank(self):
return self.__rank

@property
def model(self):
Expand Down Expand Up @@ -331,25 +339,35 @@ def data(self):

@property
def optimizer(self):
return ZeroRedundancyOptimizer(
self.model.parameters(), torch.optim.Adam, lr=0.01
)
if self.__optimizer is None:
self.__optimizer = torch.optim.Adam(self.model.parameters(), lr=0.01,
weight_decay=0.0005)
return self.__optimizer

@property
def num_epochs(self) -> int:
return self.__num_epochs

def get_loader(self, epoch: int):
def get_loader(self, epoch: int = 0, stage="train"):
import logging

logger = logging.getLogger("PyGNativeTrainer")
logger.info(f"Getting loader for epoch {epoch}")

if stage == 'train':
mask_dict = self.__dataset.train_dict
elif stage == 'test':
mask_dict = self.__dataset.test_dict
elif stage == 'val':
mask_dict = self.__dataset.val_dict
else:
raise ValueError(f"Invalid stage {stage}")

input_nodes_dict = {
node_type: np.array_split(
np.arange(len(train_mask))[train_mask], self.__world_size
np.arange(len(mask))[mask], self.__world_size
)[self.__rank]
for node_type, train_mask in self.__dataset.train_dict.items()
for node_type, mask in mask_dict.items()
}

input_nodes = list(input_nodes_dict.items())
Expand Down
2 changes: 2 additions & 0 deletions cugraph_sampling_stats.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
,dataset,num_input_edges,directed,renumber,input_memory_per_worker,peak_allocation_across_workers,input_to_peak_ratio,output_to_peak_ratio
0,ogbn_papers100M,1615685872,,,3.0GB,9.6GB,3.19473532507314,7.7350550067817485

0 comments on commit 367c79c

Please sign in to comment.