Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Asyncio execution policy #3347

Draft
wants to merge 20 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions reframe/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class AbortTaskError(ReframeError):
error in other places etc.)
'''

class KeyboardError(ReframeError):
'''Raised when there is a KeyboardInterrupt during the asyncio execution
'''

class ConfigError(ReframeError):
'''Raised when a configuration error occurs.'''
Expand Down
30 changes: 21 additions & 9 deletions reframe/core/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: BSD-3-Clause

import asyncio
import functools
import inspect

Expand Down Expand Up @@ -101,17 +102,28 @@ def select_hooks(obj, kind):
return [h for h in hooks.get(phase, [])
if h.__name__ not in getattr(obj, '_disabled_hooks', [])]

@functools.wraps(func)
def _fn(obj, *args, **kwargs):
for h in select_hooks(obj, 'pre_'):
getattr(obj, h.__name__)()

func(obj, *args, **kwargs)
for h in select_hooks(obj, 'post_'):
getattr(obj, h.__name__)()
# maybe this could be improved
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def _fn(obj, *args, **kwargs):
for h in select_hooks(obj, 'pre_'):
getattr(obj, h.__name__)()

await func(obj, *args, **kwargs)
for h in select_hooks(obj, 'post_'):
getattr(obj, h.__name__)()
return _fn
else:
@functools.wraps(func)
def _fn(obj, *args, **kwargs):
for h in select_hooks(obj, 'pre_'):
getattr(obj, h.__name__)()

return _fn
func(obj, *args, **kwargs)
for h in select_hooks(obj, 'post_'):
getattr(obj, h.__name__)()

return _fn
return _deco


Expand Down
74 changes: 59 additions & 15 deletions reframe/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: BSD-3-Clause

import abc
import asyncio
import logging
import logging.handlers
import numbers
Expand Down Expand Up @@ -953,24 +954,44 @@ def adjust_verbosity(self, num_steps):

_logger = None
_perf_logger = None
_context_logger = null_logger

global tasks_loggers
tasks_loggers = {}

global _global_logger
_global_logger = null_logger


class logging_context:
def __init__(self, check=None, level=DEBUG):
global _context_logger
try:
task = current_task()
except RuntimeError:
global _global_logger
task = None

self._orig_logger = _global_logger

self._level = level
self._orig_logger = _context_logger
self._context_logger = _global_logger
if check is not None:
_context_logger = LoggerAdapter(_logger, check)
_context_logger.colorize = self._orig_logger.colorize
self._context_logger = LoggerAdapter(_logger, check)
self._context_logger.colorize = self._orig_logger.colorize

if task:
tasks_loggers[task] = self._context_logger
else:
_global_logger = self._context_logger

def __enter__(self):
return _context_logger
return self._context_logger

def __exit__(self, exc_type, exc_value, traceback):
global _context_logger
global _global_logger
try:
task = current_task()
except RuntimeError:
task = None

# Log any exceptions thrown with the current context logger
if exc_type is not None:
Expand All @@ -979,20 +1000,23 @@ def __exit__(self, exc_type, exc_value, traceback):
getlogger().log(self._level, msg.format(exc_fullname, exc_value))

# Restore context logger
_context_logger = self._orig_logger
_global_logger = self._orig_logger

if task:
tasks_loggers[task] = self._orig_logger


def configure_logging(site_config):
global _logger, _context_logger, _perf_logger
global _logger, _global_logger, _perf_logger

if site_config is None:
_logger = None
_context_logger = null_logger
_global_logger = null_logger
return

_logger = _create_logger(site_config, 'handlers$', 'handlers')
_perf_logger = _create_logger(site_config, 'handlers_perflog')
_context_logger = LoggerAdapter(_logger)
_global_logger = LoggerAdapter(_logger)


def log_files():
Expand All @@ -1007,7 +1031,15 @@ def save_log_files(dest):


def getlogger():
return _context_logger
try:
task = current_task()
except RuntimeError:
task = None
if task:
logger_task = tasks_loggers.get(task)
if logger_task:
return tasks_loggers[task]
return _global_logger


def getperflogger(check):
Expand Down Expand Up @@ -1056,11 +1088,23 @@ class logging_sandbox:
def __enter__(self):
self._logger = _logger
self._perf_logger = _perf_logger
self._context_logger = _context_logger
self._context_logger = _global_logger

def __exit__(self, exc_type, exc_value, traceback):
global _logger, _perf_logger, _context_logger
global _logger, _perf_logger, _global_logger

_logger = self._logger
_perf_logger = self._perf_logger
_context_logger = self._context_logger
_global_logger = self._context_logger


def current_task():
"""Wrapper for asyncio.current_task() compatible
with Python 3.6 and later.
"""
if sys.version_info >= (3, 7):
# Use asyncio.current_task() directly in Python 3.7+
return asyncio.current_task()
else:
# Fallback to asyncio.tasks.current_task() in Python 3.6
return asyncio.Task.current_task()
45 changes: 24 additions & 21 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ def __new__(cls, *args, **kwargs):
prefix = cls._rfm_custom_prefix
except AttributeError:
if osext.is_interactive():
prefix = os.getcwd()
prefix = rt.get_working_dir()
else:
try:
prefix = cls._rfm_pinned_prefix
Expand Down Expand Up @@ -1762,6 +1762,7 @@ def setup(self, partition, environ, **job_opts):
more details.

'''
os.chdir(rt.get_working_dir())
self._current_partition = partition
self._current_environ = environ
self._setup_paths()
Expand All @@ -1788,7 +1789,7 @@ def _clone_to_stagedir(self, url):
)

