diff --git a/python/graphstorm/dataloading/dataset.py b/python/graphstorm/dataloading/dataset.py index bb94e5bcbf..d1b26bdea8 100644 --- a/python/graphstorm/dataloading/dataset.py +++ b/python/graphstorm/dataloading/dataset.py @@ -579,10 +579,32 @@ def _get_node_set(self, ntypes, mask): idx = [] if idx is None else idx num_data += len(idx) # If there are validation/test data globally, we should add them to the dict. - if dist_sum(len(idx)) > 0: - idxs[ntype] = idx - - logging.debug('part %d | ntype %s, mask %s | num nodes: %d', + total_num_idx = dist_sum(len(idx)) + if total_num_idx > 0: + if total_num_idx >= get_world_size(): + # The size of the validation or test set is larger + # than the world size. Each validation/test dataloader + # will not be empty + idxs[ntype] = idx + else: + # There is not enough validation or test data. + # One or more validation/test dataloader will be + # empty, which will cause an evaluation error. + # + # To avoid the error, force each trainer or + # inferencer to use the entire validation + # or test set. + idx = th.nonzero(g.nodes[ntype].data[msk][ \ + th.arange(g.num_nodes(ntype))]).reshape(-1,) # 1D tensor + idxs[ntype] = idx + logging.warning("Since the total number of validation/test data" + "of %s, which is %d, is less than the number of " + "workers %d, we will force each worker to do " + "validation or testing on the entire " + "validation/test set.", + ntype, total_num_idx, get_world_size()) + + logging.debug('part %d | ntype %s, mask %s | val/test: %d', get_rank(), ntype, msk, len(idx)) return idxs, num_data @@ -814,8 +836,30 @@ def _get_edge_set(self, etypes, mask, reverse_edge_types_map): idx = [] if idx is None else idx num_data += len(idx) # If there are validation data globally, we should add them to the dict. - if dist_sum(len(idx)) > 0: - idxs[canonical_etype] = idx + total_num_idx = dist_sum(len(idx)) + if total_num_idx > 0: + if total_num_idx >= get_world_size(): + # The size of the validation or test set is larger + # than the world size. Each validation/test dataloader + # will not be empty + idxs[canonical_etype] = idx + else: + # There is not enough validation or test data. + # One or more validation/test dataloader will be + # empty, which will cause an evaluation error. + # + # To avoid the error, force each trainer or + # inferencer to use the entire validation + # or test set. + idx = th.nonzero(g.edges[canonical_etype].data[msk][\ + th.arange(g.num_edges(canonical_etype))]).reshape(-1,) # 1D tensor + idxs[canonical_etype] = idx + logging.warning("Since the total number of validation/test data" + "of %s, which is %d, is less than the number of " + "workers %d, we will force each worker to do " + "validation or testing on the entire " + "validation/test set.", + canonical_etype, total_num_idx, get_world_size()) logging.debug('part %d | etype %s, mask %s | val/test: %d', get_rank(), canonical_etype, msk, len(idx)) diff --git a/python/graphstorm/model/multitask_gnn.py b/python/graphstorm/model/multitask_gnn.py index f5b964e33e..2749408fe2 100644 --- a/python/graphstorm/model/multitask_gnn.py +++ b/python/graphstorm/model/multitask_gnn.py @@ -553,14 +553,14 @@ def multi_task_mini_batch_predict( device, return_proba, return_label) - assert labels is None or len(labels) == 1, \ + assert not labels or len(labels) == 1, \ "In multi-task learning, for each training task, " \ "we only support prediction on one node type." \ "For multiple node types, please treat them as " \ "different training tasks." ntype = list(preds.keys())[0] res[task_info.task_id] = (preds[ntype], labels[ntype] \ - if labels is not None else None) + if labels else None) elif task_info.task_type in \ [BUILTIN_TASK_EDGE_CLASSIFICATION, BUILTIN_TASK_EDGE_REGRESSION]: if dataloader is None: @@ -576,14 +576,14 @@ def multi_task_mini_batch_predict( device, return_proba, return_label) - assert labels is None or len(labels) == 1, \ + assert not labels or len(labels) == 1, \ "In multi-task learning, for each training task, " \ "we only support prediction on one edge type." \ "For multiple edge types, please treat them as " \ "different training tasks." etype = list(preds.keys())[0] res[task_info.task_id] = (preds[etype], labels[etype] \ - if labels is not None else None) + if labels else None) elif task_info.task_type in [BUILTIN_TASK_LINK_PREDICTION]: if dataloader is None: # In cases when there is no validation or test set. diff --git a/tests/end2end-tests/create_data.sh b/tests/end2end-tests/create_data.sh index d7ce43b278..1a79ffa133 100644 --- a/tests/end2end-tests/create_data.sh +++ b/tests/end2end-tests/create_data.sh @@ -16,6 +16,13 @@ python3 -m graphstorm.gconstruct.construct_graph \ --graph-name movie-lens-100k \ --add-reverse-edges +python3 -m graphstorm.gconstruct.construct_graph \ + --conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_small_val.json \ + --num-processes 1 \ + --output-dir movielen_100k_train_small_val_1p_4t \ + --graph-name movie-lens-100k \ + --add-reverse-edges + # movielens node classification removing test mask rm -Rf /data/movielen_100k_train_notest_1p_4t cp -R /data/movielen_100k_train_val_1p_4t /data/movielen_100k_train_notest_1p_4t diff --git a/tests/end2end-tests/data_gen/movielens_small_val.json b/tests/end2end-tests/data_gen/movielens_small_val.json new file mode 100644 index 0000000000..7586056496 --- /dev/null +++ b/tests/end2end-tests/data_gen/movielens_small_val.json @@ -0,0 +1,55 @@ +{ + "version": "gconstruct-v0.1", + "nodes": [ + { + "node_id_col": "id", + "node_type": "user", + "format": {"name": "hdf5"}, + "files": "/data/ml-100k/user.hdf5", + "features": [ + { + "feature_col": "feat" + } + ] + }, + { + "node_id_col": "id", + "node_type": "movie", + "format": {"name": "parquet"}, + "files": "/data/ml-100k/movie.parquet", + "features": [ + { + "feature_col": "title", + "transform": { + "name": "bert_hf", + "bert_model": "bert-base-uncased", + "max_seq_length": 16 + } + } + ], + "labels": [ + { + "label_col": "label", + "task_type": "classification", + "split_pct": [0.8, 0.001, 0.001] + } + ] + } + ], + "edges": [ + { + "source_id_col": "src_id", + "dest_id_col": "dst_id", + "relation": ["user", "rating", "movie"], + "format": {"name": "parquet"}, + "files": "/data/ml-100k/edges.parquet", + "labels": [ + { + "label_col": "rate", + "task_type": "classification", + "split_pct": [0.1, 0.00001, 0.00001] + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/end2end-tests/graphstorm-ec/mgpu_test.sh b/tests/end2end-tests/graphstorm-ec/mgpu_test.sh index c8e0bb72f4..c3dc013dc8 100644 --- a/tests/end2end-tests/graphstorm-ec/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-ec/mgpu_test.sh @@ -336,4 +336,9 @@ fi rm /tmp/train_log.txt +echo "**************dataset: Generated multilabel MovieLens EC, RGCN layer: 1, node feat: generated feature, inference: full graph, exclude-training-targets: True, tiny valset" +python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --exclude-training-targets True --node-feat-name movie:title user:feat --use-mini-batch-infer false --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl + +error_and_exit $? + rm -fr /tmp/* diff --git a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh index bc14f9f967..3077f469ad 100644 --- a/tests/end2end-tests/graphstorm-nc/mgpu_test.sh +++ b/tests/end2end-tests/graphstorm-nc/mgpu_test.sh @@ -618,4 +618,9 @@ fi rm /tmp/train_log.txt +echo "**************dataset: MovieLens classification, RGCN layer: 1, node feat: fixed HF BERT, BERT nodes: movie, inference: mini-batch save model save emb node with tiny val set" +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /data/movielen_100k_train_small_val_1p_4t/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --num-epochs 3 --logging-file /tmp/train_log.txt --logging-level debug --preserve-input True --backend nccl --node-feat-name movie:title user:feat + +error_and_exit $? + rm -fr /tmp/* diff --git a/tests/unit-tests/data_utils.py b/tests/unit-tests/data_utils.py index 362eba0a3e..45cffa23a8 100644 --- a/tests/unit-tests/data_utils.py +++ b/tests/unit-tests/data_utils.py @@ -103,10 +103,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False): node_train_mask = generate_mask([0,1], data_size) node_val_mask = generate_mask([2,3], data_size) node_test_mask = generate_mask([4,5], data_size) + node_val_mask2 = generate_mask([2], data_size) + node_test_mask2 = generate_mask([4], data_size) edge_train_mask = generate_mask([0,1], 2 * data_size) edge_val_mask = generate_mask([2,3], 2 * data_size) edge_test_mask = generate_mask([4,5], 2 * data_size) + edge_val_mask_2 = generate_mask([2], 2 * data_size) + edge_test_mask_2 = generate_mask([4], 2 * data_size) edge_train_mask2 = generate_mask([i for i in range(data_size//2)], data_size) edge_val_mask2 = generate_mask([2,3], data_size) @@ -115,10 +119,14 @@ def generate_dummy_hetero_graph(size='tiny', gen_mask=True, add_reverse=False): hetero_graph.nodes[target_ntype[0]].data['train_mask'] = node_train_mask hetero_graph.nodes[target_ntype[0]].data['val_mask'] = node_val_mask hetero_graph.nodes[target_ntype[0]].data['test_mask'] = node_test_mask + hetero_graph.nodes[target_ntype[0]].data['val_mask2'] = node_val_mask2 + hetero_graph.nodes[target_ntype[0]].data['test_mask2'] = node_test_mask2 hetero_graph.edges[target_etype[0]].data['train_mask'] = edge_train_mask hetero_graph.edges[target_etype[0]].data['val_mask'] = edge_val_mask hetero_graph.edges[target_etype[0]].data['test_mask'] = edge_test_mask + hetero_graph.edges[target_etype[0]].data['val_mask2'] = edge_val_mask_2 + hetero_graph.edges[target_etype[0]].data['test_mask2'] = edge_test_mask_2 hetero_graph.edges[target_etype[1]].data['train_mask'] = edge_train_mask2 hetero_graph.edges[target_etype[1]].data['val_mask'] = edge_val_mask2 diff --git a/tests/unit-tests/test_dataloading.py b/tests/unit-tests/test_dataloading.py index cbcf0f5d61..18fb8df08b 100644 --- a/tests/unit-tests/test_dataloading.py +++ b/tests/unit-tests/test_dataloading.py @@ -2399,7 +2399,108 @@ def test_GSgnnMultiTaskDataLoader(): assert np.any(edge0_seeds_cnt.numpy() <= 1) assert np.any(edge0_seeds_cnt.numpy() >= 0) +def run_dist_small_val_test(part_config, worker_rank, world_size): + dist_init_method = 'tcp://{master_ip}:{master_port}'.format( + master_ip='127.0.0.1', master_port='12345') + th.distributed.init_process_group(backend="gloo", + init_method=dist_init_method, + world_size=world_size, + rank=worker_rank) + dgl.distributed.initialize('') + gs_data = GSgnnData(part_config=part_config) + + @patch("dgl.distributed.edge_split") + @patch("dgl.distributed.node_split") + def check_val_test(mock_node_split, mock_edge_split): + ####### + # Node mask + total_idx = th.tensor([1,2,3,4]) + split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:] + mock_node_split.side_effect = [split_ret, split_ret] + # Mocked val has 4 labeled data + # each worker will get 2 + ntype = 'n1' + idx = gs_data.get_node_val_set(ntype, mask="val_mask") + assert len(idx[ntype]) == 2 + # Mocked test has 4 labeled data + # each worker will get 2 + idx = gs_data.get_node_test_set(ntype, mask="test_mask") + assert len(idx[ntype]) == 2 + + total_idx = th.tensor([1]) + split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1] + mock_node_split.side_effect = [split_ret, split_ret] + # Mocked val has only 1 labeled data + # Thus, both worker 0 and worker 1 will + # take the same validation set. + idx = gs_data.get_node_val_set(ntype, mask="val_mask2") + assert len(idx[ntype]) == 1 + assert th.nonzero(gs_data._g.nodes[ntype].data['val_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0].item() == idx[ntype][0].item() + + # Mocked test has only 1 labeled data + # Thus, both worker 0 and worker 1 will + # take the same test set. + idx = gs_data.get_node_test_set(ntype, mask="test_mask2") + assert len(idx[ntype]) == 1 + assert th.nonzero(gs_data._g.nodes[ntype].data['test_mask2'][th.arange(gs_data._g.num_nodes(ntype))]).reshape(-1,)[0] == idx[ntype][0] + + ####### + # Edge mask + etype = ("n0", "r1", "n1") + total_idx = th.tensor([1,2,3,4]) + split_ret = total_idx[:2] if worker_rank == 0 else total_idx[2:] + mock_edge_split.side_effect = [split_ret, split_ret] + + # Mocked val has two labeled data + # each worker will get 1 + idx = gs_data.get_edge_val_set(etype, mask="val_mask") + assert len(idx[etype]) == 2 + # Mocked test has two labeled data + # each worker will get 1 + idx = gs_data.get_edge_test_set(etype, mask="test_mask") + assert len(idx[etype]) == 2 + + total_idx = th.tensor([1]) + split_ret = total_idx[:1] if worker_rank == 0 else total_idx[1:1] + mock_edge_split.side_effect = [split_ret, split_ret] + # Mocked val has only 1 labeled data + # Thus, both worker 0 and worker 1 will + # take the same validation set. + idx = gs_data.get_edge_val_set(etype, mask="val_mask2") + assert len(idx[etype]) == 1 + assert th.nonzero(gs_data._g.edges[etype].data['val_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0].item() == idx[etype][0].item() + + # mocked test has only 1 labeled data + # Thus, both worker 0 and worker 1 will + # take the same test set. + idx = gs_data.get_edge_test_set(etype, mask="test_mask2") + assert len(idx[etype]) == 1 + assert th.nonzero(gs_data._g.edges[etype].data['test_mask2'][th.arange(gs_data._g.num_edges(etype))]).reshape(-1,)[0] == idx[etype][0] + + check_val_test() + + if worker_rank == 0: + th.distributed.destroy_process_group() + +def test_GSgnnTranData_small_val_test(): + with tempfile.TemporaryDirectory() as tmpdirname: + _, part_config = generate_dummy_dist_graph(tmpdirname) + + ctx = mp.get_context('spawn') + p0 = ctx.Process(target=run_dist_small_val_test, + args=(part_config, 0, 2)) + p1 = ctx.Process(target=run_dist_small_val_test, + args=(part_config, 1, 2)) + + p0.start() + p1.start() + p0.join() + p1.join() + assert p0.exitcode == 0 + assert p1.exitcode == 0 + if __name__ == '__main__': + test_GSgnnTranData_small_val_test() test_GSgnnLinkPredictionTestDataLoader(1, 1) test_GSgnnLinkPredictionTestDataLoader(10, 20) test_GSgnnMultiTaskDataLoader()