From 8bc6a302dd0fa5048bceb8659e9b96de7d52d3d6 Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:07:05 +0200 Subject: [PATCH 1/8] Updated ARC Settings - Added 'queues' and 'excluded_queues' keys in the settings.py - Added 'server3' templates in submit.py - Added 'server3' in the submit_test.py --- arc/settings/settings.py | 2 + arc/settings/submit.py | 245 +++++++++++++++++++++++++++++++++++- arc/settings/submit_test.py | 2 +- 3 files changed, 246 insertions(+), 3 deletions(-) diff --git a/arc/settings/settings.py b/arc/settings/settings.py index f3d466d901..5d5d86d5fe 100644 --- a/arc/settings/settings.py +++ b/arc/settings/settings.py @@ -58,6 +58,8 @@ 'cluster_soft': 'HTCondor', 'un': '', 'cpus': 48, + 'queues': {'':''}, #{'queue_name':'HH:MM:SS'} + 'excluded_queues': ['queue_name1', 'queue_name2'], }, } diff --git a/arc/settings/submit.py b/arc/settings/submit.py index 5f160fbe66..58bf1577a5 100644 --- a/arc/settings/submit.py +++ b/arc/settings/submit.py @@ -827,7 +827,7 @@ }, 'pbs_sample': { 'gaussian': """#!/bin/bash -l -#PBS -q batch +#PBS -q {queue} #PBS -l nodes=1:ppn={cpus} #PBS -l mem={memory}mb #PBS -l walltime=48:00:00 @@ -855,4 +855,245 @@ """, }, -} + 'server3': { + 'gaussian': """#!/bin/bash -l +#SBATCH -p normal +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +export g16root=/home/gridsan/groups/GRPAPI/Software +export PATH=$g16root/g16/:$g16root/gv:$PATH +which g16 + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +GAUSS_SCRDIR=/state/partition1/user/{un}/$SLURM_JOB_NAME-$SLURM_JOB_ID +export $GAUSS_SCRDIR +. $g16root/g16/bsd/g16.profile + +mkdir -p $GAUSS_SCRDIR + +g16 < input.gjf > input.log + +rm -rf $GAUSS_SCRDIR + +touch final_time + + """, + 'orca': """#!/bin/bash -l +#SBATCH -p normal +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +WorkDir=/state/partition1/user/{un}/$SLURM_JOB_NAME-$SLURM_JOB_ID +SubmitDir=`pwd` + +#openmpi +export PATH=/home/gridsan/alongd/openmpi-3.1.4/bin:$PATH +export LD_LIBRARY_PATH=/home/gridsan/alongd/openmpi-3.1.4/lib:$LD_LIBRARY_PATH + +#Orca +orcadir=/home/gridsan/alongd/orca_4_2_1_linux_x86-64_openmpi314 +export PATH=/home/gridsan/alongd/orca_4_2_1_linux_x86-64_openmpi314:$PATH +export LD_LIBRARY_PATH=/home/gridsan/alongd/orca_4_2_1_linux_x86-64_openmpi314:$LD_LIBRARY_PATH +echo "orcaversion" +which orca +mkdir -p $WorkDir +cd $WorkDir +cp $SubmitDir/input.in . + +$orcadir/orca input.in > input.log +cp input.log $SubmitDir/ +rm -rf $WorkDir + +touch final_time + +""", + 'molpro': """#!/bin/bash -l +#SBATCH -p long +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +export PATH=/opt/molpro/molprop_2015_1_linux_x86_64_i8/bin:$PATH + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +sdir=/scratch/{un}/$SLURM_JOB_NAME-$SLURM_JOB_ID +SubmitDir=`pwd` + +mkdir -p $sdir +cd $sdir + +cp "$SubmitDir/input.in" . + +molpro -n {cpus} -d $sdir input.in + +cp input.* "$SubmitDir/" +cp geometry*.* "$SubmitDir/" + +rm -rf $sdir + +touch final_time + +""", + 'gcn': """#!/bin/bash -l +#SBATCH -p long +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +conda activate arc_env + +python $arc_path/arc/job/adapters/ts/scripts/gcn_runner.py --yml_in_path input.yml + +touch final_time + +""", + 'cfour': """#!/bin/bash -l +#SBATCH -p long +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +module load intel/2020.1.217 openmpi/4.0.3 cfour-mpi/2.1 + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +export CFOUR_NUM_CORES=$SLURM_NTASKS + +xcfour > output.out + +# Clean the symlink: +if [[ -L "GENBAS" ]]; then unlink GENBAS; fi + +touch final_time +""", + 'xtb': """#!/bin/bash -l +#SBATCH -p long +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +conda activate xtb_env + +export OMP_NUM_THREADS={cpus},1 +export OMP_MAX_ACTIVE_LEVELS=1 +setenv OMP_SCHEDULE "dynamic" +export MKL_NUM_THREADS={cpus} +export XTBPATH=$PWD # Add here all paths were configuration and/or parameter files are stored. + +bash input.sh > output.out + +touch final_time + +""", + 'xtb_gsm': """#!/bin/bash -l +#SBATCH -p long +#SBATCH -J {name} +#SBATCH -N 1 +#SBATCH -n {cpus} +#SBATCH --time={t_max} +#SBATCH --mem-per-cpu={memory} +#SBATCH -o out.txt +#SBATCH -e err.txt + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +conda activate xtb_env + +./gsm.orca + +touch final_time + +""", +}, + } diff --git a/arc/settings/submit_test.py b/arc/settings/submit_test.py index 392dd7f7f4..08ca08ed01 100644 --- a/arc/settings/submit_test.py +++ b/arc/settings/submit_test.py @@ -18,7 +18,7 @@ class TestSubmit(unittest.TestCase): def test_servers(self): """Test server keys in submit_scripts""" for server in submit_scripts.keys(): - self.assertTrue(server in ['local', 'atlas', 'txe1', 'pbs_sample', 'server1', 'server2']) + self.assertTrue(server in ['local', 'atlas', 'txe1', 'pbs_sample', 'server1', 'server2', 'server3']) if __name__ == '__main__': From 43ff784a781199bc5ec5393236e1162316845075 Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:08:20 +0200 Subject: [PATCH 2/8] ARC Common Update Added a conversion function for time from the format of HH:MM:SS to hours --- arc/common.py | 12 ++++++++++++ arc/common_test.py | 10 ++++++++++ 2 files changed, 22 insertions(+) diff --git a/arc/common.py b/arc/common.py index b85b676274..c43d6d9f0a 100644 --- a/arc/common.py +++ b/arc/common.py @@ -1705,3 +1705,15 @@ def is_xyz_mol_match(mol: 'Molecule', if element not in element_dict_xyz or element_dict_xyz[element] != count: return False return True + +def convert_to_hours(time_str:str) -> float: + """Convert walltime string in format HH:MM:SS to hours. + + Args: + time_str (str): A time string in format HH:MM:SS + + Returns: + float: The time in hours + """ + h, m, s = map(int, time_str.split(':')) + return h + m / 60 + s / 3600 diff --git a/arc/common_test.py b/arc/common_test.py index 10f746bb41..71af05fdd2 100644 --- a/arc/common_test.py +++ b/arc/common_test.py @@ -1369,6 +1369,16 @@ def test_is_xyz_mol_match(self): 'symbols': ('C', 'C', 'C', 'C', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H')} self.assertFalse(common.is_xyz_mol_match(mol1, xyz2)) self.assertTrue(common.is_xyz_mol_match(mol2, xyz2)) + + def test_convert_to_hours(self): + """Test the convert_to_hours() function""" + time_str = '0:00:00' + self.assertEqual(common.convert_to_hours(time_str), 0.0) + time_str = '3600:00:00' + self.assertEqual(common.convert_to_hours(time_str), 3600.0) + time_str = '190:40:10' + self.assertAlmostEqual(common.convert_to_hours(time_str), 190.66944444444442) + @classmethod def tearDownClass(cls): From 5e5c6720875863f603066d382c257708e1151d1f Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:24:37 +0200 Subject: [PATCH 3/8] Adjusted the submit function to check whether the key 'queue' is in the submit script template. Will assume that the user defined a queue in the template if there is no key ARC Job Adapter Update - Added {queue} checking when creating the submit script. Will also add the initial queue used to attempted_queues - Added a check to see if the job was killed due to wall time in a PBS queue - Added in the condition of checking is self.testing is True or False. If True, it will then not use the SSH function - Added `troubleshoot_queue` function, where it will run a job again if the job failed due to queue wall time --- arc/job/adapter.py | 85 +++++++++++++++++++++++++++++++++-------- arc/job/adapter_test.py | 56 +++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 16 deletions(-) diff --git a/arc/job/adapter.py b/arc/job/adapter.py index b39b925c22..29d83fc97c 100644 --- a/arc/job/adapter.py +++ b/arc/job/adapter.py @@ -23,7 +23,7 @@ import numpy as np import pandas as pd -from arc.common import ARC_PATH, get_logger, read_yaml_file, save_yaml_file, torsions_to_scans +from arc.common import ARC_PATH, get_logger, read_yaml_file, save_yaml_file, torsions_to_scans, convert_to_hours from arc.exceptions import JobError from arc.imports import local_arc_path, pipe_submit, settings, submit_scripts from arc.job.local import (change_mode, @@ -33,7 +33,7 @@ rename_output, submit_job, ) -from arc.job.trsh import trsh_job_on_server +from arc.job.trsh import trsh_job_on_server, trsh_job_queue from arc.job.ssh import SSHClient from arc.job.trsh import determine_ess_status from arc.species.converter import xyz_to_str @@ -493,21 +493,36 @@ def write_submit_script(self) -> None: # If your server has different node architectures, implement something similar here. architecture = '\n#$ -l harpertown' if self.cpu_cores <= 8 else '\n#$ -l magnycours' + # Extract the 'queue' dictionary from servers[self.server], defaulting to an empty dictionary if 'queue' is not a key + default_queue, _ = next(iter(servers[self.server].get('queues', {}).items()), (None, None)) + if default_queue and default_queue not in self.attempted_queues: + self.attempted_queues.append(default_queue) + submit_script = submit_scripts[self.server][self.job_adapter] if self.workers is None \ else pipe_submit[self.server] + + queue = self.queue if self.queue is not None else default_queue + + format_params = { + "name": self.job_server_name, + "un": servers[self.server]['un'], + "queue": queue, + "t_max": self.format_max_job_time(time_format=t_max_format[servers[self.server]['cluster_soft']]), + "memory": int(self.submit_script_memory) if isinstance(self.submit_script_memory, (int, float)) else self.submit_script_memory, + "cpus": self.cpu_cores, + "architecture": architecture, + "max_task_num": self.workers, + "arc_path": ARC_PATH, + "hdf5_path": os.path.join(self.remote_path, 'data.hdf5'), + "pwd": self.remote_path if self.server.lower() != 'local' else self.local_path, + } + + if queue is None: + logger.warning(f'Queue not defined for server {self.server}. Assuming the queue name is defined in your submit.py script.') + del format_params['queue'] + try: - submit_script = submit_script.format( - name=self.job_server_name, - un=servers[self.server]['un'], - t_max=self.format_max_job_time(time_format=t_max_format[servers[self.server]['cluster_soft']]), - memory=int(self.submit_script_memory) if (isinstance(self.submit_script_memory, int) or isinstance(self.submit_script_memory, float)) else self.submit_script_memory, - cpus=self.cpu_cores, - architecture=architecture, - max_task_num=self.workers, - arc_path=ARC_PATH, - hdf5_path=os.path.join(self.remote_path, 'data.hdf5'), - pwd=self.remote_path if self.server.lower() != 'local' else self.local_path, - ) + submit_script = submit_script.format(**format_params) except KeyError: if self.workers is None: submit_scripts_for_printing = {server: [software for software in values.keys()] @@ -872,6 +887,7 @@ def determine_job_status(self): """ Determine the Job's status. Updates self.job_status. """ + cluster_soft = servers[self.server]['cluster_soft'].lower() if self.job_status[0] == 'errored': return self.job_status[0] = self._check_job_server_status() if self.execution_type != 'incore' else 'done' @@ -898,6 +914,20 @@ def determine_job_status(self): 'time limit.' self.job_status[1]['line'] = line break + elif 'job killed' in line and 'exceeded limit' in line and cluster_soft == 'pbs': + # =>> PBS: job killed: walltime 10837 exceeded limit 10800 + logger.warning(f'Looks like the job was killed on {self.server} due to time limit. ' + f'Got: {line}') + time_limit = int(line.split('limit')[1].split()[0])/3600 + new_max_job_time = self.max_job_time - time_limit if self.max_job_time > time_limit else 1 + logger.warning(f'Setting max job time to {new_max_job_time} (was {time_limit})') + self.max_job_time = new_max_job_time + self.job_status[1]['status'] = 'errored' + self.job_status[1]['keywords'] = ['ServerTimeLimit'] + self.job_status[1]['error'] = 'Job killed by the server since it reached the maximal ' \ + 'time limit.' + self.job_status[1]['line'] = line + break elif self.job_status[0] == 'running': self.job_status[1]['status'] = 'running' @@ -913,7 +943,7 @@ def _get_additional_job_info(self): local_file_path_1 = os.path.join(self.local_path, 'out.txt') local_file_path_2 = os.path.join(self.local_path, 'err.txt') local_file_path_3 = os.path.join(self.local_path, 'job.log') - if self.server != 'local' and self.remote_path is not None: + if self.server != 'local' and self.remote_path is not None and not self.testing: remote_file_path_1 = os.path.join(self.remote_path, 'out.txt') remote_file_path_2 = os.path.join(self.remote_path, 'err.txt') remote_file_path_3 = os.path.join(self.remote_path, 'job.log') @@ -948,7 +978,7 @@ def _check_job_server_status(self) -> str: """ Possible statuses: ``initializing``, ``running``, ``errored on node xx``, ``done``. """ - if self.server != 'local': + if self.server != 'local' and not self.testing: with SSHClient(self.server) as ssh: return ssh.check_job_status(self.job_id) else: @@ -1317,6 +1347,29 @@ def troubleshoot_server(self): # resubmit job self.execute() + def troubleshoot_queue(self) -> bool: + """Troubleshoot queue errors. + + Returns: + Boolean: Whether to run the job again. + """ + queues, run_job = trsh_job_queue(job_name=self.job_name, + server=self.server, + max_time=24, + attempted_queues = self.attempted_queues, + ) + + if queues is not None: + # We use self.max_job_time to determine which queues to troubleshoot. + #filtered_queues = {queue_name: walltime for queue_name, walltime in queues.items() if convert_to_hours(walltime) >= self.max_job_time} + + sorted_queues = sorted(queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=False) + + self.queue = sorted_queues[0][0] + if self.queue not in self.attempted_queues: + self.attempted_queues.append(self.queue) + return run_job + def save_output_file(self, key: Optional[str] = None, val: Optional[Union[float, dict, np.ndarray]] = None, diff --git a/arc/job/adapter_test.py b/arc/job/adapter_test.py index fd197ad24e..a45ea74f53 100644 --- a/arc/job/adapter_test.py +++ b/arc/job/adapter_test.py @@ -11,6 +11,7 @@ import time import shutil import unittest +from unittest.mock import patch import pandas as pd @@ -193,6 +194,33 @@ def setUpClass(cls): species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])], testing=True, ) + cls.job_5 = GaussianAdapter(execution_type='queue', + job_name='spc1', + job_type='opt', + job_id='123456', + job_num=101, + job_server_name = 'server3', + level=Level(method='cbs-qb3'), + project='test', + project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'), + species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])], + server='server3', + testing=True, + ) + cls.job_6 = GaussianAdapter(execution_type='queue', + job_name='spc1', + job_type='opt', + job_id='123456', + job_num=101, + job_server_name = 'server1', + level=Level(method='cbs-qb3'), + project='test', + project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'), + species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])], + testing=True, + queue='short_queue', + attempted_queues=['short_queue'] + ) def test_determine_job_array_parameters(self): """Test determining job array parameters""" @@ -404,6 +432,34 @@ def test_get_file_property_dictionary(self): 'source': 'input_files', 'make_x': True}) + def test_determine_job_status(self): + """Test determining the job status""" + self.job_5.determine_job_status() + self.assertEqual(self.job_5.job_status[0], 'done') + self.assertEqual(self.job_5.job_status[1]['status'], 'errored') + self.assertEqual(self.job_5.job_status[1]['keywords'], ['ServerTimeLimit']) + + @patch( + "arc.job.trsh.servers", + { + "local": { + "cluster_soft": "PBS", + "un": "test_user", + "queues": {"short_queue": "24:00:0","middle_queue": "48:00:00", "long_queue": "3600:00:00"}, + } + }, + ) + def test_troubleshoot_queue(self): + """Test troubleshooting a queue job""" + self.job_6.troubleshoot_queue() + self.assertEqual(self.job_6.queue, 'middle_queue') + # Assert that 'middle_queue' and 'short_queue' were attempted + # We do not do assert equal because a user may have different queues from the settings.py originally during cls + self.assertIn('short_queue', self.job_6.attempted_queues) + self.assertIn('middle_queue', self.job_6.attempted_queues) + + + @classmethod def tearDownClass(cls): """ From d81b36ae353c2ff31c58d5a181c95317262dabc4 Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:34:57 +0200 Subject: [PATCH 4/8] ARC Job Adapters/Factory Update - Added 'queue' as a parameter - Added 'attempted_queues' as parameter --- arc/job/adapters/cfour.py | 4 ++++ arc/job/adapters/common.py | 6 +++++- arc/job/adapters/gaussian.py | 4 ++++ arc/job/adapters/molpro.py | 4 ++++ arc/job/adapters/obabel.py | 4 ++++ arc/job/adapters/orca.py | 4 ++++ arc/job/adapters/psi_4.py | 4 ++++ arc/job/adapters/qchem.py | 4 ++++ arc/job/adapters/terachem.py | 4 ++++ arc/job/adapters/torch_ani.py | 4 ++++ arc/job/adapters/ts/autotst_ts.py | 4 ++++ arc/job/adapters/ts/gcn_ts.py | 4 ++++ arc/job/adapters/ts/heuristics.py | 4 ++++ arc/job/adapters/ts/kinbot_ts.py | 4 ++++ arc/job/adapters/ts/xtb_gsm.py | 4 ++++ arc/job/adapters/xtb_adapter.py | 4 ++++ arc/job/factory.py | 4 ++++ 17 files changed, 69 insertions(+), 1 deletion(-) diff --git a/arc/job/adapters/cfour.py b/arc/job/adapters/cfour.py index 4684f798a3..cd277e5254 100644 --- a/arc/job/adapters/cfour.py +++ b/arc/job/adapters/cfour.py @@ -128,6 +128,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -177,6 +179,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/common.py b/arc/job/adapters/common.py index 6af3e94ddf..a351d4c01d 100644 --- a/arc/job/adapters/common.py +++ b/arc/job/adapters/common.py @@ -110,6 +110,8 @@ def _initialize_adapter(obj: 'JobAdapter', rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -173,6 +175,8 @@ def _initialize_adapter(obj: 'JobAdapter', obj.rotor_index = rotor_index obj.run_time = None obj.server = server + obj.queue = queue + obj.attempted_queues = attempted_queues or list() obj.server_nodes = server_nodes or list() obj.species = [species] if species is not None and not isinstance(species, list) else species obj.submit_script_memory = None @@ -225,7 +229,7 @@ def _initialize_adapter(obj: 'JobAdapter', obj.species_label = None obj.args = set_job_args(args=obj.args, level=obj.level, job_name=obj.job_name) - if obj.execution_type != 'incore' and obj.job_adapter in obj.ess_settings.keys(): + if obj.execution_type != 'incore' and obj.job_adapter in obj.ess_settings.keys() and obj.server is None: if 'server' in obj.args['trsh']: obj.server = obj.args['trsh']['server'] elif obj.job_adapter in obj.ess_settings.keys(): diff --git a/arc/job/adapters/gaussian.py b/arc/job/adapters/gaussian.py index ee84b6c6bf..8f05d32450 100644 --- a/arc/job/adapters/gaussian.py +++ b/arc/job/adapters/gaussian.py @@ -142,6 +142,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -191,6 +193,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/molpro.py b/arc/job/adapters/molpro.py index befb646887..176570aa94 100644 --- a/arc/job/adapters/molpro.py +++ b/arc/job/adapters/molpro.py @@ -141,6 +141,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -190,6 +192,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/obabel.py b/arc/job/adapters/obabel.py index 14c5e9dc34..a858a8a3cd 100644 --- a/arc/job/adapters/obabel.py +++ b/arc/job/adapters/obabel.py @@ -110,6 +110,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -162,6 +164,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/orca.py b/arc/job/adapters/orca.py index 75967b8206..e118db214e 100644 --- a/arc/job/adapters/orca.py +++ b/arc/job/adapters/orca.py @@ -146,6 +146,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -195,6 +197,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/psi_4.py b/arc/job/adapters/psi_4.py index cd8c2d3660..bd2e53792d 100644 --- a/arc/job/adapters/psi_4.py +++ b/arc/job/adapters/psi_4.py @@ -186,6 +186,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -242,6 +244,8 @@ def __init__(self, self.rotor_index = rotor_index self.server = server self.server_nodes = server_nodes or list() + self.queue = queue + self.attempted_queues = attempted_queues or list() self.species = [species] if species is not None and not isinstance(species, list) else species self.testing = testing self.torsions = [torsions] if torsions is not None and not isinstance(torsions[0], list) else torsions diff --git a/arc/job/adapters/qchem.py b/arc/job/adapters/qchem.py index dc1210f44d..1e1cfca19e 100644 --- a/arc/job/adapters/qchem.py +++ b/arc/job/adapters/qchem.py @@ -138,6 +138,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -187,6 +189,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/terachem.py b/arc/job/adapters/terachem.py index d64efd1a95..59862e2e8c 100644 --- a/arc/job/adapters/terachem.py +++ b/arc/job/adapters/terachem.py @@ -140,6 +140,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -189,6 +191,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/torch_ani.py b/arc/job/adapters/torch_ani.py index 1fd690b230..895bc5cd83 100644 --- a/arc/job/adapters/torch_ani.py +++ b/arc/job/adapters/torch_ani.py @@ -115,6 +115,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -169,6 +171,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/ts/autotst_ts.py b/arc/job/adapters/ts/autotst_ts.py index 4b9af91906..6efd1bcad7 100644 --- a/arc/job/adapters/ts/autotst_ts.py +++ b/arc/job/adapters/ts/autotst_ts.py @@ -122,6 +122,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List[ARCSpecies]] = None, testing: bool = False, times_rerun: int = 0, @@ -174,6 +176,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/ts/gcn_ts.py b/arc/job/adapters/ts/gcn_ts.py index f970e53e44..814834eab7 100644 --- a/arc/job/adapters/ts/gcn_ts.py +++ b/arc/job/adapters/ts/gcn_ts.py @@ -124,6 +124,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -175,6 +177,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/ts/heuristics.py b/arc/job/adapters/ts/heuristics.py index 228d71ec95..b6d5c2a565 100644 --- a/arc/job/adapters/ts/heuristics.py +++ b/arc/job/adapters/ts/heuristics.py @@ -129,6 +129,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List[ARCSpecies]] = None, testing: bool = False, times_rerun: int = 0, @@ -181,6 +183,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/ts/kinbot_ts.py b/arc/job/adapters/ts/kinbot_ts.py index 60b0f09a12..84ba908753 100644 --- a/arc/job/adapters/ts/kinbot_ts.py +++ b/arc/job/adapters/ts/kinbot_ts.py @@ -122,6 +122,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -202,6 +204,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/ts/xtb_gsm.py b/arc/job/adapters/ts/xtb_gsm.py index 33fb7d50ba..15e45c4c29 100644 --- a/arc/job/adapters/ts/xtb_gsm.py +++ b/arc/job/adapters/ts/xtb_gsm.py @@ -151,6 +151,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -200,6 +202,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/adapters/xtb_adapter.py b/arc/job/adapters/xtb_adapter.py index 98937d9d2f..27cbaf4490 100644 --- a/arc/job/adapters/xtb_adapter.py +++ b/arc/job/adapters/xtb_adapter.py @@ -147,6 +147,8 @@ def __init__(self, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List['ARCSpecies']] = None, testing: bool = False, times_rerun: int = 0, @@ -197,6 +199,8 @@ def __init__(self, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, diff --git a/arc/job/factory.py b/arc/job/factory.py index b8aeb7d390..df3d89a701 100644 --- a/arc/job/factory.py +++ b/arc/job/factory.py @@ -66,6 +66,8 @@ def job_factory(job_adapter: str, rotor_index: Optional[int] = None, server: Optional[str] = None, server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, species: Optional[List[ARCSpecies]] = None, testing: bool = False, times_rerun: int = 0, @@ -204,6 +206,8 @@ def job_factory(job_adapter: str, rotor_index=rotor_index, server=server, server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, species=species, testing=testing, times_rerun=times_rerun, From 7faac86918266c4f0e5989055d0526fdf71e6294 Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 21:32:05 +0200 Subject: [PATCH 5/8] ARC Job Local - PBS Max Jobs ARC will recognise if PBS is returning a max job output when attempting to submit a job to the server Removed unused import --- arc/job/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arc/job/local.py b/arc/job/local.py index 23280acf9d..5589738fcf 100644 --- a/arc/job/local.py +++ b/arc/job/local.py @@ -256,7 +256,7 @@ def submit_job(path: str, submit_filename=submit_filename, recursion=True, ) - if cluster_soft.lower() == 'pbs' and any('qsub: would exceed' in err_line for err_line in stderr): + if cluster_soft.lower() == 'pbs' and (any('qsub: would exceed' in err_line for err_line in stderr ) or any('qsub: Maximum number of jobs' in err_line for err_line in stderr)): logger.warning(f'Max number of submitted jobs was reached, sleeping...') time.sleep(5 * 60) submit_job(path=path, From 658217ba4996f9740358c7b1e84a132c363f467e Mon Sep 17 00:00:00 2001 From: Calvin Date: Tue, 13 Feb 2024 21:09:54 +0200 Subject: [PATCH 6/8] ARC Job Trash Update - Trsh Job Queue A troubleshooting function when queue walltime is reached. This will attempt first check if the user has provided queues that can be used. If not, it will then query the server (assuming PBS) to see if there are any queues that are available to user and have higher walltime style changes --- arc/job/trsh.py | 153 ++++++++++++- arc/job/trsh_test.py | 520 ++++++++++++++++++++++++++++--------------- 2 files changed, 488 insertions(+), 185 deletions(-) diff --git a/arc/job/trsh.py b/arc/job/trsh.py index 28ebaf1855..8264be80e7 100644 --- a/arc/job/trsh.py +++ b/arc/job/trsh.py @@ -11,13 +11,14 @@ import re from arc.common import (check_torsion_change, + convert_to_hours, determine_ess, estimate_orca_mem_cpu_requirement, get_logger, get_number_with_ordinal_indicator, is_same_pivot, is_same_sequence_sublist, - is_str_float, + is_str_float ) from arc.exceptions import InputError, SpeciesError, TrshError from arc.imports import settings @@ -1114,6 +1115,156 @@ def trsh_conformer_isomorphism(software: str, break return level_of_theory +def trsh_job_queue(server: str, + job_name: str, + max_time: int = 24, + attempted_queues: list = None, + ) -> Tuple[dict, bool]: + """ A function to troubleshoot job queue issues. This function will attempt to determine if the user has provided a queue that provides more time than the walltime failed queue. + If not, it will attempt to determine if there are any other queues available on the server that provide more time than the walltime failed queue. + + Args: + server (str): Name of the server + job_name (str): Name of the job + max_time (int, optional): The max time that the current queue that the job failed on provied. Defaults to 24, measured in hours. + attempted_queues (list, optional): Any queues that have already been attempted to run the job on. Defaults to None. + + Returns: + Tuple[dict, bool]: A dictionary of the available queues and a boolean indicating if the function was successful. + """ + + server_queues = servers[server].get('queues', dict()) + cluster_soft = servers[server].get('cluster_soft','Undefined') + excluded_queues = servers[server].get('excluded_queues', list()) + + # Check if there are any available queues in server_queues that hasn't been tried yet + if len(server_queues) > 1: + # Make sure that the queue is not already in the attempted_queues list + server_queues = { + queue: walltime for queue, walltime in server_queues.items() + if (attempted_queues is None or queue not in attempted_queues) + and convert_to_hours(walltime) >= max_time + } + if len(server_queues) == 0: + logger.error(f' Could not troubleshoot {job_name} on {server} as all available queues have been tried. Will attempt to query the server for additional queues.') + else: + return server_queues, True + # If the server is PBS, query which queues are available + if cluster_soft.lower() == 'pbs': + # First determine which group the current user belongs to + cmd = 'groups' + output = execute_command(cmd, shell=True) + if 'Error' in output: + logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command failed.') + return None, False + else: + # (['users zeus-users vkm-users gaussian grinberg-dana_prj docker'], []) + user_groups = output[0][0].split() + if len(user_groups) == 0: + logger.error(f'Could not troubleshoot {job_name} on {server} as the groups command did not return any groups.') + return None, False + # check if the term '-users' is in the group name, if so, remove it + elif any('-users' in group for group in user_groups): + user_groups = [group.replace('-users', '') for group in user_groups] + # Now query the queues + cmd = 'qstat -q' + output_queues = execute_command(cmd, shell=True) + if 'Error' in output_queues: + logger.error(f'Could not troubleshoot {job_name} on {server} as the qstat command failed.') + return None, False + else: + # Need to parse output + # Example: + # Queue Memory CPU Time Walltime Node Run Que Lm State + # ---------------- ------ -------- -------- ---- ----- ----- ---- ----- + # workq -- -- -- -- 0 0 -- D S + # maytal_q -- -- -- -- 7 0 -- E R + # vkm_all_q -- -- -- -- 0 0 -- D R + # zeus_temp -- -- 24:00:00 -- 0 0 -- D S + # dagan_q -- -- -- -- 0 0 -- E R + # frankel_q -- -- -- -- 0 0 -- E R + # vkm_gm_q -- -- -- -- 0 0 -- E R + # zeus_all_scalar -- -- 24:00:00 -- 0 0 -- D S + # yuval_q -- -- -- -- 0 0 -- E R + # dan_q -- -- -- -- 7 0 -- E R + # zeus_all_q -- -- 24:00:00 -- 4 0 -- E R + # zeus_long_q -- -- 168:00:0 -- 0 0 -- E R + # zeus_short_q -- -- 03:00:00 -- 0 0 -- E R + # gpu_v100_q -- -- 480:00:0 -- 0 0 -- E R + # brandon_q -- -- -- -- 0 0 -- E R + # karp_q -- -- -- -- 0 0 -- E R + # mafat_gm_q -- -- -- -- 0 0 -- E R + # training_q -- -- 24:00:00 -- 0 0 -- E R + # mafat4_q -- -- -- -- 0 0 -- D R + # zeus_new_q -- -- 72:00:00 -- 0 0 -- E R + # zeus_combined_q -- -- 24:00:00 -- 17 0 -- E R + # zeus_comb_short -- -- 03:00:00 -- 0 0 -- E R + # mafat_new_q -- -- -- -- 46 0 -- E R + # dagan_comb_q -- -- -- -- 0 0 -- E R + # dan_comb_q -- -- -- -- 0 0 -- E R + # karp_comb_q -- -- -- -- 0 0 -- D R + # brandon_comb_q -- -- -- -- 0 0 -- E R + # train_gpu_q -- -- 24:00:00 -- 0 0 -- E R + # ----- ----- + # 81 0 + # 1. Get the queue names in the first column, and check if the state is 'E' (enabled) or 'D' (disabled) - Select only enabled queues + # 2. Once we have a list of queues, we need to make sure we can submit to them. We can do this by check qstat -Qf and see output of acl_groups + # 3. We also need to get the wall time for each queue + queues = {} + for line in output_queues[0]: + if line.strip() and not line.startswith("Queue") and not line.startswith('----') and not line.startswith('server'): + parts = line.split() + if len(parts) >= 9 and parts[8] == 'E': + queue_name = parts[0] + acl_groups = None + cmd = f'qstat -Qf {queue_name}' + output_specific_queue = execute_command(cmd, shell=True) + # Parse Example: + # + # Queue: mafat_new_q + # queue_type = Execution + # total_jobs = 44 + # state_count = Transit:0 Queued:0 Held:0 Waiting:0 Running:44 Exiting:0 Begu + # n:0 + # resources_default.walltime = 3600:00:00 + # acl_group_enable = True + # acl_groups = grinberg-dana_prj,halupovich_prj,yuvallevy_prj + # default_chunk.qlist = mafat_new_q + # resources_assigned.mem = 376625977kb + # resources_assigned.mpiprocs = 132 + # resources_assigned.ncpus = 3254 + # resources_assigned.nodect = 47 + # max_run_res.ncpus = [o:PBS_ALL=3584] + # enabled = True + # started = True + for line_queue in output_specific_queue[0]: + # Check walltime + if line_queue.strip().startswith('resources_default.walltime'): + walltime = line_queue.split('=')[1].strip() + queues[queue_name] = walltime + if line_queue.strip().startswith('acl_groups'): + acl_groups = True + groups = line_queue.split('=')[1].strip().split(',') + if any(group in user_groups for group in groups): + break # User is in one of the acl_groups, keep the queue + else: + queues.pop(queue_name, None) # User is not in acl_groups, remove the queue + break + if not any(group in queue_name for group in user_groups) and acl_groups is None: + queues.pop(queue_name, None) # Queue name does not contain any of the user's groups, remove the queue + # Check if any of the found queues are part of the excluded queues list + if excluded_queues: + for queue in excluded_queues: + if queue in queues: + queues.pop(queue, None) + if len(queues) == 0: + logger.error(f'Could not troubleshoot {job_name} on {server} as no queues were found.') + return None, False + else: + return queues, True + else: + logger.error(f'Could not troubleshoot queue for {job_name} since the server is {cluster_soft} and not PBS.') + return None, False def trsh_job_on_server(server: str, job_name: str, diff --git a/arc/job/trsh_test.py b/arc/job/trsh_test.py index 1e8f336d95..f62c7fad74 100644 --- a/arc/job/trsh_test.py +++ b/arc/job/trsh_test.py @@ -7,14 +7,14 @@ import os import unittest +from unittest.mock import patch import arc.job.trsh as trsh from arc.common import ARC_PATH from arc.imports import settings from arc.parser import parse_1d_scan_energies - -supported_ess = settings['supported_ess'] +supported_ess = settings["supported_ess"] class TestTrsh(unittest.TestCase): @@ -28,253 +28,312 @@ def setUpClass(cls): A method that is run before all unit tests in this class. """ cls.maxDiff = None - path = os.path.join(ARC_PATH, 'arc', 'testing', 'trsh') + path = os.path.join(ARC_PATH, "arc", "testing", "trsh") cls.base_path = {ess: os.path.join(path, ess) for ess in supported_ess} + cls.server = "test_server" + cls.job_name = "test_job" + cls.job_id = "123" + cls.servers = { + "test_server": { + "queue": {"short_queue": "1:00:00", "long_queue": "100:00:00"}, + " cluster_soft": "pbs", + } + } def test_determine_ess_status(self): """Test the determine_ess_status() function""" # Gaussian - path = os.path.join(self.base_path['gaussian'], 'converged.out') + path = os.path.join(self.base_path["gaussian"], "converged.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='OH', job_type='opt') - self.assertEqual(status, 'done') + output_path=path, species_label="OH", job_type="opt" + ) + self.assertEqual(status, "done") self.assertEqual(keywords, list()) - self.assertEqual(error, '') - self.assertEqual(line, '') + self.assertEqual(error, "") + self.assertEqual(line, "") - path = os.path.join(self.base_path['gaussian'], 'l913.out') + path = os.path.join(self.base_path["gaussian"], "l913.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='tst', job_type='composite') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MaxOptCycles', 'GL913']) - self.assertEqual(error, 'Maximum optimization cycles reached.') - self.assertIn('Error termination via Lnk1e', line) - self.assertIn('g09/l913.exe', line) - - path = os.path.join(self.base_path['gaussian'], 'l301_checkfile.out') + output_path=path, species_label="tst", job_type="composite" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MaxOptCycles", "GL913"]) + self.assertEqual(error, "Maximum optimization cycles reached.") + self.assertIn("Error termination via Lnk1e", line) + self.assertIn("g09/l913.exe", line) + + path = os.path.join(self.base_path["gaussian"], "l301_checkfile.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='Zr2O4H', job_type='opt') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['CheckFile']) - self.assertEqual(error, 'No data on chk file.') - self.assertIn('Error termination via Lnk1e', line) - self.assertIn('g09/l301.exe', line) - - path = os.path.join(self.base_path['gaussian'], 'l301.out') + output_path=path, species_label="Zr2O4H", job_type="opt" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["CheckFile"]) + self.assertEqual(error, "No data on chk file.") + self.assertIn("Error termination via Lnk1e", line) + self.assertIn("g09/l301.exe", line) + + path = os.path.join(self.base_path["gaussian"], "l301.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='Zr2O4H', job_type='opt') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['GL301', 'BasisSet']) - self.assertEqual(error, 'The basis set 6-311G is not appropriate for the this chemistry.') - self.assertIn('Error termination via Lnk1e', line) - self.assertIn('g16/l301.exe', line) - - path = os.path.join(self.base_path['gaussian'], 'l401.out') + output_path=path, species_label="Zr2O4H", job_type="opt" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["GL301", "BasisSet"]) + self.assertEqual( + error, "The basis set 6-311G is not appropriate for the this chemistry." + ) + self.assertIn("Error termination via Lnk1e", line) + self.assertIn("g16/l301.exe", line) + + path = os.path.join(self.base_path["gaussian"], "l401.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='Zr2O4H', job_type='opt') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['CheckFile']) - self.assertEqual(error, 'Basis set data is not on the checkpoint file.') - self.assertIn('Error termination via Lnk1e', line) - self.assertIn('g09/l401.exe', line) - - path = os.path.join(self.base_path['gaussian'], 'l9999.out') + output_path=path, species_label="Zr2O4H", job_type="opt" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["CheckFile"]) + self.assertEqual(error, "Basis set data is not on the checkpoint file.") + self.assertIn("Error termination via Lnk1e", line) + self.assertIn("g09/l401.exe", line) + + path = os.path.join(self.base_path["gaussian"], "l9999.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='Zr2O4H', job_type='opt') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Unconverged', 'GL9999']) - self.assertEqual(error, 'Unconverged') - self.assertIn('Error termination via Lnk1e', line) - self.assertIn('g16/l9999.exe', line) - - path = os.path.join(self.base_path['gaussian'], 'syntax.out') + output_path=path, species_label="Zr2O4H", job_type="opt" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Unconverged", "GL9999"]) + self.assertEqual(error, "Unconverged") + self.assertIn("Error termination via Lnk1e", line) + self.assertIn("g16/l9999.exe", line) + + path = os.path.join(self.base_path["gaussian"], "syntax.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='Zr2O4H', job_type='opt') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Syntax']) - self.assertEqual(error, 'There was a syntax error in the Gaussian input file. Check your Gaussian input file ' - 'template under arc/job/inputs.py. Alternatively, perhaps the level of theory is not ' - 'supported by Gaussian in the specific format it was given.') + output_path=path, species_label="Zr2O4H", job_type="opt" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Syntax"]) + self.assertEqual( + error, + "There was a syntax error in the Gaussian input file. Check your Gaussian input file " + "template under arc/job/inputs.py. Alternatively, perhaps the level of theory is not " + "supported by Gaussian in the specific format it was given.", + ) self.assertFalse(line) # QChem - path = os.path.join(self.base_path['qchem'], 'H2_opt.out') + path = os.path.join(self.base_path["qchem"], "H2_opt.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='H2', job_type='opt') - self.assertEqual(status, 'done') + output_path=path, species_label="H2", job_type="opt" + ) + self.assertEqual(status, "done") self.assertEqual(keywords, list()) - self.assertEqual(error, '') - self.assertEqual(line, '') + self.assertEqual(error, "") + self.assertEqual(line, "") # Molpro - path = os.path.join(self.base_path['molpro'], 'unrecognized_basis_set.out') + path = os.path.join(self.base_path["molpro"], "unrecognized_basis_set.out") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='I', job_type='sp') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['BasisSet']) - self.assertEqual(error, 'Unrecognized basis set 6-311G**') - self.assertIn(' ? Basis library exhausted', line) # line includes '\n' + output_path=path, species_label="I", job_type="sp" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["BasisSet"]) + self.assertEqual(error, "Unrecognized basis set 6-311G**") + self.assertIn(" ? Basis library exhausted", line) # line includes '\n' # Orca # test detection of a successful job - path = os.path.join(self.base_path['orca'], 'orca_successful_sp.log') + path = os.path.join(self.base_path["orca"], "orca_successful_sp.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'done') + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "done") self.assertEqual(keywords, list()) - self.assertEqual(error, '') - self.assertEqual(line, '') + self.assertEqual(error, "") + self.assertEqual(line, "") # test detection of a successful job # notice that the log file in this example has a different format under the line # *** Starting incremental Fock matrix formation *** # compared to the above example. It is important to make sure that ARC's Orca trsh algorithm parse this # log file successfully - path = os.path.join(self.base_path['orca'], 'orca_successful_sp_scf.log') + path = os.path.join(self.base_path["orca"], "orca_successful_sp_scf.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'done') + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "done") self.assertEqual(keywords, list()) - self.assertEqual(error, '') - self.assertEqual(line, '') + self.assertEqual(error, "") + self.assertEqual(line, "") # test detection of SCF energy diverge issue - path = os.path.join(self.base_path['orca'], 'orca_scf_blow_up_error.log') + path = os.path.join(self.base_path["orca"], "orca_scf_blow_up_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['SCF']) - expected_error_msg = 'The SCF energy seems diverged during iterations. ' \ - 'SCF energy after initial iteration is -1076.6615662471. ' \ - 'SCF energy after final iteration is -20006124.68383977. ' \ - 'The ratio between final and initial SCF energy is 18581.627979509627. ' \ - 'This ratio is greater than the default threshold of 2. ' \ - 'Please consider using alternative methods or larger basis sets.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["SCF"]) + expected_error_msg = ( + "The SCF energy seems diverged during iterations. " + "SCF energy after initial iteration is -1076.6615662471. " + "SCF energy after final iteration is -20006124.68383977. " + "The ratio between final and initial SCF energy is 18581.627979509627. " + "This ratio is greater than the default threshold of 2. " + "Please consider using alternative methods or larger basis sets." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('', line) + self.assertIn("", line) # test detection of insufficient memory causes SCF failure - path = os.path.join(self.base_path['orca'], 'orca_scf_memory_error.log') + path = os.path.join(self.base_path["orca"], "orca_scf_memory_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['SCF', 'Memory']) - expected_error_msg = 'Orca suggests to increase per cpu core memory to 789.0 MB.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["SCF", "Memory"]) + expected_error_msg = ( + "Orca suggests to increase per cpu core memory to 789.0 MB." + ) self.assertEqual(error, expected_error_msg) - self.assertEqual(' Error (ORCA_SCF): Not enough memory available!', line) + self.assertEqual(" Error (ORCA_SCF): Not enough memory available!", line) # test detection of insufficient memory causes MDCI failure - path = os.path.join(self.base_path['orca'], 'orca_mdci_memory_error.log') + path = os.path.join(self.base_path["orca"], "orca_mdci_memory_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MDCI', 'Memory']) - expected_error_msg = 'Orca suggests to increase per cpu core memory to 10218 MB.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MDCI", "Memory"]) + expected_error_msg = ( + "Orca suggests to increase per cpu core memory to 10218 MB." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('Please increase MaxCore', line) + self.assertIn("Please increase MaxCore", line) # test detection of too many cpu cores causes MDCI failure - path = os.path.join(self.base_path['orca'], 'orca_too_many_cores.log') + path = os.path.join(self.base_path["orca"], "orca_too_many_cores.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MDCI', 'cpu']) - expected_error_msg = 'Orca cannot utilize cpu cores more than electron pairs in a molecule. ' \ - 'The maximum number of cpu cores can be used for this job is 10.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MDCI", "cpu"]) + expected_error_msg = ( + "Orca cannot utilize cpu cores more than electron pairs in a molecule. " + "The maximum number of cpu cores can be used for this job is 10." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('parallel calculation exceeds', line) + self.assertIn("parallel calculation exceeds", line) # test detection of generic GTOInt failure - path = os.path.join(self.base_path['orca'], 'orca_GTOInt_error.log') + path = os.path.join(self.base_path["orca"], "orca_GTOInt_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['GTOInt', 'Memory']) - expected_error_msg = 'GTOInt error in Orca. Assuming memory allocation error.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["GTOInt", "Memory"]) + expected_error_msg = "GTOInt error in Orca. Assuming memory allocation error." self.assertEqual(error, expected_error_msg) - self.assertIn('ORCA finished by error termination in GTOInt', line) + self.assertIn("ORCA finished by error termination in GTOInt", line) # test detection of generic MDCI failure - path = os.path.join(self.base_path['orca'], 'orca_mdci_error.log') + path = os.path.join(self.base_path["orca"], "orca_mdci_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MDCI', 'Memory']) - expected_error_msg = 'MDCI error in Orca. Assuming memory allocation error.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MDCI", "Memory"]) + expected_error_msg = "MDCI error in Orca. Assuming memory allocation error." self.assertEqual(error, expected_error_msg) - self.assertIn('ORCA finished by error termination in MDCI', line) + self.assertIn("ORCA finished by error termination in MDCI", line) # test detection of generic MDCI failure in Orca version 4.2.x log files - path = os.path.join(self.base_path['orca'], 'orca_mdci_error_2.log') + path = os.path.join(self.base_path["orca"], "orca_mdci_error_2.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MDCI', 'Memory']) - expected_error_msg = 'MDCI error in Orca. Assuming memory allocation error.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MDCI", "Memory"]) + expected_error_msg = "MDCI error in Orca. Assuming memory allocation error." self.assertEqual(error, expected_error_msg) - self.assertIn('ORCA finished by error termination in MDCI', line) + self.assertIn("ORCA finished by error termination in MDCI", line) # test detection of MDCI failure in Orca version 4.1.x log files (no memory/cpu suggestions compared to 4.2.x) - path = os.path.join(self.base_path['orca'], 'orca_mdci_error_3.log') + path = os.path.join(self.base_path["orca"], "orca_mdci_error_3.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['MDCI', 'cpu']) - expected_error_msg = 'Orca cannot utilize cpu cores more than electron pairs in a molecule. ARC will ' \ - 'estimate the number of cpu cores needed based on the number of heavy atoms in the ' \ - 'molecule.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["MDCI", "cpu"]) + expected_error_msg = ( + "Orca cannot utilize cpu cores more than electron pairs in a molecule. ARC will " + "estimate the number of cpu cores needed based on the number of heavy atoms in the " + "molecule." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('Number of processes in parallel calculation exceeds number of pairs', line) + self.assertIn( + "Number of processes in parallel calculation exceeds number of pairs", line + ) # test detection of multiplicty and charge combination error - path = os.path.join(self.base_path['orca'], 'orca_multiplicity_error.log') + path = os.path.join(self.base_path["orca"], "orca_multiplicity_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Input']) - expected_error_msg = 'The multiplicity and charge combination for species test are wrong.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Input"]) + expected_error_msg = ( + "The multiplicity and charge combination for species test are wrong." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('Error : multiplicity', line) + self.assertIn("Error : multiplicity", line) # test detection of input keyword error - path = os.path.join(self.base_path['orca'], 'orca_input_error.log') + path = os.path.join(self.base_path["orca"], "orca_input_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Syntax']) - expected_error_msg = 'There was keyword syntax error in the Orca input file. In particular, keywords ' \ - 'XTB1 can either be duplicated or illegal. Please check your Orca ' \ - 'input file template under arc/job/inputs.py. Alternatively, perhaps the level of ' \ - 'theory or the job option is not supported by Orca in the format it was given.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Syntax"]) + expected_error_msg = ( + "There was keyword syntax error in the Orca input file. In particular, keywords " + "XTB1 can either be duplicated or illegal. Please check your Orca " + "input file template under arc/job/inputs.py. Alternatively, perhaps the level of " + "theory or the job option is not supported by Orca in the format it was given." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('XTB1', line) + self.assertIn("XTB1", line) # test detection of basis set error (e.g., input contains elements not supported by specified basis) - path = os.path.join(self.base_path['orca'], 'orca_basis_error.log') + path = os.path.join(self.base_path["orca"], "orca_basis_error.log") status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Basis']) - expected_error_msg = 'There was a basis set error in the Orca input file. In particular, basis for atom type ' \ - 'Br is missing. Please check if specified basis set supports this atom.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Basis"]) + expected_error_msg = ( + "There was a basis set error in the Orca input file. In particular, basis for atom type " + "Br is missing. Please check if specified basis set supports this atom." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('There are no CABS', line) + self.assertIn("There are no CABS", line) # test detection of wavefunction convergence failure - path = os.path.join(self.base_path['orca'], 'orca_wavefunction_not_converge_error.log') + path = os.path.join( + self.base_path["orca"], "orca_wavefunction_not_converge_error.log" + ) status, keywords, error, line = trsh.determine_ess_status( - output_path=path, species_label='test', job_type='sp', software='orca') - self.assertEqual(status, 'errored') - self.assertEqual(keywords, ['Convergence']) - expected_error_msg = 'Specified wavefunction method is not converged. Please restart calculation with larger ' \ - 'max iterations or with different convergence flags.' + output_path=path, species_label="test", job_type="sp", software="orca" + ) + self.assertEqual(status, "errored") + self.assertEqual(keywords, ["Convergence"]) + expected_error_msg = ( + "Specified wavefunction method is not converged. Please restart calculation with larger " + "max iterations or with different convergence flags." + ) self.assertEqual(error, expected_error_msg) - self.assertIn('This wavefunction IS NOT FULLY CONVERGED!', line) + self.assertIn("This wavefunction IS NOT FULLY CONVERGED!", line) def test_trsh_ess_job(self): """Test the trsh_ess_job() function""" @@ -343,12 +402,15 @@ def test_trsh_ess_job(self): job_type, software, fine, memory_gb, num_heavy_atoms, cpu_cores, ess_trsh_methods) self.assertTrue(couldnt_trsh) - self.assertIn("Error: Could not troubleshoot opt for ethanol! Tried troubleshooting with the following methods: ['scf=(NoDIIS)', 'int=(Acc2E=14)', 'checkfile=None', 'scf=(qc)', 'NoSymm', 'scf=(NDamp=30)', 'guess=INDO']; ", output_errors) + self.assertIn( + "Error: Could not troubleshoot opt for ethanol! Tried troubleshooting with the following methods: ['scf=(NoDIIS)', 'int=(Acc2E=14)', 'checkfile=None', 'scf=(qc)', 'NoSymm', 'scf=(NDamp=30)', 'guess=INDO']; ", + output_errors, + ) # Test Q-Chem - software = 'qchem' - ess_trsh_methods = ['change_node'] - job_status = {'keywords': ['MaxOptCycles', 'Unconverged']} + software = "qchem" + ess_trsh_methods = ["change_node"] + job_status = {"keywords": ["MaxOptCycles", "Unconverged"]} # Q-Chem: test 1 output_errors, ess_trsh_methods, remove_checkfile, level_of_theory, software, job_type, fine, trsh_keyword, \ memory, shift, cpu_cores, couldnt_trsh = trsh.trsh_ess_job(label, level_of_theory, server, job_status, @@ -365,16 +427,18 @@ def test_trsh_ess_job(self): self.assertIn('DIIS_GDM', ess_trsh_methods) # Test Molpro - software = 'molpro' + software = "molpro" # Molpro: test - path = os.path.join(self.base_path['molpro'], 'insufficient_memory.out') - label = 'TS' - level_of_theory = {'method': 'mrci', 'basis': 'aug-cc-pV(T+d)Z'} - server = 'server1' - status, keywords, error, line = trsh.determine_ess_status(output_path=path, species_label='TS', job_type='sp') - job_status = {'keywords': keywords, 'error': error} - job_type = 'sp' + path = os.path.join(self.base_path["molpro"], "insufficient_memory.out") + label = "TS" + level_of_theory = {"method": "mrci", "basis": "aug-cc-pV(T+d)Z"} + server = "server1" + status, keywords, error, line = trsh.determine_ess_status( + output_path=path, species_label="TS", job_type="sp" + ) + job_status = {"keywords": keywords, "error": error} + job_type = "sp" fine = True memory_gb = 32.0 ess_trsh_methods = ['change_node'] @@ -394,7 +458,7 @@ def test_trsh_ess_job(self): num_heavy_atoms, cpu_cores, ess_trsh_methods) self.assertIn('memory', ess_trsh_methods) self.assertEqual(memory, 96.0) - + # Molpro: Insuffienct Memory 3 Test path = os.path.join(self.base_path['molpro'], 'insufficient_memory_3.out') status, keywords, error, line = trsh.determine_ess_status(output_path=path, @@ -634,20 +698,108 @@ def test_scan_quality_check(self): (2.461024, -0.893605, -0.429469), (-3.255509, -1.417186, -0.119474))} self.assertEqual(actions, {'change conformer': xyz}) + self.assertEqual(actions, {"change conformer": xyz}) def test_trsh_scan_job(self): """Test troubleshooting problematic 1D rotor scan""" - case = {'label': 'CH2OOH', - 'scan_res': 4.0, - 'scan': [4, 1, 2, 3], - 'scan_list': [[4, 1, 2, 3], [1, 2, 3, 6]], - 'methods': {'freeze': [[5, 1, 2, 3], [2, 1, 4, 5]]}, - 'log_file': os.path.join(ARC_PATH, 'arc', 'testing', 'rotor_scans', 'CH2OOH.out'), - } + case = { + "label": "CH2OOH", + "scan_res": 4.0, + "scan": [4, 1, 2, 3], + "scan_list": [[4, 1, 2, 3], [1, 2, 3, 6]], + "methods": {"freeze": [[5, 1, 2, 3], [2, 1, 4, 5]]}, + "log_file": os.path.join( + ARC_PATH, "arc", "testing", "rotor_scans", "CH2OOH.out" + ), + } scan_trsh, scan_res = trsh.trsh_scan_job(**case) - self.assertEqual(scan_trsh, 'D 5 4 1 2 F\nD 1 2 3 6 F\nB 2 3 F\n') + self.assertEqual(scan_trsh, "D 5 4 1 2 F\nD 1 2 3 6 F\nB 2 3 F\n") self.assertEqual(scan_res, 4.0) - -if __name__ == '__main__': + @patch( + "arc.job.trsh.servers", + { + "test_server": { + "cluster_soft": "PBS", + "un": "test_user", + "queues": {"short_queue": "24:00:0","middle_queue": "48:00:00", "long_queue": "3600:00:00"}, + } + }, + ) + @patch( + "arc.job.trsh.execute_command" + ) + def test_user_queue_setting_trsh(self, mock_execute_command): + """ Test the trsh_job_queue function with user specified queue """ + # Mocking the groups and qstat command outputs + mock_execute_command.side_effect = [ + (["users group1"], []), # Simulates 'groups' command output + ( + ["Queue Memory CPU Time Walltime Node Run Que Lm State"], + [], + ), # Simulates 'qstat' command output + ] + + # Call the trsh_job_queue function with test data + result, success = trsh.trsh_job_queue("test_server", "test_job", 24) + + # Assertions + self.assertIn("short_queue", result) + self.assertIn("long_queue", result) + self.assertTrue(success) + + # Now put in 'short_queue' in attempted_queues + result, success = trsh.trsh_job_queue( + "test_server", "test_job", 24, attempted_queues=["short_queue"] + ) + + # Assertions + self.assertNotIn("short_queue", result) + self.assertIn("long_queue", result) + self.assertTrue(success) + + # Now put in 'long_queue' in attempted_queues + result, success = trsh.trsh_job_queue( + "test_server", "test_job", 24, attempted_queues=["long_queue"] + ) + + # Assertions + self.assertIn("short_queue", result) + self.assertNotIn("long_queue", result) + self.assertTrue(success) + + @patch('arc.job.trsh.servers', { + 'test_server': { + 'cluster_soft': 'PBS', + 'un': 'test_user', + 'queue': {}, + } + }) + @patch('arc.job.trsh.execute_command') + def test_query_pbs_trsh_job_queue(self, mock_execute_command): + """ Test the trsh_job_queue function with PBS queue """ + # Setting up the mock responses for execute_command + mock_execute_command.side_effect = [ + (["users group1"], []), + (["Queue Memory CPU Time Walltime Node Run Que Lm State", + "---------------- ------ -------- -------- ---- ----- ----- ---- -----", + "workq -- -- -- -- 0 0 -- D S", + "maytal_q -- -- -- -- 7 0 -- E R", + # ... add other queue lines as needed + ], []), # Simulates 'qstat -q' command output + # Simulate 'qstat -Qf {queue_name}' for each queue + (["Queue: maytal_q", "other info", "resources_default.walltime = 48:00:00", "acl_groups = group1"], []), # For maytal_q + ] + + # Call the trsh_job_queue function with test data + result, success = trsh.trsh_job_queue("test_server", "test_job", 24, attempted_queues=None) + + # Assertions to verify function behavior + self.assertIsNotNone(result) + self.assertIn('maytal_q', result.keys()) + self.assertIn('48:00:00', result.values()) + self.assertTrue(success) + + +if __name__ == "__main__": unittest.main(testRunner=unittest.TextTestRunner(verbosity=2)) From 523bd6185487af253d8dd9e764578f73f84fb24e Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:41:12 +0200 Subject: [PATCH 7/8] ARC Scheduler Update - ServerTimeLimit ARC will recognise if the walltime was exceeded --- arc/scheduler.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/arc/scheduler.py b/arc/scheduler.py index de71bd95c1..1d1ec71868 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -751,6 +751,8 @@ def run_job(self, max_job_time: Optional[int] = None, rotor_index: Optional[int] = None, reactions: Optional[List['ARCReaction']] = None, + queue: Optional[str] = None, + attempted_queues: Optional[list] = None, scan_trsh: Optional[str] = '', shift: Optional[str] = '', trsh: Optional[Union[str, dict, list]] = None, @@ -848,6 +850,8 @@ def run_job(self, reactions=[reactions] if reactions is not None and not isinstance(reactions, list) else reactions, rotor_index=rotor_index, server_nodes=None, + queue = queue if queue is not None else None, + attempted_queues=attempted_queues if attempted_queues is not None else list(), species=[species] if species is not None and not isinstance(species, list) else species, times_rerun=times_rerun, torsions=torsions, @@ -963,6 +967,16 @@ def end_job(self, job: 'JobAdapter', f'Was {original_mem} GB, rerunning job with {job.job_memory_gb} GB.') job.job_memory_gb = used_mem * 4.5 if used_mem is not None else job.job_memory_gb * 0.5 self._run_a_job(job=job, label=label) + if job.job_status[1]['status'] == 'errored' and job.job_status[1]['keywords'] == ['ServerTimeLimit']: + logger.warning(f'Job {job.job_name} errored because of a server time limit. ' + f'Rerunning job with {job.max_job_time * 2} hours.') + job.max_job_time *= 2 + run_again = job.troubleshoot_queue() + if run_again: + self._run_a_job(job=job, label=label) + if job_name in self.running_jobs[label]: + self.running_jobs[label].pop(self.running_jobs[label].index(job_name)) + return False if not os.path.isfile(job.local_path_to_output_file) and not job.execution_type == 'incore': job.rename_output_file() @@ -1037,6 +1051,8 @@ def _run_a_job(self, max_job_time=job.max_job_time, rotor_index=job.rotor_index, reactions=job.reactions, + queue=job.queue if job.queue is not None else None, + attempted_queues=job.attempted_queues if job.attempted_queues is not None else list(), trsh=job.args['trsh'] if 'trsh' in job.args else {}, torsions=job.torsions, times_rerun=job.times_rerun + int(rerun), From bb545c57621f1d85004bb78063abb873a6d69ff3 Mon Sep 17 00:00:00 2001 From: Calvin Date: Mon, 12 Feb 2024 18:41:44 +0200 Subject: [PATCH 8/8] ServerTimeLimit Files for Tests --- .../calcs/Species/spc1/spc1/err.txt | 17 +++++++++ .../calcs/Species/spc1/spc1/input.gjf | 12 ++++++ .../calcs/Species/spc1/spc1/submit.sub | 37 +++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/err.txt create mode 100644 arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/input.gjf create mode 100644 arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/submit.sub diff --git a/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/err.txt b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/err.txt new file mode 100644 index 0000000000..17a55b3536 --- /dev/null +++ b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/err.txt @@ -0,0 +1,17 @@ +=>> PBS: job killed: walltime 86415 exceeded limit 86400 +Error: software termination + rax fffffffffffffffc, rbx 00007ffc0d4f90d0, rcx ffffffffffffffff + rdx 0000000000000000, rsp 00007ffc0d4f9098, rbp 0000000000000001 + rsi 00007ffc0d4f90d0, rdi 0000000000038f1b, r8 00002b7af22a5700 + r9 0000000000000000, r10 0000000000000000, r11 0000000000000246 + r12 00007ffc0d4f90f0, r13 000000000000008f, r14 0000000000000000 + r15 00007ffc0d4fff40 +Error: software termination + rax 0000000000024fa8, rbx 00002ae812e9f2c0, rcx 0000000000035498 + rdx 00002ae8c4888bd0, rsp 00007ffde70fb680, rbp 00007ffde70fbf70 + rsi 00002ae8c48be068, rdi 00002ae8c48f3508, r8 00002ae8c49289b0 + r9 0000000000006a93, r10 0000000000006a95, r11 00002ae812ed4768 + r12 00002ae812f66508, r13 00002ae812f9b9b0, r14 0000000000006a92 + r15 00002ae81311f478 + --- traceback not available + --- traceback not available diff --git a/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/input.gjf b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/input.gjf new file mode 100644 index 0000000000..36f9d855ac --- /dev/null +++ b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/input.gjf @@ -0,0 +1,12 @@ +%chk=check.chk +%mem=14336mb +%NProcShared=8 + +#P opt=(calcfc) cbs-qb3 IOp(2/9=2000) + +spc1 + +0 3 +O 0.00000000 0.00000000 1.00000000 + + diff --git a/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/submit.sub b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/submit.sub new file mode 100644 index 0000000000..00b840cd67 --- /dev/null +++ b/arc/testing/test_JobAdapter_ServerTimeLimit/calcs/Species/spc1/spc1/submit.sub @@ -0,0 +1,37 @@ +#!/bin/bash -l +#SBATCH -p normal +#SBATCH -J server1 +#SBATCH -N 1 +#SBATCH -n 8 +#SBATCH --time=120:00:00 +#SBATCH --mem-per-cpu=15770 +#SBATCH -o out.txt +#SBATCH -e err.txt + +export g16root=/home/gridsan/groups/GRPAPI/Software +export PATH=$g16root/g16/:$g16root/gv:$PATH +which g16 + +echo "============================================================" +echo "Job ID : $SLURM_JOB_ID" +echo "Job Name : $SLURM_JOB_NAME" +echo "Starting on : $(date)" +echo "Running on node : $SLURMD_NODENAME" +echo "Current directory : $(pwd)" +echo "============================================================" + +touch initial_time + +GAUSS_SCRDIR=/state/partition1/user//$SLURM_JOB_NAME-$SLURM_JOB_ID +export $GAUSS_SCRDIR +. $g16root/g16/bsd/g16.profile + +mkdir -p $GAUSS_SCRDIR + +g16 < input.gjf > input.log + +rm -rf $GAUSS_SCRDIR + +touch final_time + + \ No newline at end of file