diff --git a/.github/workflow_scripts/e2e_check.sh b/.github/workflow_scripts/e2e_check.sh index 9851a35529..8c122c9f9d 100644 --- a/.github/workflow_scripts/e2e_check.sh +++ b/.github/workflow_scripts/e2e_check.sh @@ -8,6 +8,7 @@ sh ./tests/end2end-tests/create_data.sh sh ./tests/end2end-tests/tools/test_mem_est.sh sh ./tests/end2end-tests/data_process/test.sh sh ./tests/end2end-tests/data_process/movielens_test.sh +sh ./tests/end2end-tests/data_process/homogeneous_test.sh sh ./tests/end2end-tests/custom-gnn/run_test.sh bash ./tests/end2end-tests/graphstorm-nc/test.sh bash ./tests/end2end-tests/graphstorm-lp/test.sh diff --git a/python/graphstorm/gconstruct/construct_graph.py b/python/graphstorm/gconstruct/construct_graph.py index 0065da5403..259399328b 100644 --- a/python/graphstorm/gconstruct/construct_graph.py +++ b/python/graphstorm/gconstruct/construct_graph.py @@ -28,6 +28,7 @@ import numpy as np import torch as th import dgl +from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE from ..utils import sys_tracker, get_log_level from .file_io import parse_node_file_format, parse_edge_file_format @@ -582,8 +583,23 @@ def process_edge_data(process_confs, node_id_map, arr_merger, return (edges, edge_data, label_stats) +def is_homogeneous(confs): + """ Verify if it is a homogeneous graph + Parameter + --------- + confs: dict + A dict containing all user input config + """ + ntypes = {conf['node_type'] for conf in confs["nodes"]} + etypes = set(tuple(conf['relation']) for conf in confs["edges"]) + return len(ntypes) == 1 and len(etypes) == 1 + def verify_confs(confs): """ Verify the configuration of the input data. + Parameter + --------- + confs: dict + A dict containing all user input config """ if "version" not in confs: # TODO: Make a requirement with v1.0 launch @@ -599,6 +615,14 @@ def verify_confs(confs): f"source node type {src_type} does not exist. Please check your input data." assert dst_type in ntypes, \ f"dest node type {dst_type} does not exist. Please check your input data." + # Adjust input to DGL homogeneous graph format if it is a homogeneous graph + if is_homogeneous(confs): + logging.warning("Generated Graph is a homogeneous graph, so the node type will be " + "changed to _N and edge type will be changed to [_N, _E, _N]") + for node in confs['nodes']: + node['node_type'] = DEFAULT_NTYPE + for edge in confs['edges']: + edge['relation'] = list(DEFAULT_ETYPE) def print_graph_info(g, node_data, edge_data, node_label_stats, edge_label_stats): """ Print graph information. @@ -698,12 +722,35 @@ def process_graph(args): if args.add_reverse_edges: edges1 = {} - for etype in edges: - e = edges[etype] + if is_homogeneous(process_confs): + logging.warning("For homogeneous graph, the generated reverse edge will " + "be the same edge type as the original graph. Instead for " + "heterogeneous graph, the generated reverse edge type will " + "add -rev as a suffix") + e = edges[DEFAULT_ETYPE] assert isinstance(e, tuple) and len(e) == 2 - assert isinstance(etype, tuple) and len(etype) == 3 - edges1[etype] = e - edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0]) + edges1[DEFAULT_ETYPE] = (np.concatenate([e[0], e[1]]), + np.concatenate([e[1], e[0]])) + # Double edge feature as it is necessary to match tensor size in generated graph + # Only generate mask on original graph + if edge_data: + data = edge_data[DEFAULT_ETYPE] + logging.warning("Reverse edge for homogeneous graph will have same feature as " + "what we have in the original edges") + for key, value in data.items(): + if key not in ["train_mask", "test_mask", "val_mask"]: + data[key] = np.concatenate([value, value]) + else: + data[key] = np.concatenate([value, np.zeros(value.shape, + dtype=value.dtype)]) + + else: + for etype in edges: + e = edges[etype] + assert isinstance(e, tuple) and len(e) == 2 + assert isinstance(etype, tuple) and len(etype) == 3 + edges1[etype] = e + edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0]) edges = edges1 sys_tracker.check('Add reverse edges') g = dgl.heterograph(edges, num_nodes_dict=num_nodes) diff --git a/tests/end2end-tests/data_gen/movielens_homogeneous.json b/tests/end2end-tests/data_gen/movielens_homogeneous.json new file mode 100644 index 0000000000..018776e82e --- /dev/null +++ b/tests/end2end-tests/data_gen/movielens_homogeneous.json @@ -0,0 +1,63 @@ +{ + "version": "gconstruct-v0.1", + "nodes": [ + { + "node_id_col": "id", + "node_type": "movie", + "format": {"name": "parquet"}, + "files": "/data/ml-100k/movie.parquet", + "features": [ + { + "feature_col": "title", + "transform": { + "name": "bert_hf", + "bert_model": "bert-base-uncased", + "max_seq_length": 16 + } + } + ], + "labels": [ + { + "label_col": "label", + "task_type": "classification", + "split_pct": [0.8, 0.1, 0.1] + } + ] + }, + { + "node_type": "movie", + "format": {"name": "parquet"}, + "files": "/data/ml-100k/movie.parquet", + "features": [ + { + "feature_col": "id" + } + ] + } + ], + "edges": [ + { + "source_id_col": "src_id", + "dest_id_col": "dst_id", + "relation": ["movie", "rating", "movie"], + "format": {"name": "parquet"}, + "files": "/data/ml-100k/edges_homogeneous.parquet", + "features": [ + { + "feature_col": "rate" + }], + "labels": [ + { + "label_col": "rate", + "task_type": "classification", + "split_pct": [0.1, 0.1, 0.1] + } + ] + }, + { + "relation": ["movie", "rating", "movie"], + "format": {"name": "parquet"}, + "files": "/data/ml-100k/edges_homogeneous.parquet" + } + ] +} \ No newline at end of file diff --git a/tests/end2end-tests/data_gen/process_movielens.py b/tests/end2end-tests/data_gen/process_movielens.py index 90fdcd1702..a9ca90873e 100644 --- a/tests/end2end-tests/data_gen/process_movielens.py +++ b/tests/end2end-tests/data_gen/process_movielens.py @@ -90,6 +90,11 @@ def write_data_parquet(data, data_file): edge_data = {'src_id': edges[0], 'dst_id': edges[1], 'rate': edges[2]} write_data_parquet(edge_data, '/data/ml-100k/edges.parquet') +# generate data for homogeneous optimization test +edges = pandas.read_csv('/data/ml-100k/u.data', delimiter='\t', header=None) +edge_data = {'src_id': edges[1], 'dst_id': edges[1], 'rate': edges[2]} +write_data_parquet(edge_data, '/data/ml-100k/edges_homogeneous.parquet') + # generate synthetic user data with label user_labels = np.random.randint(11, size=feat.shape[0]) user_data = {'id': user['id'].values, 'feat': feat, 'occupation': user['occupation'], 'label': user_labels} diff --git a/tests/end2end-tests/data_process/check_homogeneous.py b/tests/end2end-tests/data_process/check_homogeneous.py new file mode 100644 index 0000000000..daeb6f0ada --- /dev/null +++ b/tests/end2end-tests/data_process/check_homogeneous.py @@ -0,0 +1,60 @@ +""" + Copyright 2023 Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +""" +import os +import argparse +import dgl +from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE +from numpy.testing import assert_almost_equal + + +def check_reverse_edge(args): + + g_orig = dgl.load_graphs(os.path.join(args.orig_graph_path, "graph.dgl"))[0][0] + g_rev = dgl.load_graphs(os.path.join(args.rev_graph_path, "graph.dgl"))[0][0] + assert g_orig.ntypes == g_rev.ntypes + assert g_orig.etypes == g_rev.etypes + assert g_orig.number_of_nodes(DEFAULT_NTYPE) == g_rev.number_of_nodes(DEFAULT_NTYPE) + assert 2 * g_orig.number_of_edges(DEFAULT_ETYPE) == g_rev.number_of_edges(DEFAULT_ETYPE) + for ntype in g_orig.ntypes: + assert g_orig.number_of_nodes(ntype) == g_rev.number_of_nodes(ntype) + for name in g_orig.nodes[ntype].data: + # We should skip '*_mask' because data split is split randomly. + if 'mask' not in name: + assert_almost_equal(g_orig.nodes[ntype].data[name].numpy(), + g_rev.nodes[ntype].data[name].numpy()) + + # Check edge feature + g_orig_feat = dgl.data.load_tensors(os.path.join(args.orig_graph_path, "edge_feat.dgl")) + g_rev_feat = dgl.data.load_tensors(os.path.join(args.rev_graph_path, "edge_feat.dgl")) + for feat_type in g_orig_feat.keys(): + if "mask" not in feat_type: + assert_almost_equal(g_orig_feat[feat_type].numpy(), + g_rev_feat[feat_type].numpy()[:g_orig.number_of_edges(DEFAULT_ETYPE)]) + else: + assert_almost_equal(g_rev_feat[feat_type].numpy()[g_orig.number_of_edges(DEFAULT_ETYPE):], + [0] * g_orig.number_of_edges(DEFAULT_ETYPE)) + +if __name__ == '__main__': + argparser = argparse.ArgumentParser("Check edge prediction remapping") + argparser.add_argument("--orig-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous/part0/", + help="Path to save the generated data") + argparser.add_argument("--rev-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/part0/", + help="Path to save the generated data") + + args = argparser.parse_args() + + check_reverse_edge(args) \ No newline at end of file diff --git a/tests/end2end-tests/data_process/homogeneous_test.sh b/tests/end2end-tests/data_process/homogeneous_test.sh new file mode 100644 index 0000000000..dd3f55907d --- /dev/null +++ b/tests/end2end-tests/data_process/homogeneous_test.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +service ssh restart + +GS_HOME=$(pwd) +NUM_TRAINERS=4 +export PYTHONPATH=$GS_HOME/python/ +cd $GS_HOME/training_scripts/gsgnn_np +echo "127.0.0.1" > ip_list.txt +cd $GS_HOME/training_scripts/gsgnn_ep +echo "127.0.0.1" > ip_list.txt + +error_and_exit () { + # check exec status of launch.py + status=$1 + echo $status + + if test $status -ne 0 + then + exit -1 + fi +} + + +echo "********* Test Homogeneous Graph Optimization ********" +python3 -m graphstorm.gconstruct.construct_graph --conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_homogeneous.json --num-processes 1 --output-dir /tmp/movielen_100k_train_val_1p_4t_homogeneous --graph-name movie-lens-100k +error_and_exit $? + +echo "********* Test Node Classification on GConstruct Homogeneous Graph ********" +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --target-ntype _N +error_and_exit $? + +echo "********* Test Edge Classification on GConstruct Homogeneous Graph ********" +python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --target-etype _N,_E,_N +error_and_exit $? + +echo "********* Test Homogeneous Graph Optimization on reverse edge********" +python3 -m graphstorm.gconstruct.construct_graph --conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_homogeneous.json --num-processes 1 --output-dir /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev --graph-name movie-lens-100k --add-reverse-edges +error_and_exit $? + +python3 $GS_HOME/tests/end2end-tests/data_process/check_homogeneous.py +error_and_exit $? + +echo "********* Test Node Classification on GConstruct Homogeneous Graph with reverse edge********" +python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --target-ntype _N +error_and_exit $? + +echo "********* Test Edge Classification on GConstruct Homogeneous Graph with reverse edge ********" +python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --target-etype _N,_E,_N +error_and_exit $? \ No newline at end of file diff --git a/tests/unit-tests/gconstruct/test_construct_graph.py b/tests/unit-tests/gconstruct/test_construct_graph.py index a03a7cbec7..d7c9ae6650 100644 --- a/tests/unit-tests/gconstruct/test_construct_graph.py +++ b/tests/unit-tests/gconstruct/test_construct_graph.py @@ -22,11 +22,12 @@ import numpy as np import dgl import torch as th +import copy from functools import partial from numpy.testing import assert_equal, assert_almost_equal -from graphstorm.gconstruct.construct_graph import parse_edge_data +from graphstorm.gconstruct.construct_graph import parse_edge_data, verify_confs, is_homogeneous from graphstorm.gconstruct.file_io import write_data_parquet, read_data_parquet from graphstorm.gconstruct.file_io import write_data_json, read_data_json from graphstorm.gconstruct.file_io import write_data_csv, read_data_csv @@ -1705,6 +1706,59 @@ def test_gc(): assert not os.path.isdir("/tmp_featurewrapper2"), \ "Directory /tmp_featurewrapper2 should not exist after gc" + +def test_homogeneous(): + # single node type and edge type input + conf = { + "version": "gconstruct-v0.1", "nodes": [ + {"node_id_col": "id", "node_type": "movie", "format": {"name": "parquet"}, + "files": "/data/ml-100k/movie.parquet", "features": [ + {"feature_col": "title", "transform": { + "name": "bert_hf", "bert_model": "bert-base-uncased", "max_seq_length": 16}}], + "labels": [{"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]}]}], + "edges": [ + {"source_id_col": "src_id", "dest_id_col": "dst_id", "relation": ["movie", "rating", "movie"], + "format": {"name": "parquet"}, "files": "/data/ml-100k/edges_homo.parquet", "labels": [ + {"label_col": "rate", "task_type": "classification", "split_pct": [0.1, 0.1, 0.1]}]}] + } + assert is_homogeneous(conf) + verify_confs(conf) + assert conf['nodes'][0]["node_type"] == "_N" + assert conf['edges'][0]['relation'] == ["_N", "_E", "_N"] + conf["edges"][0]["relation"] = ["movie_fake", "rating", "movie"] + conf["nodes"].append(copy.deepcopy(conf["nodes"][0])) + conf["nodes"][0]["node_type"] = "movie" + conf["nodes"][1]["node_type"] = "movie_fake" + assert not is_homogeneous(conf) + + + # multiple node types and edge types input + conf = { + "version": "gconstruct-v0.1", "nodes": [ + {"node_id_col": "id", "node_type": "movie", "format": {"name": "parquet"}, + "files": "/data/ml-100k/movie.parquet", "features": [ + {"feature_col": "title", "transform": { + "name": "bert_hf", "bert_model": "bert-base-uncased", "max_seq_length": 16}}], + "labels": [{"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]}]}, + {"node_type": "movie", "format": {"name": "parquet"}, "files": "/data/ml-100k/movie.parquet", + "features": [{"feature_col": "id"}]}], + "edges": [ + {"source_id_col": "src_id", "dest_id_col": "dst_id", "relation": ["movie", "rating", "movie"], + "format": {"name": "parquet"}, "files": "/data/ml-100k/edges_homo.parquet", "labels": [ + {"label_col": "rate", "task_type": "classification", "split_pct": [0.1, 0.1, 0.1]}]}, + {"relation": ["movie", "rating", "movie"], "format": {"name": "parquet"}, + "files": "/data/ml-100k/edges_homo.parquet"}] + } + assert is_homogeneous(conf) + verify_confs(conf) + assert conf['nodes'][0]["node_type"] == "_N" + assert conf['edges'][0]['relation'] == ["_N", "_E", "_N"] + conf["edges"][0]["relation"] = ["movie_fake", "rating", "movie"] + conf["nodes"].append(copy.deepcopy(conf["nodes"][0])) + conf["nodes"][0]["node_type"] = "movie" + conf["nodes"][1]["node_type"] = "movie_fake" + assert not is_homogeneous(conf) + if __name__ == '__main__': test_parse_edge_data() test_multiprocessing_checks() @@ -1723,4 +1777,5 @@ def test_gc(): test_label() test_multicolumn(None) test_multicolumn("/") - test_feature_wrapper() \ No newline at end of file + test_feature_wrapper() + test_homogeneous()