From 001644b19cc4157bc1637d6cd5f3e16b95743029 Mon Sep 17 00:00:00 2001 From: Blanca Fuentes Date: Tue, 14 Jan 2025 10:18:47 +0100 Subject: [PATCH] Change asynchronous policy to use asyncio --- .github/workflows/main.yml | 8 - docs/requirements.txt | 1 + reframe/core/config.py | 4 +- reframe/core/launchers/mpi.py | 2 +- reframe/core/logging.py | 6 +- reframe/core/modules.py | 24 +-- reframe/core/schedulers/local.py | 4 +- reframe/core/schedulers/lsf.py | 2 +- reframe/core/schedulers/oar.py | 4 +- reframe/core/schedulers/pbs.py | 8 +- reframe/core/schedulers/sge.py | 6 +- reframe/core/schedulers/slurm.py | 16 +- reframe/core/schedulers/ssh.py | 32 ++-- reframe/frontend/executors/__init__.py | 10 +- reframe/frontend/executors/policies.py | 99 ++++++++--- reframe/utility/cpuinfo.py | 4 +- reframe/utility/osext.py | 224 +++++++++++++------------ unittests/test_pipeline.py | 2 +- unittests/test_shell.py | 8 +- unittests/test_utility.py | 80 ++++----- 20 files changed, 304 insertions(+), 240 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bbc88c83cb..543586ed51 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -138,11 +138,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: -<<<<<<< HEAD - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] -======= python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] ->>>>>>> upstream/develop steps: - uses: actions/checkout@v4 - name: Setup up Python ${{ matrix.python-version }} @@ -165,11 +161,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: -<<<<<<< HEAD - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] -======= python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] ->>>>>>> upstream/develop steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/docs/requirements.txt b/docs/requirements.txt index c2cb9e6696..98c306391d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,6 +1,7 @@ archspec==0.2.5 docutils==0.18.1 jsonschema==3.2.0 +psutil semver==2.13.0; python_version == '3.6' semver==3.0.2; python_version >= '3.7' Sphinx==5.3.0; python_version < '3.8' diff --git a/reframe/core/config.py b/reframe/core/config.py index b1ede82bb4..ddf3c111c9 100644 --- a/reframe/core/config.py +++ b/reframe/core/config.py @@ -391,7 +391,7 @@ def _py_meth(m): def _sh_meth(m): def _fn(): - completed = osext.run_command(m, check=True) + completed = osext.run_command_s(m, check=True) return completed.stdout.strip() return _fn @@ -429,7 +429,7 @@ def _detect_system(self): 'the `--system` option') getlogger().debug(f'Retrieved hostname: {hostname!r}') - getlogger().debug(f'Looking for a matching configuration entry') + getlogger().debug('Looking for a matching configuration entry') for system in self._site_config['systems']: for patt in system['hostnames']: if re.match(patt, hostname): diff --git a/reframe/core/launchers/mpi.py b/reframe/core/launchers/mpi.py index 6e6acc56b8..e982ff6dca 100644 --- a/reframe/core/launchers/mpi.py +++ b/reframe/core/launchers/mpi.py @@ -19,7 +19,7 @@ def __init__(self): self.options = [] self.use_cpus_per_task = True try: - out = osext.run_command('srun --version') + out = osext.run_command_s('srun --version') match = re.search(r'slurm(-wlm)? (\d+)\.(\d+)\.(\d+)', out.stdout) if match: # We cannot pass to semver strings like 22.05.1 directly diff --git a/reframe/core/logging.py b/reframe/core/logging.py index 46e4852df4..717fb790a9 100644 --- a/reframe/core/logging.py +++ b/reframe/core/logging.py @@ -959,7 +959,6 @@ def adjust_verbosity(self, num_steps): global tasks_loggers tasks_loggers = {} -global _global_logger _global_logger = null_logger @@ -974,10 +973,11 @@ def __init__(self, check=None, level=DEBUG): self._orig_logger = _global_logger self._level = level - self._context_logger = _global_logger if check is not None: self._context_logger = LoggerAdapter(_logger, check) self._context_logger.colorize = self._orig_logger.colorize + else: + self._context_logger = _global_logger if task: tasks_loggers[task] = self._context_logger @@ -988,10 +988,10 @@ def __enter__(self): return self._context_logger def __exit__(self, exc_type, exc_value, traceback): - global _global_logger try: task = current_task() except RuntimeError: + global _global_logger task = None # Log any exceptions thrown with the current context logger diff --git a/reframe/core/modules.py b/reframe/core/modules.py index e8c9c504b1..4f9fe73c77 100644 --- a/reframe/core/modules.py +++ b/reframe/core/modules.py @@ -597,7 +597,7 @@ def __init__(self): def _do_validate(self): # Try to figure out if we are indeed using the TCL version try: - completed = osext.run_command('modulecmd -V') + completed = osext.run_command_s('modulecmd -V') except OSError as e: raise ConfigError( 'could not find a sane TMod installation') from e @@ -626,7 +626,7 @@ def _do_validate(self): self._version = version try: # Try the Python bindings now - completed = osext.run_command(self.modulecmd()) + completed = osext.run_command_s(self.modulecmd()) except OSError as e: raise ConfigError( f'could not get the Python bindings for TMod: {e}' @@ -653,7 +653,7 @@ def _execute(self, cmd, *args): self._do_validate() modulecmd = self.modulecmd(cmd, *args) - completed = osext.run_command(modulecmd) + completed = osext.run_command_s(modulecmd) if re.search(r'\bERROR\b', completed.stderr) is not None: raise SpawnedProcessError(modulecmd, completed.stdout, @@ -746,7 +746,7 @@ def _do_validate(self): try: modulecmd = os.getenv('MODULESHOME') modulecmd = os.path.join(modulecmd, 'modulecmd.tcl') - completed = osext.run_command(modulecmd) + completed = osext.run_command_s(modulecmd) except OSError as e: raise ConfigError( f'could not find a sane TMod31 installation: {e}' @@ -776,7 +776,7 @@ def _do_validate(self): self._command = f'{modulecmd} python' try: # Try the Python bindings now - completed = osext.run_command(self._command) + completed = osext.run_command_s(self._command) except OSError as e: raise ConfigError( f'could not get the Python bindings for TMod31: {e}' @@ -800,7 +800,7 @@ def _execute(self, cmd, *args): self._do_validate() modulecmd = self.modulecmd(cmd, *args) - completed = osext.run_command(modulecmd) + completed = osext.run_command_s(modulecmd) if re.search(r'\bERROR\b', completed.stderr) is not None: raise SpawnedProcessError(modulecmd, completed.stdout, @@ -833,7 +833,7 @@ def __init__(self): def _do_validate(self): try: - completed = osext.run_command(self.modulecmd('-V'), check=True) + completed = osext.run_command_s(self.modulecmd('-V'), check=True) except OSError as e: raise ConfigError( 'could not find a sane TMod4 installation' @@ -877,7 +877,7 @@ def _execute(self, cmd, *args): self._do_validate() modulecmd = self.modulecmd(cmd, *args) - completed = osext.run_command(modulecmd, check=False) + completed = osext.run_command_s(modulecmd, check=False) namespace = {} exec(self.process(completed.stdout), {}, namespace) @@ -977,7 +977,7 @@ def _do_validate(self): 'environment variable LMOD_CMD is not defined') try: - completed = osext.run_command(f'{self._lmod_cmd} --version') + completed = osext.run_command_s(f'{self._lmod_cmd} --version') except OSError as e: raise ConfigError(f'could not find a sane Lmod installation: {e}') @@ -989,7 +989,7 @@ def _do_validate(self): self._version = version_match.group(1) try: # Try the Python bindings now - completed = osext.run_command(self.modulecmd()) + completed = osext.run_command_s(self.modulecmd()) except OSError as e: raise ConfigError( f'could not get the Python bindings for Lmod: {e}' @@ -1159,7 +1159,7 @@ def __init__(self): def _do_validate(self): # Try to figure out if we are indeed using the TCL version try: - completed = osext.run_command('spack -V') + completed = osext.run_command_s('spack -V') except OSError as e: raise ConfigError( 'could not find a sane Spack installation' @@ -1182,7 +1182,7 @@ def _execute(self, cmd, *args): self._do_validate() modulecmd = self.modulecmd(cmd, *args) - completed = osext.run_command(modulecmd, check=True) + completed = osext.run_command_s(modulecmd, check=True) return completed.stdout def available_modules(self, substr): diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index 09d2d3b6f5..2934da78d0 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -62,7 +62,7 @@ async def submit(self, job): # The new process starts also a new session (session leader), so that # we can later kill any other processes that this might spawn by just # killing this one. - proc = await osext.run_command_asyncio_alone( + proc = await osext.run_command_asyncio( os.path.abspath(job.script_filename), stdout=f_stdout, stderr=f_stderr, @@ -200,7 +200,7 @@ async def poll(self, *jobs): await self._poll_job(job) async def _poll_job(self, job): - if job is None or job.jobid is None: + if (job is None or job.jobid is None or job.finished()): return if job.cancel_time: diff --git a/reframe/core/schedulers/lsf.py b/reframe/core/schedulers/lsf.py index b40c2ecea6..6bdf8620cc 100644 --- a/reframe/core/schedulers/lsf.py +++ b/reframe/core/schedulers/lsf.py @@ -20,7 +20,7 @@ from reframe.core.schedulers.pbs import PbsJobScheduler # Asynchronous _run_strict -_run_strict = functools.partial(osext.run_command_asyncio, check=True) +_run_strict = functools.partial(osext.run_command, check=True) @register_scheduler('lsf') diff --git a/reframe/core/schedulers/oar.py b/reframe/core/schedulers/oar.py index 5ab55a7478..1cfdc545a4 100644 --- a/reframe/core/schedulers/oar.py +++ b/reframe/core/schedulers/oar.py @@ -53,9 +53,9 @@ def oar_state_pending(state): # Asynchronous _run_strict -_run_strict = functools.partial(osext.run_command_asyncio, check=True) +_run_strict = functools.partial(osext.run_command, check=True) # Synchronous _run_strict -_run_strict_s = functools.partial(osext.run_command, check=True) +_run_strict_s = functools.partial(osext.run_command_s, check=True) @register_scheduler('oar') diff --git a/reframe/core/schedulers/pbs.py b/reframe/core/schedulers/pbs.py index 826cc8c158..b567fb661f 100644 --- a/reframe/core/schedulers/pbs.py +++ b/reframe/core/schedulers/pbs.py @@ -37,9 +37,9 @@ # Asynchronous _run_strict -_run_strict = functools.partial(osext.run_command_asyncio, check=True) +_run_strict = functools.partial(osext.run_command, check=True) # Synchronous _run_strict -_run_strict_s = functools.partial(osext.run_command, check=True) +_run_strict_s = functools.partial(osext.run_command_s, check=True) JOB_STATES = { @@ -199,7 +199,7 @@ def _query_exit_code(self, job): '''Try to retrieve the exit code of a past job.''' # With PBS Pro we can obtain the exit status of a past job - extended_info = osext.run_command(f'qstat -xf {job.jobid}') + extended_info = osext.run_command_s(f'qstat -xf {job.jobid}') exit_status_match = re.search( r'^ *Exit_status *= *(?P\d+)', extended_info.stdout, flags=re.MULTILINE, @@ -224,7 +224,7 @@ def output_ready(job): if not jobs: return - completed = await osext.run_command_asyncio( + completed = await osext.run_command( f'qstat -f {" ".join(job.jobid for job in jobs)}' ) diff --git a/reframe/core/schedulers/sge.py b/reframe/core/schedulers/sge.py index afc5d192b0..7193ee855b 100644 --- a/reframe/core/schedulers/sge.py +++ b/reframe/core/schedulers/sge.py @@ -21,9 +21,9 @@ from reframe.utility import seconds_to_hms # Asynchronous _run_strict -_run_strict = functools.partial(osext.run_command_asyncio, check=True) +_run_strict = functools.partial(osext.run_command, check=True) # Synchronous _run_strict -_run_strict_s = functools.partial(osext.run_command, check=True) +_run_strict_s = functools.partial(osext.run_command_s, check=True) @register_scheduler('sge') @@ -78,7 +78,7 @@ async def poll(self, *jobs): return user = osext.osuser() - completed = await osext.run_command_asyncio(f'qstat -xml -u {user}') + completed = await osext.run_command(f'qstat -xml -u {user}') if completed.returncode != 0: raise JobSchedulerError( f'qstat failed with exit code {completed.returncode} ' diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index f719b39716..69d60cf92a 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -66,9 +66,9 @@ def slurm_state_pending(state): # Asynchronous _run_strict -_run_strict = functools.partial(osext.run_command_asyncio, check=True) +_run_strict = functools.partial(osext.run_command, check=True) # Synchronous _run_strict -_run_strict_s = functools.partial(osext.run_command, check=True) +_run_strict_s = functools.partial(osext.run_command_s, check=True) class _SlurmJob(sched.Job): @@ -85,7 +85,7 @@ def __init__(self, *args, **kwargs): def nodelist(self): # Generate the nodelist only after the job is finished if slurm_state_completed(self.state): - completed = osext.run_command( + completed = osext.run_command_s( f'scontrol show hostname {self._nodespec}', log=False ) self._nodelist = completed.stdout.splitlines() @@ -327,7 +327,7 @@ def _get_actual_partition(self, options): if partition_match: return partition_match.group('partition') - except SpawnedProcessError as e: + except SpawnedProcessError: self.log('could not retrieve actual partition') return None @@ -424,8 +424,8 @@ def _get_reservation_nodes(self, reservation): return _create_nodes(node_descriptions) def _get_nodes_by_name(self, nodespec): - completed = osext.run_command('scontrol -a show -o node %s' % - nodespec) + completed = osext.run_command_s('scontrol -a show -o node %s' % + nodespec) node_descriptions = completed.stdout.splitlines() return _create_nodes(node_descriptions) @@ -523,7 +523,7 @@ async def _cancel_if_blocked(self, job, reasons=None): return if not reasons: - completed = await osext.run_command_asyncio( + completed = await osext.run_command( 'squeue -h -j %s -o %%r' % job.jobid ) reasons = completed.stdout.splitlines() @@ -628,7 +628,7 @@ async def poll(self, *jobs): # We don't run the command with check=True, because if the job has # finished already, squeue might return an error about an invalid # job id. - completed = await osext.run_command_asyncio( + completed = await osext.run_command( f'squeue -h -j {",".join(job.jobid for job in jobs)} ' f'-o "%%i|%%T|%%N|%%r"' ) diff --git a/reframe/core/schedulers/ssh.py b/reframe/core/schedulers/ssh.py index cae9089585..37945c6a46 100644 --- a/reframe/core/schedulers/ssh.py +++ b/reframe/core/schedulers/ssh.py @@ -51,7 +51,7 @@ def __init__(self, *, hosts=None): # Determine if rsync is available try: - osext.run_command('rsync --version', check=True) + osext.run_command_s('rsync --version', check=True) except (FileNotFoundError, SpawnedProcessError): self._has_rsync = False else: @@ -80,7 +80,7 @@ def _push_artefacts(self, job): # Create a temporary directory on the remote host and push the job # artifacts - completed = osext.run_command( + completed = osext.run_command_s( f'ssh -o BatchMode=yes {options} {job.host} ' f'mktemp -td rfm.XXXXXXXX', check=True ) @@ -124,7 +124,7 @@ def _do_submit(self, job): f'"cd {job.remotedir} && bash -l {job.script_filename}"' ) - def submit(self, job): + async def submit(self, job): assert isinstance(job, _SSHJob) # Check if `#host` pseudo-option is specified and use this as a host, @@ -160,13 +160,13 @@ def success(proc): job.steps['pull'], when=success ) - job.steps['push'].start() + await job.steps['push'].start() job._jobid = job.steps['push'].pid - def wait(self, job): + async def wait(self, job): for step in job.steps.values(): if step.started(): - step.wait() + await step.wait() def cancel(self, job): for step in job.steps.values(): @@ -179,15 +179,18 @@ def finished(self, job): return job.state is not None - def poll(self, *jobs): + async def poll(self, *jobs): for job in jobs: - self._poll_job(job) + await self._poll_job(job) + + async def _poll_job(self, job): + if job.finished(): + return True - def _poll_job(self, job): last_done = None last_failed = None for proc_kind, proc in job.steps.items(): - if proc.started() and proc.done(): + if proc.started() and await proc.done(): last_done = proc_kind if proc.exitcode != 0: last_failed = proc_kind @@ -200,7 +203,7 @@ def _poll_job(self, job): # Update the job info last_proc = job.steps[last_done] job._exitcode = last_proc.exitcode - job._exception = last_proc.exception() + job._exception = await last_proc.exception() job._signal = last_proc.signal if job._exitcode == 0: job._state = 'SUCCESS' @@ -209,12 +212,13 @@ def _poll_job(self, job): exec_proc = job.steps['exec'] if exec_proc.started(): - with osext.change_dir(job.localdir): + with osext.change_dir_global(job.localdir): + stdout_, stderr_ = await exec_proc.communicate() with open(job.stdout, 'w+') as fout: - fout.write(exec_proc.stdout().read()) + fout.write(stdout_.decode()) with open(job.stderr, 'w+') as ferr: - ferr.write(exec_proc.stderr().read()) + ferr.write(stderr_.decode()) return True diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 62df76e5b5..68466ce989 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -800,16 +800,8 @@ def _exit(self): pass @abc.abstractmethod - def _runcase(self, case): - '''Run a test case.''' - def execute(self, testcases): - '''Execute the policy for a given set of testcases.''' - # Moved here the execution - for t in testcases: - self._runcase(t) - - self._exit() + '''Execute the policy for a given set of testcases and exit.''' def asyncio_run(coro): diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 86a2670bcf..6fda6b31a4 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -64,7 +64,8 @@ def __init__(self): self._num_polls = 0 self._sleep_duration = None self._t_init = None - self._jobs_pool = [] + self._t_snoozed = None + self._jobs_pool = {} def reset_snooze_time(self): self._sleep_duration = self.SLEEP_MIN @@ -72,7 +73,9 @@ def reset_snooze_time(self): async def snooze(self): if self._num_polls == 0: self._t_init = time.time() + self._t_snoozed = time.time() + t_increase_sleep = time.time() - self._t_snoozed t_elapsed = time.time() - self._t_init self._num_polls += 1 poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf @@ -81,25 +84,32 @@ async def snooze(self): f'(current poll rate: {poll_rate} polls/s)' ) await asyncio.sleep(self._sleep_duration) - self._sleep_duration = min( - self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX - ) + if t_increase_sleep > self._sleep_duration: + self._t_snoozed = time.time() + self._sleep_duration = min( + self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX + ) def is_time_to_poll(self): # We check here if it's time to poll if self._num_polls == 0: - self._t_init = time.time() + if self._t_init is None: + self._t_init = time.time() + self._t_snoozed = time.time() + self._num_polls += 1 + return True t_elapsed = time.time() - self._t_init - self._num_polls += 1 if t_elapsed >= self._sleep_duration: + self._num_polls += 1 return True else: return False def reset_time_to_poll(self): self._t_init = time.time() + self._t_snoozed = time.time() global _poll_controller @@ -124,7 +134,6 @@ def __init__(self): self.task_listeners.append(self) async def _runcase(self, case): - super()._runcase(case) check, partition, _ = case task = RegressionTask(case, self.task_listeners) if check.is_dry_run(): @@ -357,7 +366,6 @@ async def _runcase(self, case, task): # I needed to do abortall on all the tests, not only the ones # which were initiated by the execution. Exit gracefully # the execuion loop aborting all the tasks - super()._runcase(case) check, partition, _ = case # task = RegressionTask(case, self.task_listeners) if check.is_dry_run(): @@ -416,9 +424,28 @@ async def _runcase(self, case, task): await asyncio.sleep(2) self._partition_tasks[partname].add(task) await task.compile() + if task.check.build_job: + # Pick the right scheduler + if task.check.build_locally: + sched = self.local_scheduler + else: + sched = partition.scheduler + + while True: + if not self.dry_run_mode: + if getpollcontroller().is_time_to_poll(): + getpollcontroller().reset_time_to_poll() + await sched.poll(*getpollcontroller()._jobs_pool[ + sched.registered_name + ]) + + if task.compile_complete(): + break + await self._pollctl.snooze() + if task.compile_complete(): + break await task.compile_wait() self._partition_tasks[partname].remove(task) - task.compile_complete() partname = _get_partition_name(task, phase='run') max_jobs = self._max_jobs[partname] while len(self._partition_tasks[partname])+1 > max_jobs: @@ -436,12 +463,15 @@ async def _runcase(self, case, task): if not self.dry_run_mode: if getpollcontroller().is_time_to_poll(): getpollcontroller().reset_time_to_poll() - await sched.poll(*getpollcontroller()._jobs_pool) + await sched.poll(*getpollcontroller()._jobs_pool[ + sched.registered_name + ]) if task.run_complete(): break - await self._pollctl.snooze() + if task.run_complete(): + break await task.run_wait() self._partition_tasks[partname].remove(task) @@ -530,20 +560,45 @@ def on_task_setup(self, task): pass def on_task_run(self, task): - getpollcontroller()._jobs_pool.append(task.check.job) + if task.check.job: + if getpollcontroller()._jobs_pool.get( + task.check.job.scheduler.registered_name + ): + getpollcontroller()._jobs_pool[ + task.check.job.scheduler.registered_name + ].append(task.check.job) + else: + getpollcontroller()._jobs_pool[ + task.check.job.scheduler.registered_name + ] = [task.check.job] def on_task_compile(self, task): - # getpollcontroller()._jobs_pool.append(task.check.job) - # print("Add compile", task.check.job.name) - pass + if task.check.build_job: + if getpollcontroller()._jobs_pool.get( + task.check.build_job.scheduler.registered_name + ): + getpollcontroller()._jobs_pool[ + task.check.build_job.scheduler.registered_name + ].append(task.check.build_job) + else: + getpollcontroller()._jobs_pool[ + task.check.build_job.scheduler.registered_name + ] = [task.check.build_job] def on_task_exit(self, task): - getpollcontroller()._jobs_pool.remove(task.check.job) + if task.check.job: + getpollcontroller()._jobs_pool[ + task.check.job.scheduler.registered_name + ].remove(task.check.job) + getpollcontroller().reset_snooze_time() def on_task_compile_exit(self, task): - # getpollcontroller()._jobs_pool.remove(task.check.job) - # print("Remove compile", task.check.job.name) - pass + if task.check.build_job: + getpollcontroller().reset_snooze_time() + getpollcontroller()._jobs_pool[ + task.check.build_job.scheduler.registered_name + ].remove( + task.check.build_job) def on_task_skip(self, task): msg = str(task.exc_info[1]) @@ -645,7 +700,8 @@ def execute(self, testcases): self._exit() async def _execute_until_failure(self, all_cases): - """Wait for tasks to complete or fail, stopping at the first failure.""" + """Wait for tasks to complete or fail, stopping at the first failure. + """ while all_cases: done, all_cases = await asyncio.wait( all_cases, return_when=asyncio.FIRST_COMPLETED @@ -662,7 +718,8 @@ async def _cancel_gracefully(all_cases): def all_tasks(loop): - """Wrapper for asyncio.current_task() compatible with Python 3.6 and later.""" + """Wrapper for asyncio.current_task() compatible with Python 3.6 and later. + """ if sys.version_info >= (3, 7): # Use asyncio.current_task() directly in Python 3.7+ return asyncio.all_tasks(loop) diff --git a/reframe/utility/cpuinfo.py b/reframe/utility/cpuinfo.py index 1e7ea45b4a..c49787b326 100644 --- a/reframe/utility/cpuinfo.py +++ b/reframe/utility/cpuinfo.py @@ -196,8 +196,8 @@ def _sysfs_topo(): def _sysctl_topo(): try: - exec_output = osext.run_command('sysctl hw machdep.cpu', - check=True) + exec_output = osext.run_command_s('sysctl hw machdep.cpu', + check=True) except (FileNotFoundError, SpawnedProcessError): return {} diff --git a/reframe/utility/osext.py b/reframe/utility/osext.py index 76dd2cee98..a3fa65c16a 100644 --- a/reframe/utility/osext.py +++ b/reframe/utility/osext.py @@ -13,6 +13,7 @@ import getpass import grp import os +import psutil import re import semver import shlex @@ -61,16 +62,19 @@ def _check_started(self): if not self.started(): raise UnstartedProcError - def start(self): + async def start(self): '''Start the future, i.e. spawn the encapsulated command.''' args, kwargs = self._cmd_args - self._proc = run_command_async(*args, **kwargs) - - if os.getsid(self._proc.pid) == self._proc.pid: - self._session = True - else: - self._session = False + self._proc = await run_command_asyncio(*args, **kwargs) + self._session = False + if self._proc.returncode is None: + try: + p = psutil.Process(self._proc.pid) + if p.ppid() == self._proc.pid: + self._session = True + except psutil.NoSuchProcess: + pass return self @@ -106,9 +110,24 @@ def kill(self, signum): ''' self._check_started() - kill_fn = os.killpg if self.is_session() else os.kill - kill_fn(self.pid, signum) - self._signal = signum + children = [] + if self.is_session(): + p = psutil.Process(self.pid) + # Get the chilldren of the process + children = p.children(recursive=True) + try: + self._proc.send_signal(signum) + self._signal = signum + except (ProcessLookupError, PermissionError): + self.log(f'pid {self._proc.pid} already dead') + + for child in children: + try: + child.send_signal(signum) + except (ProcessLookupError, PermissionError, + psutil.NoSuchProcess): + self.log(f'child pid {child.pid} already dead') + self._completed = True def terminate(self): '''Terminate the spawned process by sending ``SIGTERM``.''' @@ -165,28 +184,22 @@ def started(self): '''Check if this future has started.''' return self._proc is not None - def _wait(self, *, nohang=False): + async def _wait(self, *, nohang=False): self._check_started() if self._completed: return True - options = os.WNOHANG if nohang else 0 - try: - pid, status = os.waitpid(self.pid, options) - except OSError as e: - if e.errno == errno.ECHILD: - self._completed = True - return self._completed - else: - raise e - - if nohang and not pid: - return False + if self._proc.returncode is None: + if nohang: + return False + elif not nohang: + await self._proc.wait() - if os.WIFEXITED(status): - self._exitcode = os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - self._signal = os.WTERMSIG(status) + # Retrieve the status of the job and return + if self._proc.returncode >= 0: + self._exitcode = self._proc.returncode + else: + self._signal = self._proc.returncode self._completed = True @@ -197,23 +210,23 @@ def _wait(self, *, nohang=False): # Start the next futures in the chain for fut, cond in self._next: if cond(self): - fut.start() + await fut.start() return self._completed - def done(self): + async def done(self): '''Check if the future has finished. This is a non-blocking call. ''' self._check_started() - return self._wait(nohang=True) + return await self._wait(nohang=True) - def wait(self): + async def wait(self): '''Wait for this future to finish.''' - self._wait() + await self._wait() - def exception(self): + async def exception(self): '''Retrieve the exception raised by this future. This is a blocking call and will wait until this future finishes. @@ -223,47 +236,40 @@ def exception(self): :func:`run_command_async2`. ''' - self._wait() + await self._wait() if not self._check: return if self._proc.returncode == 0: return + stdout, stderr = await self._proc.communicate() return SpawnedProcessError(self._proc.args, - self._proc.stdout.read(), - self._proc.stderr.read(), + stdout.decode(), + stderr.decode(), self._proc.returncode) - def stdout(self): - '''Retrieve the standard output of the spawned process. - - This is a blocking call and will wait until the future finishes. - ''' - self._wait() - return self._proc.stdout - - def stderr(self): - '''Retrieve the standard error of the spawned process. + async def communicate(self): + '''Retrieve the standard output/error of the spawned process. This is a blocking call and will wait until the future finishes. ''' - self._wait() - return self._proc.stderr + await self._wait() + return await self._proc.communicate() -def run_command(cmd, check=False, timeout=None, **kwargs): +def run_command_s(cmd, check=False, timeout=None, **kwargs): '''Run command synchronously. This function will block until the command executes or the timeout is - reached. It essentially calls :func:`run_command_async` and waits for the + reached. It essentially calls :func:`run_command_process` and waits for the command's completion. :arg cmd: The command to execute as a string or a sequence. See - :func:`run_command_async` for more details. + :func:`run_command_process` for more details. :arg check: Raise an error if the command exits with a non-zero exit code. :arg timeout: Timeout in seconds. - :arg kwargs: Keyword arguments to be passed :func:`run_command_async`. + :arg kwargs: Keyword arguments to be passed :func:`run_command_process`. :returns: A :py:class:`subprocess.CompletedProcess` object with information about the command's outcome. :raises reframe.core.exceptions.SpawnedProcessError: If ``check`` @@ -274,7 +280,7 @@ def run_command(cmd, check=False, timeout=None, **kwargs): ''' try: - proc = run_command_async(cmd, start_new_session=True, **kwargs) + proc = run_command_process(cmd, start_new_session=True, **kwargs) proc_stdout, proc_stderr = proc.communicate(timeout=timeout) except subprocess.TimeoutExpired as e: os.killpg(proc.pid, signal.SIGKILL) @@ -295,13 +301,13 @@ def run_command(cmd, check=False, timeout=None, **kwargs): return completed -def run_command_async(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=False, - log=True, - **popen_args): - '''Run command asynchronously. +def run_command_process(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, + log=True, + **popen_args): + '''Start the subprocess to run a command. A wrapper to :py:class:`subprocess.Popen` with the following tweaks: @@ -339,15 +345,26 @@ def run_command_async(cmd, **popen_args) -async def run_command_asyncio_alone(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - timeout=None, - log=True, - serial=False, - **kwargs): - '''TODO: please write proper docstring +async def run_command_asyncio(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + log=True, + **kwargs): + '''Create an asyncio subprocess to run the command. + + :arg cmd: The command to run either as a string or a sequence of arguments. + :arg stdout: Same as the corresponding argument of :py:class:`Popen`. + Default is :py:obj:`subprocess.PIPE`. + :arg stderr: Same as the corresponding argument of :py:class:`Popen`. + Default is :py:obj:`subprocess.PIPE`. + :arg shell: Same as the corresponding argument of :py:class:`Popen`. + :arg log: Log the execution of the command through ReFrame's logging + facility. + :arg kargs: Any additional arguments to be passed to + :py:class:`asyncio.subprocess.Popen`. + :returns: A new :py:class:`Popen` object. + ''' if log: from reframe.core.logging import getlogger @@ -358,49 +375,46 @@ async def run_command_asyncio_alone(cmd, if shell: # Call create_subprocess_shell - return await asyncio.create_subprocess_shell( + process = await asyncio.create_subprocess_shell( cmd, stdout=stdout, stderr=stderr, **kwargs ) + process.args = cmd else: # Call create_subprocess_exec - return await asyncio.create_subprocess_exec( + process = await asyncio.create_subprocess_exec( cmd, stdout=stdout, stderr=stderr, **kwargs ) + process.args = cmd -async def run_command_asyncio(cmd, - check=False, - timeout=None, - shell=True, - log=True, - **kwargs): - '''TODO: please write proper docstring - ''' - if log: - from reframe.core.logging import getlogger - getlogger().debug(f'[CMD] {cmd!r}') + return process - if isinstance(cmd, str) and not shell: - cmd = shlex.split(cmd) +async def run_command(cmd, check=False, timeout=None, **kwargs): + '''Run command synchronously. + + This function will block until the command executes or the timeout is + reached. It essentially calls :func:`run_command_asyncio` and waits for the + command's completion. + + :arg cmd: The command to execute as a string or a sequence. See + :func:`run_command_asyncio` for more details. + :arg check: Raise an error if the command exits with a non-zero exit code. + :arg timeout: Timeout in seconds. + :arg kwargs: Keyword arguments to be passed :func:`run_command_asyncio`. + :returns: A :py:class:`subprocess.CompletedProcess` object with + information about the command's outcome. + :raises reframe.core.exceptions.SpawnedProcessError: If ``check`` + is :class:`True` and the command fails. + :raises reframe.core.exceptions.SpawnedProcessTimeout: If the command + times out. + + ''' try: - if shell: - # Call create_subprocess_shell - proc = await asyncio.create_subprocess_shell( - cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs - ) - else: - # Call create_subprocess_exec - proc = await asyncio.create_subprocess_exec( - cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs - ) + proc = await run_command_asyncio(cmd, start_new_session=True, **kwargs) proc_stdout, proc_stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout ) @@ -431,11 +445,11 @@ def run_command_async2(*args, check=False, **kwargs): started. :arg args: Any of the arguments that can be passed to - :func:`run_command_async` + :func:`run_command_process` :arg check: If true, flag the future with a :func:`SpawnedProcessError` exception, upon failure. :arg kwargs: Any of the keyword arguments that can be passed to - :func:`run_command_async`. + :func:`run_command_process`. .. versionadded:: 4.4 @@ -801,8 +815,8 @@ def git_repo_exists(url, timeout=5): ''' try: os.environ['GIT_TERMINAL_PROMPT'] = '0' - run_command('git ls-remote -h %s' % url, check=True, - timeout=timeout) + run_command_s('git ls-remote -h %s' % url, check=True, + timeout=timeout) except (SpawnedProcessTimeout, SpawnedProcessError): return False else: @@ -827,8 +841,8 @@ def git_repo_hash(commit='HEAD', short=True, wd='.'): try: # Do not log this command, since we need to call this function # from the logger - completed = run_command(f'git -C {wd} rev-parse {commit}', - check=True, log=False) + completed = run_command_s(f'git -C {wd} rev-parse {commit}', + check=True, log=False) except (SpawnedProcessError, FileNotFoundError): return None @@ -872,7 +886,7 @@ def expandvars(s): cmd = cmd_subst_m.groups()[0] or cmd_subst_m.groups()[1] # We need shell=True to support nested expansion - completed = run_command(cmd, check=True, shell=True) + completed = run_command_s(cmd, check=True, shell=True) # Prepare stdout for inline use stdout = completed.stdout.replace('\n', ' ').strip() diff --git a/unittests/test_pipeline.py b/unittests/test_pipeline.py index cd19af6af0..4cf7477e87 100644 --- a/unittests/test_pipeline.py +++ b/unittests/test_pipeline.py @@ -1867,7 +1867,7 @@ def assert_os_release(self): def _cray_cle_version(): - completed = osext.run_command('cat /etc/opt/cray/release/cle-release') + completed = osext.run_command_s('cat /etc/opt/cray/release/cle-release') matched = re.match(r'^RELEASE=(\S+)', completed.stdout) if matched is None: return None diff --git a/unittests/test_shell.py b/unittests/test_shell.py index 1b5b5c8c47..193f69abb4 100644 --- a/unittests/test_shell.py +++ b/unittests/test_shell.py @@ -94,7 +94,7 @@ def test_trap_error(script_file): gen.write('echo hello') with pytest.raises(SpawnedProcessError) as cm: - osext.run_command(str(script_file), check=True) + osext.run_command_s(str(script_file), check=True) exc = cm.value assert 'hello' not in exc.stdout @@ -106,7 +106,7 @@ def test_trap_exit(script_file): with shell.generate_script(script_file, trap_exit=True) as gen: gen.write('echo hello') - completed = osext.run_command(str(script_file), check=True) + completed = osext.run_command_s(str(script_file), check=True) assert 'hello' in completed.stdout assert 0 == completed.returncode assert '-reframe: script exiting with exit code: 0' in completed.stdout @@ -118,8 +118,8 @@ def test_trap_signal(script_file): gen.write('echo hello') f_stdout = tempfile.NamedTemporaryFile(mode='w+', delete=False) - proc = osext.run_command_async(str(script_file), stdout=f_stdout, - start_new_session=True) + proc = osext.run_command_process(str(script_file), stdout=f_stdout, + start_new_session=True) # Yield for some time to allow the script to start time.sleep(1) diff --git a/unittests/test_utility.py b/unittests/test_utility.py index 2cb5ac462e..488b1dd917 100644 --- a/unittests/test_utility.py +++ b/unittests/test_utility.py @@ -25,13 +25,13 @@ def test_command_success(): - completed = osext.run_command('echo foobar') + completed = osext.run_command_s('echo foobar') assert completed.returncode == 0 assert completed.stdout == 'foobar\n' def test_command_success_cmd_seq(): - completed = osext.run_command(['echo', 'foobar']) + completed = osext.run_command_s(['echo', 'foobar']) assert completed.returncode == 0 assert completed.stdout == 'foobar\n' @@ -39,13 +39,13 @@ def test_command_success_cmd_seq(): def test_command_error(): with pytest.raises(SpawnedProcessError, match=r"command 'false' failed with exit code 1"): - osext.run_command('false', check=True) + osext.run_command_s('false', check=True) def test_command_error_cmd_seq(): with pytest.raises(SpawnedProcessError, match=r"command 'false' failed with exit code 1"): - osext.run_command(['false'], check=True) + osext.run_command_s(['false'], check=True) def test_command_timeout(): @@ -53,7 +53,7 @@ def test_command_timeout(): SpawnedProcessTimeout, match=r"command 'sleep 3' timed out " r'after 2s') as exc_info: - osext.run_command('sleep 3', timeout=2) + osext.run_command_s('sleep 3', timeout=2) assert exc_info.value.timeout == 2 @@ -66,7 +66,7 @@ def test_command_stdin(tmp_path): fp.write('hello') with open(tmp_path / 'in.txt') as fp: - completed = osext.run_command('cat', stdin=fp) + completed = osext.run_command_s('cat', stdin=fp) assert completed.stdout == 'hello' @@ -74,7 +74,7 @@ def test_command_stdin(tmp_path): def test_command_async(): t_launch = time.time() t_sleep = t_launch - proc = osext.run_command_async('sleep 1') + proc = osext.run_command_process('sleep 1') t_launch = time.time() - t_launch proc.wait() @@ -90,7 +90,7 @@ def test_command_futures(): # Check that some operations cannot be performed on an unstarted future with pytest.raises(osext.UnstartedProcError): - proc.done() + test_util.asyncio_run(proc.done) with pytest.raises(osext.UnstartedProcError): proc.cancel() @@ -99,10 +99,10 @@ def test_command_futures(): proc.terminate() with pytest.raises(osext.UnstartedProcError): - proc.wait() + test_util.asyncio_run(proc.wait) assert not proc.started() - proc.start() + test_util.asyncio_run(proc.start) assert proc.started() assert proc.pid is not None @@ -110,17 +110,18 @@ def test_command_futures(): assert not proc.is_session() # stdout must block - assert proc.stdout().read() == 'hello\n' + stdout, stderr = test_util.asyncio_run(proc.communicate) + assert stdout.decode() == 'hello\n' assert proc.exitcode == 0 assert proc.signal is None # Additional wait() should have no effect - proc.wait() - proc.wait() + test_util.asyncio_run(proc.wait) + test_util.asyncio_run(proc.wait) - assert proc.done() + assert test_util.asyncio_run(proc.done) assert not proc.cancelled() - assert proc.exception() is None + assert test_util.asyncio_run(proc.exception) is None def test_command_futures_callbacks(): @@ -135,13 +136,13 @@ def _callback(_): with pytest.raises(ValueError): proc.add_done_callback(lambda: 1) - proc.start() - while not proc.done(): + test_util.asyncio_run(proc.start) + while not test_util.asyncio_run(proc.done): pass # Call explicitly more times - proc.done() - proc.done() + test_util.asyncio_run(proc.done) + test_util.asyncio_run(proc.done) assert num_called == 1 @@ -152,13 +153,14 @@ def _checked_cmd(request): def test_command_futures_error(_checked_cmd): proc = osext.run_command_async2("false", shell=True, check=_checked_cmd) - proc.start() + test_util.asyncio_run(proc.start) # exception() blocks until the process is finished if _checked_cmd: - assert isinstance(proc.exception(), SpawnedProcessError) + assert isinstance(test_util.asyncio_run(proc.exception), + SpawnedProcessError) else: - assert proc.exception() is None + assert test_util.asyncio_run(proc.exception) is None assert proc.exitcode == 1 assert proc.signal is None @@ -178,7 +180,7 @@ def _signal(request): def test_command_futures_signal(_checked_cmd, _signal): proc = osext.run_command_async2('sleep 3', shell=True, check=_checked_cmd) - proc.start() + test_util.asyncio_run(proc.start) if _signal == signal.SIGTERM: proc.terminate() elif _signal == signal.SIGKILL: @@ -186,8 +188,8 @@ def test_command_futures_signal(_checked_cmd, _signal): else: proc.kill(_signal) - proc.wait() - assert proc.done() + test_util.asyncio_run(proc.wait) + assert test_util.asyncio_run(proc.done) if _signal == signal.SIGKILL: assert proc.cancelled() else: @@ -196,9 +198,10 @@ def test_command_futures_signal(_checked_cmd, _signal): assert proc.signal == _signal assert proc.exitcode is None if _checked_cmd: - assert isinstance(proc.exception(), SpawnedProcessError) + assert isinstance(test_util.asyncio_run(proc.exception), + SpawnedProcessError) else: - assert proc.exception() is None + assert test_util.asyncio_run(proc.exception) is None def test_command_futures_chain(tmp_path): @@ -211,13 +214,14 @@ def test_command_futures_chain(tmp_path): proc0.then(proc2).then(proc3) all_procs = [proc0, proc1, proc2, proc3] t_start = time.time() - proc0.start() - while not all(p.done() for p in all_procs if p.started()): + test_util.asyncio_run(proc0.start) + while not all(test_util.asyncio_run(p.done) for p in all_procs + if p.started()): pass t_elapsed = time.time() - t_start assert t_elapsed < 2 - assert all(p.done() for p in all_procs) + assert all(test_util.asyncio_run(p.done) for p in all_procs) with open(tmp_path / 'stdout.txt') as fp: assert fp.read() == 'hello\nworld\n' @@ -244,13 +248,13 @@ def cond(proc): with pytest.raises(ValueError): proc0.then(proc1, when=lambda: False) - proc0.start() - proc0.wait() - proc1.wait() + test_util.asyncio_run(proc0.start) + test_util.asyncio_run(proc0.wait) + test_util.asyncio_run(proc1.wait) if _chain_policy == 'fail_on_error': assert not proc2.started() else: - proc2.wait() + test_util.asyncio_run(proc2.wait) with open(tmp_path / 'stdout.txt') as fp: if _chain_policy == 'fail_on_error': @@ -264,8 +268,8 @@ def test_command_futures_chain_cancel(): proc1 = osext.run_command_async2('sleep 1', shell=True) proc2 = osext.run_command_async2('echo world', shell=True) proc0.then(proc1).then(proc2) - proc0.start() - while not proc0.done(): + test_util.asyncio_run(proc0.start) + while not test_util.asyncio_run(proc0.done): pass assert proc1.started() @@ -437,12 +441,12 @@ def test_is_url(): @pytest.fixture def git_only(): try: - osext.run_command('git --version', check=True, log=False) + osext.run_command_s('git --version', check=True, log=False) except (SpawnedProcessError, FileNotFoundError): pytest.skip('no git installation found on system') try: - osext.run_command('git status', check=True, log=False) + osext.run_command_s('git status', check=True, log=False) except (SpawnedProcessError, FileNotFoundError): pytest.skip('not inside a git repository')