Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug when the number of val/test samples is small #968

Merged
merged 5 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions python/graphstorm/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,26 @@ def _get_node_set(self, ntypes, mask):
idx = [] if idx is None else idx
num_data += len(idx)
# If there are validation/test data globally, we should add them to the dict.
if dist_sum(len(idx)) > 0:
idxs[ntype] = idx

logging.debug('part %d | ntype %s, mask %s | num nodes: %d',
total_num_idx = dist_sum(len(idx))
if total_num_idx > 0:
if total_num_idx >= get_world_size():
# The size of the validation or test set is larger
# than the world size. Each validation/test dataloader
# will not be empty
idxs[ntype] = idx
else:
# There is not enough validation or test data.
# One or more validation/test dataloader will be
# empty, which will cause an evaluation error.
#
# To avoid the error, force each trainer or
# inferencer to use the entire validation
# or test set.
idx = th.nonzero(g.nodes[ntype].data[msk][ \
classicsong marked this conversation as resolved.
Show resolved Hide resolved
th.arange(g.num_nodes(ntype))]).reshape(-1,) # 1D tensor
idxs[ntype] = idx

logging.debug('part %d | ntype %s, mask %s | val/test: %d',
get_rank(), ntype, msk, len(idx))
return idxs, num_data

Expand Down Expand Up @@ -814,8 +830,24 @@ def _get_edge_set(self, etypes, mask, reverse_edge_types_map):
idx = [] if idx is None else idx
num_data += len(idx)
# If there are validation data globally, we should add them to the dict.
if dist_sum(len(idx)) > 0:
idxs[canonical_etype] = idx
total_num_idx = dist_sum(len(idx))
if total_num_idx > 0:
if total_num_idx >= get_world_size():
# The size of the validation or test set is larger
# than the world size. Each validation/test dataloader
# will not be empty
idxs[canonical_etype] = idx
else:
# There is not enough validation or test data.
# One or more validation/test dataloader will be
# empty, which will cause an evaluation error.
#
# To avoid the error, force each trainer or
# inferencer to use the entire validation
# or test set.
classicsong marked this conversation as resolved.
Show resolved Hide resolved
idx = th.nonzero(g.edges[canonical_etype].data[msk][\
th.arange(g.num_edges(canonical_etype))]).reshape(-1,) # 1D tensor
idxs[canonical_etype] = idx

logging.debug('part %d | etype %s, mask %s | val/test: %d',
get_rank(), canonical_etype, msk, len(idx))
Expand Down
8 changes: 4 additions & 4 deletions python/graphstorm/model/multitask_gnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,14 @@ def multi_task_mini_batch_predict(
device,
return_proba,
return_label)
assert labels is None or len(labels) == 1, \
assert not labels or len(labels) == 1, \
jalencato marked this conversation as resolved.
Show resolved Hide resolved
"In multi-task learning, for each training task, " \
"we only support prediction on one node type." \
"For multiple node types, please treat them as " \
"different training tasks."
ntype = list(preds.keys())[0]
res[task_info.task_id] = (preds[ntype], labels[ntype] \
if labels is not None else None)
if labels else None)
elif task_info.task_type in \
[BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION]:
if dataloader is None:
Expand All @@ -576,14 +576,14 @@ def multi_task_mini_batch_predict(
device,
return_proba,
return_label)
assert labels is None or len(labels) == 1, \
assert not labels or len(labels) == 1, \
"In multi-task learning, for each training task, " \
"we only support prediction on one edge type." \
"For multiple edge types, please treat them as " \
"different training tasks."
etype = list(preds.keys())[0]
res[task_info.task_id] = (preds[etype], labels[etype] \
if labels is not None else None)
if labels else None)
elif task_info.task_type in [BUILTIN_TASK_LINK_PREDICTION]:
if dataloader is None:
# In cases when there is no validation or test set.
Expand Down
7 changes: 7 additions & 0 deletions tests/end2end-tests/create_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ python3 -m graphstorm.gconstruct.construct_graph \
--graph-name movie-lens-100k \
--add-reverse-edges

