Skip to content

Commit

Permalink
fix 2nd round comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Apr 26, 2024
1 parent ac3a9d1 commit 39d7d80
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 33 deletions.
24 changes: 24 additions & 0 deletions python/graphstorm/eval/eval_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ def __init__(self):
self.metric_comparator = {}
self.metric_comparator["mrr"] = operator.le

# This is the operator used to measure each metric performance
self.metric_function = {}
self.metric_function["mrr"] = compute_mrr

# This is the operator used to measure each metric performance in evaluation
self.metric_eval_function = {}
self.metric_eval_function["mrr"] = compute_mrr

def assert_supported_metric(self, metric):
""" check if the given metric is supported.
"""
Expand Down Expand Up @@ -589,3 +597,19 @@ def compute_mae(pred, labels):

diff = th.abs(pred.cpu() - labels.cpu())
return th.mean(diff).cpu().item()

def compute_mrr(ranking):
""" Get link prediction mrr metrics
Parameters
----------
ranking:
ranking of each positive edge
Returns
-------
link prediction mrr metrics: tensor
"""
logs = th.div(1.0, ranking)
metrics = th.tensor(th.div(th.sum(logs),len(logs)))
return metrics
76 changes: 49 additions & 27 deletions python/graphstorm/eval/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def evaluate(self, val_rankings, test_rankings, total_iters):
"""

@abc.abstractmethod
def compute_score(self, rankings):
def compute_score(self, rankings, train=True):
""" Compute evaluation score for Prediciton tasks
Ranking-based link prediction evaluators should provide ranking values as input.
Expand All @@ -214,6 +214,8 @@ def compute_score(self, rankings):
----------
rankings: dict of tensors
Rankings of positive scores in format of {etype: ranking}
train: boolean
If in model training.
Returns
-------
Expand Down Expand Up @@ -460,15 +462,12 @@ class GSgnnClassificationEvaluator(GSgnnBaseEvaluator, GSgnnPredictionEvalInterf
1) consecutive_increase and 2) average_increase.
"""
def __init__(self, eval_frequency,
eval_metric_list=None,
eval_metric_list=["accuracy"],
multilabel=False,
use_early_stop=False,
early_stop_burnin_rounds=0,
early_stop_rounds=3,
early_stop_strategy=EARLY_STOP_AVERAGE_INCREASE_STRATEGY): # pylint: disable=unused-argument
# set up default metric to be accuracy
if eval_metric_list is None:
eval_metric_list = ["accuracy"]
super(GSgnnClassificationEvaluator, self).__init__(eval_frequency,
eval_metric_list,
use_early_stop,
Expand Down Expand Up @@ -596,14 +595,11 @@ class GSgnnRegressionEvaluator(GSgnnBaseEvaluator, GSgnnPredictionEvalInterface)
1) consecutive_increase and 2) average_increase.
"""
def __init__(self, eval_frequency,
eval_metric_list=None,
eval_metric_list=["rmse"],
use_early_stop=False,
early_stop_burnin_rounds=0,
early_stop_rounds=3,
early_stop_strategy=EARLY_STOP_AVERAGE_INCREASE_STRATEGY):
# set up default metric to be "rmse"
if eval_metric_list is None:
eval_metric_list = ["rmse"]
super(GSgnnRegressionEvaluator, self).__init__(eval_frequency,
eval_metric_list, use_early_stop, early_stop_burnin_rounds,
early_stop_rounds, early_stop_strategy)
Expand Down Expand Up @@ -708,7 +704,7 @@ def compute_score(self, pred, labels, train=True):
class GSgnnMrrLPEvaluator(GSgnnBaseEvaluator, GSgnnLPRankingEvalInterface):
""" Link Prediction Evaluator using "mrr" as metric.
GS built-in evaluator for Link Prediction task. It uses "mrr" as the default eval metric,
GS built-in evaluator for Link Prediction tasks. It uses "mrr" as the default eval metric,
which implements the `GSgnnLPRankingEvalInterface`.
To create a customized LP evaluator that use evaluation metric other than "mrr", users might
Expand All @@ -733,13 +729,11 @@ class GSgnnMrrLPEvaluator(GSgnnBaseEvaluator, GSgnnLPRankingEvalInterface):
1) consecutive_increase and 2) average_increase.
"""
def __init__(self, eval_frequency,
eval_metric_list=None,
eval_metric_list=["mrr"],
use_early_stop=False,
early_stop_burnin_rounds=0,
early_stop_rounds=3,
early_stop_strategy=EARLY_STOP_AVERAGE_INCREASE_STRATEGY):
if eval_metric_list is None:
eval_metric_list = ["mrr"]
super(GSgnnMrrLPEvaluator, self).__init__(eval_frequency,
eval_metric_list, use_early_stop, early_stop_burnin_rounds,
early_stop_rounds, early_stop_strategy)
Expand Down Expand Up @@ -797,36 +791,50 @@ def evaluate(self, val_rankings, test_rankings, total_iters):

return val_score, test_score

def compute_score(self, rankings):
def compute_score(self, rankings, train=True):
""" Compute evaluation score
Parameters
----------
rankings: dict of tensors
Rankings of positive scores in format of {etype: ranking}
train: boolean
If in model training.
Returns
-------
Evaluation metric values: dict
"""
# We calculate global mrr, etype is ignored.
# User can develop its own per etype MRR evaluator
ranking = []
for _, rank in rankings.items():
ranking.append(rank)
ranking = th.cat(ranking, dim=0)

metrics = gen_mrr_score(ranking)
# compute ranking value for each metric
metrics = {}
for metric in self.metric_list:
if train:
# training expects always a single number to be
# returned and has a different (potentially) evluation function
metrics[metric] = self.metrics_obj.metric_function[metric](ranking)
else:
# validation or testing may have a different
# evaluation function, in our case the evaluation code
# may return a dictionary with the metric values for each metric
metrics[metric] = self.metrics_obj.metric_eval_function[metric](ranking)

