Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Homogeneous Graph Optimization] Homogeneous Graph Optimization #690

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflow_scripts/e2e_check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sh ./tests/end2end-tests/create_data.sh
sh ./tests/end2end-tests/tools/test_mem_est.sh
sh ./tests/end2end-tests/data_process/test.sh
sh ./tests/end2end-tests/data_process/movielens_test.sh
sh ./tests/end2end-tests/data_process/homogeneous_test.sh
sh ./tests/end2end-tests/custom-gnn/run_test.sh
bash ./tests/end2end-tests/graphstorm-nc/test.sh
bash ./tests/end2end-tests/graphstorm-lp/test.sh
Expand Down
8 changes: 4 additions & 4 deletions docs/source/configuration/configuration-run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -381,20 +381,20 @@ Classification and Regression Task

Node Classification/Regression Specific
.........................................
- **target_ntype**: (**Required**) The node type for prediction.
- **target_ntype**: The node type for prediction.

- Yaml: ``target_ntype: movie``
- Argument: ``--target-ntype movie``
- Default value: This parameter must be provided by user.
- Default value: For heterogeneous input graph, this parameter must be provided by the user. If not provided, GraphStorm will assume the input graph is a homogeneous graph and set ``target_ntype`` to "_N".

Edge Classification/Regression Specific
..........................................
- **target_etype**: (**Required**) The list of canonical edge types that will be added as a training target in edge classification/regression tasks, for example ``--train-etype query,clicks,asin`` or ``--train-etype query,clicks,asin query,search,asin``. A canonical edge type should be formatted as `src_node_type,relation_type,dst_node_type`. Currently, GraphStorm only supports single task edge classification/regression, i.e., it only accepts one canonical edge type.
- **target_etype**: The list of canonical edge types that will be added as training targets in edge classification/regression tasks, for example ``--train-etype query,clicks,asin`` or ``--train-etype query,clicks,asin query,search,asin``. A canonical edge type should be formatted as `src_node_type,relation_type,dst_node_type`. Currently, GraphStorm only supports single task edge classification/regression, i.e., it only accepts one canonical edge type.

- Yaml: ``target_etype:``
| ``- query,clicks,asin``
- Argument: ``--target-etype query,clicks,asin``
- Default value: This parameter must be provided by user.
- Default value: For heterogeneous input graph, this parameter must be provided by the user. If not provided, GraphStorm will assume the input graph is a homogeneous graph and set ``target_etype`` to ("_N", "_E", "_N").
- **remove_target_edge_type**: When set to true, GraphStorm removes target_etype in message passing, i.e., any edge with target_etype will not be sampled during training and inference.

- Yaml: ``remove_target_edge_type: false``
Expand Down
16 changes: 11 additions & 5 deletions python/graphstorm/config/argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import yaml
import torch as th
import torch.nn.functional as F
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE

from .config import BUILTIN_GNN_ENCODER
from .config import BUILTIN_ENCODER
Expand Down Expand Up @@ -1573,9 +1574,12 @@ def target_ntype(self):
""" The node type for prediction
"""
# pylint: disable=no-member
assert hasattr(self, "_target_ntype"), \
"Must provide the target ntype through target_ntype"
return self._target_ntype
if hasattr(self, "_target_ntype"):
return self._target_ntype
else:
logging.warning("There is not target ntype provided, "
"will treat the input graph as a homogeneous graph")
return DEFAULT_NTYPE