@final
def compile(self):
async def compile(self):
'''The compilation phase of the regression test pipeline.

:raises reframe.core.exceptions.ReframeError: In case of errors.
Expand Down Expand Up @@ -1881,7 +1882,7 @@ def compile(self):
# override those set by the framework.
resources_opts = self._map_resources_to_jobopts()
self._build_job.options = resources_opts + self._build_job.options
with osext.change_dir(self._stagedir):
with osext.change_dir_global(self._stagedir):
# Prepare build job
build_commands = [
*self.prebuild_cmds,
Expand All @@ -1899,10 +1900,10 @@ def compile(self):
raise PipelineError('failed to prepare build job') from e

if not self.is_dry_run():
self._build_job.submit()
await self._build_job.submit()

@final
def compile_wait(self):
async def compile_wait(self):
'''Wait for compilation phase to finish.

.. versionadded:: 2.13
Expand All @@ -1924,19 +1925,19 @@ def compile_wait(self):
if self.is_dry_run():
return

self._build_job.wait()
await self._build_job.wait()

# We raise a BuildError when we an exit code and it is non zero
if self._build_job.exitcode:
raise BuildError(
f'build job failed with exit code: {self._build_job.exitcode}'
)

with osext.change_dir(self._stagedir):
with osext.change_dir_global(self._stagedir):
self.build_system.post_build(self._build_job)

@final
def run(self):
async def run(self):
'''The run phase of the regression test pipeline.

This call is non-blocking.
Expand Down Expand Up @@ -2030,7 +2031,7 @@ def _get_cp_env():
# override those set by the framework.
resources_opts = self._map_resources_to_jobopts()
self._job.options = resources_opts + self._job.options
with osext.change_dir(self._stagedir):
with osext.change_dir_global(self._stagedir):
try:
self.logger.debug('Generating the run script')
self._job.prepare(
Expand All @@ -2046,7 +2047,7 @@ def _get_cp_env():
raise PipelineError('failed to prepare run job') from e

if not self.is_dry_run():
self._job.submit()
await self._job.submit()
self.logger.debug(f'Spawned run job (id={self.job.jobid})')

# Update num_tasks if test is flexible
Expand Down Expand Up @@ -2113,7 +2114,7 @@ def run_complete(self):
return self._job.finished()

@final
def run_wait(self):
async def run_wait(self):
'''Wait for the run phase of this test to finish.

:raises reframe.core.exceptions.ReframeError: In case of errors.
Expand All @@ -2133,7 +2134,7 @@ def run_wait(self):
if self.is_dry_run():
return

self._job.wait()
await self._job.wait()

@final
def sanity(self):
Expand Down Expand Up @@ -2197,7 +2198,7 @@ def check_sanity(self):
if self.is_dry_run():
return

with osext.change_dir(self._stagedir):
with osext.change_dir_global(self._stagedir):
success = sn.evaluate(self.sanity_patterns)
if not success:
raise SanityError()
Expand Down Expand Up @@ -2254,7 +2255,7 @@ def check_performance(self):
unit)

# Evaluate the performance function and retrieve the metrics
with osext.change_dir(self._stagedir):
with osext.change_dir_global(self._stagedir):
for tag, expr in self.perf_variables.items():
try:
value = expr.evaluate() if not self.is_dry_run() else None
Expand Down Expand Up @@ -2339,7 +2340,7 @@ def _copy_to_outputdir(self):
self._copy_job_files(self._job, self.outputdir)
self._copy_job_files(self._build_job, self.outputdir)

with osext.change_dir(self.stagedir):
with osext.change_dir_global(self.stagedir):
# Copy files specified by the user, but expand any glob patterns
keep_files = itertools.chain(
*(glob.iglob(f) for f in self.keep_files)
Expand Down Expand Up @@ -2582,26 +2583,27 @@ def setup(self, partition, environ, **job_opts):
Similar to the :func:`RegressionTest.setup`, except that no build job
is created for this test.
'''
os.chdir(rt.get_working_dir())
self._current_partition = partition
self._current_environ = environ
self._setup_paths()
self._setup_run_job(**job_opts)
self._setup_container_platform()
self._resolve_fixtures()

def compile(self):
async def compile(self):
'''The compilation phase of the regression test pipeline.

This is a no-op for this type of test.
'''

def compile_wait(self):
async def compile_wait(self):
'''Wait for compilation phase to finish.

This is a no-op for this type of test.
'''

def run(self):
async def run(self):
'''The run phase of the regression test pipeline.

The resources of the test are copied to the stage directory and the
Expand All @@ -2614,7 +2616,7 @@ def run(self):
self._copy_to_stagedir(os.path.join(self._prefix,
self.sourcesdir))

super().run()
await super().run()


class CompileOnlyRegressionTest(RegressionTest, special=True):
Expand Down Expand Up @@ -2648,6 +2650,7 @@ def setup(self, partition, environ, **job_opts):
Similar to the :func:`RegressionTest.setup`, except that no run job
is created for this test.
'''
os.chdir(rt.get_working_dir())
# No need to setup the job for compile-only checks
self._current_partition = partition
self._current_environ = environ
Expand All @@ -2666,13 +2669,13 @@ def stdout(self):
def stderr(self):
return self.build_job.stderr if self.build_job else None

def run(self):
async def run(self):
'''The run stage of the regression test pipeline.

Implemented as no-op.
'''

def run_wait(self):
async def run_wait(self):
'''Wait for this test to finish.

Implemented as no-op
Expand Down
Loading
Loading