diff --git a/reframe/frontend/argparse.py b/reframe/frontend/argparse.py index ceec757b1..288735df5 100644 --- a/reframe/frontend/argparse.py +++ b/reframe/frontend/argparse.py @@ -196,7 +196,7 @@ def add_argument(self, *flags, **kwargs): if flags and opt_name is None: # A positional argument - opt_name = flags[-1] + opt_name, flags = flags[-1], flags[:-1] if opt_name is None: raise ValueError('could not infer a dest name: no flags defined') @@ -230,7 +230,8 @@ def add_argument(self, *flags, **kwargs): except KeyError: self._defaults.__dict__[opt_name] = None - if not flags: + positional = kwargs.pop('positional', False) + if not flags and not positional: return None return self._holder.add_argument(*flags, **kwargs) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index cda760463..90e791e3a 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -13,6 +13,7 @@ import sys import time import traceback +import yaml import reframe.core.config as config import reframe.core.exceptions as errors @@ -464,6 +465,10 @@ def main(): action_options.add_argument( '-V', '--version', action='version', version=osext.reframe_version() ) + action_options.add_argument( + '--import-results', action='store', metavar='SPECFILE', + help='Import results to the database' + ) # Run options run_options.add_argument( @@ -809,6 +814,7 @@ def main(): action='store_true', help='Use a login shell for job scripts' ) + argparser.add_argument('args', metavar='ARGS', nargs='*', positional=True) def restrict_logging(): '''Restrict logging to errors only. @@ -1034,6 +1040,21 @@ def restrict_logging(): ) sys.exit(0) + if options.import_results: + with exit_gracefully_on_error('failed to import results', printer): + with open(options.import_results) as fp: + spec = yaml.load(fp, yaml.Loader) + + if spec['import']['from'] == 'perflog': + kwargs = spec['import'] + del kwargs['from'] + report = reporting.RunReport.create_from_perflog(*options.args, + **kwargs) + # report.save('foo.json', link_to_last=False) + uuid = report.store() + printer.info(f'Results imported successfully as session {uuid}') + sys.exit(0) + # Show configuration after everything is set up if options.show_config: # Restore logging level diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index 08d3deb21..6ce11005c 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -13,10 +13,12 @@ import os import re import socket +import sys import time import uuid from collections import UserDict from collections.abc import Hashable +from datetime import datetime from filelock import FileLock import reframe as rfm @@ -26,7 +28,7 @@ from reframe.core.logging import getlogger, _format_time_rfc3339, time_function from reframe.core.runtime import runtime from reframe.core.warnings import suppress_deprecations -from reframe.utility import nodelist_abbrev, OrderedSet +from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet from .storage import StorageBackend from .utility import Aggregator, parse_cmp_spec, parse_query_spec @@ -270,6 +272,132 @@ def __getitem__(self, key): def __rfm_json_encode__(self): return self.__report + @classmethod + def create_from_perflog(cls, *logfiles, format=None, + merge_records=None, datefmt=None, + ignore_lines=None, ignore_records=None): + def _filter_record(rec): + if ignore_records is None: + return False + else: + return eval(ignore_records, {}, rec) + + def _do_merge(dst, src): + system = src.get('system') + part = src.get('partition') + pvar = src.get('pvar') + pval = src.get('pval') + pref = src.get('pref') + plower = src.get('plower') + pupper = src.get('pupper') + punit = src.get('punit') + if pvar is None: + return dst + + if system is not None and part is not None: + pvar = f'{system}:{part}:{pvar}' + + # Convert to numbers before inserting + def _convert(x): + if x is None: + return x + + if x == 'null': + return None + + return float(x) + + pval = _convert(pval) + pref = _convert(pref) + pupper = _convert(pupper) + plower = _convert(plower) + dst['perfvalues'][pvar] = (pval, pref, plower, pupper, punit) + dst.pop('pvar', None) + dst.pop('pval', None) + dst.pop('pref', None) + dst.pop('plower', None) + dst.pop('pupper', None) + dst.pop('punit', None) + return dst + + patt = re.compile(format) + report = RunReport() + session_uuid = report['session_info']['uuid'] + run_index = 0 + test_index = 0 + t_report_start = sys.maxsize + t_report_end = 0 + num_failures = 0 + testcases = [] + for filename in logfiles: + records = {} + with open(filename) as fp: + for lineno, line in enumerate(fp, start=1): + if lineno in ignore_lines: + continue + + m = patt.match(line) + if not m: + continue + + rec = m.groupdict() + if _filter_record(rec): + continue + + # Add parameters as separate fields + if 'name' in rec: + params = rec['name'].split()[1:] + for spec in params: + p, v = spec.split('=', maxsplit=1) + rec[p[1:]] = v + + # Groom the record + if 'job_completion_time' in rec: + key = 'job_completion_time' + date = datetime.strptime(rec[key], datefmt) + rec[key] = date.strftime(_DATETIME_FMT) + ts = date.timestamp() + rec[f'{key}_unix'] = ts + t_report_start = min(t_report_start, ts) + t_report_end = max(t_report_end, ts) + + if 'job_nodelist' in rec: + key = 'job_nodelist' + rec[key] = nodelist_expand(rec[key]) + + rec['uuid'] = f'{session_uuid}:{run_index}:{test_index}' + rec.setdefault('result', 'pass') + if rec['result'] != 'pass': + num_failures += 1 + + if not merge_records: + key = lineno + elif len(merge_records) == 1: + key = rec[merge_records[0]] + else: + key = tuple(rec[k] for k in merge_records) + + if key in records: + records[key] = _do_merge(records[key], rec) + else: + rec['perfvalues'] = {} + records[key] = _do_merge(rec, rec) + test_index += 1 + + testcases += list(records.values()) + + report.update_timestamps(t_report_start, t_report_end) + report._add_run({ + 'num_cases': len(testcases), + 'num_failures': num_failures, + 'run_index': run_index, + 'testcases': testcases + }) + return report + + def _add_run(self, run): + self.__report['runs'].append(run) + def update_session_info(self, session_info): # Remove timestamps for key, val in session_info.items():