@property
def eval_target_ntype(self):
Expand Down Expand Up @@ -1648,8 +1652,10 @@ def target_etype(self):
classification/regression. Support multiple tasks when needed.
"""
# pylint: disable=no-member
assert hasattr(self, "_target_etype"), \
"Edge classification task needs a target etype"
if not hasattr(self, "_target_etype"):
logging.warning("There is not target etype provided, "
"will treat the input graph as a homogeneous graph")
return [DEFAULT_ETYPE]
assert isinstance(self._target_etype, list), \
"target_etype must be a list in format: " \
"[\"query,clicks,asin\", \"query,search,asin\"]."
Expand Down
36 changes: 35 additions & 1 deletion python/graphstorm/dataloading/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import torch as th
import dgl
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE
from torch.utils.data import Dataset
import pandas as pd

Expand Down Expand Up @@ -554,6 +555,15 @@ def __init__(self, graph_name, part_config, train_etypes, eval_etypes=None,
lm_feat_ntypes=lm_feat_ntypes,
lm_feat_etypes=lm_feat_etypes)

if self._train_etypes == [DEFAULT_ETYPE]:
# DGL Graph edge type is not canonical. It is just list[str].
assert self._g.ntypes == [DEFAULT_NTYPE] and \
self._g.etypes == [DEFAULT_ETYPE[1]], \
f"It is required to be a homogeneous graph when target_etype is not provided " \
f"or is set to {DEFAULT_ETYPE} on edge tasks, expect node type " \
f"to be {[DEFAULT_NTYPE]} and edge type to be {[DEFAULT_ETYPE[1]]}, " \
f"but get {self._g.ntypes} and {self._g.etypes}"

def prepare_data(self, g):
"""
Prepare the training, validation and testing edge set.
Expand Down Expand Up @@ -731,6 +741,14 @@ def __init__(self, graph_name, part_config, eval_etypes,
decoder_edge_feat,
lm_feat_ntypes=lm_feat_ntypes,
lm_feat_etypes=lm_feat_etypes)
if self._eval_etypes == [DEFAULT_ETYPE]:
# DGL Graph edge type is not canonical. It is just list[str].
assert self._g.ntypes == [DEFAULT_NTYPE] and \
self._g.etypes == [DEFAULT_ETYPE[1]], \
f"It is required to be a homogeneous graph when target_etype is not provided " \
f"or is set to {DEFAULT_ETYPE} on edge tasks, expect node type " \
f"to be {[DEFAULT_NTYPE]} and edge type to be {[DEFAULT_ETYPE[1]]}, " \
f"but get {self._g.ntypes} and {self._g.etypes}"

def prepare_data(self, g):
""" Prepare the testing edge set if any
Expand Down Expand Up @@ -916,7 +934,6 @@ def __init__(self, graph_name, part_config, train_ntypes, eval_ntypes=None,
assert isinstance(train_ntypes, list), \
"prediction ntypes for training has to be a string or a list of strings."
self._train_ntypes = train_ntypes

if eval_ntypes is not None:
if isinstance(eval_ntypes, str):
eval_ntypes = [eval_ntypes]
Expand All @@ -932,6 +949,14 @@ def __init__(self, graph_name, part_config, train_ntypes, eval_ntypes=None,
edge_feat_field=edge_feat_field,
lm_feat_ntypes=lm_feat_ntypes,
lm_feat_etypes=lm_feat_etypes)
if self._train_ntypes == [DEFAULT_NTYPE]:
# DGL Graph edge type is not canonical. It is just list[str].
assert self._g.ntypes == [DEFAULT_NTYPE] and \
self._g.etypes == [DEFAULT_ETYPE[1]], \
f"It is required to be a homogeneous graph when target_ntype is not provided " \
f"or is set to {DEFAULT_NTYPE} on node tasks, expect node type " \
f"to be {[DEFAULT_NTYPE]} and edge type to be {[DEFAULT_ETYPE[1]]}, " \
f"but get {self._g.ntypes} and {self._g.etypes}"

def prepare_data(self, g):
pb = g.get_partition_book()
Expand Down Expand Up @@ -1072,6 +1097,15 @@ def __init__(self, graph_name, part_config, eval_ntypes,
lm_feat_ntypes=lm_feat_ntypes,
lm_feat_etypes=lm_feat_etypes)

if self._eval_ntypes == [DEFAULT_NTYPE]:
# DGL Graph edge type is not canonical. It is just list[str].
assert self._g.ntypes == [DEFAULT_NTYPE] and \
self._g.etypes == [DEFAULT_ETYPE[1]], \
f"It is required to be a homogeneous graph when target_ntype is not provided " \
f"or is set to {DEFAULT_NTYPE} on node tasks, expect node type " \
f"to be {[DEFAULT_NTYPE]} and edge type to be {[DEFAULT_ETYPE[1]]}, " \
f"but get {self._g.ntypes} and {self._g.etypes}"

def prepare_data(self, g):
"""
Prepare the testing node set if any
Expand Down
57 changes: 52 additions & 5 deletions python/graphstorm/gconstruct/construct_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import numpy as np
import torch as th
import dgl
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE

