From c4cf710aee1245c2eb9417b275353ee419cb7727 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 7 Nov 2023 13:23:50 +0100 Subject: [PATCH 01/15] Add basic firecrest implementation --- .gitignore | 15 ++ reframe/core/schedulers/slurm.py | 255 +++++++++++++++++++++++++++++++ requirements.txt | 1 + 3 files changed, 271 insertions(+) diff --git a/.gitignore b/.gitignore index 4eaed2111c..2af478f870 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,18 @@ ENV/ # Vim temp files *.swp + +docs/doctrees +.vscode +output +stage +external +xml_files +json_files +.venv_reframe +async_build_stats +plot_bars.py +plot_pipeline* +run_test_reframe.sh +send_reframe.sh +my_checks \ No newline at end of file diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 36a3f59195..0039c09de0 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -3,9 +3,11 @@ # # SPDX-License-Identifier: BSD-3-Clause +import firecrest as fc import functools import glob import itertools +import os import re import shlex import time @@ -64,6 +66,11 @@ def slurm_state_pending(state): return False +def join_and_normalize(*args): + joined_path = os.path.join(*args) + normalized_path = os.path.normpath(joined_path) + return normalized_path + _run_strict = functools.partial(osext.run_command, check=True) @@ -97,6 +104,35 @@ def is_cancelling(self): return self._is_cancelling +class _SlurmFirecrestJob(sched.Job): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._is_array = False + self._is_cancelling = False + + # FIXME get this dynamically + self._remotedir_prefix = '/scratch/snx3000/eirinik/rfc_remote' + self._remotedir = None + self._localdir = None + + self._local_filetimestamps = {} + self._remote_filetimestamps = {} + self._first_submission = True + + # The compacted nodelist as reported by Slurm. This must be updated in + # every poll as Slurm may be slow in reporting the exact nodelist + self._nodespec = None + self._stage_prefix = rt.runtime().stage_prefix + + @property + def is_array(self): + return self._is_array + + @property + def is_cancelling(self): + return self._is_cancelling + + @register_scheduler('slurm') class SlurmJobScheduler(sched.JobScheduler): # In some systems, scheduler performance is sensitive to the squeue poll @@ -692,3 +728,222 @@ def _extract_attribute(self, attr_name, node_descr, sep=None): def __str__(self): return self._name + +@register_scheduler('slurmfc') +class SlurmFirecrestJobScheduler(SlurmJobScheduler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # FIXME set these in a better way + client_id = os.environ.get("FIRECREST_CLIENT_ID") + client_secret = os.environ.get("FIRECREST_CLIENT_SECRET") + token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token" + + # Setup the client for the specific account + self.client = fc.Firecrest( + firecrest_url="https://firecrest.cscs.ch", + authorization=fc.ClientCredentialsAuth(client_id, client_secret, token_uri) + ) + self._system_name = 'daint' + + def make_job(self, *args, **kwargs): + return _SlurmFirecrestJob(*args, **kwargs) + + def _push_artefacts(self, job): + for dirpath, dirnames, filenames in os.walk('.'): + for d in dirnames: + new_dir = join_and_normalize(job._remotedir, dirpath, d) + self.log(f'Creating remote directory {new_dir}') + self.client.mkdir(self._system_name, new_dir, p=True) + + remote_dir_path = join_and_normalize(job._remotedir, dirpath) + for f in filenames: + local_norm_path = join_and_normalize(job._localdir, dirpath, f) + modification_time = os.path.getmtime(local_norm_path) + if job._local_filetimestamps.get(local_norm_path) != modification_time: + job._local_filetimestamps[local_norm_path] = modification_time + + self.log(f'Uploading file {f} in {join_and_normalize(job._remotedir, dirpath)}') + self.client.simple_upload( + self._system_name, + local_norm_path, + remote_dir_path + ) + + # Update timestamps for remote directory + remote_files = self.client.list_files( + self._system_name, + remote_dir_path, + show_hidden=True + ) + for f in remote_files: + local_norm_path = join_and_normalize(remote_dir_path, f['name']) + job._remote_filetimestamps[local_norm_path] = f['last_modified'] + + + + def _pull_artefacts(self, job): + def firecrest_walk(directory): + contents = self.client.list_files(self._system_name, directory) + + dirs = [] + nondirs = [] + + for item in contents: + if item['type'] == 'd': + dirs.append(item['name']) + else: + nondirs.append((item['name'], item["last_modified"])) + + yield directory, dirs, nondirs + + for item in dirs: + item_path = f"{directory}/{item['name']}" + yield from firecrest_walk(item_path) + + for dirpath, dirnames, files in firecrest_walk(job._remotedir): + local_dirpath = join_and_normalize( + job._localdir, + os.path.relpath( + dirpath, + job._remotedir + ) + ) + for d in dirnames: + new_dir = join_and_normalize(local_dirpath, d) + self.log(f'Creating local directory {new_dir}') + os.makedirs(new_dir) + + for (f, modification_time) in files: + norm_path = join_and_normalize(dirpath, f) + local_file_path = join_and_normalize(local_dirpath, f) + if job._remote_filetimestamps.get(norm_path) != modification_time: + self.log(f'Downloading file {f} in {local_dirpath}') + self.client.simple_download( + self._system_name, + norm_path, + local_file_path + ) + + job._remote_filetimestamps[norm_path] = modification_time + + job._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path) + + + def submit(self, job): + job._localdir = os.getcwd() + job._remotedir = os.path.join( + job._remotedir_prefix, + os.path.relpath(os.getcwd(), job._stage_prefix) + ) + + if job._first_submission: + # Create clean stage directory in the remote system + try: + self.client.simple_delete(self._system_name, job._remotedir) + except fc.HeaderException: + # The delete request will raise an exception if it doesn't exist + pass + + self.client.mkdir(self._system_name, job._remotedir, p=True) + job._first_submission = False + + self._push_artefacts(job) + + intervals = itertools.cycle([1, 2, 3]) + while True: + try: + # Make request for submission + submission_result = self.client.submit( + self._system_name, + os.path.join(job._remotedir, job.script_filename), + local_file=False + ) + break + except fc.FirecrestException as e: + stderr = e.responses[-1].json().get('error', '') + error_match = re.search( + rf'({"|".join(self._resubmit_on_errors)})', stderr + ) + if not self._resubmit_on_errors or not error_match: + raise + + t = next(intervals) + self.log( + f'encountered a job submission error: ' + f'{error_match.group(1)}: will resubmit after {t}s' + ) + time.sleep(t) + + job._jobid = str(submission_result['jobid']) + job._submit_time = time.time() + + def allnodes(self): + raise NotImplementedError('firecrest slurm backend does not support node listing') + + def filternodes(self, job, nodes): + raise NotImplementedError( + 'firecrest slurm backend does not support node filtering' + ) + + def poll(self, *jobs): + '''Update the status of the jobs.''' + + if jobs: + # Filter out non-jobs + jobs = [job for job in jobs if job is not None] + + if not jobs: + return + + poll_results = self.client.poll( + self._system_name, [job.jobid for job in jobs] + ) + job_info = {} + for r in poll_results: + # Take into account both job arrays and heterogeneous jobs + jobid = re.split(r'_|\+', r['jobid'])[0] + job_info.setdefault(jobid, []).append(r) + + for job in jobs: + try: + jobarr_info = job_info[job.jobid] + except KeyError: + continue + + # Join the states with ',' in case of job arrays|heterogeneous jobs + job._state = ','.join(m['state'] for m in jobarr_info) + + self._cancel_if_pending_too_long(job) + if slurm_state_completed(job.state): + # Since Slurm exitcodes are positive take the maximum one + job._exitcode = max( + int(m['exit_code'].split(":")[0]) for m in jobarr_info + ) + + # Use ',' to join nodes to be consistent with Slurm syntax + job._nodespec = ','.join(m['nodelist'] for m in jobarr_info) + # self._update_completion_time( + # job, (m.group('end') for m in jobarr_info) + # ) + + def wait(self, job): + # Quickly return in case we have finished already + self._pull_artefacts(job) + if self.finished(job): + if job.is_array: + self._merge_files(job) + + return + + intervals = itertools.cycle([1, 2, 3]) + while not self.finished(job): + self.poll(job) + time.sleep(next(intervals)) + + self._pull_artefacts(job) + if job.is_array: + self._merge_files(job) + + def cancel(self, job): + self.client.cancel(job.system_name, job.jobid) + job._is_cancelling = True diff --git a/requirements.txt b/requirements.txt index 34b692acdf..fb6a8bd55a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ setuptools==59.6.0; python_version == '3.6' setuptools==67.4.0; python_version >= '3.7' wcwidth==0.2.6 #+pygelf%pygelf==0.4.0 +pyfirecrest==2.0.0 From 88c5095c2c573400c52ab23016d26e48a4c7a298 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 7 Nov 2023 13:28:31 +0100 Subject: [PATCH 02/15] Revert gitignore --- .gitignore | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/.gitignore b/.gitignore index 2af478f870..4eaed2111c 100644 --- a/.gitignore +++ b/.gitignore @@ -99,18 +99,3 @@ ENV/ # Vim temp files *.swp - -docs/doctrees -.vscode -output -stage -external -xml_files -json_files -.venv_reframe -async_build_stats -plot_bars.py -plot_pipeline* -run_test_reframe.sh -send_reframe.sh -my_checks \ No newline at end of file From 2cf0ac47fb9de90a2a92984a0ca53c4819bb0823 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 14 Nov 2023 13:26:44 +0100 Subject: [PATCH 03/15] Fix the sync of files for the firecrest scheduler --- reframe/core/pipeline.py | 15 +++++++------- reframe/core/schedulers/__init__.py | 5 ++++- reframe/core/schedulers/slurm.py | 31 +++++++++++------------------ 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index ac08e2fc78..a9dd6254d7 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1665,7 +1665,7 @@ def _setup_paths(self): except OSError as e: raise PipelineError('failed to set up paths') from e - def _create_job(self, job_type, force_local=False, **job_opts): + def _create_job(self, job_type, force_local=False, clean_up_stage=False, **job_opts): '''Setup the job related to this check.''' if force_local: @@ -1692,14 +1692,15 @@ def _create_job(self, job_type, force_local=False, **job_opts): script_filename=script_name, workdir=self._stagedir, sched_access=self._current_partition.access, + clean_up_stage=clean_up_stage, **job_opts) - def _setup_build_job(self, **job_opts): + def _setup_build_job(self, clean_up_stage=False, **job_opts): self._build_job = self._create_job( - 'build', self.local or self.build_locally, **job_opts + 'build', self.local or self.build_locally, clean_up_stage, **job_opts ) - def _setup_run_job(self, **job_opts): + def _setup_run_job(self, clean_up_stage=False, **job_opts): self._job = self._create_job(f'run', self.local, **job_opts) def _setup_container_platform(self): @@ -1743,7 +1744,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_build_job(**job_opts) + self._setup_build_job(clean_up_stage=True, **job_opts) self._setup_run_job(**job_opts) self._setup_container_platform() self._resolve_fixtures() @@ -2559,7 +2560,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_run_job(**job_opts) + self._setup_run_job(clean_up_stage=True, **job_opts) self._setup_container_platform() self._resolve_fixtures() @@ -2616,7 +2617,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_build_job(**job_opts) + self._setup_build_job(clean_up_stage=True, **job_opts) self._setup_container_platform() self._resolve_fixtures() diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index a5daee11f2..fec19dc723 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -326,7 +326,8 @@ def __init__(self, stderr=None, sched_flex_alloc_nodes=None, sched_access=[], - sched_options=None): + sched_options=None, + clean_up_stage=False): self._cli_options = list(sched_options) if sched_options else [] self._name = name @@ -354,6 +355,8 @@ def __init__(self, # in finished() self._exception = None + self._clean_up_stage = clean_up_stage + @classmethod def create(cls, scheduler, launcher, *args, **kwargs): ret = scheduler.make_job(*args, **kwargs) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 0039c09de0..fdb16bd502 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -115,9 +115,6 @@ def __init__(self, *args, **kwargs): self._remotedir = None self._localdir = None - self._local_filetimestamps = {} - self._remote_filetimestamps = {} - self._first_submission = True # The compacted nodelist as reported by Slurm. This must be updated in # every poll as Slurm may be slow in reporting the exact nodelist @@ -745,6 +742,9 @@ def __init__(self, *args, **kwargs): ) self._system_name = 'daint' + self._local_filetimestamps = {} + self._remote_filetimestamps = {} + def make_job(self, *args, **kwargs): return _SlurmFirecrestJob(*args, **kwargs) @@ -759,9 +759,8 @@ def _push_artefacts(self, job): for f in filenames: local_norm_path = join_and_normalize(job._localdir, dirpath, f) modification_time = os.path.getmtime(local_norm_path) - if job._local_filetimestamps.get(local_norm_path) != modification_time: - job._local_filetimestamps[local_norm_path] = modification_time - + if self._local_filetimestamps.get(local_norm_path) != modification_time: + self._local_filetimestamps[local_norm_path] = modification_time self.log(f'Uploading file {f} in {join_and_normalize(job._remotedir, dirpath)}') self.client.simple_upload( self._system_name, @@ -777,9 +776,7 @@ def _push_artefacts(self, job): ) for f in remote_files: local_norm_path = join_and_normalize(remote_dir_path, f['name']) - job._remote_filetimestamps[local_norm_path] = f['last_modified'] - - + self._remote_filetimestamps[local_norm_path] = f['last_modified'] def _pull_artefacts(self, job): def firecrest_walk(directory): @@ -816,7 +813,7 @@ def firecrest_walk(directory): for (f, modification_time) in files: norm_path = join_and_normalize(dirpath, f) local_file_path = join_and_normalize(local_dirpath, f) - if job._remote_filetimestamps.get(norm_path) != modification_time: + if self._remote_filetimestamps.get(norm_path) != modification_time: self.log(f'Downloading file {f} in {local_dirpath}') self.client.simple_download( self._system_name, @@ -824,10 +821,9 @@ def firecrest_walk(directory): local_file_path ) - job._remote_filetimestamps[norm_path] = modification_time - - job._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path) + self._remote_filetimestamps[norm_path] = modification_time + self._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path) def submit(self, job): job._localdir = os.getcwd() @@ -836,7 +832,7 @@ def submit(self, job): os.path.relpath(os.getcwd(), job._stage_prefix) ) - if job._first_submission: + if job._clean_up_stage: # Create clean stage directory in the remote system try: self.client.simple_delete(self._system_name, job._remotedir) @@ -845,9 +841,9 @@ def submit(self, job): pass self.client.mkdir(self._system_name, job._remotedir, p=True) - job._first_submission = False + self.log(f'Creating remote directory {job._remotedir} in {self._system_name}') - self._push_artefacts(job) + self._push_artefacts(job) intervals = itertools.cycle([1, 2, 3]) while True: @@ -922,9 +918,6 @@ def poll(self, *jobs): # Use ',' to join nodes to be consistent with Slurm syntax job._nodespec = ','.join(m['nodelist'] for m in jobarr_info) - # self._update_completion_time( - # job, (m.group('end') for m in jobarr_info) - # ) def wait(self, job): # Quickly return in case we have finished already From 37f6ea0a6ae67d574a51de2cb1701184005421c2 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 16 Nov 2023 09:37:59 +0100 Subject: [PATCH 04/15] Disable checking for sane module system when resolve_module_conflicts is false --- reframe/core/modules.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/reframe/core/modules.py b/reframe/core/modules.py index 5fc41cac00..2e90c120ff 100644 --- a/reframe/core/modules.py +++ b/reframe/core/modules.py @@ -13,6 +13,7 @@ from collections import OrderedDict import reframe.core.fields as fields +import reframe.core.runtime as rt import reframe.utility.osext as osext import reframe.utility.typecheck as types from reframe.core.exceptions import (ConfigError, EnvironError, @@ -586,6 +587,10 @@ class TModImpl(ModulesSystemImpl): MIN_VERSION = (3, 2) def __init__(self): + # No need to check for a sane module system + if not rt.runtime().get_option('general/0/resolve_module_conflicts'): + return + # Try to figure out if we are indeed using the TCL version try: completed = osext.run_command('modulecmd -V') @@ -718,6 +723,10 @@ class TMod31Impl(TModImpl): MIN_VERSION = (3, 1) def __init__(self): + # No need to check for a sane module system + if not rt.runtime().get_option('general/0/resolve_module_conflicts'): + return + # Try to figure out if we are indeed using the TCL version try: modulecmd = os.getenv('MODULESHOME') @@ -793,6 +802,10 @@ class TMod4Impl(TModImpl): MIN_VERSION = (4, 1) def __init__(self): + # No need to check for a sane module system + if not rt.runtime().get_option('general/0/resolve_module_conflicts'): + return + try: completed = osext.run_command(self.modulecmd('-V'), check=True) except OSError as e: From 8b04d514ad326684b7ba43affc4d9596a57afdcf Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 16 Nov 2023 09:42:32 +0100 Subject: [PATCH 05/15] Temporarily disable all module sanity checking --- reframe/core/modules.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/reframe/core/modules.py b/reframe/core/modules.py index 2e90c120ff..766218c921 100644 --- a/reframe/core/modules.py +++ b/reframe/core/modules.py @@ -13,7 +13,6 @@ from collections import OrderedDict import reframe.core.fields as fields -import reframe.core.runtime as rt import reframe.utility.osext as osext import reframe.utility.typecheck as types from reframe.core.exceptions import (ConfigError, EnvironError, @@ -587,9 +586,8 @@ class TModImpl(ModulesSystemImpl): MIN_VERSION = (3, 2) def __init__(self): - # No need to check for a sane module system - if not rt.runtime().get_option('general/0/resolve_module_conflicts'): - return + # FIXME + return # Try to figure out if we are indeed using the TCL version try: @@ -723,9 +721,8 @@ class TMod31Impl(TModImpl): MIN_VERSION = (3, 1) def __init__(self): - # No need to check for a sane module system - if not rt.runtime().get_option('general/0/resolve_module_conflicts'): - return + # FIXME + return # Try to figure out if we are indeed using the TCL version try: @@ -802,9 +799,8 @@ class TMod4Impl(TModImpl): MIN_VERSION = (4, 1) def __init__(self): - # No need to check for a sane module system - if not rt.runtime().get_option('general/0/resolve_module_conflicts'): - return + # FIXME + return try: completed = osext.run_command(self.modulecmd('-V'), check=True) From d146d0053fc5328ffe75d99b130a0441edf04723 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 16 Nov 2023 09:46:09 +0100 Subject: [PATCH 06/15] Setup slurmfc from env vars --- reframe/core/schedulers/slurm.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index fdb16bd502..79fde43999 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -111,7 +111,7 @@ def __init__(self, *args, **kwargs): self._is_cancelling = False # FIXME get this dynamically - self._remotedir_prefix = '/scratch/snx3000/eirinik/rfc_remote' + self._remotedir_prefix = os.environ.get('FIRECREST_BASEDIR') self._remotedir = None self._localdir = None @@ -733,14 +733,15 @@ def __init__(self, *args, **kwargs): # FIXME set these in a better way client_id = os.environ.get("FIRECREST_CLIENT_ID") client_secret = os.environ.get("FIRECREST_CLIENT_SECRET") - token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token" + token_uri = os.environ.get("AUTH_TOKEN_URL") + firecrest_url = os.environ.get("FIRECREST_URL") + self._system_name = os.environ.get("FIRECREST_SYSTEM") # Setup the client for the specific account self.client = fc.Firecrest( - firecrest_url="https://firecrest.cscs.ch", + firecrest_url=firecrest_url, authorization=fc.ClientCredentialsAuth(client_id, client_secret, token_uri) ) - self._system_name = 'daint' self._local_filetimestamps = {} self._remote_filetimestamps = {} From 1495671685f36eb78d67d9bb7ed28d26ea9f0c61 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 16 Nov 2023 09:47:59 +0100 Subject: [PATCH 07/15] Make stagedir if it doesn't exist --- reframe/core/schedulers/slurm.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 79fde43999..2b764518a4 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -841,8 +841,8 @@ def submit(self, job): # The delete request will raise an exception if it doesn't exist pass - self.client.mkdir(self._system_name, job._remotedir, p=True) - self.log(f'Creating remote directory {job._remotedir} in {self._system_name}') + self.client.mkdir(self._system_name, job._remotedir, p=True) + self.log(f'Creating remote directory {job._remotedir} in {self._system_name}') self._push_artefacts(job) From b54ba5cc25349aa0a4a2f5d82f2dfb25c9dbcca3 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 21 Nov 2023 09:57:14 +0100 Subject: [PATCH 08/15] Skip module init --- reframe/core/modules.py | 61 ++++++++++++++++++++++++++-------------- reframe/core/pipeline.py | 15 +++++++--- reframe/core/systems.py | 7 +++-- 3 files changed, 55 insertions(+), 28 deletions(-) diff --git a/reframe/core/modules.py b/reframe/core/modules.py index 766218c921..056e04320e 100644 --- a/reframe/core/modules.py +++ b/reframe/core/modules.py @@ -106,22 +106,22 @@ class ModulesSystem: module_map = fields.TypedField(types.Dict[str, types.List[str]]) @classmethod - def create(cls, modules_kind=None): + def create(cls, modules_kind=None, module_resolution=True): getlogger().debug(f'Initializing modules system {modules_kind!r}') if modules_kind is None or modules_kind == 'nomod': return ModulesSystem(NoModImpl()) elif modules_kind == 'tmod31': - return ModulesSystem(TMod31Impl()) + return ModulesSystem(TMod31Impl(module_resolution)) elif modules_kind == 'tmod': - return ModulesSystem(TModImpl()) + return ModulesSystem(TModImpl(module_resolution)) elif modules_kind == 'tmod32': - return ModulesSystem(TModImpl()) + return ModulesSystem(TModImpl(module_resolution)) elif modules_kind == 'tmod4': - return ModulesSystem(TMod4Impl()) + return ModulesSystem(TMod4Impl(module_resolution)) elif modules_kind == 'lmod': - return ModulesSystem(LModImpl()) + return ModulesSystem(LModImpl(module_resolution)) elif modules_kind == 'spack': - return ModulesSystem(SpackImpl()) + return ModulesSystem(SpackImpl(module_resolution)) else: raise ConfigError('unknown module system: %s' % modules_kind) @@ -585,9 +585,11 @@ class TModImpl(ModulesSystemImpl): MIN_VERSION = (3, 2) - def __init__(self): - # FIXME - return + def __init__(self, module_resolution=True): + if not module_resolution: + # The module system may not be available locally + self._version = None + return # Try to figure out if we are indeed using the TCL version try: @@ -720,9 +722,13 @@ class TMod31Impl(TModImpl): MIN_VERSION = (3, 1) - def __init__(self): - # FIXME - return + def __init__(self, module_resolution=True): + self._version = None + self._command = None + + if not module_resolution: + # The module system may not be available locally + return # Try to figure out if we are indeed using the TCL version try: @@ -798,9 +804,12 @@ class TMod4Impl(TModImpl): MIN_VERSION = (4, 1) - def __init__(self): - # FIXME - return + def __init__(self, module_resolution=True): + self._version = None + self._extra_module_paths = [] + if not module_resolution: + # The module system may not be available locally + return try: completed = osext.run_command(self.modulecmd('-V'), check=True) @@ -924,7 +933,13 @@ def searchpath_remove(self, *dirs): class LModImpl(TMod4Impl): '''Module system for Lmod (Tcl/Lua).''' - def __init__(self): + def __init__(self, module_resolution=True): + self._extra_module_paths = [] + self._version = None + if not module_resolution: + # The module system may not be available locally + return + # Try to figure out if we are indeed using LMOD self._lmod_cmd = os.getenv('LMOD_CMD') if self._lmod_cmd is None: @@ -954,8 +969,6 @@ def __init__(self): raise ConfigError('Python is not supported by ' 'this Lmod installation') - self._extra_module_paths = [] - def name(self): return 'lmod' @@ -1103,7 +1116,14 @@ class SpackImpl(ModulesSystemImpl): ''' - def __init__(self): + def __init__(self, module_resolution=True): + self._name_format = '{name}/{version}-{hash}' + self._version = None + + if not module_resolution: + # The module system may not be available locally + return + # Try to figure out if we are indeed using the TCL version try: completed = osext.run_command('spack -V') @@ -1112,7 +1132,6 @@ def __init__(self): 'could not find a sane Spack installation') from e self._version = completed.stdout.strip() - self._name_format = '{name}/{version}-{hash}' def name(self): return 'spack' diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index a9dd6254d7..7adfb48b16 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1665,7 +1665,13 @@ def _setup_paths(self): except OSError as e: raise PipelineError('failed to set up paths') from e - def _create_job(self, job_type, force_local=False, clean_up_stage=False, **job_opts): + def _create_job( + self, + job_type, + force_local=False, + clean_up_stage=False, + **job_opts + ): '''Setup the job related to this check.''' if force_local: @@ -1696,9 +1702,10 @@ def _create_job(self, job_type, force_local=False, clean_up_stage=False, **job_o **job_opts) def _setup_build_job(self, clean_up_stage=False, **job_opts): - self._build_job = self._create_job( - 'build', self.local or self.build_locally, clean_up_stage, **job_opts - ) + self._build_job = self._create_job('build', + self.local or self.build_locally, + clean_up_stage, + **job_opts) def _setup_run_job(self, clean_up_stage=False, **job_opts): self._job = self._create_job(f'run', self.local, **job_opts) diff --git a/reframe/core/systems.py b/reframe/core/systems.py index 53ed8ad40b..8fef375a4c 100644 --- a/reframe/core/systems.py +++ b/reframe/core/systems.py @@ -462,12 +462,12 @@ class System(jsonext.JSONSerializable): def __init__(self, name, descr, hostnames, modules_system, preload_env, prefix, outputdir, - resourcesdir, stagedir, partitions): + resourcesdir, stagedir, partitions, module_resolution): getlogger().debug(f'Initializing system {name!r}') self._name = name self._descr = descr self._hostnames = hostnames - self._modules_system = ModulesSystem.create(modules_system) + self._modules_system = ModulesSystem.create(modules_system, module_resolution) self._preload_env = preload_env self._prefix = prefix self._outputdir = outputdir @@ -590,7 +590,8 @@ def create(cls, site_config): outputdir=site_config.get('systems/0/outputdir'), resourcesdir=site_config.get('systems/0/resourcesdir'), stagedir=site_config.get('systems/0/stagedir'), - partitions=partitions + partitions=partitions, + module_resolution=site_config.get('general/resolve_module_conflicts') ) @property From bd8e796b0a9ae1e1f8e86e14223dbca9b29d5c79 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 21 Nov 2023 10:32:07 +0100 Subject: [PATCH 09/15] Fix formatting --- reframe/core/schedulers/slurm.py | 53 ++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index bf4bacb34f..aaa514c8a1 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -115,9 +115,8 @@ def __init__(self, *args, **kwargs): self._remotedir = None self._localdir = None - - # The compacted nodelist as reported by Slurm. This must be updated in - # every poll as Slurm may be slow in reporting the exact nodelist + # The compacted nodelist as reported by Slurm. This must be updated + # in every poll as Slurm may be slow in reporting the exact nodelist self._nodespec = None self._stage_prefix = rt.runtime().stage_prefix @@ -740,7 +739,8 @@ def __init__(self, *args, **kwargs): # Setup the client for the specific account self.client = fc.Firecrest( firecrest_url=firecrest_url, - authorization=fc.ClientCredentialsAuth(client_id, client_secret, token_uri) + authorization=fc.ClientCredentialsAuth(client_id, client_secret, + token_uri) ) self._local_filetimestamps = {} @@ -758,11 +758,17 @@ def _push_artefacts(self, job): remote_dir_path = join_and_normalize(job._remotedir, dirpath) for f in filenames: - local_norm_path = join_and_normalize(job._localdir, dirpath, f) - modification_time = os.path.getmtime(local_norm_path) - if self._local_filetimestamps.get(local_norm_path) != modification_time: - self._local_filetimestamps[local_norm_path] = modification_time - self.log(f'Uploading file {f} in {join_and_normalize(job._remotedir, dirpath)}') + local_norm_path = join_and_normalize( + job._localdir, dirpath, f + ) + modtime = os.path.getmtime(local_norm_path) + last_modtime = self._local_filetimestamps.get(local_norm_path) + if (last_modtime != modtime): + self._local_filetimestamps[local_norm_path] = modtime + self.log( + f'Uploading file {f} in ' + f'{join_and_normalize(job._remotedir, dirpath)}' + ) self.client.simple_upload( self._system_name, local_norm_path, @@ -776,8 +782,11 @@ def _push_artefacts(self, job): show_hidden=True ) for f in remote_files: - local_norm_path = join_and_normalize(remote_dir_path, f['name']) - self._remote_filetimestamps[local_norm_path] = f['last_modified'] + local_norm_path = join_and_normalize(remote_dir_path, + f['name']) + self._remote_filetimestamps[local_norm_path] = ( + f['last_modified'] + ) def _pull_artefacts(self, job): def firecrest_walk(directory): @@ -811,20 +820,20 @@ def firecrest_walk(directory): self.log(f'Creating local directory {new_dir}') os.makedirs(new_dir) - for (f, modification_time) in files: + for (f, modtime) in files: norm_path = join_and_normalize(dirpath, f) local_file_path = join_and_normalize(local_dirpath, f) - if self._remote_filetimestamps.get(norm_path) != modification_time: + if self._remote_filetimestamps.get(norm_path) != modtime: self.log(f'Downloading file {f} in {local_dirpath}') self.client.simple_download( self._system_name, norm_path, local_file_path ) + self._remote_filetimestamps[norm_path] = modtime - self._remote_filetimestamps[norm_path] = modification_time - - self._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path) + new_modtime = os.path.getmtime(local_file_path) + self._local_filetimestamps[local_file_path] = new_modtime def submit(self, job): job._localdir = os.getcwd() @@ -838,11 +847,13 @@ def submit(self, job): try: self.client.simple_delete(self._system_name, job._remotedir) except fc.HeaderException: - # The delete request will raise an exception if it doesn't exist + # The delete request will raise an exception if it doesn't + # exist, but it can be ignored pass self.client.mkdir(self._system_name, job._remotedir, p=True) - self.log(f'Creating remote directory {job._remotedir} in {self._system_name}') + self.log(f'Creating remote directory {job._remotedir} in ' + f'{self._system_name}') self._push_artefacts(job) @@ -875,7 +886,8 @@ def submit(self, job): job._submit_time = time.time() def allnodes(self): - raise NotImplementedError('firecrest slurm backend does not support node listing') + raise NotImplementedError('firecrest slurm backend does not support ' + 'node listing') def filternodes(self, job, nodes): raise NotImplementedError( @@ -907,7 +919,8 @@ def poll(self, *jobs): except KeyError: continue - # Join the states with ',' in case of job arrays|heterogeneous jobs + # Join the states with ',' in case of job arrays|heterogeneous + # jobs job._state = ','.join(m['state'] for m in jobarr_info) self._cancel_if_pending_too_long(job) From f73ccb62739ac965fdca2de0a87b707e61ef9553 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 21 Nov 2023 10:55:21 +0100 Subject: [PATCH 10/15] Raise JobSchedulerError in firecrest sched for python 3.6 --- reframe/core/schedulers/slurm.py | 9 ++++++++- requirements.txt | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index aaa514c8a1..7a7f11c5d4 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -3,13 +3,13 @@ # # SPDX-License-Identifier: BSD-3-Clause -import firecrest as fc import functools import glob import itertools import os import re import shlex +import sys import time from argparse import ArgumentParser from contextlib import suppress @@ -24,6 +24,9 @@ JobSchedulerError) from reframe.utility import nodelist_abbrev, seconds_to_hms +if sys.version_info >= (3, 7): + import firecrest as fc + def slurm_state_completed(state): completion_states = { @@ -728,6 +731,10 @@ def __str__(self): @register_scheduler('slurmfc') class SlurmFirecrestJobScheduler(SlurmJobScheduler): def __init__(self, *args, **kwargs): + if sys.version_info < (3, 7): + raise JobSchedulerError('the firecrest scheduler needs ' + 'python>=3.7') + super().__init__(*args, **kwargs) # FIXME set these in a better way client_id = os.environ.get("FIRECREST_CLIENT_ID") diff --git a/requirements.txt b/requirements.txt index 962df3defa..6cc766043f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ argcomplete==3.1.2 importlib_metadata==4.0.1; python_version < '3.8' jsonschema==3.2.0 lxml==4.9.3 +pyfirecrest==2.0.0; python_version >= '3.7' pytest==7.0.1 pytest-forked==1.4.0; python_version == '3.6' pytest-forked==1.6.0; python_version >= '3.7' @@ -19,4 +20,3 @@ setuptools==68.0.0; python_version == '3.7' setuptools==68.2.2; python_version >= '3.8' wcwidth==0.2.9 #+pygelf%pygelf==0.4.0 -pyfirecrest==2.0.0 From aa571078a35c1b3b22fac378c10bf1adbb832748 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 30 Nov 2023 09:56:18 +0100 Subject: [PATCH 11/15] Small fixes --- reframe/core/schedulers/slurm.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 7a7f11c5d4..a10cee87e9 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -112,9 +112,6 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._is_array = False self._is_cancelling = False - - # FIXME get this dynamically - self._remotedir_prefix = os.environ.get('FIRECREST_BASEDIR') self._remotedir = None self._localdir = None @@ -731,17 +728,25 @@ def __str__(self): @register_scheduler('slurmfc') class SlurmFirecrestJobScheduler(SlurmJobScheduler): def __init__(self, *args, **kwargs): + def set_mandatory_var(var): + res = os.environ.get(var) + if res: + return res + + raise JobSchedulerError(f'the env var {var} is mandatory for the ' + f'firecrest scheduler') + if sys.version_info < (3, 7): raise JobSchedulerError('the firecrest scheduler needs ' 'python>=3.7') super().__init__(*args, **kwargs) - # FIXME set these in a better way - client_id = os.environ.get("FIRECREST_CLIENT_ID") - client_secret = os.environ.get("FIRECREST_CLIENT_SECRET") - token_uri = os.environ.get("AUTH_TOKEN_URL") - firecrest_url = os.environ.get("FIRECREST_URL") - self._system_name = os.environ.get("FIRECREST_SYSTEM") + client_id = set_mandatory_var("FIRECREST_CLIENT_ID") + client_secret = set_mandatory_var("FIRECREST_CLIENT_SECRET") + token_uri = set_mandatory_var("AUTH_TOKEN_URL") + firecrest_url = set_mandatory_var("FIRECREST_URL") + self._system_name = set_mandatory_var("FIRECREST_SYSTEM") + self._remotedir_prefix = set_mandatory_var('FIRECREST_BASEDIR') # Setup the client for the specific account self.client = fc.Firecrest( @@ -845,7 +850,7 @@ def firecrest_walk(directory): def submit(self, job): job._localdir = os.getcwd() job._remotedir = os.path.join( - job._remotedir_prefix, + self._remotedir_prefix, os.path.relpath(os.getcwd(), job._stage_prefix) ) From 2d5a1f81ba86ce0fee4d363905194b8d240d1b11 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 30 Nov 2023 10:00:05 +0100 Subject: [PATCH 12/15] Fix formatting --- reframe/core/pipeline.py | 12 ++++++------ reframe/core/schedulers/slurm.py | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 7adfb48b16..e1da90bd18 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1666,12 +1666,12 @@ def _setup_paths(self): raise PipelineError('failed to set up paths') from e def _create_job( - self, - job_type, - force_local=False, - clean_up_stage=False, - **job_opts - ): + self, + job_type, + force_local=False, + clean_up_stage=False, + **job_opts + ): '''Setup the job related to this check.''' if force_local: diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index a10cee87e9..fb3a135967 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -725,6 +725,7 @@ def _extract_attribute(self, attr_name, node_descr, sep=None): def __str__(self): return self._name + @register_scheduler('slurmfc') class SlurmFirecrestJobScheduler(SlurmJobScheduler): def __init__(self, *args, **kwargs): From 022455d9237594a6f722268e1092ea1ec0ff0a30 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 30 Nov 2023 10:30:01 +0100 Subject: [PATCH 13/15] Add pyfirecrest in setup.cfg --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index a82110e09d..3e54cf64c5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ install_requires = PyYAML requests semver + pyfirecrest; python_version >= '3.7' [options.packages.find] include = reframe,reframe.*,hpctestlib.* From 5c2278c80a36ea6abb079152e5a863f4815d9368 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 11 Dec 2023 09:47:21 +0100 Subject: [PATCH 14/15] Handle large files transfers with firecrest --- reframe/core/schedulers/slurm.py | 86 +++++++++++++++++++++++++++++--- 1 file changed, 78 insertions(+), 8 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index fb3a135967..9c1f9f83d7 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -756,6 +756,12 @@ def set_mandatory_var(var): token_uri) ) + params = self.client.parameters() + for p in params['utilities']: + if p['name'] == 'UTILITIES_MAX_FILE_SIZE': + self._max_file_size_utilities = float(p['value'])*1000000 + break + self._local_filetimestamps = {} self._remote_filetimestamps = {} @@ -763,12 +769,33 @@ def make_job(self, *args, **kwargs): return _SlurmFirecrestJob(*args, **kwargs) def _push_artefacts(self, job): + def _upload(local_path, remote_path): + f_size = os.path.getsize(local_path) + if f_size <= self._max_file_size_utilities: + self.client.simple_upload( + self._system_name, + local_path, + remote_path + ) + else: + self.log( + f'File {f} is {f_size} bytes, so it may take some time...' + ) + up_obj = self.client.external_upload( + self._system_name, + local_path, + remote_path + ) + up_obj.finish_upload() + return up_obj + for dirpath, dirnames, filenames in os.walk('.'): for d in dirnames: new_dir = join_and_normalize(job._remotedir, dirpath, d) self.log(f'Creating remote directory {new_dir}') self.client.mkdir(self._system_name, new_dir, p=True) + async_uploads = [] remote_dir_path = join_and_normalize(job._remotedir, dirpath) for f in filenames: local_norm_path = join_and_normalize( @@ -782,11 +809,34 @@ def _push_artefacts(self, job): f'Uploading file {f} in ' f'{join_and_normalize(job._remotedir, dirpath)}' ) - self.client.simple_upload( - self._system_name, + up = _upload( local_norm_path, remote_dir_path ) + if up: + async_uploads.append(up) + + sleep_time = itertools.cycle([1, 5, 10]) + while async_uploads: + still_uploading = [] + for element in async_uploads: + upload_status = int(element.status) + if upload_status < 114: + still_uploading.append(element) + self.log(f'file is still uploafing, ' + f'status: {upload_status}') + elif upload_status > 114: + raise JobSchedulerError( + 'could not upload file to remote staging ' + 'area' + ) + + async_uploads = still_uploading + t = next(sleep_time) + self.log( + f'Waiting for the uploads, sleeping for {t} sec' + ) + time.sleep(t) # Update timestamps for remote directory remote_files = self.client.list_files( @@ -812,7 +862,9 @@ def firecrest_walk(directory): if item['type'] == 'd': dirs.append(item['name']) else: - nondirs.append((item['name'], item["last_modified"])) + nondirs.append((item['name'], + item["last_modified"], + int(item['size']))) yield directory, dirs, nondirs @@ -820,6 +872,24 @@ def firecrest_walk(directory): item_path = f"{directory}/{item['name']}" yield from firecrest_walk(item_path) + def _download(remote_path, local_path, f_size): + if f_size <= self._max_file_size_utilities: + self.client.simple_download( + self._system_name, + remote_path, + local_path + ) + else: + self.log( + f'File {f} is {f_size} bytes, so it may take some time...' + ) + up_obj = self.client.external_download( + self._system_name, + remote_path + ) + up_obj.finish_download(local_path) + return up_obj + for dirpath, dirnames, files in firecrest_walk(job._remotedir): local_dirpath = join_and_normalize( job._localdir, @@ -833,17 +903,17 @@ def firecrest_walk(directory): self.log(f'Creating local directory {new_dir}') os.makedirs(new_dir) - for (f, modtime) in files: + for (f, modtime, fsize) in files: norm_path = join_and_normalize(dirpath, f) local_file_path = join_and_normalize(local_dirpath, f) if self._remote_filetimestamps.get(norm_path) != modtime: self.log(f'Downloading file {f} in {local_dirpath}') - self.client.simple_download( - self._system_name, + self._remote_filetimestamps[norm_path] = modtime + _download( norm_path, - local_file_path + local_file_path, + fsize ) - self._remote_filetimestamps[norm_path] = modtime new_modtime = os.path.getmtime(local_file_path) self._local_filetimestamps[local_file_path] = new_modtime From b7f44fd62a6258ed1adfa8037efcfa969f215eca Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 20 Dec 2023 09:51:02 +0100 Subject: [PATCH 15/15] Split firecrest scheduler implementation --- reframe/core/pipeline.py | 26 +- reframe/core/schedulers/__init__.py | 5 +- reframe/core/schedulers/firecrest.py | 364 +++++++++++++++++++++++++++ reframe/core/schedulers/slurm.py | 345 ------------------------- 4 files changed, 374 insertions(+), 366 deletions(-) create mode 100644 reframe/core/schedulers/firecrest.py diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 1597974ad9..a6f51ac674 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1665,13 +1665,7 @@ def _setup_paths(self): except OSError as e: raise PipelineError('failed to set up paths') from e - def _create_job( - self, - job_type, - force_local=False, - clean_up_stage=False, - **job_opts - ): + def _create_job(self, job_type, force_local=False, **job_opts): '''Setup the job related to this check.''' if force_local: @@ -1698,16 +1692,14 @@ def _create_job( script_filename=script_name, workdir=self._stagedir, sched_access=self._current_partition.access, - clean_up_stage=clean_up_stage, **job_opts) - def _setup_build_job(self, clean_up_stage=False, **job_opts): - self._build_job = self._create_job('build', - self.local or self.build_locally, - clean_up_stage, - **job_opts) + def _setup_build_job(self, **job_opts): + self._build_job = self._create_job( + 'build', self.local or self.build_locally, **job_opts + ) - def _setup_run_job(self, clean_up_stage=False, **job_opts): + def _setup_run_job(self, **job_opts): self._job = self._create_job(f'run', self.local, **job_opts) def _setup_container_platform(self): @@ -1751,7 +1743,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_build_job(clean_up_stage=True, **job_opts) + self._setup_build_job(**job_opts) self._setup_run_job(**job_opts) self._setup_container_platform() self._resolve_fixtures() @@ -2567,7 +2559,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_run_job(clean_up_stage=True, **job_opts) + self._setup_run_job(**job_opts) self._setup_container_platform() self._resolve_fixtures() @@ -2624,7 +2616,7 @@ def setup(self, partition, environ, **job_opts): self._current_partition = partition self._current_environ = environ self._setup_paths() - self._setup_build_job(clean_up_stage=True, **job_opts) + self._setup_build_job(**job_opts) self._setup_container_platform() self._resolve_fixtures() diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index fec19dc723..a5daee11f2 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -326,8 +326,7 @@ def __init__(self, stderr=None, sched_flex_alloc_nodes=None, sched_access=[], - sched_options=None, - clean_up_stage=False): + sched_options=None): self._cli_options = list(sched_options) if sched_options else [] self._name = name @@ -355,8 +354,6 @@ def __init__(self, # in finished() self._exception = None - self._clean_up_stage = clean_up_stage - @classmethod def create(cls, scheduler, launcher, *args, **kwargs): ret = scheduler.make_job(*args, **kwargs) diff --git a/reframe/core/schedulers/firecrest.py b/reframe/core/schedulers/firecrest.py new file mode 100644 index 0000000000..426549fda5 --- /dev/null +++ b/reframe/core/schedulers/firecrest.py @@ -0,0 +1,364 @@ +# Copyright 2016-2023 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import itertools +import os +import re +import sys +import time + +import reframe.core.runtime as rt +import reframe.core.schedulers as sched +from reframe.core.backends import register_scheduler +from reframe.core.schedulers.slurm import (SlurmJobScheduler, + slurm_state_completed) +from reframe.core.exceptions import JobSchedulerError + +if sys.version_info >= (3, 7): + import firecrest as fc + + +def join_and_normalize(*args): + joined_path = os.path.join(*args) + normalized_path = os.path.normpath(joined_path) + return normalized_path + + +class _SlurmFirecrestJob(sched.Job): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._is_array = False + self._is_cancelling = False + self._remotedir = None + self._localdir = None + + # The compacted nodelist as reported by Slurm. This must be updated + # in every poll as Slurm may be slow in reporting the exact nodelist + self._nodespec = None + self._stage_prefix = rt.runtime().stage_prefix + + @property + def is_array(self): + return self._is_array + + @property + def is_cancelling(self): + return self._is_cancelling + + +@register_scheduler('firecrest-slurm') +class SlurmFirecrestJobScheduler(SlurmJobScheduler): + def __init__(self, *args, **kwargs): + def set_mandatory_var(var): + res = os.environ.get(var) + if res: + return res + + raise JobSchedulerError(f'the env var {var} is mandatory for the ' + f'firecrest scheduler') + + if sys.version_info < (3, 7): + raise JobSchedulerError('the firecrest scheduler needs ' + 'python>=3.7') + + super().__init__(*args, **kwargs) + client_id = set_mandatory_var("FIRECREST_CLIENT_ID") + client_secret = set_mandatory_var("FIRECREST_CLIENT_SECRET") + token_uri = set_mandatory_var("AUTH_TOKEN_URL") + firecrest_url = set_mandatory_var("FIRECREST_URL") + self._system_name = set_mandatory_var("FIRECREST_SYSTEM") + self._remotedir_prefix = set_mandatory_var('FIRECREST_BASEDIR') + + # Setup the client for the specific account + self.client = fc.Firecrest( + firecrest_url=firecrest_url, + authorization=fc.ClientCredentialsAuth(client_id, client_secret, + token_uri) + ) + + params = self.client.parameters() + for p in params['utilities']: + if p['name'] == 'UTILITIES_MAX_FILE_SIZE': + self._max_file_size_utilities = float(p['value'])*1000000 + break + + self._local_filetimestamps = {} + self._remote_filetimestamps = {} + self._cleaned_remotedirs = set() + + def make_job(self, *args, **kwargs): + return _SlurmFirecrestJob(*args, **kwargs) + + def _push_artefacts(self, job): + def _upload(local_path, remote_path): + f_size = os.path.getsize(local_path) + if f_size <= self._max_file_size_utilities: + self.client.simple_upload( + self._system_name, + local_path, + remote_path + ) + else: + self.log( + f'File {f} is {f_size} bytes, so it may take some time...' + ) + up_obj = self.client.external_upload( + self._system_name, + local_path, + remote_path + ) + up_obj.finish_upload() + return up_obj + + for dirpath, dirnames, filenames in os.walk('.'): + for d in dirnames: + new_dir = join_and_normalize(job._remotedir, dirpath, d) + self.log(f'Creating remote directory {new_dir}') + self.client.mkdir(self._system_name, new_dir, p=True) + + async_uploads = [] + remote_dir_path = join_and_normalize(job._remotedir, dirpath) + for f in filenames: + local_norm_path = join_and_normalize( + job._localdir, dirpath, f + ) + modtime = os.path.getmtime(local_norm_path) + last_modtime = self._local_filetimestamps.get(local_norm_path) + if (last_modtime != modtime): + self._local_filetimestamps[local_norm_path] = modtime + self.log( + f'Uploading file {f} in ' + f'{join_and_normalize(job._remotedir, dirpath)}' + ) + up = _upload( + local_norm_path, + remote_dir_path + ) + if up: + async_uploads.append(up) + + sleep_time = itertools.cycle([1, 5, 10]) + while async_uploads: + still_uploading = [] + for element in async_uploads: + upload_status = int(element.status) + if upload_status < 114: + still_uploading.append(element) + self.log(f'file is still uploafing, ' + f'status: {upload_status}') + elif upload_status > 114: + raise JobSchedulerError( + 'could not upload file to remote staging ' + 'area' + ) + + async_uploads = still_uploading + t = next(sleep_time) + self.log( + f'Waiting for the uploads, sleeping for {t} sec' + ) + time.sleep(t) + + # Update timestamps for remote directory + remote_files = self.client.list_files( + self._system_name, + remote_dir_path, + show_hidden=True + ) + for f in remote_files: + local_norm_path = join_and_normalize(remote_dir_path, + f['name']) + self._remote_filetimestamps[local_norm_path] = ( + f['last_modified'] + ) + + def _pull_artefacts(self, job): + def firecrest_walk(directory): + contents = self.client.list_files(self._system_name, directory) + + dirs = [] + nondirs = [] + + for item in contents: + if item['type'] == 'd': + dirs.append(item['name']) + else: + nondirs.append((item['name'], + item["last_modified"], + int(item['size']))) + + yield directory, dirs, nondirs + + for item in dirs: + item_path = f"{directory}/{item['name']}" + yield from firecrest_walk(item_path) + + def _download(remote_path, local_path, f_size): + if f_size <= self._max_file_size_utilities: + self.client.simple_download( + self._system_name, + remote_path, + local_path + ) + else: + self.log( + f'File {f} is {f_size} bytes, so it may take some time...' + ) + up_obj = self.client.external_download( + self._system_name, + remote_path + ) + up_obj.finish_download(local_path) + return up_obj + + for dirpath, dirnames, files in firecrest_walk(job._remotedir): + local_dirpath = join_and_normalize( + job._localdir, + os.path.relpath( + dirpath, + job._remotedir + ) + ) + for d in dirnames: + new_dir = join_and_normalize(local_dirpath, d) + self.log(f'Creating local directory {new_dir}') + os.makedirs(new_dir) + + for (f, modtime, fsize) in files: + norm_path = join_and_normalize(dirpath, f) + local_file_path = join_and_normalize(local_dirpath, f) + if self._remote_filetimestamps.get(norm_path) != modtime: + self.log(f'Downloading file {f} in {local_dirpath}') + self._remote_filetimestamps[norm_path] = modtime + _download( + norm_path, + local_file_path, + fsize + ) + + new_modtime = os.path.getmtime(local_file_path) + self._local_filetimestamps[local_file_path] = new_modtime + + def submit(self, job): + job._localdir = os.getcwd() + job._remotedir = os.path.join( + self._remotedir_prefix, + os.path.relpath(os.getcwd(), job._stage_prefix) + ) + + if job._remotedir not in self._cleaned_remotedirs: + # Create clean stage directory in the remote system + try: + self.client.simple_delete(self._system_name, job._remotedir) + except fc.HeaderException: + # The delete request will raise an exception if it doesn't + # exist, but it can be ignored + pass + + self._cleaned_remotedirs.add(job._remotedir) + + self.client.mkdir(self._system_name, job._remotedir, p=True) + self.log(f'Creating remote directory {job._remotedir} in ' + f'{self._system_name}') + + self._push_artefacts(job) + + intervals = itertools.cycle([1, 2, 3]) + while True: + try: + # Make request for submission + submission_result = self.client.submit( + self._system_name, + os.path.join(job._remotedir, job.script_filename), + local_file=False + ) + break + except fc.FirecrestException as e: + stderr = e.responses[-1].json().get('error', '') + error_match = re.search( + rf'({"|".join(self._resubmit_on_errors)})', stderr + ) + if not self._resubmit_on_errors or not error_match: + raise + + t = next(intervals) + self.log( + f'encountered a job submission error: ' + f'{error_match.group(1)}: will resubmit after {t}s' + ) + time.sleep(t) + + job._jobid = str(submission_result['jobid']) + job._submit_time = time.time() + + def allnodes(self): + raise NotImplementedError('firecrest slurm backend does not support ' + 'node listing') + + def filternodes(self, job, nodes): + raise NotImplementedError( + 'firecrest slurm backend does not support node filtering' + ) + + def poll(self, *jobs): + '''Update the status of the jobs.''' + + if jobs: + # Filter out non-jobs + jobs = [job for job in jobs if job is not None] + + if not jobs: + return + + poll_results = self.client.poll( + self._system_name, [job.jobid for job in jobs] + ) + job_info = {} + for r in poll_results: + # Take into account both job arrays and heterogeneous jobs + jobid = re.split(r'_|\+', r['jobid'])[0] + job_info.setdefault(jobid, []).append(r) + + for job in jobs: + try: + jobarr_info = job_info[job.jobid] + except KeyError: + continue + + # Join the states with ',' in case of job arrays|heterogeneous + # jobs + job._state = ','.join(m['state'] for m in jobarr_info) + + self._cancel_if_pending_too_long(job) + if slurm_state_completed(job.state): + # Since Slurm exitcodes are positive take the maximum one + job._exitcode = max( + int(m['exit_code'].split(":")[0]) for m in jobarr_info + ) + + # Use ',' to join nodes to be consistent with Slurm syntax + job._nodespec = ','.join(m['nodelist'] for m in jobarr_info) + + def wait(self, job): + # Quickly return in case we have finished already + self._pull_artefacts(job) + if self.finished(job): + if job.is_array: + self._merge_files(job) + + return + + intervals = itertools.cycle([1, 2, 3]) + while not self.finished(job): + self.poll(job) + time.sleep(next(intervals)) + + self._pull_artefacts(job) + if job.is_array: + self._merge_files(job) + + def cancel(self, job): + self.client.cancel(job.system_name, job.jobid) + job._is_cancelling = True diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 9c1f9f83d7..4e6e925e09 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -6,10 +6,8 @@ import functools import glob import itertools -import os import re import shlex -import sys import time from argparse import ArgumentParser from contextlib import suppress @@ -24,9 +22,6 @@ JobSchedulerError) from reframe.utility import nodelist_abbrev, seconds_to_hms -if sys.version_info >= (3, 7): - import firecrest as fc - def slurm_state_completed(state): completion_states = { @@ -69,11 +64,6 @@ def slurm_state_pending(state): return False -def join_and_normalize(*args): - joined_path = os.path.join(*args) - normalized_path = os.path.normpath(joined_path) - return normalized_path - _run_strict = functools.partial(osext.run_command, check=True) @@ -107,28 +97,6 @@ def is_cancelling(self): return self._is_cancelling -class _SlurmFirecrestJob(sched.Job): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._is_array = False - self._is_cancelling = False - self._remotedir = None - self._localdir = None - - # The compacted nodelist as reported by Slurm. This must be updated - # in every poll as Slurm may be slow in reporting the exact nodelist - self._nodespec = None - self._stage_prefix = rt.runtime().stage_prefix - - @property - def is_array(self): - return self._is_array - - @property - def is_cancelling(self): - return self._is_cancelling - - @register_scheduler('slurm') class SlurmJobScheduler(sched.JobScheduler): # In some systems, scheduler performance is sensitive to the squeue poll @@ -724,316 +692,3 @@ def _extract_attribute(self, attr_name, node_descr, sep=None): def __str__(self): return self._name - - -@register_scheduler('slurmfc') -class SlurmFirecrestJobScheduler(SlurmJobScheduler): - def __init__(self, *args, **kwargs): - def set_mandatory_var(var): - res = os.environ.get(var) - if res: - return res - - raise JobSchedulerError(f'the env var {var} is mandatory for the ' - f'firecrest scheduler') - - if sys.version_info < (3, 7): - raise JobSchedulerError('the firecrest scheduler needs ' - 'python>=3.7') - - super().__init__(*args, **kwargs) - client_id = set_mandatory_var("FIRECREST_CLIENT_ID") - client_secret = set_mandatory_var("FIRECREST_CLIENT_SECRET") - token_uri = set_mandatory_var("AUTH_TOKEN_URL") - firecrest_url = set_mandatory_var("FIRECREST_URL") - self._system_name = set_mandatory_var("FIRECREST_SYSTEM") - self._remotedir_prefix = set_mandatory_var('FIRECREST_BASEDIR') - - # Setup the client for the specific account - self.client = fc.Firecrest( - firecrest_url=firecrest_url, - authorization=fc.ClientCredentialsAuth(client_id, client_secret, - token_uri) - ) - - params = self.client.parameters() - for p in params['utilities']: - if p['name'] == 'UTILITIES_MAX_FILE_SIZE': - self._max_file_size_utilities = float(p['value'])*1000000 - break - - self._local_filetimestamps = {} - self._remote_filetimestamps = {} - - def make_job(self, *args, **kwargs): - return _SlurmFirecrestJob(*args, **kwargs) - - def _push_artefacts(self, job): - def _upload(local_path, remote_path): - f_size = os.path.getsize(local_path) - if f_size <= self._max_file_size_utilities: - self.client.simple_upload( - self._system_name, - local_path, - remote_path - ) - else: - self.log( - f'File {f} is {f_size} bytes, so it may take some time...' - ) - up_obj = self.client.external_upload( - self._system_name, - local_path, - remote_path - ) - up_obj.finish_upload() - return up_obj - - for dirpath, dirnames, filenames in os.walk('.'): - for d in dirnames: - new_dir = join_and_normalize(job._remotedir, dirpath, d) - self.log(f'Creating remote directory {new_dir}') - self.client.mkdir(self._system_name, new_dir, p=True) - - async_uploads = [] - remote_dir_path = join_and_normalize(job._remotedir, dirpath) - for f in filenames: - local_norm_path = join_and_normalize( - job._localdir, dirpath, f - ) - modtime = os.path.getmtime(local_norm_path) - last_modtime = self._local_filetimestamps.get(local_norm_path) - if (last_modtime != modtime): - self._local_filetimestamps[local_norm_path] = modtime - self.log( - f'Uploading file {f} in ' - f'{join_and_normalize(job._remotedir, dirpath)}' - ) - up = _upload( - local_norm_path, - remote_dir_path - ) - if up: - async_uploads.append(up) - - sleep_time = itertools.cycle([1, 5, 10]) - while async_uploads: - still_uploading = [] - for element in async_uploads: - upload_status = int(element.status) - if upload_status < 114: - still_uploading.append(element) - self.log(f'file is still uploafing, ' - f'status: {upload_status}') - elif upload_status > 114: - raise JobSchedulerError( - 'could not upload file to remote staging ' - 'area' - ) - - async_uploads = still_uploading - t = next(sleep_time) - self.log( - f'Waiting for the uploads, sleeping for {t} sec' - ) - time.sleep(t) - - # Update timestamps for remote directory - remote_files = self.client.list_files( - self._system_name, - remote_dir_path, - show_hidden=True - ) - for f in remote_files: - local_norm_path = join_and_normalize(remote_dir_path, - f['name']) - self._remote_filetimestamps[local_norm_path] = ( - f['last_modified'] - ) - - def _pull_artefacts(self, job): - def firecrest_walk(directory): - contents = self.client.list_files(self._system_name, directory) - - dirs = [] - nondirs = [] - - for item in contents: - if item['type'] == 'd': - dirs.append(item['name']) - else: - nondirs.append((item['name'], - item["last_modified"], - int(item['size']))) - - yield directory, dirs, nondirs - - for item in dirs: - item_path = f"{directory}/{item['name']}" - yield from firecrest_walk(item_path) - - def _download(remote_path, local_path, f_size): - if f_size <= self._max_file_size_utilities: - self.client.simple_download( - self._system_name, - remote_path, - local_path - ) - else: - self.log( - f'File {f} is {f_size} bytes, so it may take some time...' - ) - up_obj = self.client.external_download( - self._system_name, - remote_path - ) - up_obj.finish_download(local_path) - return up_obj - - for dirpath, dirnames, files in firecrest_walk(job._remotedir): - local_dirpath = join_and_normalize( - job._localdir, - os.path.relpath( - dirpath, - job._remotedir - ) - ) - for d in dirnames: - new_dir = join_and_normalize(local_dirpath, d) - self.log(f'Creating local directory {new_dir}') - os.makedirs(new_dir) - - for (f, modtime, fsize) in files: - norm_path = join_and_normalize(dirpath, f) - local_file_path = join_and_normalize(local_dirpath, f) - if self._remote_filetimestamps.get(norm_path) != modtime: - self.log(f'Downloading file {f} in {local_dirpath}') - self._remote_filetimestamps[norm_path] = modtime - _download( - norm_path, - local_file_path, - fsize - ) - - new_modtime = os.path.getmtime(local_file_path) - self._local_filetimestamps[local_file_path] = new_modtime - - def submit(self, job): - job._localdir = os.getcwd() - job._remotedir = os.path.join( - self._remotedir_prefix, - os.path.relpath(os.getcwd(), job._stage_prefix) - ) - - if job._clean_up_stage: - # Create clean stage directory in the remote system - try: - self.client.simple_delete(self._system_name, job._remotedir) - except fc.HeaderException: - # The delete request will raise an exception if it doesn't - # exist, but it can be ignored - pass - - self.client.mkdir(self._system_name, job._remotedir, p=True) - self.log(f'Creating remote directory {job._remotedir} in ' - f'{self._system_name}') - - self._push_artefacts(job) - - intervals = itertools.cycle([1, 2, 3]) - while True: - try: - # Make request for submission - submission_result = self.client.submit( - self._system_name, - os.path.join(job._remotedir, job.script_filename), - local_file=False - ) - break - except fc.FirecrestException as e: - stderr = e.responses[-1].json().get('error', '') - error_match = re.search( - rf'({"|".join(self._resubmit_on_errors)})', stderr - ) - if not self._resubmit_on_errors or not error_match: - raise - - t = next(intervals) - self.log( - f'encountered a job submission error: ' - f'{error_match.group(1)}: will resubmit after {t}s' - ) - time.sleep(t) - - job._jobid = str(submission_result['jobid']) - job._submit_time = time.time() - - def allnodes(self): - raise NotImplementedError('firecrest slurm backend does not support ' - 'node listing') - - def filternodes(self, job, nodes): - raise NotImplementedError( - 'firecrest slurm backend does not support node filtering' - ) - - def poll(self, *jobs): - '''Update the status of the jobs.''' - - if jobs: - # Filter out non-jobs - jobs = [job for job in jobs if job is not None] - - if not jobs: - return - - poll_results = self.client.poll( - self._system_name, [job.jobid for job in jobs] - ) - job_info = {} - for r in poll_results: - # Take into account both job arrays and heterogeneous jobs - jobid = re.split(r'_|\+', r['jobid'])[0] - job_info.setdefault(jobid, []).append(r) - - for job in jobs: - try: - jobarr_info = job_info[job.jobid] - except KeyError: - continue - - # Join the states with ',' in case of job arrays|heterogeneous - # jobs - job._state = ','.join(m['state'] for m in jobarr_info) - - self._cancel_if_pending_too_long(job) - if slurm_state_completed(job.state): - # Since Slurm exitcodes are positive take the maximum one - job._exitcode = max( - int(m['exit_code'].split(":")[0]) for m in jobarr_info - ) - - # Use ',' to join nodes to be consistent with Slurm syntax - job._nodespec = ','.join(m['nodelist'] for m in jobarr_info) - - def wait(self, job): - # Quickly return in case we have finished already - self._pull_artefacts(job) - if self.finished(job): - if job.is_array: - self._merge_files(job) - - return - - intervals = itertools.cycle([1, 2, 3]) - while not self.finished(job): - self.poll(job) - time.sleep(next(intervals)) - - self._pull_artefacts(job) - if job.is_array: - self._merge_files(job) - - def cancel(self, job): - self.client.cancel(job.system_name, job.jobid) - job._is_cancelling = True