Skip to content

Commit

Permalink
Merge branch 'homo-optmization' into homo_gsf
Browse files Browse the repository at this point in the history
  • Loading branch information
jalencato authored Dec 15, 2023
2 parents 009ff4f + bd9af81 commit 0e8d914
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflow_scripts/e2e_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sh ./tests/end2end-tests/create_data.sh
sh ./tests/end2end-tests/tools/test_mem_est.sh
sh ./tests/end2end-tests/data_process/test.sh
sh ./tests/end2end-tests/data_process/movielens_test.sh
sh ./tests/end2end-tests/data_process/homogeneous_test.sh
sh ./tests/end2end-tests/custom-gnn/run_test.sh
bash ./tests/end2end-tests/graphstorm-nc/test.sh
bash ./tests/end2end-tests/graphstorm-lp/test.sh
Expand Down
57 changes: 52 additions & 5 deletions python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import numpy as np
import torch as th
import dgl
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE

from ..utils import sys_tracker, get_log_level
from .file_io import parse_node_file_format, parse_edge_file_format
Expand Down Expand Up @@ -582,8 +583,23 @@ def process_edge_data(process_confs, node_id_map, arr_merger,

return (edges, edge_data, label_stats)

def is_homogeneous(confs):
""" Verify if it is a homogeneous graph
Parameter
---------
confs: dict
A dict containing all user input config
"""
ntypes = {conf['node_type'] for conf in confs["nodes"]}
etypes = set(tuple(conf['relation']) for conf in confs["edges"])
return len(ntypes) == 1 and len(etypes) == 1

def verify_confs(confs):
""" Verify the configuration of the input data.
Parameter
---------
confs: dict
A dict containing all user input config
"""
if "version" not in confs:
# TODO: Make a requirement with v1.0 launch
Expand All @@ -599,6 +615,14 @@ def verify_confs(confs):
f"source node type {src_type} does not exist. Please check your input data."
assert dst_type in ntypes, \
f"dest node type {dst_type} does not exist. Please check your input data."
# Adjust input to DGL homogeneous graph format if it is a homogeneous graph
if is_homogeneous(confs):
logging.warning("Generated Graph is a homogeneous graph, so the node type will be "
"changed to _N and edge type will be changed to [_N, _E, _N]")
for node in confs['nodes']:
node['node_type'] = DEFAULT_NTYPE
for edge in confs['edges']:
edge['relation'] = list(DEFAULT_ETYPE)

def print_graph_info(g, node_data, edge_data, node_label_stats, edge_label_stats):
""" Print graph information.
Expand Down Expand Up @@ -698,12 +722,35 @@ def process_graph(args):

if args.add_reverse_edges:
edges1 = {}
for etype in edges:
e = edges[etype]
if is_homogeneous(process_confs):
logging.warning("For homogeneous graph, the generated reverse edge will "
"be the same edge type as the original graph. Instead for "
"heterogeneous graph, the generated reverse edge type will "
"add -rev as a suffix")
e = edges[DEFAULT_ETYPE]
assert isinstance(e, tuple) and len(e) == 2
assert isinstance(etype, tuple) and len(etype) == 3
edges1[etype] = e
edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0])
edges1[DEFAULT_ETYPE] = (np.concatenate([e[0], e[1]]),
np.concatenate([e[1], e[0]]))
# Double edge feature as it is necessary to match tensor size in generated graph
# Only generate mask on original graph
if edge_data:
data = edge_data[DEFAULT_ETYPE]
logging.warning("Reverse edge for homogeneous graph will have same feature as "
"what we have in the original edges")
for key, value in data.items():
if key not in ["train_mask", "test_mask", "val_mask"]:
data[key] = np.concatenate([value, value])
else:
data[key] = np.concatenate([value, np.zeros(value.shape,
dtype=value.dtype)])

else:
for etype in edges:
e = edges[etype]
assert isinstance(e, tuple) and len(e) == 2
assert isinstance(etype, tuple) and len(etype) == 3
edges1[etype] = e
edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0])
edges = edges1
sys_tracker.check('Add reverse edges')
g = dgl.heterograph(edges, num_nodes_dict=num_nodes)
Expand Down
63 changes: 63 additions & 0 deletions tests/end2end-tests/data_gen/movielens_homogeneous.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"version": "gconstruct-v0.1",
"nodes": [
{
"node_id_col": "id",
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "title",
"transform": {
"name": "bert_hf",
"bert_model": "bert-base-uncased",
"max_seq_length": 16
}
}
],
"labels": [
{
"label_col": "label",
"task_type": "classification",
"split_pct": [0.8, 0.1, 0.1]
}
]
},
{
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "id"
}
]
}
],
"edges": [
{
"source_id_col": "src_id",
"dest_id_col": "dst_id",
"relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges_homogeneous.parquet",
"features": [
{
"feature_col": "rate"
}],
"labels": [
{
"label_col": "rate",
"task_type": "classification",
"split_pct": [0.1, 0.1, 0.1]
}
]
},
{
"relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges_homogeneous.parquet"
}
]
}
5 changes: 5 additions & 0 deletions tests/end2end-tests/data_gen/process_movielens.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def write_data_parquet(data, data_file):
edge_data = {'src_id': edges[0], 'dst_id': edges[1], 'rate': edges[2]}
write_data_parquet(edge_data, '/data/ml-100k/edges.parquet')

# generate data for homogeneous optimization test
edges = pandas.read_csv('/data/ml-100k/u.data', delimiter='\t', header=None)
edge_data = {'src_id': edges[1], 'dst_id': edges[1], 'rate': edges[2]}
write_data_parquet(edge_data, '/data/ml-100k/edges_homogeneous.parquet')

# generate synthetic user data with label
user_labels = np.random.randint(11, size=feat.shape[0])
user_data = {'id': user['id'].values, 'feat': feat, 'occupation': user['occupation'], 'label': user_labels}
Expand Down
60 changes: 60 additions & 0 deletions tests/end2end-tests/data_process/check_homogeneous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Copyright 2023 Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import argparse
import dgl
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE
from numpy.testing import assert_almost_equal


def check_reverse_edge(args):

g_orig = dgl.load_graphs(os.path.join(args.orig_graph_path, "graph.dgl"))[0][0]
g_rev = dgl.load_graphs(os.path.join(args.rev_graph_path, "graph.dgl"))[0][0]
assert g_orig.ntypes == g_rev.ntypes
assert g_orig.etypes == g_rev.etypes
assert g_orig.number_of_nodes(DEFAULT_NTYPE) == g_rev.number_of_nodes(DEFAULT_NTYPE)
assert 2 * g_orig.number_of_edges(DEFAULT_ETYPE) == g_rev.number_of_edges(DEFAULT_ETYPE)
for ntype in g_orig.ntypes:
assert g_orig.number_of_nodes(ntype) == g_rev.number_of_nodes(ntype)
for name in g_orig.nodes[ntype].data:
# We should skip '*_mask' because data split is split randomly.
if 'mask' not in name:
assert_almost_equal(g_orig.nodes[ntype].data[name].numpy(),
g_rev.nodes[ntype].data[name].numpy())

# Check edge feature
g_orig_feat = dgl.data.load_tensors(os.path.join(args.orig_graph_path, "edge_feat.dgl"))
g_rev_feat = dgl.data.load_tensors(os.path.join(args.rev_graph_path, "edge_feat.dgl"))
for feat_type in g_orig_feat.keys():
if "mask" not in feat_type:
assert_almost_equal(g_orig_feat[feat_type].numpy(),
g_rev_feat[feat_type].numpy()[:g_orig.number_of_edges(DEFAULT_ETYPE)])
else:
assert_almost_equal(g_rev_feat[feat_type].numpy()[g_orig.number_of_edges(DEFAULT_ETYPE):],
[0] * g_orig.number_of_edges(DEFAULT_ETYPE))

if __name__ == '__main__':
argparser = argparse.ArgumentParser("Check edge prediction remapping")
argparser.add_argument("--orig-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous/part0/",
help="Path to save the generated data")
argparser.add_argument("--rev-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/part0/",
help="Path to save the generated data")

args = argparser.parse_args()

check_reverse_edge(args)
50 changes: 50 additions & 0 deletions tests/end2end-tests/data_process/homogeneous_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash

service ssh restart

GS_HOME=$(pwd)
NUM_TRAINERS=4
export PYTHONPATH=$GS_HOME/python/
cd $GS_HOME/training_scripts/gsgnn_np
echo "127.0.0.1" > ip_list.txt
cd $GS_HOME/training_scripts/gsgnn_ep
echo "127.0.0.1" > ip_list.txt

error_and_exit () {
# check exec status of launch.py
status=$1
echo $status

if test $status -ne 0
then
exit -1
fi
}


echo "********* Test Homogeneous Graph Optimization ********"
python3 -m graphstorm.gconstruct.construct_graph --conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_homogeneous.json --num-processes 1 --output-dir /tmp/movielen_100k_train_val_1p_4t_homogeneous --graph-name movie-lens-100k
error_and_exit $?

echo "********* Test Node Classification on GConstruct Homogeneous Graph ********"
python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --target-ntype _N
error_and_exit $?

echo "********* Test Edge Classification on GConstruct Homogeneous Graph ********"
python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --target-etype _N,_E,_N
error_and_exit $?

echo "********* Test Homogeneous Graph Optimization on reverse edge********"
python3 -m graphstorm.gconstruct.construct_graph --conf-file $GS_HOME/tests/end2end-tests/data_gen/movielens_homogeneous.json --num-processes 1 --output-dir /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev --graph-name movie-lens-100k --add-reverse-edges
error_and_exit $?

python3 $GS_HOME/tests/end2end-tests/data_process/check_homogeneous.py
error_and_exit $?

echo "********* Test Node Classification on GConstruct Homogeneous Graph with reverse edge********"
python3 -m graphstorm.run.gs_node_classification --workspace $GS_HOME/training_scripts/gsgnn_np/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_nc.yaml --target-ntype _N
error_and_exit $?

echo "********* Test Edge Classification on GConstruct Homogeneous Graph with reverse edge ********"
python3 -m graphstorm.run.gs_edge_classification --workspace $GS_HOME/training_scripts/gsgnn_ep/ --num-trainers $NUM_TRAINERS --num-servers 1 --num-samplers 0 --part-config /tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/movie-lens-100k.json --ip-config ip_list.txt --ssh-port 2222 --cf ml_ec.yaml --target-etype _N,_E,_N
error_and_exit $?
59 changes: 57 additions & 2 deletions tests/unit-tests/gconstruct/test_construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import numpy as np
import dgl
import torch as th
import copy

from functools import partial
from numpy.testing import assert_equal, assert_almost_equal

from graphstorm.gconstruct.construct_graph import parse_edge_data
from graphstorm.gconstruct.construct_graph import parse_edge_data, verify_confs, is_homogeneous
from graphstorm.gconstruct.file_io import write_data_parquet, read_data_parquet
from graphstorm.gconstruct.file_io import write_data_json, read_data_json
from graphstorm.gconstruct.file_io import write_data_csv, read_data_csv
Expand Down Expand Up @@ -1705,6 +1706,59 @@ def test_gc():
assert not os.path.isdir("/tmp_featurewrapper2"), \
"Directory /tmp_featurewrapper2 should not exist after gc"


def test_homogeneous():
# single node type and edge type input
conf = {
"version": "gconstruct-v0.1", "nodes": [
{"node_id_col": "id", "node_type": "movie", "format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet", "features": [
{"feature_col": "title", "transform": {
"name": "bert_hf", "bert_model": "bert-base-uncased", "max_seq_length": 16}}],
"labels": [{"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]}]}],
"edges": [
{"source_id_col": "src_id", "dest_id_col": "dst_id", "relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"}, "files": "/data/ml-100k/edges_homo.parquet", "labels": [
{"label_col": "rate", "task_type": "classification", "split_pct": [0.1, 0.1, 0.1]}]}]
}
assert is_homogeneous(conf)
verify_confs(conf)
assert conf['nodes'][0]["node_type"] == "_N"
assert conf['edges'][0]['relation'] == ["_N", "_E", "_N"]
conf["edges"][0]["relation"] = ["movie_fake", "rating", "movie"]
conf["nodes"].append(copy.deepcopy(conf["nodes"][0]))
conf["nodes"][0]["node_type"] = "movie"
conf["nodes"][1]["node_type"] = "movie_fake"
assert not is_homogeneous(conf)


# multiple node types and edge types input
conf = {
"version": "gconstruct-v0.1", "nodes": [
{"node_id_col": "id", "node_type": "movie", "format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet", "features": [
{"feature_col": "title", "transform": {
"name": "bert_hf", "bert_model": "bert-base-uncased", "max_seq_length": 16}}],
"labels": [{"label_col": "label", "task_type": "classification", "split_pct": [0.8, 0.1, 0.1]}]},
{"node_type": "movie", "format": {"name": "parquet"}, "files": "/data/ml-100k/movie.parquet",
"features": [{"feature_col": "id"}]}],
"edges": [
{"source_id_col": "src_id", "dest_id_col": "dst_id", "relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"}, "files": "/data/ml-100k/edges_homo.parquet", "labels": [
{"label_col": "rate", "task_type": "classification", "split_pct": [0.1, 0.1, 0.1]}]},
{"relation": ["movie", "rating", "movie"], "format": {"name": "parquet"},
"files": "/data/ml-100k/edges_homo.parquet"}]
}
assert is_homogeneous(conf)
verify_confs(conf)
assert conf['nodes'][0]["node_type"] == "_N"
assert conf['edges'][0]['relation'] == ["_N", "_E", "_N"]
conf["edges"][0]["relation"] = ["movie_fake", "rating", "movie"]
conf["nodes"].append(copy.deepcopy(conf["nodes"][0]))
conf["nodes"][0]["node_type"] = "movie"
conf["nodes"][1]["node_type"] = "movie_fake"
assert not is_homogeneous(conf)

if __name__ == '__main__':
test_parse_edge_data()
test_multiprocessing_checks()
Expand All @@ -1723,4 +1777,5 @@ def test_gc():
test_label()
test_multicolumn(None)
test_multicolumn("/")
test_feature_wrapper()
test_feature_wrapper()
test_homogeneous()

0 comments on commit 0e8d914

Please sign in to comment.