Skip to content

Commit

Permalink
WIP: Import results from perflogs
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarak committed Oct 24, 2024
1 parent 2a41433 commit 92aa97c
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 3 deletions.
5 changes: 3 additions & 2 deletions reframe/frontend/argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def add_argument(self, *flags, **kwargs):

if flags and opt_name is None:
# A positional argument
opt_name = flags[-1]
opt_name, flags = flags[-1], flags[:-1]

if opt_name is None:
raise ValueError('could not infer a dest name: no flags defined')
Expand Down Expand Up @@ -230,7 +230,8 @@ def add_argument(self, *flags, **kwargs):
except KeyError:
self._defaults.__dict__[opt_name] = None

if not flags:
positional = kwargs.pop('positional', False)
if not flags and not positional:
return None

return self._holder.add_argument(*flags, **kwargs)
Expand Down
21 changes: 21 additions & 0 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import time
import traceback
import yaml

import reframe.core.config as config
import reframe.core.exceptions as errors
Expand Down Expand Up @@ -464,6 +465,10 @@ def main():
action_options.add_argument(
'-V', '--version', action='version', version=osext.reframe_version()
)
action_options.add_argument(
'--import-results', action='store', metavar='SPECFILE',
help='Import results to the database'
)

# Run options
run_options.add_argument(
Expand Down Expand Up @@ -809,6 +814,7 @@ def main():
action='store_true',
help='Use a login shell for job scripts'
)
argparser.add_argument('args', metavar='ARGS', nargs='*', positional=True)

def restrict_logging():
'''Restrict logging to errors only.
Expand Down Expand Up @@ -1034,6 +1040,21 @@ def restrict_logging():
)
sys.exit(0)

if options.import_results:
with exit_gracefully_on_error('failed to import results', printer):
with open(options.import_results) as fp:
spec = yaml.load(fp, yaml.Loader)

if spec['import']['from'] == 'perflog':
kwargs = spec['import']
del kwargs['from']
report = reporting.RunReport.create_from_perflog(*options.args,
**kwargs)
# report.save('foo.json', link_to_last=False)
uuid = report.store()
printer.info(f'Results imported successfully as session {uuid}')
sys.exit(0)

# Show configuration after everything is set up
if options.show_config:
# Restore logging level
Expand Down
130 changes: 129 additions & 1 deletion reframe/frontend/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import os
import re
import socket
import sys
import time
import uuid
from collections import UserDict
from collections.abc import Hashable
from datetime import datetime
from filelock import FileLock

import reframe as rfm
Expand All @@ -26,7 +28,7 @@
from reframe.core.logging import getlogger, _format_time_rfc3339, time_function
from reframe.core.runtime import runtime
from reframe.core.warnings import suppress_deprecations
from reframe.utility import nodelist_abbrev, OrderedSet
from reframe.utility import nodelist_abbrev, nodelist_expand, OrderedSet
from .storage import StorageBackend
from .utility import Aggregator, parse_cmp_spec, parse_query_spec

Expand Down Expand Up @@ -270,6 +272,132 @@ def __getitem__(self, key):
def __rfm_json_encode__(self):
return self.__report

@classmethod
def create_from_perflog(cls, *logfiles, format=None,
merge_records=None, datefmt=None,
ignore_lines=None, ignore_records=None):
def _filter_record(rec):
if ignore_records is None:
return False
else:
return eval(ignore_records, {}, rec)

def _do_merge(dst, src):
system = src.get('system')
part = src.get('partition')
pvar = src.get('pvar')
pval = src.get('pval')
pref = src.get('pref')
plower = src.get('plower')
pupper = src.get('pupper')
punit = src.get('punit')
if pvar is None:
return dst

if system is not None and part is not None:
pvar = f'{system}:{part}:{pvar}'

# Convert to numbers before inserting
def _convert(x):
if x is None:
return x

if x == 'null':
return None

return float(x)

pval = _convert(pval)
pref = _convert(pref)
pupper = _convert(pupper)
plower = _convert(plower)
dst['perfvalues'][pvar] = (pval, pref, plower, pupper, punit)
dst.pop('pvar', None)
dst.pop('pval', None)
dst.pop('pref', None)
dst.pop('plower', None)
dst.pop('pupper', None)
dst.pop('punit', None)
return dst

patt = re.compile(format)
report = RunReport()
session_uuid = report['session_info']['uuid']
run_index = 0
test_index = 0
t_report_start = sys.maxsize
t_report_end = 0
num_failures = 0
testcases = []
for filename in logfiles:
records = {}
with open(filename) as fp:
for lineno, line in enumerate(fp, start=1):
if lineno in ignore_lines:
continue

m = patt.match(line)
if not m:
continue

rec = m.groupdict()
if _filter_record(rec):
continue

# Add parameters as separate fields
if 'name' in rec:
params = rec['name'].split()[1:]
for spec in params:
p, v = spec.split('=', maxsplit=1)
rec[p[1:]] = v

# Groom the record
if 'job_completion_time' in rec:
key = 'job_completion_time'
date = datetime.strptime(rec[key], datefmt)
rec[key] = date.strftime(_DATETIME_FMT)
ts = date.timestamp()
rec[f'{key}_unix'] = ts
t_report_start = min(t_report_start, ts)
t_report_end = max(t_report_end, ts)

if 'job_nodelist' in rec:
key = 'job_nodelist'
rec[key] = nodelist_expand(rec[key])

rec['uuid'] = f'{session_uuid}:{run_index}:{test_index}'
rec.setdefault('result', 'pass')
if rec['result'] != 'pass':
num_failures += 1

if not merge_records:
key = lineno
elif len(merge_records) == 1:
key = rec[merge_records[0]]
else:
key = tuple(rec[k] for k in merge_records)

if key in records:
records[key] = _do_merge(records[key], rec)
else:
rec['perfvalues'] = {}
records[key] = _do_merge(rec, rec)
test_index += 1

testcases += list(records.values())

report.update_timestamps(t_report_start, t_report_end)
report._add_run({
'num_cases': len(testcases),
'num_failures': num_failures,
'run_index': run_index,
'testcases': testcases
})
return report

def _add_run(self, run):
self.__report['runs'].append(run)

def update_session_info(self, session_info):
# Remove timestamps
for key, val in session_info.items():
Expand Down

0 comments on commit 92aa97c

Please sign in to comment.