Skip to content

Commit

Permalink
Fix the bug when the number of val/test samples is small (#968)
Browse files Browse the repository at this point in the history
*Issue #, if available:*
#959 

*Description of changes:*


By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.

---------

Co-authored-by: Xiang Song <[email protected]>
  • Loading branch information
2 people authored and jalencato committed Aug 16, 2024
1 parent b4ffede commit 64774bb
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 10 deletions.
56 changes: 50 additions & 6 deletions python/graphstorm/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,32 @@ def _get_node_set(self, ntypes, mask):
idx = [] if idx is None else idx
num_data += len(idx)
# If there are validation/test data globally, we should add them to the dict.
if dist_sum(len(idx)) > 0:
idxs[ntype] = idx

logging.debug('part %d | ntype %s, mask %s | num nodes: %d',
total_num_idx = dist_sum(len(idx))
if total_num_idx > 0:
if total_num_idx >= get_world_size():
# The size of the validation or test set is larger
# than the world size. Each validation/test dataloader
# will not be empty
idxs[ntype] = idx
else:
# There is not enough validation or test data.
# One or more validation/test dataloader will be
# empty, which will cause an evaluation error.
#
# To avoid the error, force each trainer or
# inferencer to use the entire validation
# or test set.
idx = th.nonzero(g.nodes[ntype].data[msk][ \
th.arange(g.num_nodes(ntype))]).reshape(-1,) # 1D tensor
idxs[ntype] = idx
logging.warning("Since the total number of validation/test data"
"of %s, which is %d, is less than the number of "
"workers %d, we will force each worker to do "
"validation or testing on the entire "
"validation/test set.",
ntype, total_num_idx, get_world_size())

logging.debug('part %d | ntype %s, mask %s | val/test: %d',
get_rank(), ntype, msk, len(idx))
return idxs, num_data

Expand Down Expand Up @@ -814,8 +836,30 @@ def _get_edge_set(self, etypes, mask, reverse_edge_types_map):
idx = [] if idx is None else idx
num_data += len(idx)
# If there are validation data globally, we should add them to the dict.
if dist_sum(len(idx)) > 0:
idxs[canonical_etype] = idx
total_num_idx = dist_sum(len(idx))
if total_num_idx > 0:
if total_num_idx >= get_world_size():
# The size of the validation or test set is larger
# than the world size. Each validation/test dataloader
# will not be empty
idxs[canonical_etype] = idx
else:
# There is not enough validation or test data.
# One or more validation/test dataloader will be
# empty, which will cause an evaluation error.
#
# To avoid the error, force each trainer or
# inferencer to use the entire validation
# or test set.
idx = th.nonzero(g.edges[canonical_etype].data[msk][\
th.arange(g.num_edges(canonical_etype))]).reshape(-1,) # 1D tensor
idxs[canonical_etype] = idx
logging.warning("Since the total number of validation/test data"
"of %s, which is %d, is less than the number of "
"workers %d, we will force each worker to do "
"validation or testing on the entire "
"validation/test set.",
canonical_etype, total_num_idx, get_world_size())

logging.debug('part %d | etype %s, mask %s | val/test: %d',
get_rank(), canonical_etype, msk, len(idx))
Expand Down
8 changes: 4 additions & 4 deletions python/graphstorm/model/multitask_gnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,14 @@ def multi_task_mini_batch_predict(
device,
return_proba,
return_label)
assert labels is None or len(labels) == 1, \
assert not labels or len(labels) == 1, \
"In multi-task learning, for each training task, " \
"we only support prediction on one node type." \
"For multiple node types, please treat them as " \
"different training tasks."
ntype = list(preds.keys())[0]
res[task_info.task_id] = (preds[ntype], labels[ntype] \
if labels is not None else None)
if labels else None)
elif task_info.task_type in \
[BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION]:
if dataloader is None:
Expand All @@ -576,14 +576,14 @@ def multi_task_mini_batch_predict(
device,
return_proba,
return_label)
assert labels is None or len(labels) == 1, \
assert not labels or len(labels) == 1, \
"In multi-task learning, for each training task, " \
"we only support prediction on one edge type." \
"For multiple edge types, please treat them as " \
"different training tasks."
etype = list(preds.keys())[0]
res[task_info.task_id] = (preds[etype], labels[etype] \
if labels is not None else None)
if labels else None)
elif task_info.task_type in [BUILTIN_TASK_LINK_PREDICTION]:
if dataloader is None:
# In cases when there is no validation or test set.
Expand Down
7 changes: 7 additions & 0 deletions tests/end2end-tests/create_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ python3 -m graphstorm.gconstruct.construct_graph \
--graph-name movie-lens-100k \
--add-reverse-edges