from ..utils import sys_tracker, get_log_level
from .file_io import parse_node_file_format, parse_edge_file_format
Expand Down Expand Up @@ -582,8 +583,23 @@ def process_edge_data(process_confs, node_id_map, arr_merger,

return (edges, edge_data, label_stats)

def is_homogeneous(confs):
""" Verify if it is a homogeneous graph
Parameter
---------
confs: dict
A dict containing all user input config
"""
ntypes = {conf['node_type'] for conf in confs["nodes"]}
etypes = set(tuple(conf['relation']) for conf in confs["edges"])
return len(ntypes) == 1 and len(etypes) == 1

def verify_confs(confs):
""" Verify the configuration of the input data.
Parameter
---------
confs: dict
A dict containing all user input config
"""
if "version" not in confs:
# TODO: Make a requirement with v1.0 launch
Expand All @@ -599,6 +615,14 @@ def verify_confs(confs):
f"source node type {src_type} does not exist. Please check your input data."
assert dst_type in ntypes, \
f"dest node type {dst_type} does not exist. Please check your input data."
# Adjust input to DGL homogeneous graph format if it is a homogeneous graph
if is_homogeneous(confs):
logging.warning("Generated Graph is a homogeneous graph, so the node type will be "
"changed to _N and edge type will be changed to [_N, _E, _N]")
for node in confs['nodes']:
node['node_type'] = DEFAULT_NTYPE
for edge in confs['edges']:
edge['relation'] = list(DEFAULT_ETYPE)

def print_graph_info(g, node_data, edge_data, node_label_stats, edge_label_stats):
""" Print graph information.
Expand Down Expand Up @@ -698,12 +722,35 @@ def process_graph(args):

if args.add_reverse_edges:
edges1 = {}
for etype in edges:
e = edges[etype]
if is_homogeneous(process_confs):
logging.warning("For homogeneous graph, the generated reverse edge will "
"be the same edge type as the original graph. Instead for "
"heterogeneous graph, the generated reverse edge type will "
"add -rev as a suffix")
e = edges[DEFAULT_ETYPE]
assert isinstance(e, tuple) and len(e) == 2
assert isinstance(etype, tuple) and len(etype) == 3
edges1[etype] = e
edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0])
edges1[DEFAULT_ETYPE] = (np.concatenate([e[0], e[1]]),
np.concatenate([e[1], e[0]]))
# Double edge feature as it is necessary to match tensor size in generated graph
# Only generate mask on original graph
if edge_data:
data = edge_data[DEFAULT_ETYPE]
logging.warning("Reverse edge for homogeneous graph will have same feature as "
"what we have in the original edges")
for key, value in data.items():
if key not in ["train_mask", "test_mask", "val_mask"]:
data[key] = np.concatenate([value, value])
else:
data[key] = np.concatenate([value, np.zeros(value.shape,
dtype=value.dtype)])

else:
for etype in edges:
e = edges[etype]
assert isinstance(e, tuple) and len(e) == 2
assert isinstance(etype, tuple) and len(etype) == 3
edges1[etype] = e
edges1[etype[2], etype[1] + "-rev", etype[0]] = (e[1], e[0])
edges = edges1
sys_tracker.check('Add reverse edges')
g = dgl.heterograph(edges, num_nodes_dict=num_nodes)
Expand Down
63 changes: 63 additions & 0 deletions tests/end2end-tests/data_gen/movielens_homogeneous.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"version": "gconstruct-v0.1",
"nodes": [
{
"node_id_col": "id",
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "title",
"transform": {
"name": "bert_hf",
"bert_model": "bert-base-uncased",
"max_seq_length": 16
}
}
],
"labels": [
{
"label_col": "label",
"task_type": "classification",
"split_pct": [0.8, 0.1, 0.1]
}
]
},
{
"node_type": "movie",
"format": {"name": "parquet"},
"files": "/data/ml-100k/movie.parquet",
"features": [
{
"feature_col": "id"
}
]
}
],
"edges": [
{
"source_id_col": "src_id",
"dest_id_col": "dst_id",
"relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges_homogeneous.parquet",
"features": [
{
"feature_col": "rate"
}],
"labels": [
{
"label_col": "rate",
"task_type": "classification",
"split_pct": [0.1, 0.1, 0.1]
}
]
},
{
"relation": ["movie", "rating", "movie"],
"format": {"name": "parquet"},
"files": "/data/ml-100k/edges_homogeneous.parquet"
}
]
}
5 changes: 5 additions & 0 deletions tests/end2end-tests/data_gen/process_movielens.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ def write_data_parquet(data, data_file):
edge_data = {'src_id': edges[0], 'dst_id': edges[1], 'rate': edges[2]}
write_data_parquet(edge_data, '/data/ml-100k/edges.parquet')

# generate data for homogeneous optimization test
edges = pandas.read_csv('/data/ml-100k/u.data', delimiter='\t', header=None)
edge_data = {'src_id': edges[1], 'dst_id': edges[1], 'rate': edges[2]}
write_data_parquet(edge_data, '/data/ml-100k/edges_homogeneous.parquet')

# generate synthetic user data with label
user_labels = np.random.randint(11, size=feat.shape[0])
user_data = {'id': user['id'].values, 'feat': feat, 'occupation': user['occupation'], 'label': user_labels}
Expand Down
60 changes: 60 additions & 0 deletions tests/end2end-tests/data_process/check_homogeneous.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Copyright 2023 Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

"""
import os
import argparse
import dgl
from dgl.distributed.constants import DEFAULT_NTYPE, DEFAULT_ETYPE
from numpy.testing import assert_almost_equal


def check_reverse_edge(args):

g_orig = dgl.load_graphs(os.path.join(args.orig_graph_path, "graph.dgl"))[0][0]
g_rev = dgl.load_graphs(os.path.join(args.rev_graph_path, "graph.dgl"))[0][0]
assert g_orig.ntypes == g_rev.ntypes
assert g_orig.etypes == g_rev.etypes
assert g_orig.number_of_nodes(DEFAULT_NTYPE) == g_rev.number_of_nodes(DEFAULT_NTYPE)
assert 2 * g_orig.number_of_edges(DEFAULT_ETYPE) == g_rev.number_of_edges(DEFAULT_ETYPE)
for ntype in g_orig.ntypes:
assert g_orig.number_of_nodes(ntype) == g_rev.number_of_nodes(ntype)
for name in g_orig.nodes[ntype].data:
# We should skip '*_mask' because data split is split randomly.
if 'mask' not in name:
assert_almost_equal(g_orig.nodes[ntype].data[name].numpy(),
g_rev.nodes[ntype].data[name].numpy())

# Check edge feature
g_orig_feat = dgl.data.load_tensors(os.path.join(args.orig_graph_path, "edge_feat.dgl"))
g_rev_feat = dgl.data.load_tensors(os.path.join(args.rev_graph_path, "edge_feat.dgl"))
for feat_type in g_orig_feat.keys():
if "mask" not in feat_type:
assert_almost_equal(g_orig_feat[feat_type].numpy(),
g_rev_feat[feat_type].numpy()[:g_orig.number_of_edges(DEFAULT_ETYPE)])
else:
assert_almost_equal(g_rev_feat[feat_type].numpy()[g_orig.number_of_edges(DEFAULT_ETYPE):],
[0] * g_orig.number_of_edges(DEFAULT_ETYPE))

if __name__ == '__main__':
argparser = argparse.ArgumentParser("Check edge prediction remapping")
argparser.add_argument("--orig-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous/part0/",
help="Path to save the generated data")
argparser.add_argument("--rev-graph-path", type=str, default="/tmp/movielen_100k_train_val_1p_4t_homogeneous_rev/part0/",
help="Path to save the generated data")

args = argparser.parse_args()

check_reverse_edge(args)
Loading
Loading