python3 -m graphstorm.gconstruct.construct_graph \
--conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_small_val.json \
--num-processes 1 \
--output-dir movielen_100k_train_small_val_1p_4t \
--graph-name movie-lens-100k \
--add-reverse-edges

# movielens node classification removing test mask
rm -Rf /data/movielen_100k_train_notest_1p_4t
cp -R /data/movielen_100k_train_val_1p_4t /data/movielen_100k_train_notest_1p_4t
Expand Down
55 changes: 55 additions & 0 deletions tests/end2end-tests/data_gen/movielens_small_val.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"version": "gconstruct-v0.1",
"nodes": [
{
"node_id_col": "id",
"node_type": "user",
"format": {"name": "hdf5"},
"files": "/data/ml-100k/user.hdf5",
"features": [
{
"feature_col": "feat"
}
]
},
{
"node_id_col": "id",
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "title",
"transform": {
"name": "bert_hf",
"bert_model": "bert-base-uncased",
"max_seq_length": 16
}
}
],
"labels": [
{
"label_col": "label",
"task_type": "classification",
"split_pct": [0.8, 0.001, 0.001]
}
]
}
],
"edges": [
{
"source_id_col": "src_id",
"dest_id_col": "dst_id",
"relation": ["user", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges.parquet",
"labels": [
{
"label_col": "rate",
"task_type": "classification",
"split_pct": [0.1, 0.00001, 0.00001]
jalencato marked this conversation as resolved.
Show resolved Hide resolved
}
]
}
]
}
5 changes: 5 additions & 0 deletions tests/end2end-tests/graphstorm-ec/mgpu_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,9 @@ fi

rm /tmp/train_log.txt

echo "**************dataset: Generated multilabel MovieLens EC, RGCN layer: 1, node feat: generated feature, inference: full graph, exclude-training-targets: True, tiny valset"
python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --exclude-training-targets True --node-feat-name movie:title user:feat --use-mini-batch-infer false --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl

error_and_exit $?

