Skip to content

Commit

Permalink
Change asynchronous policy to use asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
Blanca-Fuentes committed Jan 15, 2025
1 parent 3505af3 commit 001644b
Show file tree
Hide file tree
Showing 20 changed files with 304 additions and 240 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
<<<<<<< HEAD
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
=======
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
>>>>>>> upstream/develop
steps:
- uses: actions/checkout@v4
- name: Setup up Python ${{ matrix.python-version }}
Expand All @@ -165,11 +161,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
<<<<<<< HEAD
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
=======
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
>>>>>>> upstream/develop
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
archspec==0.2.5
docutils==0.18.1
jsonschema==3.2.0
psutil
semver==2.13.0; python_version == '3.6'
semver==3.0.2; python_version >= '3.7'
Sphinx==5.3.0; python_version < '3.8'
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def _py_meth(m):

def _sh_meth(m):
def _fn():
completed = osext.run_command(m, check=True)
completed = osext.run_command_s(m, check=True)
return completed.stdout.strip()

return _fn
Expand Down Expand Up @@ -429,7 +429,7 @@ def _detect_system(self):
'the `--system` option')

getlogger().debug(f'Retrieved hostname: {hostname!r}')
getlogger().debug(f'Looking for a matching configuration entry')
getlogger().debug('Looking for a matching configuration entry')
for system in self._site_config['systems']:
for patt in system['hostnames']:
if re.match(patt, hostname):
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/launchers/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self):
self.options = []
self.use_cpus_per_task = True
try:
out = osext.run_command('srun --version')
out = osext.run_command_s('srun --version')
match = re.search(r'slurm(-wlm)? (\d+)\.(\d+)\.(\d+)', out.stdout)
if match:
# We cannot pass to semver strings like 22.05.1 directly
Expand Down
6 changes: 3 additions & 3 deletions reframe/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,6 @@ def adjust_verbosity(self, num_steps):
global tasks_loggers
tasks_loggers = {}

global _global_logger
_global_logger = null_logger


Expand All @@ -974,10 +973,11 @@ def __init__(self, check=None, level=DEBUG):
self._orig_logger = _global_logger

self._level = level
self._context_logger = _global_logger
if check is not None:
self._context_logger = LoggerAdapter(_logger, check)
self._context_logger.colorize = self._orig_logger.colorize
else:
self._context_logger = _global_logger

if task:
tasks_loggers[task] = self._context_logger
Expand All @@ -988,10 +988,10 @@ def __enter__(self):
return self._context_logger

def __exit__(self, exc_type, exc_value, traceback):
global _global_logger
try:
task = current_task()
except RuntimeError:
global _global_logger
task = None