python3 -m graphstorm.gconstruct.construct_graph \
--conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_small_val.json \
--num-processes 1 \
--output-dir movielen_100k_train_small_val_1p_4t \
--graph-name movie-lens-100k \
--add-reverse-edges

# movielens node classification removing test mask
rm -Rf /data/movielen_100k_train_notest_1p_4t
cp -R /data/movielen_100k_train_val_1p_4t /data/movielen_100k_train_notest_1p_4t
Expand Down
55 changes: 55 additions & 0 deletions tests/end2end-tests/data_gen/movielens_small_val.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"version": "gconstruct-v0.1",
"nodes": [
{
"node_id_col": "id",
"node_type": "user",
"format": {"name": "hdf5"},
"files": "/data/ml-100k/user.hdf5",
"features": [
{
"feature_col": "feat"
}
]
},
{
"node_id_col": "id",
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "title",
"transform": {
"name": "bert_hf",
"bert_model": "bert-base-uncased",
"max_seq_length": 16
}
}
],
"labels": [
{
"label_col": "label",
"task_type": "classification",
"split_pct": [0.8, 0.001, 0.001]
}
]
}
],
"edges": [
{
"source_id_col": "src_id",
"dest_id_col": "dst_id",
"relation": ["user", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges.parquet",
"labels": [
{
"label_col": "rate",
"task_type": "classification",
"split_pct": [0.1, 0.00001, 0.00001]
}
]
}
]
}
5 changes: 5 additions & 0 deletions tests/end2end-tests/graphstorm-ec/mgpu_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,9 @@ fi

rm /tmp/train_log.txt

echo "**************dataset: Generated multilabel MovieLens EC, RGCN layer: 1, node feat: generated feature, inference: full graph, exclude-training-targets: True, tiny valset"
python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --exclude-training-targets True --node-feat-name movie:title user:feat --use-mini-batch-infer false --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl

error_and_exit $?

