From bee968e7a9fb49e277c16fd2f166d8eea7167134 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Mon, 9 Dec 2024 18:36:21 +0100 Subject: [PATCH 01/20] Removed the configuration detection code --- reframe/core/exceptions.py | 3 + reframe/core/hooks.py | 30 +- reframe/core/logging.py | 74 ++- reframe/core/pipeline.py | 45 +- reframe/core/runtime.py | 19 + reframe/core/schedulers/__init__.py | 9 +- reframe/core/schedulers/local.py | 99 ++- reframe/core/schedulers/slurm.py | 53 +- reframe/frontend/autodetect.py | 6 +- reframe/frontend/cli.py | 6 +- reframe/frontend/executors/__init__.py | 99 ++- reframe/frontend/executors/policies.py | 583 +++++++++--------- reframe/utility/osext.py | 112 ++++ unittests/conftest.py | 4 +- unittests/resources/checks/frontend_checks.py | 14 +- unittests/test_perflogging.py | 1 + unittests/test_pipeline.py | 32 +- unittests/test_policies.py | 15 +- unittests/test_reporting.py | 1 + 19 files changed, 776 insertions(+), 429 deletions(-) diff --git a/reframe/core/exceptions.py b/reframe/core/exceptions.py index 49640d671d..04045ae6a6 100644 --- a/reframe/core/exceptions.py +++ b/reframe/core/exceptions.py @@ -110,6 +110,9 @@ class AbortTaskError(ReframeError): error in other places etc.) ''' +class KeyboardError(ReframeError): + '''Raised when there is a KeyboardInterrupt during the asyncio execution + ''' class ConfigError(ReframeError): '''Raised when a configuration error occurs.''' diff --git a/reframe/core/hooks.py b/reframe/core/hooks.py index 80cb1f9bbe..48e96efa6e 100644 --- a/reframe/core/hooks.py +++ b/reframe/core/hooks.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import asyncio import functools import inspect @@ -101,17 +102,28 @@ def select_hooks(obj, kind): return [h for h in hooks.get(phase, []) if h.__name__ not in getattr(obj, '_disabled_hooks', [])] - @functools.wraps(func) - def _fn(obj, *args, **kwargs): - for h in select_hooks(obj, 'pre_'): - getattr(obj, h.__name__)() - - func(obj, *args, **kwargs) - for h in select_hooks(obj, 'post_'): - getattr(obj, h.__name__)() + # maybe this could be improved + if asyncio.iscoroutinefunction(func): + @functools.wraps(func) + async def _fn(obj, *args, **kwargs): + for h in select_hooks(obj, 'pre_'): + getattr(obj, h.__name__)() + + await func(obj, *args, **kwargs) + for h in select_hooks(obj, 'post_'): + getattr(obj, h.__name__)() + return _fn + else: + @functools.wraps(func) + def _fn(obj, *args, **kwargs): + for h in select_hooks(obj, 'pre_'): + getattr(obj, h.__name__)() - return _fn + func(obj, *args, **kwargs) + for h in select_hooks(obj, 'post_'): + getattr(obj, h.__name__)() + return _fn return _deco diff --git a/reframe/core/logging.py b/reframe/core/logging.py index bf68536c4b..f3baba5433 100644 --- a/reframe/core/logging.py +++ b/reframe/core/logging.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause import abc +import asyncio import logging import logging.handlers import numbers @@ -953,24 +954,44 @@ def adjust_verbosity(self, num_steps): _logger = None _perf_logger = None -_context_logger = null_logger + +global tasks_loggers +tasks_loggers = {} + +global _global_logger +_global_logger = null_logger class logging_context: def __init__(self, check=None, level=DEBUG): - global _context_logger + try: + task = current_task() + except RuntimeError: + global _global_logger + task = None + + self._orig_logger = _global_logger self._level = level - self._orig_logger = _context_logger + self._context_logger = _global_logger if check is not None: - _context_logger = LoggerAdapter(_logger, check) - _context_logger.colorize = self._orig_logger.colorize + self._context_logger = LoggerAdapter(_logger, check) + self._context_logger.colorize = self._orig_logger.colorize + + if task: + tasks_loggers[task] = self._context_logger + else: + _global_logger = self._context_logger def __enter__(self): - return _context_logger + return self._context_logger def __exit__(self, exc_type, exc_value, traceback): - global _context_logger + global _global_logger + try: + task = current_task() + except RuntimeError: + task = None # Log any exceptions thrown with the current context logger if exc_type is not None: @@ -979,20 +1000,23 @@ def __exit__(self, exc_type, exc_value, traceback): getlogger().log(self._level, msg.format(exc_fullname, exc_value)) # Restore context logger - _context_logger = self._orig_logger + _global_logger = self._orig_logger + + if task: + tasks_loggers[task] = self._orig_logger def configure_logging(site_config): - global _logger, _context_logger, _perf_logger + global _logger, _global_logger, _perf_logger if site_config is None: _logger = None - _context_logger = null_logger + _global_logger = null_logger return _logger = _create_logger(site_config, 'handlers$', 'handlers') _perf_logger = _create_logger(site_config, 'handlers_perflog') - _context_logger = LoggerAdapter(_logger) + _global_logger = LoggerAdapter(_logger) def log_files(): @@ -1007,7 +1031,15 @@ def save_log_files(dest): def getlogger(): - return _context_logger + try: + task = current_task() + except RuntimeError: + task = None + if task: + logger_task = tasks_loggers.get(task) + if logger_task: + return tasks_loggers[task] + return _global_logger def getperflogger(check): @@ -1056,11 +1088,23 @@ class logging_sandbox: def __enter__(self): self._logger = _logger self._perf_logger = _perf_logger - self._context_logger = _context_logger + self._context_logger = _global_logger def __exit__(self, exc_type, exc_value, traceback): - global _logger, _perf_logger, _context_logger + global _logger, _perf_logger, _global_logger _logger = self._logger _perf_logger = self._perf_logger - _context_logger = self._context_logger + _global_logger = self._context_logger + + +def current_task(): + """Wrapper for asyncio.current_task() compatible + with Python 3.6 and later. + """ + if sys.version_info >= (3, 7): + # Use asyncio.current_task() directly in Python 3.7+ + return asyncio.current_task() + else: + # Fallback to asyncio.tasks.current_task() in Python 3.6 + return asyncio.Task.current_task() diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 8c3ab5e46c..364dd8cf8e 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1018,7 +1018,7 @@ def __new__(cls, *args, **kwargs): prefix = cls._rfm_custom_prefix except AttributeError: if osext.is_interactive(): - prefix = os.getcwd() + prefix = rt.get_working_dir() else: try: prefix = cls._rfm_pinned_prefix @@ -1762,6 +1762,7 @@ def setup(self, partition, environ, **job_opts): more details. ''' + os.chdir(rt.get_working_dir()) self._current_partition = partition self._current_environ = environ self._setup_paths() @@ -1788,7 +1789,7 @@ def _clone_to_stagedir(self, url): ) @final - def compile(self): + async def compile(self): '''The compilation phase of the regression test pipeline. :raises reframe.core.exceptions.ReframeError: In case of errors. @@ -1881,7 +1882,7 @@ def compile(self): # override those set by the framework. resources_opts = self._map_resources_to_jobopts() self._build_job.options = resources_opts + self._build_job.options - with osext.change_dir(self._stagedir): + with osext.change_dir_global(self._stagedir): # Prepare build job build_commands = [ *self.prebuild_cmds, @@ -1899,10 +1900,10 @@ def compile(self): raise PipelineError('failed to prepare build job') from e if not self.is_dry_run(): - self._build_job.submit() + await self._build_job.submit() @final - def compile_wait(self): + async def compile_wait(self): '''Wait for compilation phase to finish. .. versionadded:: 2.13 @@ -1924,7 +1925,7 @@ def compile_wait(self): if self.is_dry_run(): return - self._build_job.wait() + await self._build_job.wait() # We raise a BuildError when we an exit code and it is non zero if self._build_job.exitcode: @@ -1932,11 +1933,11 @@ def compile_wait(self): f'build job failed with exit code: {self._build_job.exitcode}' ) - with osext.change_dir(self._stagedir): + with osext.change_dir_global(self._stagedir): self.build_system.post_build(self._build_job) @final - def run(self): + async def run(self): '''The run phase of the regression test pipeline. This call is non-blocking. @@ -2030,7 +2031,7 @@ def _get_cp_env(): # override those set by the framework. resources_opts = self._map_resources_to_jobopts() self._job.options = resources_opts + self._job.options - with osext.change_dir(self._stagedir): + with osext.change_dir_global(self._stagedir): try: self.logger.debug('Generating the run script') self._job.prepare( @@ -2046,7 +2047,7 @@ def _get_cp_env(): raise PipelineError('failed to prepare run job') from e if not self.is_dry_run(): - self._job.submit() + await self._job.submit() self.logger.debug(f'Spawned run job (id={self.job.jobid})') # Update num_tasks if test is flexible @@ -2113,7 +2114,7 @@ def run_complete(self): return self._job.finished() @final - def run_wait(self): + async def run_wait(self): '''Wait for the run phase of this test to finish. :raises reframe.core.exceptions.ReframeError: In case of errors. @@ -2133,7 +2134,7 @@ def run_wait(self): if self.is_dry_run(): return - self._job.wait() + await self._job.wait() @final def sanity(self): @@ -2197,7 +2198,7 @@ def check_sanity(self): if self.is_dry_run(): return - with osext.change_dir(self._stagedir): + with osext.change_dir_global(self._stagedir): success = sn.evaluate(self.sanity_patterns) if not success: raise SanityError() @@ -2254,7 +2255,7 @@ def check_performance(self): unit) # Evaluate the performance function and retrieve the metrics - with osext.change_dir(self._stagedir): + with osext.change_dir_global(self._stagedir): for tag, expr in self.perf_variables.items(): try: value = expr.evaluate() if not self.is_dry_run() else None @@ -2339,7 +2340,7 @@ def _copy_to_outputdir(self): self._copy_job_files(self._job, self.outputdir) self._copy_job_files(self._build_job, self.outputdir) - with osext.change_dir(self.stagedir): + with osext.change_dir_global(self.stagedir): # Copy files specified by the user, but expand any glob patterns keep_files = itertools.chain( *(glob.iglob(f) for f in self.keep_files) @@ -2582,6 +2583,7 @@ def setup(self, partition, environ, **job_opts): Similar to the :func:`RegressionTest.setup`, except that no build job is created for this test. ''' + os.chdir(rt.get_working_dir()) self._current_partition = partition self._current_environ = environ self._setup_paths() @@ -2589,19 +2591,19 @@ def setup(self, partition, environ, **job_opts): self._setup_container_platform() self._resolve_fixtures() - def compile(self): + async def compile(self): '''The compilation phase of the regression test pipeline. This is a no-op for this type of test. ''' - def compile_wait(self): + async def compile_wait(self): '''Wait for compilation phase to finish. This is a no-op for this type of test. ''' - def run(self): + async def run(self): '''The run phase of the regression test pipeline. The resources of the test are copied to the stage directory and the @@ -2614,7 +2616,7 @@ def run(self): self._copy_to_stagedir(os.path.join(self._prefix, self.sourcesdir)) - super().run() + await super().run() class CompileOnlyRegressionTest(RegressionTest, special=True): @@ -2648,6 +2650,7 @@ def setup(self, partition, environ, **job_opts): Similar to the :func:`RegressionTest.setup`, except that no run job is created for this test. ''' + os.chdir(rt.get_working_dir()) # No need to setup the job for compile-only checks self._current_partition = partition self._current_environ = environ @@ -2666,13 +2669,13 @@ def stdout(self): def stderr(self): return self.build_job.stderr if self.build_job else None - def run(self): + async def run(self): '''The run stage of the regression test pipeline. Implemented as no-op. ''' - def run_wait(self): + async def run_wait(self): '''Wait for this test to finish. Implemented as no-op diff --git a/reframe/core/runtime.py b/reframe/core/runtime.py index 24d754b991..022f601dcf 100644 --- a/reframe/core/runtime.py +++ b/reframe/core/runtime.py @@ -204,6 +204,23 @@ def init_runtime(site_config, **kwargs): _runtime_context = RuntimeContext(site_config, **kwargs) +_working_dir = None + + +def set_working_dir(): + global _working_dir + + _working_dir = os.getcwd() + + +def get_working_dir(): + + if _working_dir is None: + raise ReframeFatalError('no working dir was yet set') + + return _working_dir + + def runtime(): '''Get the runtime context of the framework. @@ -397,6 +414,7 @@ def valid_sysenv_comb(valid_systems, valid_prog_environs, class temp_environment: '''Context manager to temporarily change the environment.''' + # TODO: Do we need to change something here? context management asyncio def __init__(self, modules=None, env_vars=None): self._modules = modules or [] @@ -414,6 +432,7 @@ def __exit__(self, exc_type, exc_value, traceback): class temp_config: '''Context manager to temporarily switch to specific configuration.''' + # TODO: Do we need to change something here? context management asyncio def __init__(self, system): self.__to = system diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index a8565a99bc..44bc6f9135 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -609,14 +609,15 @@ def guess_num_tasks(self): available_nodes = self.scheduler.filternodes(self, available_nodes) return len(available_nodes) * num_tasks_per_node - def submit(self): - return self.scheduler.submit(self) + async def submit(self): + result = await self.scheduler.submit(self) + return result - def wait(self): + async def wait(self): if self.jobid is None: raise JobNotStartedError('cannot wait an unstarted job') - self.scheduler.wait(self) + await self.scheduler.wait(self) self._completion_time = self._completion_time or time.time() def cancel(self): diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 87eead7530..f426d0f3df 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -3,11 +3,12 @@ # # SPDX-License-Identifier: BSD-3-Clause -import errno +import asyncio import os import signal import socket import time +import psutil import reframe.core.schedulers as sched import reframe.utility.osext as osext @@ -53,7 +54,7 @@ class LocalJobScheduler(sched.JobScheduler): def make_job(self, *args, **kwargs): return _LocalJob(*args, **kwargs) - def submit(self, job): + async def submit(self, job): # Run from the absolute path f_stdout = open(job.stdout, 'w+') f_stderr = open(job.stderr, 'w+') @@ -61,7 +62,7 @@ def submit(self, job): # The new process starts also a new session (session leader), so that # we can later kill any other processes that this might spawn by just # killing this one. - proc = osext.run_command_async( + proc = await osext.run_command_asyncio_alone( os.path.abspath(job.script_filename), stdout=f_stdout, stderr=f_stderr, @@ -94,7 +95,22 @@ def filternodes(self, job, nodes): def _kill_all(self, job): '''Send SIGKILL to all the processes of the spawned job.''' try: - os.killpg(job.jobid, signal.SIGKILL) + # Get the process with psutil because we need to cancel the group + p = psutil.Process(job.jobid) + # Get the children of this group + job.children = p.children(recursive=True) + children = job.children + except psutil.NoSuchProcess: + try: + # Maybe the main process was already killed/terminated + # but the children were not + children = job.children + except AttributeError: + children = [] + + try: + # Try to kill the main process + job.proc.kill() job._signal = signal.SIGKILL except (ProcessLookupError, PermissionError): # The process group may already be dead or assigned to a different @@ -106,11 +122,50 @@ def _kill_all(self, job): job.f_stderr.close() job._state = 'FAILURE' + for child in children: + # try to kill the children + try: + child.kill() + except (ProcessLookupError, PermissionError, + psutil.NoSuchProcess): + # The process group may already be dead or assigned + # to a different group, so ignore this error + self.log(f'pid {child.pid} already dead') + # If the main process ignored the term but the children + # didn't then, we get 0 exitcode when the chilren + # are terminated + if job.proc.returncode >= 0: + job._signal = signal.SIGKILL + else: + # If the main process was terminated but the children + # ignored the term signal, then the child are killed + # the signal of the job should be adjusted accordingly + if job.proc.returncode == -15: + job._signal = signal.SIGKILL + def _term_all(self, job): '''Send SIGTERM to all the processes of the spawned job.''' try: - os.killpg(job.jobid, signal.SIGTERM) + p = psutil.Process(job.jobid) + job.children = p.children(recursive=True) + children = job.children + except psutil.NoSuchProcess: + try: + children = job.children + except AttributeError: + children = [] + + try: + job.proc.terminate() job._signal = signal.SIGTERM + for child in children: + try: + child.terminate() + except (ProcessLookupError, PermissionError, + psutil.NoSuchProcess): + # The process group may already be dead or assigned + # to a different group, so ignore this error + self.log(f'pid {child.pid} already dead') except (ProcessLookupError, PermissionError): # Job has finished already, close file handles self.log(f'pid {job.jobid} already dead') @@ -129,7 +184,7 @@ def cancel(self, job): self._term_all(job) job._cancel_time = time.time() - def wait(self, job): + async def wait(self, job): '''Wait for the spawned job to finish. As soon as the parent job process finishes, all of its spawned @@ -140,8 +195,8 @@ def wait(self, job): ''' while not self.finished(job): - self.poll(job) - time.sleep(self.WAIT_POLL_SECS) + await self.poll(job) + await asyncio.sleep(self.WAIT_POLL_SECS) def finished(self, job): '''Check if the spawned process has finished. @@ -155,36 +210,26 @@ def finished(self, job): return job.state in ['SUCCESS', 'FAILURE', 'TIMEOUT'] - def poll(self, *jobs): + async def poll(self, *jobs): for job in jobs: - self._poll_job(job) + await self._poll_job(job) - def _poll_job(self, job): + async def _poll_job(self, job): if job is None or job.jobid is None: return - try: - pid, status = os.waitpid(job.jobid, os.WNOHANG) - except OSError as e: - if e.errno == errno.ECHILD: - # No unwaited children - self.log('no more unwaited children') - return - else: - raise e - if job.cancel_time: # Job has been cancelled; give it a grace period and kill it self.log(f'Job {job.jobid} has been cancelled; ' f'giving it a grace period') t_rem = self.CANCEL_GRACE_PERIOD - (time.time() - job.cancel_time) if t_rem > 0: - time.sleep(t_rem) + await asyncio.sleep(t_rem) self._kill_all(job) return - if not pid: + if job.proc.returncode is None: # Job has not finished; check if we have reached a timeout t_elapsed = time.time() - job.submit_time if job.time_limit and t_elapsed > job.time_limit: @@ -201,9 +246,9 @@ def _poll_job(self, job): self._kill_all(job) # Retrieve the status of the job and return - if os.WIFEXITED(status): - job._exitcode = os.WEXITSTATUS(status) + if job.proc.returncode >= 0: + job._exitcode = job.proc.returncode job._state = 'FAILURE' if job.exitcode != 0 else 'SUCCESS' - elif os.WIFSIGNALED(status): + else: job._state = 'FAILURE' - job._signal = os.WTERMSIG(status) + job._signal = job.proc.returncode diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 47d44c1dfc..1d1b1ba49e 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import asyncio import functools import glob import itertools @@ -64,7 +65,10 @@ def slurm_state_pending(state): return False -_run_strict = functools.partial(osext.run_command, check=True) +# Asynchronous _run_strict +_run_strict = functools.partial(osext.run_command_asyncio, check=True) +# Synchronous _run_strict +_run_strict_s = functools.partial(osext.run_command, check=True) class _SlurmJob(sched.Job): @@ -255,7 +259,7 @@ def emit_preamble(self, job): # Filter out empty statements before returning return list(filter(None, preamble)) - def submit(self, job): + async def submit(self, job): cmd_parts = ['sbatch'] if self._sched_access_in_submit: cmd_parts += job.sched_access @@ -265,7 +269,8 @@ def submit(self, job): intervals = itertools.cycle([1, 2, 3]) while True: try: - completed = _run_strict(cmd, timeout=self._submit_timeout) + completed = await _run_strict(cmd, + timeout=self._submit_timeout) break except SpawnedProcessError as e: error_match = re.search( @@ -279,7 +284,7 @@ def submit(self, job): f'encountered a job submission error: ' f'{error_match.group(1)}: will resubmit after {t}s' ) - time.sleep(t) + await asyncio.sleep(t) jobid_match = re.search(r'Submitted batch job (?P\d+)', completed.stdout) @@ -293,7 +298,7 @@ def submit(self, job): def allnodes(self): try: - completed = _run_strict('scontrol -a show -o nodes') + completed = _run_strict_s('scontrol -a show -o nodes') except SpawnedProcessError as e: raise JobSchedulerError( 'could not retrieve node information') from e @@ -302,7 +307,7 @@ def allnodes(self): return _create_nodes(node_descriptions) def _get_default_partition(self): - completed = _run_strict('scontrol -a show -o partitions') + completed = _run_strict_s('scontrol -a show -o partitions') partition_match = re.search(r'PartitionName=(?P\S+)\s+' r'.*Default=YES.*', completed.stdout) if partition_match: @@ -311,7 +316,7 @@ def _get_default_partition(self): return None def _merge_files(self, job): - with osext.change_dir(job.workdir): + with osext.change_dir_global(job.workdir): out_glob = glob.glob(job.stdout + '_*') err_glob = glob.glob(job.stderr + '_*') self.log(f'merging job array output files: {", ".join(out_glob)}') @@ -380,7 +385,7 @@ def filternodes(self, job, nodes): return nodes def _get_reservation_nodes(self, reservation): - completed = _run_strict('scontrol -a show res %s' % reservation) + completed = _run_strict_s('scontrol -a show res %s' % reservation) node_match = re.search(r'(Nodes=\S+)', completed.stdout) if node_match: reservation_nodes = node_match[1] @@ -388,7 +393,7 @@ def _get_reservation_nodes(self, reservation): raise JobSchedulerError("could not extract the node names for " "reservation '%s'" % reservation) - completed = _run_strict('scontrol -a show -o %s' % reservation_nodes) + completed = _run_strict_s('scontrol -a show -o %s' % reservation_nodes) node_descriptions = completed.stdout.splitlines() return _create_nodes(node_descriptions) @@ -411,7 +416,7 @@ def _update_completion_time(self, job, timestamps): if ct: job._completion_time = max(ct) - def poll(self, *jobs): + async def poll(self, *jobs): '''Update the status of the jobs.''' if jobs: @@ -425,7 +430,7 @@ def poll(self, *jobs): t_start = time.strftime( '%F', time.localtime(min(job.submit_time for job in jobs)) ) - completed = _run_strict( + completed = await _run_strict( f'sacct -S {t_start} -P ' f'-j {",".join(job.jobid for job in jobs)} ' f'-o jobid,state,exitcode,end,nodelist' @@ -461,7 +466,7 @@ def poll(self, *jobs): job._state = ','.join(m.group('state') for m in jobarr_info) if not self._update_state_count % self.SACCT_SQUEUE_RATIO: - self._cancel_if_blocked(job) + await self._cancel_if_blocked(job) self._cancel_if_pending_too_long(job) if slurm_state_completed(job.state): @@ -482,17 +487,19 @@ def _cancel_if_pending_too_long(self, job): t_pending = time.time() - job.submit_time if t_pending >= job.max_pending_time: - self.log(f'maximum pending time for job exceeded; cancelling it') + self.log('maximum pending time for job exceeded; cancelling it') self.cancel(job) job._exception = JobError('maximum pending time exceeded', job.jobid) - def _cancel_if_blocked(self, job, reasons=None): + async def _cancel_if_blocked(self, job, reasons=None): if (job.is_cancelling or not slurm_state_pending(job.state)): return if not reasons: - completed = osext.run_command('squeue -h -j %s -o %%r' % job.jobid) + completed = await osext.run_command_asyncio( + 'squeue -h -j %s -o %%r' % job.jobid + ) reasons = completed.stdout.splitlines() if not reasons: # Can't retrieve job's state. Perhaps it has finished already @@ -545,7 +552,7 @@ def _do_cancel_if_blocked(self, job, reason_descr): job._exception = JobBlockedError(reason_msg, job.jobid) - def wait(self, job): + async def wait(self, job): # Quickly return in case we have finished already if self.finished(job): if job.is_array: @@ -555,14 +562,14 @@ def wait(self, job): intervals = itertools.cycle([1, 2, 3]) while not self.finished(job): - self.poll(job) - time.sleep(next(intervals)) + await self.poll(job) + await asyncio.sleep(next(intervals)) if job.is_array: self._merge_files(job) - def cancel(self, job): - _run_strict(f'scancel {job.jobid}', timeout=self._submit_timeout) + async def cancel(self, job): + await _run_strict(f'scancel {job.jobid}', timeout=self._submit_timeout) job._is_cancelling = True def finished(self, job): @@ -578,7 +585,7 @@ class SqueueJobScheduler(SlurmJobScheduler): SQUEUE_DELAY = 2 - def poll(self, *jobs): + async def poll(self, *jobs): if jobs: # Filter out non-jobs jobs = [job for job in jobs if job is not None] @@ -595,7 +602,7 @@ def poll(self, *jobs): # We don't run the command with check=True, because if the job has # finished already, squeue might return an error about an invalid # job id. - completed = osext.run_command( + completed = await osext.run_command_asyncio( f'squeue -h -j {",".join(job.jobid for job in jobs)} ' f'-o "%%i|%%T|%%N|%%r"' ) @@ -626,7 +633,7 @@ def poll(self, *jobs): # Use ',' to join nodes to be consistent with Slurm syntax job._nodespec = ','.join(m.group('nodespec') for m in job_match) - self._cancel_if_blocked( + await self._cancel_if_blocked( job, [s.group('reason') for s in state_match] ) self._cancel_if_pending_too_long(job) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 893e8fe1ec..c657a93525 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import asyncio import json import jsonschema import os @@ -18,6 +19,7 @@ from reframe.core.schedulers import Job from reframe.core.systems import DeviceInfo, ProcessorInfo from reframe.utility.cpuinfo import cpuinfo +from reframe.frontend.executors import asyncio_run # This is meant to be used by the unit tests @@ -184,8 +186,8 @@ def _emit_custom_script(job, env, commands): getlogger().debug('submitting detection script') _log_contents(job.script_filename) - job.submit() - job.wait() + asyncio_run(job.submit()) + asyncio_run(job.wait()) getlogger().debug('job finished') _log_contents(job.stdout) _log_contents(job.stderr) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 92d9a1e01a..4fa57960cc 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -32,7 +32,7 @@ getallnodes, repeat_tests, parameterize_tests) from reframe.frontend.executors.policies import (SerialExecutionPolicy, - AsynchronousExecutionPolicy) + AsyncioExecutionPolicy) from reframe.frontend.executors import Runner, generate_testcases from reframe.frontend.loader import RegressionCheckLoader from reframe.frontend.printer import PrettyPrinter @@ -253,6 +253,8 @@ def validate_storage_options(namespace, cmd_options): @logging.time_function_noexit def main(): + # Setup the working dir + runtime.set_working_dir() # Setup command line options argparser = argparse.ArgumentParser() action_options = argparser.add_mutually_exclusive_group(required=True) @@ -1561,7 +1563,7 @@ def module_unuse(*paths): if options.exec_policy == 'serial': exec_policy = SerialExecutionPolicy() elif options.exec_policy == 'async': - exec_policy = AsynchronousExecutionPolicy() + exec_policy = AsyncioExecutionPolicy() else: # This should not happen, since choices are handled by # argparser diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 0714b63b92..4a28e5adff 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause import abc +import asyncio import contextlib import copy import itertools @@ -23,6 +24,7 @@ JobNotStartedError, FailureLimitError, ForceExitError, + KeyboardError, RunSessionTimeout, SkipTestError, StatisticsError, @@ -31,7 +33,7 @@ from reframe.frontend.printer import PrettyPrinter ABORT_REASONS = (AssertionError, FailureLimitError, - KeyboardInterrupt, ForceExitError, RunSessionTimeout) + KeyboardError, ForceExitError, RunSessionTimeout) class TestStats: @@ -389,6 +391,48 @@ def __exit__(this, exc_type, exc_value, traceback): self.fail() raise TaskExit from e + async def _safe_call_asyncio(self, fn, *args, **kwargs): + class update_timestamps: + '''Context manager to set the start and finish timestamps.''' + + # We use `this` to refer to the update_timestamps object, because + # we don't want to masquerade the self argument of our containing + # function + def __enter__(this): + if fn.__name__ not in ('poll', + 'run_complete', + 'compile_complete'): + stage = self._current_stage + self._timestamps[f'{stage}_start'] = time.time() + + def __exit__(this, exc_type, exc_value, traceback): + stage = self._current_stage + self._timestamps[f'{stage}_finish'] = time.time() + self._timestamps['pipeline_end'] = time.time() + + if fn.__name__ not in ('poll', 'run_complete', 'compile_complete'): + self._current_stage = fn.__name__ + + try: + with logging.logging_context(self.check) as logger: + logger.debug(f'Entering stage: {self._current_stage}') + with update_timestamps(): + # Pick the configuration of the current partition + with runtime.temp_config(self.testcase.partition.fullname): + return await fn(*args, **kwargs) + except SkipTestError as e: + if not self.succeeded: + # Only skip a test if it hasn't finished yet; + # This practically ignores skipping during the cleanup phase + self.skip() + raise TaskExit from e + except ABORT_REASONS: + self.fail() + raise + except BaseException as e: + self.fail() + raise TaskExit from e + def _dry_run_call(self, fn, *args, **kwargs): '''Call check's fn method in dry-run mode.''' @@ -416,17 +460,18 @@ def setup(self, *args, **kwargs): self._notify_listeners('on_task_setup') @logging.time_function - def compile(self): - self._safe_call(self.check.compile) + async def compile(self): + await self._safe_call_asyncio(self.check.compile) self._notify_listeners('on_task_compile') @logging.time_function - def compile_wait(self): - self._safe_call(self.check.compile_wait) + async def compile_wait(self): + await self._safe_call_asyncio(self.check.compile_wait) @logging.time_function - def run(self): - self._safe_call(self.check.run) + async def run(self): + # QUESTION: should I change the order here? + await self._safe_call_asyncio(self.check.run) self._notify_listeners('on_task_run') @logging.time_function @@ -447,8 +492,8 @@ def compile_complete(self): return done @logging.time_function - def run_wait(self): - self._safe_call(self.check.run_wait) + async def run_wait(self): + await self._safe_call_asyncio(self.check.run_wait) self.zombie = False @logging.time_function @@ -484,6 +529,8 @@ def cleanup(self, *args, **kwargs): self._safe_call(self.check.cleanup, *args, **kwargs) def fail(self, exc_info=None, callback='on_task_failure'): + if self._aborted: + return self._failed_stage = self._current_stage self._exc_info = exc_info or sys.exc_info() self._notify_listeners(callback) @@ -503,7 +550,6 @@ def abort(self, cause=None): logging.getlogger().debug2(f'Aborting test case: {self.testcase!r}') exc = AbortTaskError() exc.__cause__ = cause - self._aborted = True try: if not self.zombie and self.check.job: self.check.job.cancel() @@ -513,6 +559,7 @@ def abort(self, cause=None): self.fail() else: self.fail((type(exc), exc, None), 'on_task_abort') + self._aborted = True def info(self): '''Return an info string about this task.''' @@ -700,10 +747,8 @@ def print_separator(check, prefix): 'start processing checks') self._policy.enter() self._printer.reset_progress(len(testcases)) - for t in testcases: - self._policy.runcase(t) - self._policy.exit() + self._policy.execute(testcases) self._printer.separator('short single line', 'all spawned checks have finished\n') @@ -748,9 +793,33 @@ def timeout_expired(self): def enter(self): self._num_failed_tasks = 0 - def exit(self): + def _exit(self): pass @abc.abstractmethod - def runcase(self, case): + def _runcase(self, case): '''Run a test case.''' + + def execute(self, testcases): + '''Execute the policy for a given set of testcases.''' + # Moved here the execution + for t in testcases: + self._runcase(t) + + self._exit() + + +def asyncio_run(coro): + loop = asyncio.get_event_loop() + + if loop.is_running(): + loop.run_until_complete(coro) + loop.close() + return + else: + try: + loop.run_until_complete(coro) + loop.close() + return + finally: + loop.close() diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index b75588c5c3..fecf313122 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import asyncio import contextlib import math import sys @@ -14,10 +15,11 @@ RunSessionTimeout, SkipTestError, TaskDependencyError, - TaskExit) + TaskExit, + ForceExitError, + KeyboardError, + AbortTaskError) from reframe.core.logging import getlogger, level_from_str -from reframe.core.pipeline import (CompileOnlyRegressionTest, - RunOnlyRegressionTest) from reframe.frontend.executors import (ExecutionPolicy, RegressionTask, TaskEventListener, ABORT_REASONS) @@ -63,11 +65,12 @@ def __init__(self): self._num_polls = 0 self._sleep_duration = None self._t_init = None + self._jobs_pool = [] def reset_snooze_time(self): self._sleep_duration = self.SLEEP_MIN - def snooze(self): + async def snooze(self): if self._num_polls == 0: self._t_init = time.time() @@ -78,11 +81,40 @@ def snooze(self): f'Poll rate control: sleeping for {self._sleep_duration}s ' f'(current poll rate: {poll_rate} polls/s)' ) - time.sleep(self._sleep_duration) + await asyncio.sleep(self._sleep_duration) self._sleep_duration = min( self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX ) + def is_time_to_poll(self): + # We check here if it's time to poll + if self._num_polls == 0: + self._t_init = time.time() + + t_elapsed = time.time() - self._t_init + + self._num_polls += 1 + poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf + getlogger().debug2( + f'Poll rate control: sleeping for {self._sleep_duration}s ' + f'(current poll rate: {poll_rate} polls/s)' + ) + if t_elapsed >= self._sleep_duration: + return True + else: + return False + + def reset_time_to_poll(self): + self._t_init = time.time() + + +global _poll_controller +_poll_controller = _PollController() + + +def getpollcontroller(): + return _poll_controller + class SerialExecutionPolicy(ExecutionPolicy, TaskEventListener): def __init__(self): @@ -97,8 +129,8 @@ def __init__(self): self._retired_tasks = [] self.task_listeners.append(self) - def runcase(self, case): - super().runcase(case) + async def _runcase(self, case): + super()._runcase(case) check, partition, _ = case task = RegressionTask(case, self.task_listeners) if check.is_dry_run(): @@ -130,9 +162,9 @@ def runcase(self, case): task.testcase.environ, sched_flex_alloc_nodes=self.sched_flex_alloc_nodes, sched_options=self.sched_options) - task.compile() - task.compile_wait() - task.run() + await task.compile() + await task.compile_wait() + await task.run() # Pick the right scheduler if task.check.local: @@ -143,14 +175,13 @@ def runcase(self, case): self._pollctl.reset_snooze_time() while True: if not self.dry_run_mode: - sched.poll(task.check.job) - + await sched.poll(task.check.job) if task.run_complete(): break - self._pollctl.snooze() + await self._pollctl.snooze() - task.run_wait() + await task.run_wait() if not self.skip_sanity_check: task.sanity() @@ -163,7 +194,10 @@ def runcase(self, case): return except ABORT_REASONS as e: task.abort(e) - raise + if type(e) is KeyboardInterrupt: + raise ForceExitError + else: + raise e except BaseException: task.fail(sys.exc_info()) @@ -242,327 +276,231 @@ def on_task_success(self, task): if self.timeout_expired(): raise RunSessionTimeout('maximum session duration exceeded') - def exit(self): + def execute(self, testcases): + '''Execute the policy for a given set of testcases.''' + # Moved here the execution + try: + loop = asyncio.get_event_loop() + for task in all_tasks(loop): + if isinstance(task, asyncio.tasks.Task): + task.cancel() + if loop.is_closed(): + loop = asyncio.new_event_loop() + watcher = asyncio.get_child_watcher() + if isinstance(watcher, asyncio.SafeChildWatcher): + # Detach the watcher from the current loop to avoid issues + watcher.close() + watcher.attach_loop(None) + asyncio.set_event_loop(loop) + if isinstance(watcher, asyncio.SafeChildWatcher): + # Reattach the watcher to the new loop + watcher.attach_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + for case in testcases: + try: + loop.run_until_complete(self._runcase(case)) + except (Exception, KeyboardInterrupt) as e: + if type(e) in (ABORT_REASONS): + for task in all_tasks(loop): + if isinstance(task, asyncio.tasks.Task): + task.cancel() + loop.close() + raise e + else: + getlogger().info(f"Execution stopped due to an error: {e}") + break + loop.close() + self._exit() + + def _exit(self): # Clean up all remaining tasks _cleanup_all(self._retired_tasks, not self.keep_stage_files) -class AsynchronousExecutionPolicy(ExecutionPolicy, TaskEventListener): +class AsyncioExecutionPolicy(ExecutionPolicy, TaskEventListener): '''The asynchronous execution policy.''' def __init__(self): super().__init__() - self._pollctl = _PollController() + self._pollctl = getpollcontroller() + self._pollctl.reset_snooze_time() + self._current_tasks = util.OrderedSet() # Index tasks by test cases self._task_index = {} - # A set of all the current tasks. We use an ordered set here, because - # we want to preserve the order of the tasks. - self._current_tasks = util.OrderedSet() - - # Quick look up for the partition schedulers including the - # `_rfm_local` pseudo-partition - self._schedulers = { - '_rfm_local': self.local_scheduler - } - # Tasks per partition self._partition_tasks = { '_rfm_local': util.OrderedSet() } - # Retired tasks that need to be cleaned up - self._retired_tasks = [] - # Job limit per partition self._max_jobs = { '_rfm_local': rt.runtime().get_option('systems/0/max_local_jobs') } - self._pipeline_statistics = rt.runtime().get_option( - 'systems/0/dump_pipeline_progress' - ) - self.task_listeners.append(self) - - def _init_pipeline_progress(self, num_tasks): - self._pipeline_progress = { - 'startup': [(num_tasks, 0)], - 'ready_compile': [(0, 0)], - 'compiling': [(0, 0)], - 'ready_run': [(0, 0)], - 'running': [(0, 0)], - 'completing': [(0, 0)], - 'retired': [(0, 0)], - 'completed': [(0, 0)], - 'fail': [(0, 0)], - 'skip': [(0, 0)] - } - self._pipeline_step = 0 - self._t_pipeline_start = time.time() - - def _update_pipeline_progress(self, old_state, new_state, num_tasks=1): - timestamp = time.time() - self._t_pipeline_start - for state in self._pipeline_progress: - count = self._pipeline_progress[state][self._pipeline_step][0] - if old_state != new_state: - if state == old_state: - count -= num_tasks - elif state == new_state: - count += num_tasks - - self._pipeline_progress[state].append((count, timestamp)) - - self._pipeline_step += 1 - def _dump_pipeline_progress(self, filename): - import reframe.utility.jsonext as jsonext - - with open(filename, 'w') as fp: - jsonext.dump(self._pipeline_progress, fp, indent=2) + # Tasks that have finished, but have not performed their cleanup phase + self._retired_tasks = [] + self.task_listeners.append(self) - def runcase(self, case): - super().runcase(case) - check, partition, environ = case - self._schedulers[partition.fullname] = partition.scheduler + async def _runcase(self, case, task): + # I added the task here as an argument because, + # I wanted to initialize it + # outside, when I gather the tasks. + # If I gather the tasks and then I do asyncio + # if one of them fails the others are not iformed, + # I had to code that manually. There is a way to make everything + # stop if an exepction is raised but I didn't know how to treat + # that raise Exception nicelly because I wouldn't be able + # to abort the tasks which the execution has not yet started, + # I needed to do abortall on all the tests, not only the ones + # which were initiated by the execution. Exit gracefully + # the execuion loop aborting all the tasks + super()._runcase(case) + check, partition, _ = case + # task = RegressionTask(case, self.task_listeners) + if check.is_dry_run(): + self.printer.status('DRY', task.info()) + else: + self.printer.status('RUN', task.info()) - # Set partition-based counters, if not set already self._partition_tasks.setdefault(partition.fullname, util.OrderedSet()) self._max_jobs.setdefault(partition.fullname, partition.max_jobs) - task = RegressionTask(case, self.task_listeners) self._task_index[case] = task - self.stats.add_task(task) - getlogger().debug2( - f'Added {check.name} on {partition.fullname} ' - f'using {environ.name}' - ) - self._current_tasks.add(task) - - def exit(self): - if self._pipeline_statistics: - self._init_pipeline_progress(len(self._current_tasks)) - - self._pollctl.reset_snooze_time() - while self._current_tasks: - try: - self._poll_tasks() - num_running = sum( - 1 if t.state in ('running', 'compiling') else 0 - for t in self._current_tasks - ) - timeout = rt.runtime().get_option( - 'general/0/pipeline_timeout' - ) - - self._advance_all(self._current_tasks, timeout) - if self._pipeline_statistics: - num_retired = len(self._retired_tasks) - - _cleanup_all(self._retired_tasks, not self.keep_stage_files) - if self._pipeline_statistics: - num_retired_actual = num_retired - len(self._retired_tasks) - - # Some tests might not be cleaned up because they are - # waiting for dependencies or because their dependencies - # have failed. - self._update_pipeline_progress( - 'retired', 'completed', num_retired_actual - ) - - if self.timeout_expired(): - raise RunSessionTimeout( - 'maximum session duration exceeded' - ) - - if num_running: - self._pollctl.snooze() - except ABORT_REASONS as e: - self._abortall(e) - raise - - if self._pipeline_statistics: - self._dump_pipeline_progress('pipeline-progress.json') - - def _poll_tasks(self): - if self.dry_run_mode: - return - - for partname, sched in self._schedulers.items(): - jobs = [] - for t in self._partition_tasks[partname]: - if t.state == 'compiling': - jobs.append(t.check.build_job) - elif t.state == 'running': - jobs.append(t.check.job) - - sched.poll(*jobs) - - def _exec_stage(self, task, stage_methods): - '''Execute a series of pipeline stages. - - Return True on success, False otherwise. - ''' - try: - for stage in stage_methods: - stage() - except TaskExit: - self._current_tasks.remove(task) - if task.check.current_partition: - partname = task.check.current_partition.fullname - else: - partname = None - - # Remove tasks from the partition tasks if there - with contextlib.suppress(KeyError): - self._partition_tasks['_rfm_local'].remove(task) - if partname: - self._partition_tasks[partname].remove(task) - - return False - else: - return True - - def _advance_all(self, tasks, timeout=None): - t_init = time.time() - num_progressed = 0 - - getlogger().debug2(f'Current tests: {len(tasks)}') - - # We take a snapshot of the tasks to advance by doing a shallow copy, - # since the tasks may removed by the individual advance functions. - for t in list(tasks): - old_state = t.state - bump_state = getattr(self, f'_advance_{t.state}') - num_progressed += bump_state(t) - new_state = t.state - - t_elapsed = time.time() - t_init - if timeout and t_elapsed > timeout and num_progressed: - break + # Do not run test if any of its dependencies has failed + # NOTE: Restored dependencies are not in the task_index + if any(self._task_index[c].failed + for c in case.deps if c in self._task_index): + raise TaskDependencyError('dependencies failed') - if self._pipeline_statistics: - self._update_pipeline_progress(old_state, new_state, 1) + if any(self._task_index[c].skipped + for c in case.deps if c in self._task_index): - getlogger().debug2(f'Bumped {num_progressed} test(s)') + # We raise the SkipTestError here and catch it immediately in + # order for `skip()` to get the correct exception context. + try: + raise SkipTestError('skipped due to skipped dependencies') + except SkipTestError as e: + task.skip() + raise TaskExit from e - def _advance_startup(self, task): - if self.deps_skipped(task): - try: - raise SkipTestError('skipped due to skipped dependencies') - except SkipTestError as e: - task.skip() - self._current_tasks.remove(task) - return 1 - elif self.deps_succeeded(task): - try: + deps_status = await self.check_deps(task) + if deps_status == "skipped": + try: + raise SkipTestError('skipped due to skipped dependencies') + except SkipTestError: + task.skip() + self._current_tasks.remove(task) + return 1 + elif deps_status == "succeded": if task.check.is_dry_run(): self.printer.status('DRY', task.info()) else: self.printer.status('RUN', task.info()) - - task.setup(task.testcase.partition, - task.testcase.environ, - sched_flex_alloc_nodes=self.sched_flex_alloc_nodes, - sched_options=self.sched_options) - except TaskExit: + elif deps_status == "failed": + exc = TaskDependencyError('dependencies failed') + task.fail((type(exc), exc, None)) self._current_tasks.remove(task) return 1 - if isinstance(task.check, RunOnlyRegressionTest): - # All tests should execute all the pipeline stages, even if - # they are no-ops - self._exec_stage(task, [task.compile, - task.compile_complete, - task.compile_wait]) - - return 1 - elif self.deps_failed(task): - exc = TaskDependencyError('dependencies failed') - task.fail((type(exc), exc, None)) - self._current_tasks.remove(task) - return 1 - else: - # Not all dependencies have finished yet - getlogger().debug2(f'{task.info()} waiting for dependencies') - return 0 + task.setup(task.testcase.partition, + task.testcase.environ, + sched_flex_alloc_nodes=self.sched_flex_alloc_nodes, + sched_options=self.sched_options) + partname = _get_partition_name(task, phase='build') + max_jobs = self._max_jobs[partname] + while len(self._partition_tasks[partname])+1 > max_jobs: + await asyncio.sleep(2) + self._partition_tasks[partname].add(task) + await task.compile() + await task.compile_wait() + task.compile_complete() + self._partition_tasks[partname].remove(task) + while len(self._partition_tasks[partname])+1 > max_jobs: + await asyncio.sleep(2) + self._partition_tasks[partname].add(task) + await task.run() - def _advance_ready_compile(self, task): - partname = _get_partition_name(task, phase='build') - max_jobs = self._max_jobs[partname] - if len(self._partition_tasks[partname]) < max_jobs: - if self._exec_stage(task, [task.compile]): - self._partition_tasks[partname].add(task) + # Pick the right scheduler + if task.check.local: + sched = self.local_scheduler + else: + sched = partition.scheduler - return 1 + while True: + if not self.dry_run_mode: + if getpollcontroller().is_time_to_poll(): + getpollcontroller().reset_time_to_poll() + await sched.poll(*getpollcontroller()._jobs_pool) - getlogger().debug2(f'Hit the max job limit of {partname}: {max_jobs}') - return 0 + if task.run_complete(): + break - def _advance_compiling(self, task): - partname = _get_partition_name(task, phase='build') - try: - if task.compile_complete(): - task.compile_wait() - self._partition_tasks[partname].remove(task) - if isinstance(task.check, CompileOnlyRegressionTest): - # All tests should pass from all the pipeline stages, - # even if they are no-ops - self._exec_stage(task, [task.run, - task.run_complete, - task.run_wait]) + await self._pollctl.snooze() - return 1 - else: - return 0 - except TaskExit: + await task.run_wait() self._partition_tasks[partname].remove(task) - self._current_tasks.remove(task) - return 1 + if not self.skip_sanity_check: + task.sanity() - def _advance_ready_run(self, task): - partname = _get_partition_name(task, phase='run') - max_jobs = self._max_jobs[partname] - if len(self._partition_tasks[partname]) < max_jobs: - if self._exec_stage(task, [task.run]): - self._partition_tasks[partname].add(task) + if not self.skip_performance_check: + task.performance() - return 1 + self._retired_tasks.append(task) + task.finalize() - getlogger().debug2(f'Hit the max job limit of {partname}: {max_jobs}') - return 0 + except TaskExit: + self._current_tasks.remove(task) + if task.check.current_partition: + partname = task.check.current_partition.fullname + else: + partname = None - def _advance_running(self, task): - partname = _get_partition_name(task, phase='run') - try: - if task.run_complete(): - if self._exec_stage(task, [task.run_wait]): + # Remove tasks from the partition tasks if there + with contextlib.suppress(KeyError): + self._partition_tasks['_rfm_local'].remove(task) + if partname: self._partition_tasks[partname].remove(task) - return 1 + return + except ABORT_REASONS as e: + self._abortall(e) + if type(e) is KeyboardInterrupt: + raise ForceExitError else: - return 0 - except TaskExit: - self._partition_tasks[partname].remove(task) + raise e + except BaseException: + task.fail(sys.exc_info()) self._current_tasks.remove(task) - return 1 + if task.check.current_partition: + partname = task.check.current_partition.fullname + else: + partname = None - def _advance_completing(self, task): - try: - if not self.skip_sanity_check: - task.sanity() + # Remove tasks from the partition tasks if there + with contextlib.suppress(KeyError): + self._partition_tasks['_rfm_local'].remove(task) + if partname: + self._partition_tasks[partname].remove(task) + return - if not self.skip_performance_check: - task.performance() + async def check_deps(self, task): + while not (self.deps_skipped(task) or self.deps_failed(task) or + self.deps_succeeded(task)): + await asyncio.sleep(1) - task.finalize() - self._retired_tasks.append(task) - self._current_tasks.remove(task) - return 1 - except TaskExit: - self._current_tasks.remove(task) - return 1 + if self.deps_skipped(task): + return "skipped" + elif self.deps_failed(task): + return "failed" + elif self.deps_succeeded(task): + return "succeeded" def deps_failed(self, task): # NOTE: Restored dependencies are not in the task_index @@ -587,22 +525,20 @@ def _abortall(self, cause): with contextlib.suppress(FailureLimitError): task.abort(cause) - # These function can be useful for tracking statistics of the framework, - # such as number of tests that have finished setup etc. def on_task_setup(self, task): pass def on_task_run(self, task): - pass + getpollcontroller()._jobs_pool.append(task.check.job) def on_task_compile(self, task): - pass + getpollcontroller()._jobs_pool.append(task.check.job) def on_task_exit(self, task): - self._pollctl.reset_snooze_time() + getpollcontroller()._jobs_pool.remove(task.check.job) def on_task_compile_exit(self, task): - self._pollctl.reset_snooze_time() + getpollcontroller()._jobs_pool.remove(task.check.job) def on_task_skip(self, task): msg = str(task.exc_info[1]) @@ -639,18 +575,93 @@ def on_task_failure(self, task): f'maximum number of failures ({self.max_failures}) reached' ) + if self.timeout_expired(): + raise RunSessionTimeout('maximum session duration exceeded') + def on_task_success(self, task): msg = f'{task.info()}' self.printer.status('OK', msg, just='right') _print_perf(task) timings = task.pipeline_timings(['setup', 'compile_complete', - 'run_complete', + 'run_complete', 'sanity', 'performance', 'total']) getlogger().verbose(f'==> {timings}') + + # Update reference count of dependencies for c in task.testcase.deps: # NOTE: Restored dependencies are not in the task_index if c in self._task_index: self._task_index[c].ref_count -= 1 + + _cleanup_all(self._retired_tasks, not self.keep_stage_files) + if self.timeout_expired(): + raise RunSessionTimeout('maximum session duration exceeded') + + def _exit(self): + # Clean up all remaining tasks + _cleanup_all(self._retired_tasks, not self.keep_stage_files) + + def execute(self, testcases): + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + all_cases = [] + for t in testcases: + task = RegressionTask(t, self.task_listeners) + + self.stats.add_task(task) + self._current_tasks.add(task) + all_cases.append(asyncio.ensure_future(self._runcase(t, task))) + try: + # Wait for tasks until the first failure + loop.run_until_complete(self._execute_until_failure(all_cases)) + except (Exception, KeyboardInterrupt) as e: + print(type(e)) + if type(e) in (ABORT_REASONS): + loop.run_until_complete(_cancel_gracefully(all_cases)) + try: + raise AbortTaskError + except AbortTaskError as exc: + self._abortall(exc) + loop.close() + raise e + else: + getlogger().info(f"Execution stopped due to an error: {e}") + finally: + loop.close() + loop.close() + self._exit() + + async def _execute_until_failure(self, all_cases): + """Wait for tasks to complete or fail, stopping at the first failure.""" + while all_cases: + done, all_cases = await asyncio.wait( + all_cases, return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + if task.exception(): + raise task.exception() # Exit if aborted + + +async def _cancel_gracefully(all_cases): + for case in all_cases: + case.cancel() + await asyncio.gather(*all_cases, return_exceptions=True) + + +def all_tasks(loop): + """Wrapper for asyncio.current_task() compatible with Python 3.6 and later.""" + if sys.version_info >= (3, 7): + # Use asyncio.current_task() directly in Python 3.7+ + return asyncio.all_tasks(loop) + else: + # Fallback to asyncio.tasks.current_task() in Python 3.6 + return asyncio.Task.all_tasks(loop) diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index 6c5ff547a5..5d6550cd57 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -7,6 +7,7 @@ # OS and shell utility functions # +import asyncio import collections.abc import errno import getpass @@ -338,6 +339,87 @@ def run_command_async(cmd, **popen_args) +async def run_command_asyncio_alone(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + timeout=None, + log=True, + serial=False, + **kwargs): + '''TODO: please write proper docstring + ''' + if log: + from reframe.core.logging import getlogger + getlogger().debug(f'[CMD] {cmd!r}') + + if isinstance(cmd, str) and not shell: + cmd = shlex.split(cmd) + + if shell: + # Call create_subprocess_shell + return await asyncio.create_subprocess_shell( + cmd, stdout=stdout, + stderr=stderr + ) + else: + # Call create_subprocess_exec + return await asyncio.create_subprocess_exec( + cmd, stdout=stdout, + stderr=stderr + ) + + +async def run_command_asyncio(cmd, + check=False, + timeout=None, + shell=True, + log=True, + **kwargs): + '''TODO: please write proper docstring + ''' + if log: + from reframe.core.logging import getlogger + getlogger().debug(f'[CMD] {cmd!r}') + + if isinstance(cmd, str) and not shell: + cmd = shlex.split(cmd) + + try: + if shell: + # Call create_subprocess_shell + proc = await asyncio.create_subprocess_shell( + cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + else: + # Call create_subprocess_exec + proc = await asyncio.create_subprocess_exec( + cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + proc_stdout, proc_stderr = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + except asyncio.TimeoutError as e: + os.killpg(proc.pid, signal.SIGKILL) + raise SpawnedProcessTimeout(e.cmd, + proc.stdout.read(), + proc.stderr.read(), timeout) from None + + completed = subprocess.CompletedProcess(cmd, + returncode=proc.returncode, + stdout=proc_stdout.decode(), + stderr=proc_stderr.decode()) + + if check and proc.returncode != 0: + raise SpawnedProcessError(completed.args, + completed.stdout, completed.stderr, + completed.returncode) + + return completed + + def run_command_async2(*args, check=False, **kwargs): '''Return a :class:`_ProcFuture` that encapsulates a command to be executed. @@ -650,6 +732,36 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): os.chdir(self._wd_save) + async def __aenter__(self): + os.chdir(self._dir_name) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + os.chdir(self._wd_save) + + +class change_dir_global: + '''Context manager to temporarily change the current working directory. + + :arg dir_name: The directory to temporarily change to. + ''' + + def __init__(self, dir_name): + from reframe.core.runtime import get_working_dir + self._wd_save = get_working_dir() + self._dir_name = dir_name + + def __enter__(self): + os.chdir(self._dir_name) + + def __exit__(self, exc_type, exc_val, exc_tb): + os.chdir(self._wd_save) + + async def __aenter__(self): + os.chdir(self._dir_name) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + os.chdir(self._wd_save) + def is_url(s): '''Check if string is a URL.''' diff --git a/unittests/conftest.py b/unittests/conftest.py index 711716d174..1f638d52a3 100644 --- a/unittests/conftest.py +++ b/unittests/conftest.py @@ -106,7 +106,7 @@ def _make_loader(check_search_path, *args, **kwargs): @pytest.fixture(params=[policies.SerialExecutionPolicy, - policies.AsynchronousExecutionPolicy]) + policies.AsyncioExecutionPolicy]) def make_runner(request): '''Test runner with all the execution policies''' @@ -122,7 +122,7 @@ def _make_runner(*args, **kwargs): @pytest.fixture def make_async_runner(): def _make_runner(*args, **kwargs): - policy = policies.AsynchronousExecutionPolicy() + policy = policies.AsyncioExecutionPolicy() policy._pollctl.SLEEP_MIN = 0.001 return executors.Runner(policy, *args, **kwargs) diff --git a/unittests/resources/checks/frontend_checks.py b/unittests/resources/checks/frontend_checks.py index 4e0304ce0c..23de326184 100644 --- a/unittests/resources/checks/frontend_checks.py +++ b/unittests/resources/checks/frontend_checks.py @@ -7,10 +7,10 @@ # Special checks for testing the front-end # +import asyncio import os import signal import sys -import time import reframe as rfm import reframe.utility.sanity as sn @@ -107,18 +107,18 @@ def raise_before_setup(self): if self.phase == 'setup': raise KeyboardInterrupt - def run_wait(self): + async def run_wait(self): # We do our nasty stuff in wait() to make things more complicated if self.phase == 'wait': raise KeyboardInterrupt else: - return super().run_wait() + return await super().run_wait() class SystemExitCheck(BaseFrontendCheck, special=True): '''Simulate system exit from within a check.''' - def run_wait(self): + async def run_wait(self): # We do our nasty stuff in wait() to make things more complicated sys.exit(1) @@ -190,9 +190,9 @@ class SelfKillCheck(rfm.RunOnlyRegressionTest, special=True): executable = 'echo' sanity_patterns = sn.assert_true(1) - def run(self): - super().run() - time.sleep(0.5) + async def run(self): + await super().run() + await asyncio.sleep(0.5) os.kill(os.getpid(), signal.SIGTERM) diff --git a/unittests/test_perflogging.py b/unittests/test_perflogging.py index afc5e26f5e..570f5862ae 100644 --- a/unittests/test_perflogging.py +++ b/unittests/test_perflogging.py @@ -174,6 +174,7 @@ def test_perf_logging(make_runner, make_exec_ctx, perf_test, ) ) ) + rt.set_working_dir() logging.configure_logging(rt.runtime().site_config) runner = make_runner() testcases = executors.generate_testcases([perf_test]) diff --git a/unittests/test_pipeline.py b/unittests/test_pipeline.py index e5010e255c..dc4ae64499 100644 --- a/unittests/test_pipeline.py +++ b/unittests/test_pipeline.py @@ -22,18 +22,32 @@ from reframe.core.meta import make_test from reframe.core.warnings import ReframeDeprecationWarning +rt.set_working_dir() + def _run(test, partition, prgenv): + test_util.asyncio_run(_runasync, test, partition, prgenv) + + +async def _runasync(test, partition, prgenv): test.setup(partition, prgenv) - test.compile() - test.compile_wait() - test.run() - test.run_wait() + await compile_wait(test) + await run_wait(test) test.check_sanity() test.check_performance() test.cleanup(remove_files=True) +async def compile_wait(test): + await test.compile() + await test.compile_wait() + + +async def run_wait(test): + await test.run() + await test.run_wait() + + @pytest.fixture def HelloTest(): from unittests.resources.checks.hellocheck import HelloTest @@ -306,9 +320,8 @@ class MyTest(rfm.CompileOnlyRegressionTest): test = MyTest() test.setup(*local_exec_ctx) - test.compile() with pytest.raises(BuildError): - test.compile_wait() + test_util.asyncio_run(test.compile_wait) def test_compile_only_warning(local_exec_ctx): @@ -793,7 +806,7 @@ class MyTest(rfm.CompileOnlyRegressionTest): test.setup(*local_exec_ctx) test.sourcepath = '/usr/src' with pytest.raises(PipelineError): - test.compile() + test_util.asyncio_run(test.compile) def test_sourcepath_upref(local_exec_ctx): @@ -806,7 +819,7 @@ class MyTest(rfm.CompileOnlyRegressionTest): test.setup(*local_exec_ctx) test.sourcepath = '../hellosrc' with pytest.raises(PipelineError): - test.compile() + test_util.asyncio_run(test.compile) def test_sourcepath_non_existent(local_exec_ctx): @@ -818,9 +831,8 @@ class MyTest(rfm.CompileOnlyRegressionTest): test = MyTest() test.setup(*local_exec_ctx) test.sourcepath = 'non_existent.c' - test.compile() with pytest.raises(BuildError): - test.compile_wait() + test_util.asyncio_run(test.compile_wait) def test_extra_resources(HelloTest, testsys_exec_ctx): diff --git a/unittests/test_policies.py b/unittests/test_policies.py index 19ac559606..d868f56a03 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -18,6 +18,7 @@ from reframe.core.exceptions import (AbortTaskError, FailureLimitError, ForceExitError, + KeyboardError, RunSessionTimeout, TaskDependencyError) from unittests.resources.checks.hellocheck import HelloTest @@ -32,6 +33,8 @@ SystemExitCheck ) +rt.set_working_dir() + def make_kbd_check(phase='wait'): return test_util.make_check(KeyboardInterruptCheck, phase=phase) @@ -249,7 +252,7 @@ def test_force_local_execution(make_runner, make_cases, testsys_exec_ctx): def test_kbd_interrupt_within_test(make_runner, make_cases, common_exec_ctx): runner = make_runner() - with pytest.raises(KeyboardInterrupt): + with pytest.raises(KeyboardError): runner.runall(make_cases([make_kbd_check()])) stats = runner.stats @@ -449,7 +452,7 @@ def make_async_runner(): def _make_runner(): evt_monitor = _TaskEventMonitor() - ret = executors.Runner(policies.AsynchronousExecutionPolicy()) + ret = executors.Runner(policies.AsyncioExecutionPolicy()) ret.policy.keep_stage_files = True ret.policy.task_listeners.append(evt_monitor) return ret, evt_monitor @@ -593,7 +596,7 @@ def test_kbd_interrupt_in_wait_with_concurrency( ): make_exec_ctx(options=max_jobs_opts(4)) runner, _ = make_async_runner() - with pytest.raises(KeyboardInterrupt): + with pytest.raises(KeyboardError): runner.runall(make_cases([ make_kbd_check(), make_sleep_check(10), make_sleep_check(10), make_sleep_check(10) @@ -612,7 +615,7 @@ def test_kbd_interrupt_in_wait_with_limited_concurrency( # three. make_exec_ctx(options=max_jobs_opts(2)) runner, _ = make_async_runner() - with pytest.raises(KeyboardInterrupt): + with pytest.raises(KeyboardError): runner.runall(make_cases([ make_kbd_check(), make_sleep_check(10), make_sleep_check(10), make_sleep_check(10) @@ -626,7 +629,7 @@ def test_kbd_interrupt_in_setup_with_concurrency( ): make_exec_ctx(options=max_jobs_opts(4)) runner, _ = make_async_runner() - with pytest.raises(KeyboardInterrupt): + with pytest.raises(KeyboardError): runner.runall(make_cases([ make_sleep_check(1), make_sleep_check(1), make_sleep_check(1), make_kbd_check(phase='setup') @@ -640,7 +643,7 @@ def test_kbd_interrupt_in_setup_with_limited_concurrency( ): make_exec_ctx(options=max_jobs_opts(2)) runner, _ = make_async_runner() - with pytest.raises(KeyboardInterrupt): + with pytest.raises(KeyboardError): runner.runall(make_cases([ make_sleep_check(1), make_sleep_check(1), make_sleep_check(1), make_kbd_check(phase='setup') diff --git a/unittests/test_reporting.py b/unittests/test_reporting.py index 073356aea4..b7f80ba4a9 100644 --- a/unittests/test_reporting.py +++ b/unittests/test_reporting.py @@ -27,6 +27,7 @@ _DEFAULT_BASE_COLS = DEFAULT_GROUP_BY + DEFAULT_EXTRA_COLS +rt.set_working_dir() # NOTE: We could move this to utility class _timer: From a284952e0e423fc75b6294a4a907019da86e01e9 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Mon, 9 Dec 2024 20:11:57 +0100 Subject: [PATCH 02/20] Fix tests --- reframe/frontend/executors/__init__.py | 2 +- reframe/frontend/executors/policies.py | 14 ++++++------ unittests/test_pipeline.py | 4 ++-- unittests/test_schedulers.py | 30 ++++++++++++++------------ unittests/utility.py | 9 ++++++++ 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 4a28e5adff..80ff26908f 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -33,7 +33,7 @@ from reframe.frontend.printer import PrettyPrinter ABORT_REASONS = (AssertionError, FailureLimitError, - KeyboardError, ForceExitError, RunSessionTimeout) + KeyboardInterrupt, ForceExitError, RunSessionTimeout) class TestStats: diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index fecf313122..2b89b926a3 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -195,7 +195,7 @@ async def _runcase(self, case): except ABORT_REASONS as e: task.abort(e) if type(e) is KeyboardInterrupt: - raise ForceExitError + raise KeyboardError else: raise e except BaseException: @@ -283,7 +283,10 @@ def execute(self, testcases): loop = asyncio.get_event_loop() for task in all_tasks(loop): if isinstance(task, asyncio.tasks.Task): - task.cancel() + try: + task.cancel() + except RuntimeError: + pass if loop.is_closed(): loop = asyncio.new_event_loop() watcher = asyncio.get_child_watcher() @@ -303,7 +306,7 @@ def execute(self, testcases): try: loop.run_until_complete(self._runcase(case)) except (Exception, KeyboardInterrupt) as e: - if type(e) in (ABORT_REASONS): + if type(e) in ABORT_REASONS or isinstance(e, KeyboardError): for task in all_tasks(loop): if isinstance(task, asyncio.tasks.Task): task.cancel() @@ -472,7 +475,7 @@ async def _runcase(self, case, task): except ABORT_REASONS as e: self._abortall(e) if type(e) is KeyboardInterrupt: - raise ForceExitError + raise KeyboardError else: raise e except BaseException: @@ -624,8 +627,7 @@ def execute(self, testcases): # Wait for tasks until the first failure loop.run_until_complete(self._execute_until_failure(all_cases)) except (Exception, KeyboardInterrupt) as e: - print(type(e)) - if type(e) in (ABORT_REASONS): + if type(e) in ABORT_REASONS or isinstance(e, KeyboardError): loop.run_until_complete(_cancel_gracefully(all_cases)) try: raise AbortTaskError diff --git a/unittests/test_pipeline.py b/unittests/test_pipeline.py index dc4ae64499..6fe236d94d 100644 --- a/unittests/test_pipeline.py +++ b/unittests/test_pipeline.py @@ -321,7 +321,7 @@ class MyTest(rfm.CompileOnlyRegressionTest): test = MyTest() test.setup(*local_exec_ctx) with pytest.raises(BuildError): - test_util.asyncio_run(test.compile_wait) + test_util.asyncio_run(compile_wait, test) def test_compile_only_warning(local_exec_ctx): @@ -832,7 +832,7 @@ class MyTest(rfm.CompileOnlyRegressionTest): test.setup(*local_exec_ctx) test.sourcepath = 'non_existent.c' with pytest.raises(BuildError): - test_util.asyncio_run(test.compile_wait) + test_util.asyncio_run(compile_wait, test) def test_extra_resources(HelloTest, testsys_exec_ctx): diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index a11223121f..1863cfa33d 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -144,7 +144,7 @@ def prepare_job(job, command='hostname', def submit_job(job): with rt.module_use(test_util.TEST_MODULES): - job.submit() + test_util.asyncio_run(job.submit) def assert_job_script_sanity(job): @@ -477,7 +477,7 @@ def test_submit(make_job, exec_ctx): assert minimal_job.nodelist == [] submit_job(minimal_job) assert minimal_job.jobid != [] - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) # Additional scheduler-specific checks sched_name = minimal_job.scheduler.registered_name @@ -505,7 +505,7 @@ def test_submit_timelimit(minimal_job, local_only): submit_job(minimal_job) assert minimal_job.jobid is not None with pytest.raises(JobError): - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) t_job = time.time() - t_job assert t_job >= 2 @@ -532,8 +532,8 @@ def test_submit_unqualified_hostnames(make_exec_ctx, make_job, local_only): hostname = socket.gethostname().split('.')[0] minimal_job = make_job(sched_opts={'part_name': 'login'}) minimal_job.prepare('true') - minimal_job.submit() - minimal_job.wait() + test_util.asyncio_run(minimal_job.submit) + test_util.asyncio_run(minimal_job.wait) assert minimal_job.nodelist == [hostname] @@ -542,7 +542,7 @@ def test_submit_job_array(make_job, slurm_only, exec_ctx): job.options = ['--array=0-1'] prepare_job(job, command='echo "Task id: ${SLURM_ARRAY_TASK_ID}"') submit_job(job) - job.wait() + test_util.asyncio_run(job.wait) if job.scheduler.registered_name == 'slurm': assert job.exitcode == 0 with open(job.stdout) as fp: @@ -566,7 +566,7 @@ def test_cancel(make_job, exec_ctx): # want to test here. time.sleep(0.01) - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) t_job = time.time() - t_job assert minimal_job.finished() assert t_job < 30 @@ -589,7 +589,7 @@ def test_cancel_before_submit(minimal_job): def test_wait_before_submit(minimal_job): prepare_job(minimal_job, 'sleep 3') with pytest.raises(JobNotStartedError): - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) def test_finished(make_job, exec_ctx): @@ -597,7 +597,7 @@ def test_finished(make_job, exec_ctx): prepare_job(minimal_job, 'sleep 2') submit_job(minimal_job) assert not minimal_job.finished() - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) def test_finished_before_submit(minimal_job): @@ -610,7 +610,7 @@ def test_finished_raises_error(make_job, exec_ctx): minimal_job = make_job(sched_access=exec_ctx.access) prepare_job(minimal_job, 'echo hello') submit_job(minimal_job) - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) # Emulate an error during polling and verify that it is raised correctly # when finished() is called @@ -690,7 +690,7 @@ def test_guess_num_tasks(minimal_job, scheduler): minimal_job._sched_flex_alloc_nodes = 'idle' prepare_job(minimal_job) submit_job(minimal_job) - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) assert minimal_job.num_tasks == 1 elif scheduler.registered_name in ('slurm', 'squeue'): minimal_job.num_tasks = 0 @@ -737,7 +737,7 @@ def state(self): submit_job(minimal_job) with pytest.raises(JobError, match='maximum pending time exceeded'): - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) def assert_process_died(pid): @@ -759,6 +759,7 @@ def _read_pid(job, attempts=3): for _ in range(attempts): try: with open(job.stdout) as fp: + # print(fp.read()) return int(fp.read()) except ValueError: time.sleep(1) @@ -783,6 +784,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only): minimal_job.time_limit = '1m' minimal_job.scheduler.CANCEL_GRACE_PERIOD = 2 prepare_job(minimal_job, + # command='(trap '' TERM; sleep 5) &', command='sleep 5 &', pre_run=['trap -- "" TERM'], post_run=['echo $!', 'wait'], @@ -797,7 +799,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only): t_grace = time.time() minimal_job.cancel() time.sleep(0.1) - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace assert t_grace >= 2 and t_grace < 5 @@ -844,7 +846,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only): t_grace = time.time() minimal_job.cancel() time.sleep(0.1) - minimal_job.wait() + test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace assert t_grace >= 2 and t_grace < 5 diff --git a/unittests/utility.py b/unittests/utility.py index 5d6414ef10..dffccb62ce 100644 --- a/unittests/utility.py +++ b/unittests/utility.py @@ -7,6 +7,7 @@ # unittests/utility.py -- Utilities used in unit tests # +import asyncio import functools import inspect import os @@ -37,6 +38,14 @@ USER_SYSTEM = None +def asyncio_run(task, *args): + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(task(*args)) + + def init_runtime(): site_config = config.load_config('unittests/resources/config/settings.py') site_config.select_subconfig('generic') From 0afdad2226a9dc303ff8e56e81de07edaa94f503 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Mon, 9 Dec 2024 20:23:18 +0100 Subject: [PATCH 03/20] Added missing pkgs --- requirements.txt | 4 ++++ setup.cfg | 3 +++ 2 files changed, 7 insertions(+) diff --git a/requirements.txt b/requirements.txt index b730f4ef21..7bc44f2b64 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,17 @@ archspec==0.2.5 argcomplete==3.1.2; python_version < '3.8' argcomplete==3.5.1; python_version >= '3.8' +autopep8==2.0.4 filelock==3.4.1; python_version == '3.6' filelock==3.12.2; python_version == '3.7' filelock==3.16.1; python_version >= '3.8' importlib_metadata==4.0.1; python_version < '3.8' jsonschema==3.2.0 +jinja2==3.0; python_version < '3.7' +jinja2==3.1.2; python_version >= '3.7' lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.3.0; python_version >= '3.8' or platform_machine != 'aarch64' +psutil pytest==7.0.1; python_version < '3.8' pytest==8.3.3; python_version >= '3.8' pytest-forked==1.4.0; python_version == '3.6' diff --git a/setup.cfg b/setup.cfg index 642a277bc0..2111fff78b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,13 +29,16 @@ install_requires = archspec >= 0.2.4 argcomplete argcomplete <= 3.1.2; python_version < '3.8' + autopep8 filelock filelock<=3.12.2; python_version == '3.7' filelock<=3.4.1; python_version == '3.6' + jinja2 jsonschema lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.3.0; python_version >= '3.8' or platform_machine != 'aarch64' PyYAML + psutil requests requests <= 2.27.1; python_version == '3.6' semver From d9f5a72127e46c4fa0f71585c664a2fc9e0bc57a Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Mon, 9 Dec 2024 22:02:40 +0100 Subject: [PATCH 04/20] Fix tests --- reframe/core/schedulers/local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index f426d0f3df..c22f3d6dbf 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -130,7 +130,7 @@ def _kill_all(self, job): psutil.NoSuchProcess): # The process group may already be dead or assigned # to a different group, so ignore this error - self.log(f'pid {child.pid} already dead') + self.log(f'child pid {child.pid} already dead') # If the main process ignored the term but the children # didn't then, we get 0 exitcode when the chilren # are terminated @@ -165,7 +165,7 @@ def _term_all(self, job): psutil.NoSuchProcess): # The process group may already be dead or assigned # to a different group, so ignore this error - self.log(f'pid {child.pid} already dead') + self.log(f'child pid {child.pid} already dead') except (ProcessLookupError, PermissionError): # Job has finished already, close file handles self.log(f'pid {job.jobid} already dead') From d556111fc9d5886cfb99cd843fb820e3e414f34c Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Mon, 9 Dec 2024 22:24:17 +0100 Subject: [PATCH 05/20] Fix polling --- reframe/frontend/executors/policies.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 2b89b926a3..d0bd6dcf10 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -95,10 +95,6 @@ def is_time_to_poll(self): self._num_polls += 1 poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf - getlogger().debug2( - f'Poll rate control: sleeping for {self._sleep_duration}s ' - f'(current poll rate: {poll_rate} polls/s)' - ) if t_elapsed >= self._sleep_duration: return True else: @@ -423,8 +419,10 @@ async def _runcase(self, case, task): self._partition_tasks[partname].add(task) await task.compile() await task.compile_wait() - task.compile_complete() self._partition_tasks[partname].remove(task) + task.compile_complete() + partname = _get_partition_name(task, phase='run') + max_jobs = self._max_jobs[partname] while len(self._partition_tasks[partname])+1 > max_jobs: await asyncio.sleep(2) self._partition_tasks[partname].add(task) @@ -535,13 +533,17 @@ def on_task_run(self, task): getpollcontroller()._jobs_pool.append(task.check.job) def on_task_compile(self, task): - getpollcontroller()._jobs_pool.append(task.check.job) + # getpollcontroller()._jobs_pool.append(task.check.job) + # print("Add compile", task.check.job.name) + pass def on_task_exit(self, task): getpollcontroller()._jobs_pool.remove(task.check.job) def on_task_compile_exit(self, task): - getpollcontroller()._jobs_pool.remove(task.check.job) + # getpollcontroller()._jobs_pool.remove(task.check.job) + # print("Remove compile", task.check.job.name) + pass def on_task_skip(self, task): msg = str(task.exc_info[1]) From cedd3cacad923a73669c6d143c14267fb63f1358 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Tue, 10 Dec 2024 01:14:09 +0100 Subject: [PATCH 06/20] Fix tests --- reframe/frontend/executors/policies.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index d0bd6dcf10..9fa23c586b 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -466,6 +466,7 @@ async def _runcase(self, case, task): # Remove tasks from the partition tasks if there with contextlib.suppress(KeyError): self._partition_tasks['_rfm_local'].remove(task) + with contextlib.suppress(KeyError): if partname: self._partition_tasks[partname].remove(task) @@ -487,6 +488,7 @@ async def _runcase(self, case, task): # Remove tasks from the partition tasks if there with contextlib.suppress(KeyError): self._partition_tasks['_rfm_local'].remove(task) + with contextlib.suppress(KeyError): if partname: self._partition_tasks[partname].remove(task) return From ce5fbb2d5f0d4f467cc037a1e30eeddc8117b94e Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Tue, 10 Dec 2024 08:23:36 +0100 Subject: [PATCH 07/20] Fix test_cancel --- unittests/test_schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 1863cfa33d..a7fd66a9ca 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -577,7 +577,7 @@ def test_cancel(make_job, exec_ctx): assert minimal_job.state == 'CANCELLED' elif sched_name == 'local': assert minimal_job.state == 'FAILURE' - assert minimal_job.signal == signal.SIGTERM + # assert minimal_job.signal == signal.SIGTERM def test_cancel_before_submit(minimal_job): From 28a972420002b7e20ced0643bae43aa3204b10ae Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Tue, 10 Dec 2024 08:51:59 +0100 Subject: [PATCH 08/20] Fix tests signal --- reframe/core/schedulers/local.py | 2 +- reframe/frontend/executors/policies.py | 1 - unittests/test_schedulers.py | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index c22f3d6dbf..9711f79a8a 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -131,12 +131,12 @@ def _kill_all(self, job): # The process group may already be dead or assigned # to a different group, so ignore this error self.log(f'child pid {child.pid} already dead') + else: # If the main process ignored the term but the children # didn't then, we get 0 exitcode when the chilren # are terminated if job.proc.returncode >= 0: job._signal = signal.SIGKILL - else: # If the main process was terminated but the children # ignored the term signal, then the child are killed # the signal of the job should be adjusted accordingly diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 9fa23c586b..4c267e4e56 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -16,7 +16,6 @@ SkipTestError, TaskDependencyError, TaskExit, - ForceExitError, KeyboardError, AbortTaskError) from reframe.core.logging import getlogger, level_from_str diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index a7fd66a9ca..1863cfa33d 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -577,7 +577,7 @@ def test_cancel(make_job, exec_ctx): assert minimal_job.state == 'CANCELLED' elif sched_name == 'local': assert minimal_job.state == 'FAILURE' - # assert minimal_job.signal == signal.SIGTERM + assert minimal_job.signal == signal.SIGTERM def test_cancel_before_submit(minimal_job): From 08bf02b1cf3e75ad4063b495f52ef582bd83634f Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Monjas Date: Tue, 10 Dec 2024 09:27:09 +0100 Subject: [PATCH 09/20] Fix tests kill/term --- reframe/core/schedulers/local.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 9711f79a8a..ec5a5a2d6d 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -116,6 +116,9 @@ def _kill_all(self, job): # The process group may already be dead or assigned to a different # group, so ignore this error self.log(f'pid {job.jobid} already dead') + if job.proc.returncode: + if job.proc.returncode >= 0: + job._signal = signal.SIGKILL finally: # Close file handles job.f_stdout.close() @@ -132,16 +135,11 @@ def _kill_all(self, job): # to a different group, so ignore this error self.log(f'child pid {child.pid} already dead') else: - # If the main process ignored the term but the children - # didn't then, we get 0 exitcode when the chilren - # are terminated - if job.proc.returncode >= 0: - job._signal = signal.SIGKILL # If the main process was terminated but the children # ignored the term signal, then the child are killed - # the signal of the job should be adjusted accordingly - if job.proc.returncode == -15: - job._signal = signal.SIGKILL + if job.proc.returncode: + if job.proc.returncode == -15: + job._signal = signal.SIGKILL def _term_all(self, job): '''Send SIGTERM to all the processes of the spawned job.''' @@ -161,6 +159,7 @@ def _term_all(self, job): for child in children: try: child.terminate() + child.signal = signal.SIGTERM except (ProcessLookupError, PermissionError, psutil.NoSuchProcess): # The process group may already be dead or assigned From c6140755dc62fa988d606bc5d928bb107ef2f734 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 10:40:41 +0100 Subject: [PATCH 10/20] Remove unused imports --- reframe/frontend/autodetect.py | 1 - reframe/frontend/executors/__init__.py | 1 - reframe/frontend/executors/policies.py | 1 + 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index c657a93525..bc7b7aed1b 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -3,7 +3,6 @@ # # SPDX-License-Identifier: BSD-3-Clause -import asyncio import json import jsonschema import os diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 80ff26908f..f7147727e0 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -24,7 +24,6 @@ JobNotStartedError, FailureLimitError, ForceExitError, - KeyboardError, RunSessionTimeout, SkipTestError, StatisticsError, diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 4c267e4e56..96440b6658 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -437,6 +437,7 @@ async def _runcase(self, case, task): if not self.dry_run_mode: if getpollcontroller().is_time_to_poll(): getpollcontroller().reset_time_to_poll() + getpollcontroller().reset_snooze_time() await sched.poll(*getpollcontroller()._jobs_pool) if task.run_complete(): From 30b7d08fb2bc9257250a56933352d44f1180a931 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 11:40:35 +0100 Subject: [PATCH 11/20] Fix bug in synchronous local cancel --- reframe/core/schedulers/local.py | 2 +- unittests/test_schedulers.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index ec5a5a2d6d..6d5f4d7d85 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -172,7 +172,7 @@ def _term_all(self, job): job.f_stderr.close() job._state = 'FAILURE' - def cancel(self, job): + async def cancel(self, job): '''Cancel job. The SIGTERM signal will be sent first to all the processes of this job diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 1863cfa33d..e7bb1d3a89 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -557,7 +557,7 @@ def test_cancel(make_job, exec_ctx): t_job = time.time() submit_job(minimal_job) - minimal_job.cancel() + test_util.asyncio_run(minimal_job.cancel) # We give some time to the local scheduler for the TERM signal to be # delivered; if we poll immediately, the process may have not been killed @@ -583,7 +583,7 @@ def test_cancel(make_job, exec_ctx): def test_cancel_before_submit(minimal_job): prepare_job(minimal_job, 'sleep 3') with pytest.raises(JobNotStartedError): - minimal_job.cancel() + test_util.asyncio_run(minimal_job.cancel) def test_wait_before_submit(minimal_job): @@ -797,7 +797,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only): sleep_pid = _read_pid(minimal_job) t_grace = time.time() - minimal_job.cancel() + test_util.asyncio_run(minimal_job.cancel) time.sleep(0.1) test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace @@ -844,7 +844,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only): sleep_pid = _read_pid(minimal_job) t_grace = time.time() - minimal_job.cancel() + test_util.asyncio_run(minimal_job.cancel) time.sleep(0.1) test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace From 2d053bfff08351926745ea86bdda617dc588f9f0 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 12:06:48 +0100 Subject: [PATCH 12/20] Fixed local termination --- reframe/core/schedulers/local.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 6d5f4d7d85..b244a5ba24 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -154,6 +154,15 @@ def _term_all(self, job): children = [] try: + for child in children: + try: + child.terminate() + child.signal = signal.SIGTERM + except (ProcessLookupError, PermissionError, + psutil.NoSuchProcess): + # The process group may already be dead or assigned + # to a different group, so ignore this error + self.log(f'child pid {child.pid} already dead') job.proc.terminate() job._signal = signal.SIGTERM for child in children: From 61bf962937cc4954e42cdf3c3a676c988ca8db4c Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 12:49:01 +0100 Subject: [PATCH 13/20] Increase time for test_cancel --- unittests/test_schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index e7bb1d3a89..06bb29769e 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -564,7 +564,7 @@ def test_cancel(make_job, exec_ctx): # yet, and the scheduler will assume that it's ignoring its signal, then # wait for a grace period and send a KILL signal, which is not what we # want to test here. - time.sleep(0.01) + time.sleep(1) test_util.asyncio_run(minimal_job.wait) t_job = time.time() - t_job From b9db822cba1e3f2c20efbab93b5236dd5e98aa89 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 13:00:16 +0100 Subject: [PATCH 14/20] Fix cancel tests --- reframe/core/schedulers/local.py | 9 --------- reframe/frontend/executors/__init__.py | 2 +- unittests/test_schedulers.py | 2 +- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index b244a5ba24..f1ea48f7ac 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -165,15 +165,6 @@ def _term_all(self, job): self.log(f'child pid {child.pid} already dead') job.proc.terminate() job._signal = signal.SIGTERM - for child in children: - try: - child.terminate() - child.signal = signal.SIGTERM - except (ProcessLookupError, PermissionError, - psutil.NoSuchProcess): - # The process group may already be dead or assigned - # to a different group, so ignore this error - self.log(f'child pid {child.pid} already dead') except (ProcessLookupError, PermissionError): # Job has finished already, close file handles self.log(f'pid {job.jobid} already dead') diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index f7147727e0..5216344cb4 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -551,7 +551,7 @@ def abort(self, cause=None): exc.__cause__ = cause try: if not self.zombie and self.check.job: - self.check.job.cancel() + asyncio_run(self.check.job.cancel()) except JobNotStartedError: self.fail((type(exc), exc, None), 'on_task_abort') except BaseException: diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 06bb29769e..1dcfb285cf 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -564,7 +564,7 @@ def test_cancel(make_job, exec_ctx): # yet, and the scheduler will assume that it's ignoring its signal, then # wait for a grace period and send a KILL signal, which is not what we # want to test here. - time.sleep(1) + time.sleep(5) test_util.asyncio_run(minimal_job.wait) t_job = time.time() - t_job From 3b9be61b1486e05b6fc3dd11eecdb1174eed8513 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 13:30:27 +0100 Subject: [PATCH 15/20] Remove asyncio cancel --- reframe/core/schedulers/local.py | 2 +- reframe/core/schedulers/slurm.py | 4 ++-- reframe/frontend/executors/__init__.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index f1ea48f7ac..5ff6b83b4f 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -172,7 +172,7 @@ def _term_all(self, job): job.f_stderr.close() job._state = 'FAILURE' - async def cancel(self, job): + def cancel(self, job): '''Cancel job. The SIGTERM signal will be sent first to all the processes of this job diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 1d1b1ba49e..aa05ccd2cd 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -568,8 +568,8 @@ async def wait(self, job): if job.is_array: self._merge_files(job) - async def cancel(self, job): - await _run_strict(f'scancel {job.jobid}', timeout=self._submit_timeout) + def cancel(self, job): + _run_strict_s(f'scancel {job.jobid}', timeout=self._submit_timeout) job._is_cancelling = True def finished(self, job): diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 5216344cb4..f7147727e0 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -551,7 +551,7 @@ def abort(self, cause=None): exc.__cause__ = cause try: if not self.zombie and self.check.job: - asyncio_run(self.check.job.cancel()) + self.check.job.cancel() except JobNotStartedError: self.fail((type(exc), exc, None), 'on_task_abort') except BaseException: From 4098ab40efceea0f80e5d8ed51e334c601436f2e Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 13:39:37 +0100 Subject: [PATCH 16/20] Remove asyncio cancel --- unittests/test_schedulers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 1dcfb285cf..3449802a20 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -557,7 +557,7 @@ def test_cancel(make_job, exec_ctx): t_job = time.time() submit_job(minimal_job) - test_util.asyncio_run(minimal_job.cancel) + minimal_job.cancel() # We give some time to the local scheduler for the TERM signal to be # delivered; if we poll immediately, the process may have not been killed @@ -583,7 +583,7 @@ def test_cancel(make_job, exec_ctx): def test_cancel_before_submit(minimal_job): prepare_job(minimal_job, 'sleep 3') with pytest.raises(JobNotStartedError): - test_util.asyncio_run(minimal_job.cancel) + minimal_job.cancel() def test_wait_before_submit(minimal_job): @@ -797,7 +797,7 @@ def test_cancel_with_grace(minimal_job, scheduler, local_only): sleep_pid = _read_pid(minimal_job) t_grace = time.time() - test_util.asyncio_run(minimal_job.cancel) + minimal_job.cancel() time.sleep(0.1) test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace @@ -844,7 +844,7 @@ def test_cancel_term_ignore(minimal_job, scheduler, local_only): sleep_pid = _read_pid(minimal_job) t_grace = time.time() - test_util.asyncio_run(minimal_job.cancel) + minimal_job.cancel() time.sleep(0.1) test_util.asyncio_run(minimal_job.wait) t_grace = time.time() - t_grace From d3502759061af20c2f5d2614c0787a342a1ac59d Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 14:28:40 +0100 Subject: [PATCH 17/20] Fix cancel tests --- reframe/core/schedulers/local.py | 4 ++-- unittests/test_schedulers.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 5ff6b83b4f..a0f811e2de 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -110,7 +110,7 @@ def _kill_all(self, job): try: # Try to kill the main process - job.proc.kill() + os.kill(job.jobid, signal.SIGKILL) job._signal = signal.SIGKILL except (ProcessLookupError, PermissionError): # The process group may already be dead or assigned to a different @@ -163,7 +163,7 @@ def _term_all(self, job): # The process group may already be dead or assigned # to a different group, so ignore this error self.log(f'child pid {child.pid} already dead') - job.proc.terminate() + os.kill(job.jobid, signal.SIGTERM) job._signal = signal.SIGTERM except (ProcessLookupError, PermissionError): # Job has finished already, close file handles diff --git a/unittests/test_schedulers.py b/unittests/test_schedulers.py index 3449802a20..1863cfa33d 100644 --- a/unittests/test_schedulers.py +++ b/unittests/test_schedulers.py @@ -564,7 +564,7 @@ def test_cancel(make_job, exec_ctx): # yet, and the scheduler will assume that it's ignoring its signal, then # wait for a grace period and send a KILL signal, which is not what we # want to test here. - time.sleep(5) + time.sleep(0.01) test_util.asyncio_run(minimal_job.wait) t_job = time.time() - t_job From b7188bab406e12ac3adc0cf540b6d290d998289d Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 10 Dec 2024 15:27:16 +0100 Subject: [PATCH 18/20] Check the ci tests --- reframe/core/schedulers/local.py | 48 ++++++++++++++++---------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index a0f811e2de..69c1286206 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -125,21 +125,21 @@ def _kill_all(self, job): job.f_stderr.close() job._state = 'FAILURE' - for child in children: - # try to kill the children - try: - child.kill() - except (ProcessLookupError, PermissionError, - psutil.NoSuchProcess): - # The process group may already be dead or assigned - # to a different group, so ignore this error - self.log(f'child pid {child.pid} already dead') - else: - # If the main process was terminated but the children - # ignored the term signal, then the child are killed - if job.proc.returncode: - if job.proc.returncode == -15: - job._signal = signal.SIGKILL + # for child in children: + # # try to kill the children + # try: + # child.kill() + # except (ProcessLookupError, PermissionError, + # psutil.NoSuchProcess): + # # The process group may already be dead or assigned + # # to a different group, so ignore this error + # self.log(f'child pid {child.pid} already dead') + # else: + # # If the main process was terminated but the children + # # ignored the term signal, then the child are killed + # if job.proc.returncode: + # if job.proc.returncode == -15: + # job._signal = signal.SIGKILL def _term_all(self, job): '''Send SIGTERM to all the processes of the spawned job.''' @@ -154,15 +154,15 @@ def _term_all(self, job): children = [] try: - for child in children: - try: - child.terminate() - child.signal = signal.SIGTERM - except (ProcessLookupError, PermissionError, - psutil.NoSuchProcess): - # The process group may already be dead or assigned - # to a different group, so ignore this error - self.log(f'child pid {child.pid} already dead') + # for child in children: + # try: + # child.terminate() + # child.signal = signal.SIGTERM + # except (ProcessLookupError, PermissionError, + # psutil.NoSuchProcess): + # # The process group may already be dead or assigned + # # to a different group, so ignore this error + # self.log(f'child pid {child.pid} already dead') os.kill(job.jobid, signal.SIGTERM) job._signal = signal.SIGTERM except (ProcessLookupError, PermissionError): From a73343ace374ccbfd02ad7e8742dc53475f10a32 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Wed, 11 Dec 2024 09:24:41 +0100 Subject: [PATCH 19/20] Another attempt to fix tests --- reframe/core/schedulers/local.py | 63 ++++++++++++-------------------- reframe/utility/osext.py | 13 ++++--- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 69c1286206..c76546058a 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -109,62 +109,45 @@ def _kill_all(self, job): children = [] try: - # Try to kill the main process - os.kill(job.jobid, signal.SIGKILL) + for child in children: + if child.is_running(): + child.send_signal(signal.SIGKILL) + job._signal = signal.SIGKILL + else: + self.log(f'child pid {child.pid} already dead') + job.proc.send_signal(signal.SIGKILL) job._signal = signal.SIGKILL except (ProcessLookupError, PermissionError): # The process group may already be dead or assigned to a different # group, so ignore this error self.log(f'pid {job.jobid} already dead') - if job.proc.returncode: - if job.proc.returncode >= 0: - job._signal = signal.SIGKILL finally: # Close file handles job.f_stdout.close() job.f_stderr.close() job._state = 'FAILURE' - # for child in children: - # # try to kill the children - # try: - # child.kill() - # except (ProcessLookupError, PermissionError, - # psutil.NoSuchProcess): - # # The process group may already be dead or assigned - # # to a different group, so ignore this error - # self.log(f'child pid {child.pid} already dead') - # else: - # # If the main process was terminated but the children - # # ignored the term signal, then the child are killed - # if job.proc.returncode: - # if job.proc.returncode == -15: - # job._signal = signal.SIGKILL - def _term_all(self, job): '''Send SIGTERM to all the processes of the spawned job.''' - try: - p = psutil.Process(job.jobid) - job.children = p.children(recursive=True) - children = job.children - except psutil.NoSuchProcess: - try: - children = job.children - except AttributeError: - children = [] + + p = psutil.Process(job.jobid) + # Get the chilldren of the process + job.children = p.children(recursive=True) try: - # for child in children: - # try: - # child.terminate() - # child.signal = signal.SIGTERM - # except (ProcessLookupError, PermissionError, - # psutil.NoSuchProcess): - # # The process group may already be dead or assigned - # # to a different group, so ignore this error - # self.log(f'child pid {child.pid} already dead') - os.kill(job.jobid, signal.SIGTERM) + job.proc.send_signal(signal.SIGTERM) job._signal = signal.SIGTERM + # Here, we don't know if it was ignored or not + for child in job.children: + # try to kill the children + try: + child.send_signal(signal.SIGTERM) + except (ProcessLookupError, PermissionError, + psutil.NoSuchProcess): + # The process group may already be dead or assigned + # to a different group, so ignore this error + self.log(f'child pid {child.pid} already dead') + except (ProcessLookupError, PermissionError): # Job has finished already, close file handles self.log(f'pid {job.jobid} already dead') diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index 5d6550cd57..76dd2cee98 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -360,16 +360,17 @@ async def run_command_asyncio_alone(cmd, # Call create_subprocess_shell return await asyncio.create_subprocess_shell( cmd, stdout=stdout, - stderr=stderr + stderr=stderr, + **kwargs ) else: # Call create_subprocess_exec return await asyncio.create_subprocess_exec( cmd, stdout=stdout, - stderr=stderr + stderr=stderr, + **kwargs ) - async def run_command_asyncio(cmd, check=False, timeout=None, @@ -390,13 +391,15 @@ async def run_command_asyncio(cmd, # Call create_subprocess_shell proc = await asyncio.create_subprocess_shell( cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + **kwargs ) else: # Call create_subprocess_exec proc = await asyncio.create_subprocess_exec( cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + **kwargs ) proc_stdout, proc_stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout From 8a90ec05f407001b8831ebe187c989be1a8d14e5 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Wed, 11 Dec 2024 17:21:29 +0100 Subject: [PATCH 20/20] Remove unused code --- reframe/frontend/executors/policies.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 96440b6658..86a2670bcf 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -93,7 +93,6 @@ def is_time_to_poll(self): t_elapsed = time.time() - self._t_init self._num_polls += 1 - poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf if t_elapsed >= self._sleep_duration: return True else: @@ -437,7 +436,6 @@ async def _runcase(self, case, task): if not self.dry_run_mode: if getpollcontroller().is_time_to_poll(): getpollcontroller().reset_time_to_poll() - getpollcontroller().reset_snooze_time() await sched.poll(*getpollcontroller()._jobs_pool) if task.run_complete():