# Log any exceptions thrown with the current context logger
Expand Down
24 changes: 12 additions & 12 deletions reframe/core/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def __init__(self):
def _do_validate(self):
# Try to figure out if we are indeed using the TCL version
try:
completed = osext.run_command('modulecmd -V')
completed = osext.run_command_s('modulecmd -V')
except OSError as e:
raise ConfigError(
'could not find a sane TMod installation') from e
Expand Down Expand Up @@ -626,7 +626,7 @@ def _do_validate(self):
self._version = version
try:
# Try the Python bindings now
completed = osext.run_command(self.modulecmd())
completed = osext.run_command_s(self.modulecmd())
except OSError as e:
raise ConfigError(
f'could not get the Python bindings for TMod: {e}'
Expand All @@ -653,7 +653,7 @@ def _execute(self, cmd, *args):
self._do_validate()

modulecmd = self.modulecmd(cmd, *args)
completed = osext.run_command(modulecmd)
completed = osext.run_command_s(modulecmd)
if re.search(r'\bERROR\b', completed.stderr) is not None:
raise SpawnedProcessError(modulecmd,
completed.stdout,
Expand Down Expand Up @@ -746,7 +746,7 @@ def _do_validate(self):
try:
modulecmd = os.getenv('MODULESHOME')
modulecmd = os.path.join(modulecmd, 'modulecmd.tcl')
completed = osext.run_command(modulecmd)
completed = osext.run_command_s(modulecmd)
except OSError as e:
raise ConfigError(
f'could not find a sane TMod31 installation: {e}'
Expand Down Expand Up @@ -776,7 +776,7 @@ def _do_validate(self):
self._command = f'{modulecmd} python'
try:
# Try the Python bindings now
completed = osext.run_command(self._command)
completed = osext.run_command_s(self._command)
except OSError as e:
raise ConfigError(
f'could not get the Python bindings for TMod31: {e}'
Expand All @@ -800,7 +800,7 @@ def _execute(self, cmd, *args):
self._do_validate()

modulecmd = self.modulecmd(cmd, *args)
completed = osext.run_command(modulecmd)
completed = osext.run_command_s(modulecmd)
if re.search(r'\bERROR\b', completed.stderr) is not None:
raise SpawnedProcessError(modulecmd,
completed.stdout,
Expand Down Expand Up @@ -833,7 +833,7 @@ def __init__(self):

def _do_validate(self):
try:
completed = osext.run_command(self.modulecmd('-V'), check=True)
completed = osext.run_command_s(self.modulecmd('-V'), check=True)
except OSError as e:
raise ConfigError(
'could not find a sane TMod4 installation'
Expand Down Expand Up @@ -877,7 +877,7 @@ def _execute(self, cmd, *args):
self._do_validate()

modulecmd = self.modulecmd(cmd, *args)
completed = osext.run_command(modulecmd, check=False)
completed = osext.run_command_s(modulecmd, check=False)
namespace = {}
exec(self.process(completed.stdout), {}, namespace)

Expand Down Expand Up @@ -977,7 +977,7 @@ def _do_validate(self):
'environment variable LMOD_CMD is not defined')

try:
completed = osext.run_command(f'{self._lmod_cmd} --version')
completed = osext.run_command_s(f'{self._lmod_cmd} --version')
except OSError as e:
raise ConfigError(f'could not find a sane Lmod installation: {e}')

Expand All @@ -989,7 +989,7 @@ def _do_validate(self):
self._version = version_match.group(1)
try:
# Try the Python bindings now
completed = osext.run_command(self.modulecmd())
completed = osext.run_command_s(self.modulecmd())
except OSError as e:
raise ConfigError(
f'could not get the Python bindings for Lmod: {e}'
Expand Down Expand Up @@ -1159,7 +1159,7 @@ def __init__(self):
def _do_validate(self):
# Try to figure out if we are indeed using the TCL version
try:
completed = osext.run_command('spack -V')
completed = osext.run_command_s('spack -V')
except OSError as e:
raise ConfigError(
'could not find a sane Spack installation'
Expand All @@ -1182,7 +1182,7 @@ def _execute(self, cmd, *args):
self._do_validate()

modulecmd = self.modulecmd(cmd, *args)
completed = osext.run_command(modulecmd, check=True)
completed = osext.run_command_s(modulecmd, check=True)
return completed.stdout

def available_modules(self, substr):
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def submit(self, job):
# The new process starts also a new session (session leader), so that
# we can later kill any other processes that this might spawn by just
# killing this one.
proc = await osext.run_command_asyncio_alone(
proc = await osext.run_command_asyncio(
os.path.abspath(job.script_filename),
stdout=f_stdout,
stderr=f_stderr,
Expand Down Expand Up @@ -200,7 +200,7 @@ async def poll(self, *jobs):
await self._poll_job(job)

async def _poll_job(self, job):
if job is None or job.jobid is None:
if (job is None or job.jobid is None or job.finished()):
return

if job.cancel_time:
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from reframe.core.schedulers.pbs import PbsJobScheduler

# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
_run_strict = functools.partial(osext.run_command, check=True)


@register_scheduler('lsf')
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/schedulers/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def oar_state_pending(state):


# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
_run_strict = functools.partial(osext.run_command, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)
_run_strict_s = functools.partial(osext.run_command_s, check=True)


@register_scheduler('oar')
Expand Down
8 changes: 4 additions & 4 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@


# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
_run_strict = functools.partial(osext.run_command, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)
_run_strict_s = functools.partial(osext.run_command_s, check=True)


JOB_STATES = {
Expand Down Expand Up @@ -199,7 +199,7 @@ def _query_exit_code(self, job):
'''Try to retrieve the exit code of a past job.'''

# With PBS Pro we can obtain the exit status of a past job
extended_info = osext.run_command(f'qstat -xf {job.jobid}')
extended_info = osext.run_command_s(f'qstat -xf {job.jobid}')
exit_status_match = re.search(
r'^ *Exit_status *= *(?P<exit_status>\d+)', extended_info.stdout,
flags=re.MULTILINE,
Expand All @@ -224,7 +224,7 @@ def output_ready(job):
if not jobs:
return

completed = await osext.run_command_asyncio(
completed = await osext.run_command(
f'qstat -f {" ".join(job.jobid for job in jobs)}'
)

Expand Down
6 changes: 3 additions & 3 deletions reframe/core/schedulers/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from reframe.utility import seconds_to_hms

# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
_run_strict = functools.partial(osext.run_command, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)
_run_strict_s = functools.partial(osext.run_command_s, check=True)


@register_scheduler('sge')
Expand Down Expand Up @@ -78,7 +78,7 @@ async def poll(self, *jobs):
return

user = osext.osuser()
completed = await osext.run_command_asyncio(f'qstat -xml -u {user}')
completed = await osext.run_command(f'qstat -xml -u {user}')
if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
Expand Down
16 changes: 8 additions & 8 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def slurm_state_pending(state):


# Asynchronous _run_strict
_run_strict = functools.partial(osext.run_command_asyncio, check=True)
_run_strict = functools.partial(osext.run_command, check=True)
# Synchronous _run_strict
_run_strict_s = functools.partial(osext.run_command, check=True)
_run_strict_s = functools.partial(osext.run_command_s, check=True)


class _SlurmJob(sched.Job):
Expand All @@ -85,7 +85,7 @@ def __init__(self, *args, **kwargs):
def nodelist(self):
# Generate the nodelist only after the job is finished
if slurm_state_completed(self.state):
completed = osext.run_command(
completed = osext.run_command_s(
f'scontrol show hostname {self._nodespec}', log=False
)
self._nodelist = completed.stdout.splitlines()
Expand Down Expand Up @@ -327,7 +327,7 @@ def _get_actual_partition(self, options):
if partition_match:
return partition_match.group('partition')

except SpawnedProcessError as e:
except SpawnedProcessError:
self.log('could not retrieve actual partition')

return None
Expand Down Expand Up @@ -424,8 +424,8 @@ def _get_reservation_nodes(self, reservation):
return _create_nodes(node_descriptions)

def _get_nodes_by_name(self, nodespec):
completed = osext.run_command('scontrol -a show -o node %s' %
nodespec)
completed = osext.run_command_s('scontrol -a show -o node %s' %
nodespec)
node_descriptions = completed.stdout.splitlines()
return _create_nodes(node_descriptions)

Expand Down Expand Up @@ -523,7 +523,7 @@ async def _cancel_if_blocked(self, job, reasons=None):
return

if not reasons:
completed = await osext.run_command_asyncio(
completed = await osext.run_command(
'squeue -h -j %s -o %%r' % job.jobid
)
reasons = completed.stdout.splitlines()
Expand Down Expand Up @@ -628,7 +628,7 @@ async def poll(self, *jobs):
# We don't run the command with check=True, because if the job has
# finished already, squeue might return an error about an invalid
# job id.
completed = await osext.run_command_asyncio(
completed = await osext.run_command(
f'squeue -h -j {",".join(job.jobid for job in jobs)} '
f'-o "%%i|%%T|%%N|%%r"'
)
Expand Down
Loading

0 comments on commit 001644b

Please sign in to comment.