rm -fr /tmp/*
5 changes: 5 additions & 0 deletions tests/end2end-tests/graphstorm-nc/mgpu_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,9 @@ fi

rm /tmp/train_log.txt

echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: fixed HF BERT, BERT nodes: movie, inference: mini-batch save model save emb node with tiny val set"
python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl --node-feat-name movie:title user:feat

error_and_exit $?

rm -fr /tmp/*
8 changes: 8 additions & 0 deletions tests/unit-tests/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False):
node_train_mask = generate_mask([0,1], data_size)
node_val_mask = generate_mask([2,3], data_size)
node_test_mask = generate_mask([4,5], data_size)
node_val_mask2 = generate_mask([2], data_size)
node_test_mask2 = generate_mask([4], data_size)

edge_train_mask = generate_mask([0,1], 2 * data_size)
edge_val_mask = generate_mask([2,3], 2 * data_size)
edge_test_mask = generate_mask([4,5], 2 * data_size)
edge_val_mask_2 = generate_mask([2], 2 * data_size)
edge_test_mask_2 = generate_mask([4], 2 * data_size)

edge_train_mask2 = generate_mask([i for i in range(data_size//2)], data_size)
edge_val_mask2 = generate_mask([2,3], data_size)
Expand All @@ -115,10 +119,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False):
hetero_graph.nodes[target_ntype[0]].data['train_mask'] = node_train_mask
hetero_graph.nodes[target_ntype[0]].data['val_mask'] = node_val_mask
hetero_graph.nodes[target_ntype[0]].data['test_mask'] = node_test_mask
hetero_graph.nodes[target_ntype[0]].data['val_mask2'] = node_val_mask2
hetero_graph.nodes[target_ntype[0]].data['test_mask2'] = node_test_mask2

hetero_graph.edges[target_etype[0]].data['train_mask'] = edge_train_mask
hetero_graph.edges[target_etype[0]].data['val_mask'] = edge_val_mask
hetero_graph.edges[target_etype[0]].data['test_mask'] = edge_test_mask
hetero_graph.edges[target_etype[0]].data['val_mask2'] = edge_val_mask_2
hetero_graph.edges[target_etype[0]].data['test_mask2'] = edge_test_mask_2

hetero_graph.edges[target_etype[1]].data['train_mask'] = edge_train_mask2
hetero_graph.edges[target_etype[1]].data['val_mask'] = edge_val_mask2
Expand Down
101 changes: 101 additions & 0 deletions tests/unit-tests/test_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,108 @@ def test_GSgnnMultiTaskDataLoader():
assert np.any(edge0_seeds_cnt.numpy() <= 1)
assert np.any(edge0_seeds_cnt.numpy() >= 0)

def run_dist_small_val_test(part_config, worker_rank, world_size):
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
th.distributed.init_process_group(backend="gloo",
init_method=dist_init_method,
world_size=world_size,
rank=worker_rank)
dgl.distributed.initialize('')
gs_data = GSgnnData(part_config=part_config)

@patch("dgl.distributed.edge_split")
@patch("dgl.distributed.node_split")
def check_val_test(mock_node_split, mock_edge_split):
#######
# Node mask
total_idx = th.tensor([1,2,3,4])
split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:]
mock_node_split.side_effect = [split_ret, split_ret]
# Mocked val has 4 labeled data
# each worker will get 2
ntype = 'n1'
idx = gs_data.get_node_val_set(ntype, mask="val_mask")
assert len(idx[ntype]) == 2
# Mocked test has 4 labeled data
# each worker will get 2
idx = gs_data.get_node_test_set(ntype, mask="test_mask")
assert len(idx[ntype]) == 2

total_idx = th.tensor([1])
split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1]
mock_node_split.side_effect = [split_ret, split_ret]
# Mocked val has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same validation set.
idx = gs_data.get_node_val_set(ntype, mask="val_mask2")
assert len(idx[ntype]) == 1
assert th.nonzero(gs_data._g.nodes[ntype].data['val_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0].item() == idx[ntype][0].item()

# Mocked test has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same test set.
idx = gs_data.get_node_test_set(ntype, mask="test_mask2")
assert len(idx[ntype]) == 1
assert th.nonzero(gs_data._g.nodes[ntype].data['test_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0] == idx[ntype][0]

#######
# Edge mask
etype = ("n0", "r1", "n1")
total_idx = th.tensor([1,2,3,4])
split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:]
mock_edge_split.side_effect = [split_ret, split_ret]

# Mocked val has two labeled data
# each worker will get 1
idx = gs_data.get_edge_val_set(etype, mask="val_mask")
assert len(idx[etype]) == 2
# Mocked test has two labeled data
# each worker will get 1
idx = gs_data.get_edge_test_set(etype, mask="test_mask")
assert len(idx[etype]) == 2

total_idx = th.tensor([1])
split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1]
mock_edge_split.side_effect = [split_ret, split_ret]
# Mocked val has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same validation set.
idx = gs_data.get_edge_val_set(etype, mask="val_mask2")
assert len(idx[etype]) == 1
assert th.nonzero(gs_data._g.edges[etype].data['val_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0].item() == idx[etype][0].item()

# mocked test has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same test set.
idx = gs_data.get_edge_test_set(etype, mask="test_mask2")
assert len(idx[etype]) == 1
assert th.nonzero(gs_data._g.edges[etype].data['test_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0] == idx[etype][0]

check_val_test()

if worker_rank == 0:
th.distributed.destroy_process_group()

def test_GSgnnTranData_small_val_test():
with tempfile.TemporaryDirectory() as tmpdirname:
_, part_config = generate_dummy_dist_graph(tmpdirname)

ctx = mp.get_context('spawn')
p0 = ctx.Process(target=run_dist_small_val_test,
args=(part_config, 0, 2))
p1 = ctx.Process(target=run_dist_small_val_test,
args=(part_config, 1, 2))

p0.start()
p1.start()
p0.join()
p1.join()
assert p0.exitcode == 0
assert p1.exitcode == 0

if __name__ == '__main__':
test_GSgnnTranData_small_val_test()
test_GSgnnLinkPredictionTestDataLoader(1, 1)
test_GSgnnLinkPredictionTestDataLoader(10, 20)
test_GSgnnMultiTaskDataLoader()
Expand Down

0 comments on commit 64774bb

Please sign in to comment.