# When world size == 1, we do not need the barrier
if get_world_size() > 1:
barrier()
for _, metric_val in metrics.items():
th.distributed.all_reduce(metric_val)

return_metrics = {}
for metric, metric_val in metrics.items():
return_metric = metric_val / get_world_size()
return_metrics[metric] = return_metric.item()

return return_metrics


Expand All @@ -853,14 +861,12 @@ class GSgnnPerEtypeMrrLPEvaluator(GSgnnBaseEvaluator, GSgnnLPRankingEvalInterfac
1) consecutive_increase and 2) average_increase.
"""
def __init__(self, eval_frequency,
eval_metric_list=None,
eval_metric_list=["mrr"],
major_etype = LINK_PREDICTION_MAJOR_EVAL_ETYPE_ALL,
use_early_stop=False,
early_stop_burnin_rounds=0,
early_stop_rounds=3,
early_stop_strategy=EARLY_STOP_AVERAGE_INCREASE_STRATEGY):
if eval_metric_list is None:
eval_metric_list = ["mrr"]
super(GSgnnPerEtypeMrrLPEvaluator, self).__init__(eval_frequency,
eval_metric_list,
use_early_stop,
Expand All @@ -878,33 +884,47 @@ def __init__(self, eval_frequency,
self._best_test_score[metric] = self.metrics_obj.init_best_metric(metric=metric)
self._best_iter[metric] = 0

def compute_score(self, rankings):
def compute_score(self, rankings, train=True):
""" Compute evaluation score
Parameters
----------
rankings: dict of tensors
Rankings of positive scores in format of {etype: ranking}
train: boolean
If in model training.
Returns
-------
Evaluation metric values: dict
"""
# We calculate global mrr, etype is ignored.
# User can develop its own per etype MRR evaluator
metrics = {}
for etype, rank in rankings.items():
metrics[etype] = gen_mrr_score(rank)
per_etype_metrics = {}
for etype, ranking in rankings.items():
# compute ranking value for each metric
metrics = {}
for metric in self.metric_list:
if train:
# training expects always a single number to be
# returned and has a different (potentially) evluation function
metrics[metric] = self.metrics_obj.metric_function[metric](ranking)
else:
# validation or testing may have a different
# evaluation function, in our case the evaluation code
# may return a dictionary with the metric values for each metric
metrics[metric] = self.metrics_obj.metric_eval_function[metric](ranking)
per_etype_metrics[etype] = metrics

# When world size == 1, we do not need the barrier
if get_world_size() > 1:
barrier()
for _, metric in metrics.items():
for _, metric in per_etype_metrics.items():
for _, metric_val in metric.items():
th.distributed.all_reduce(metric_val)

return_metrics = {}
for etype, metric in metrics.items():
for etype, metric in per_etype_metrics.items():
for metric_key, metric_val in metric.items():
return_metric = metric_val / get_world_size()
if metric_key not in return_metrics:
Expand Down Expand Up @@ -946,7 +966,8 @@ def evaluate(self, val_rankings, test_rankings, total_iters):
if test_rankings is not None:
test_score = self.compute_score(test_rankings)
else:
test_score = {"mrr": "N/A"} # Dummy
for metric in self.metric_list:
test_score = {metric: "N/A"} # Dummy

if val_rankings is not None:
val_score = self.compute_score(val_rankings)
Expand All @@ -962,7 +983,8 @@ def evaluate(self, val_rankings, test_rankings, total_iters):
self._best_test_score[metric] = major_test_score
self._best_iter[metric] = total_iters
else:
val_score = {"mrr": "N/A"} # Dummy
for metric in self.metric_list:
val_score = {metric: "N/A"} # Dummy

return val_score, test_score

Expand Down
8 changes: 4 additions & 4 deletions python/graphstorm/trainer/lp_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,16 @@ def eval(self, model, data, val_loader, test_loader,
edge_mask=edge_mask_for_gnn_embeddings,
task_tracker=self.task_tracker)
sys_tracker.check('compute embeddings')
val_rankings = lp_mini_batch_predict(model, emb, val_loader, self.device) \
val_scores = lp_mini_batch_predict(model, emb, val_loader, self.device) \
if val_loader is not None else None
sys_tracker.check('after_val_score')
if test_loader is not None:
test_rankings = lp_mini_batch_predict(model, emb, test_loader, self.device)
test_scores = lp_mini_batch_predict(model, emb, test_loader, self.device)
else:
test_rankings = None
test_scores = None
sys_tracker.check('after_test_score')
val_score, test_score = self.evaluator.evaluate(
val_rankings, test_rankings, total_steps)
val_scores, test_scores, total_steps)
sys_tracker.check('evaluate validation/test')
model.train()

Expand Down
4 changes: 2 additions & 2 deletions tests/unit-tests/test_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_mrr_per_etype_lp_evaluation():

th.distributed.destroy_process_group()

def test_lp_evaluator():
def test_mrr_lp_evaluator():
# system heavily depends on th distributed
dist_init_method = 'tcp://{master_ip}:{master_port}'.format(
master_ip='127.0.0.1', master_port='12346')
Expand Down Expand Up @@ -852,7 +852,7 @@ def test_get_val_score_rank():
if __name__ == '__main__':
# test evaluators
test_mrr_per_etype_lp_evaluation()
test_lp_evaluator()
test_mrr_lp_evaluator()
test_regression_evaluator()
test_early_stop_avg_increase_judge()
test_early_stop_cons_increase_judge()
Expand Down

0 comments on commit 39d7d80

Please sign in to comment.