rm -fr /tmp/*
5 changes: 5 additions & 0 deletions tests/end2end-tests/graphstorm-nc/mgpu_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,9 @@ fi

rm /tmp/train_log.txt

echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: fixed HF BERT, BERT nodes: movie, inference: mini-batch save model save emb node with tiny val set"
python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl --node-feat-name movie:title user:feat

error_and_exit $?

rm -fr /tmp/*
8 changes: 8 additions & 0 deletions tests/unit-tests/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False):
node_train_mask = generate_mask([0,1], data_size)
node_val_mask = generate_mask([2,3], data_size)
node_test_mask = generate_mask([4,5], data_size)
node_val_mask2 = generate_mask([2], data_size)
node_test_mask2 = generate_mask([4], data_size)

edge_train_mask = generate_mask([0,1], 2 * data_size)
edge_val_mask = generate_mask([2,3], 2 * data_size)
edge_test_mask = generate_mask([4,5], 2 * data_size)
edge_val_mask_2 = generate_mask([2], 2 * data_size)
edge_test_mask_2 = generate_mask([4], 2 * data_size)

edge_train_mask2 = generate_mask([i for i in range(data_size//2)], data_size)
edge_val_mask2 = generate_mask([2,3], data_size)
Expand All @@ -115,10 +119,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False):
hetero_graph.nodes[target_ntype[0]].data['train_mask'] = node_train_mask
hetero_graph.nodes[target_ntype[0]].data['val_mask'] = node_val_mask
hetero_graph.nodes[target_ntype[0]].data['test_mask'] = node_test_mask
hetero_graph.nodes[target_ntype[0]].data['val_mask2'] = node_val_mask2
hetero_graph.nodes[target_ntype[0]].data['test_mask2'] = node_test_mask2

hetero_graph.edges[target_etype[0]].data['train_mask'] = edge_train_mask
hetero_graph.edges[target_etype[0]].data['val_mask'] = edge_val_mask
hetero_graph.edges[target_etype[0]].data['test_mask'] = edge_test_mask
hetero_graph.edges[target_etype[0]].data['val_mask2'] = edge_val_mask_2
hetero_graph.edges[target_etype[0]].data['test_mask2'] = edge_test_mask_2

hetero_graph.edges[target_etype[1]].data['train_mask'] = edge_train_mask2
hetero_graph.edges[target_etype[1]].data['val_mask'] = edge_val_mask2
Expand Down
101 changes: 101 additions & 0 deletions tests/unit-tests/test_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,108 @@ def test_GSgnnMultiTaskDataLoader():
assert np.any(edge0_seeds_cnt.numpy() <= 1)
assert np.any(edge0_seeds_cnt.numpy() >= 0)

def run_dist_small_val_test(part_config, worker_rank, world_size):
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12345')
th.distributed.init_process_group(backend="gloo",
init_method=dist_init_method,
world_size=world_size,
rank=worker_rank)
dgl.distributed.initialize('')
gs_data = GSgnnData(part_config=part_config)

@patch("dgl.distributed.edge_split")
@patch("dgl.distributed.node_split")
def check_val_test(mock_node_split, mock_edge_split):
#######
# Node mask
total_idx = th.tensor([1,2,3,4])
split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:]
mock_node_split.side_effect = [split_ret, split_ret]
# Mocked val has 4 labeled data
# each worker will get 2
ntype = 'n1'
idx = gs_data.get_node_val_set(ntype, mask="val_mask")
assert len(idx[ntype]) == 2
# Mocked test has 4 labeled data
# each worker will get 2
idx = gs_data.get_node_test_set(ntype, mask="test_mask")
assert len(idx[ntype]) == 2

total_idx = th.tensor([1])
split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1]
mock_node_split.side_effect = [split_ret, split_ret]
# Mocked val has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same validation set.
idx = gs_data.get_node_val_set(ntype, mask="val_mask2")
assert len(idx[ntype]) == 1
assert th.nonzero(gs_data._g.nodes[ntype].data['val_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0].item() == idx[ntype][0].item()

# Mocked test has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same test set.
idx = gs_data.get_node_test_set(ntype, mask="test_mask2")
assert len(idx[ntype]) == 1
assert th.nonzero(gs_data._g.nodes[ntype].data['test_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0] == idx[ntype][0]

#######
# Edge mask
etype = ("n0", "r1", "n1")
total_idx = th.tensor([1,2,3,4])
split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:]
mock_edge_split.side_effect = [split_ret, split_ret]

# Mocked val has two labeled data
# each worker will get 1
idx = gs_data.get_edge_val_set(etype, mask="val_mask")
assert len(idx[etype]) == 2
# Mocked test has two labeled data
# each worker will get 1
idx = gs_data.get_edge_test_set(etype, mask="test_mask")
assert len(idx[etype]) == 2

total_idx = th.tensor([1])
split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1]
mock_edge_split.side_effect = [split_ret, split_ret]
# Mocked val has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same validation set.
idx = gs_data.get_edge_val_set(etype, mask="val_mask2")
assert len(idx[etype]) == 1
assert th.nonzero(gs_data._g.edges[etype].data['val_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0].item() == idx[etype][0].item()

# mocked test has only 1 labeled data
# Thus, both worker 0 and worker 1 will
# take the same test set.
idx = gs_data.get_edge_test_set(etype, mask="test_mask2")
assert len(idx[etype]) == 1
assert th.nonzero(gs_data._g.edges[etype].data['test_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0] == idx[etype][0]

check_val_test()

if worker_rank == 0:
th.distributed.destroy_process_group()

def test_GSgnnTranData_small_val_test():
with tempfile.TemporaryDirectory() as tmpdirname:
_, part_config = generate_dummy_dist_graph(tmpdirname)

ctx = mp.get_context('spawn')
p0 = ctx.Process(target=run_dist_small_val_test,
args=(part_config, 0, 2))
p1 = ctx.Process(target=run_dist_small_val_test,
args=(part_config, 1, 2))

p0.start()
p1.start()
p0.join()
p1.join()
assert p0.exitcode == 0
assert p1.exitcode == 0

if __name__ == '__main__':
test_GSgnnTranData_small_val_test()
test_GSgnnLinkPredictionTestDataLoader(1, 1)
test_GSgnnLinkPredictionTestDataLoader(10, 20)
test_GSgnnMultiTaskDataLoader()
Expand Down
Loading