Skip to content

Commit

Permalink
Queue change - PBS (ReactionMechanismGenerator#719)
Browse files Browse the repository at this point in the history
A new troubleshoot function that will recognise, on PBS, if the job
failed due to exceeding the walltime. If this is the case, it will then
first check to see if the user has provided multiple queue names and
times to use. If so, it will then check if the provided times of these
queues are sufficient. If not, it will then query PBS to determine if
there are queues available that provide the user sufficient time
  • Loading branch information
alongd authored Feb 14, 2024
2 parents ca95070 + bb545c5 commit 1f561ed
Show file tree
Hide file tree
Showing 31 changed files with 1,033 additions and 206 deletions.
12 changes: 12 additions & 0 deletions arc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1705,3 +1705,15 @@ def is_xyz_mol_match(mol: 'Molecule',
if element not in element_dict_xyz or element_dict_xyz[element] != count:
return False
return True

def convert_to_hours(time_str:str) -> float:
"""Convert walltime string in format HH:MM:SS to hours.
Args:
time_str (str): A time string in format HH:MM:SS
Returns:
float: The time in hours
"""
h, m, s = map(int, time_str.split(':'))
return h + m / 60 + s / 3600
10 changes: 10 additions & 0 deletions arc/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,16 @@ def test_is_xyz_mol_match(self):
'symbols': ('C', 'C', 'C', 'C', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H', 'H')}
self.assertFalse(common.is_xyz_mol_match(mol1, xyz2))
self.assertTrue(common.is_xyz_mol_match(mol2, xyz2))

def test_convert_to_hours(self):
"""Test the convert_to_hours() function"""
time_str = '0:00:00'
self.assertEqual(common.convert_to_hours(time_str), 0.0)
time_str = '3600:00:00'
self.assertEqual(common.convert_to_hours(time_str), 3600.0)
time_str = '190:40:10'
self.assertAlmostEqual(common.convert_to_hours(time_str), 190.66944444444442)


@classmethod
def tearDownClass(cls):
Expand Down
85 changes: 69 additions & 16 deletions arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import numpy as np
import pandas as pd

from arc.common import ARC_PATH, get_logger, read_yaml_file, save_yaml_file, torsions_to_scans
from arc.common import ARC_PATH, get_logger, read_yaml_file, save_yaml_file, torsions_to_scans, convert_to_hours
from arc.exceptions import JobError
from arc.imports import local_arc_path, pipe_submit, settings, submit_scripts
from arc.job.local import (change_mode,
Expand All @@ -33,7 +33,7 @@
rename_output,
submit_job,
)
from arc.job.trsh import trsh_job_on_server
from arc.job.trsh import trsh_job_on_server, trsh_job_queue
from arc.job.ssh import SSHClient
from arc.job.trsh import determine_ess_status
from arc.species.converter import xyz_to_str
Expand Down Expand Up @@ -493,21 +493,36 @@ def write_submit_script(self) -> None:
# If your server has different node architectures, implement something similar here.
architecture = '\n#$ -l harpertown' if self.cpu_cores <= 8 else '\n#$ -l magnycours'

# Extract the 'queue' dictionary from servers[self.server], defaulting to an empty dictionary if 'queue' is not a key
default_queue, _ = next(iter(servers[self.server].get('queues', {}).items()), (None, None))
if default_queue and default_queue not in self.attempted_queues:
self.attempted_queues.append(default_queue)

submit_script = submit_scripts[self.server][self.job_adapter] if self.workers is None \
else pipe_submit[self.server]

queue = self.queue if self.queue is not None else default_queue

format_params = {
"name": self.job_server_name,
"un": servers[self.server]['un'],
"queue": queue,
"t_max": self.format_max_job_time(time_format=t_max_format[servers[self.server]['cluster_soft']]),
"memory": int(self.submit_script_memory) if isinstance(self.submit_script_memory, (int, float)) else self.submit_script_memory,
"cpus": self.cpu_cores,
"architecture": architecture,
"max_task_num": self.workers,
"arc_path": ARC_PATH,
"hdf5_path": os.path.join(self.remote_path, 'data.hdf5'),
"pwd": self.remote_path if self.server.lower() != 'local' else self.local_path,
}

if queue is None:
logger.warning(f'Queue not defined for server {self.server}. Assuming the queue name is defined in your submit.py script.')
del format_params['queue']

try:
submit_script = submit_script.format(
name=self.job_server_name,
un=servers[self.server]['un'],
t_max=self.format_max_job_time(time_format=t_max_format[servers[self.server]['cluster_soft']]),
memory=int(self.submit_script_memory) if (isinstance(self.submit_script_memory, int) or isinstance(self.submit_script_memory, float)) else self.submit_script_memory,
cpus=self.cpu_cores,
architecture=architecture,
max_task_num=self.workers,
arc_path=ARC_PATH,
hdf5_path=os.path.join(self.remote_path, 'data.hdf5'),
pwd=self.remote_path if self.server.lower() != 'local' else self.local_path,
)
submit_script = submit_script.format(**format_params)
except KeyError:
if self.workers is None:
submit_scripts_for_printing = {server: [software for software in values.keys()]
Expand Down Expand Up @@ -872,6 +887,7 @@ def determine_job_status(self):
"""
Determine the Job's status. Updates self.job_status.
"""
cluster_soft = servers[self.server]['cluster_soft'].lower()
if self.job_status[0] == 'errored':
return
self.job_status[0] = self._check_job_server_status() if self.execution_type != 'incore' else 'done'
Expand All @@ -898,6 +914,20 @@ def determine_job_status(self):
'time limit.'
self.job_status[1]['line'] = line
break
elif 'job killed' in line and 'exceeded limit' in line and cluster_soft == 'pbs':
# =>> PBS: job killed: walltime 10837 exceeded limit 10800
logger.warning(f'Looks like the job was killed on {self.server} due to time limit. '
f'Got: {line}')
time_limit = int(line.split('limit')[1].split()[0])/3600
new_max_job_time = self.max_job_time - time_limit if self.max_job_time > time_limit else 1
logger.warning(f'Setting max job time to {new_max_job_time} (was {time_limit})')
self.max_job_time = new_max_job_time
self.job_status[1]['status'] = 'errored'
self.job_status[1]['keywords'] = ['ServerTimeLimit']
self.job_status[1]['error'] = 'Job killed by the server since it reached the maximal ' \
'time limit.'
self.job_status[1]['line'] = line
break
elif self.job_status[0] == 'running':
self.job_status[1]['status'] = 'running'

Expand All @@ -913,7 +943,7 @@ def _get_additional_job_info(self):
local_file_path_1 = os.path.join(self.local_path, 'out.txt')
local_file_path_2 = os.path.join(self.local_path, 'err.txt')
local_file_path_3 = os.path.join(self.local_path, 'job.log')
if self.server != 'local' and self.remote_path is not None:
if self.server != 'local' and self.remote_path is not None and not self.testing:
remote_file_path_1 = os.path.join(self.remote_path, 'out.txt')
remote_file_path_2 = os.path.join(self.remote_path, 'err.txt')
remote_file_path_3 = os.path.join(self.remote_path, 'job.log')
Expand Down Expand Up @@ -948,7 +978,7 @@ def _check_job_server_status(self) -> str:
"""
Possible statuses: ``initializing``, ``running``, ``errored on node xx``, ``done``.
"""
if self.server != 'local':
if self.server != 'local' and not self.testing:
with SSHClient(self.server) as ssh:
return ssh.check_job_status(self.job_id)
else:
Expand Down Expand Up @@ -1317,6 +1347,29 @@ def troubleshoot_server(self):
# resubmit job
self.execute()

def troubleshoot_queue(self) -> bool:
"""Troubleshoot queue errors.
Returns:
Boolean: Whether to run the job again.
"""
queues, run_job = trsh_job_queue(job_name=self.job_name,
server=self.server,
max_time=24,
attempted_queues = self.attempted_queues,
)

if queues is not None:
# We use self.max_job_time to determine which queues to troubleshoot.
#filtered_queues = {queue_name: walltime for queue_name, walltime in queues.items() if convert_to_hours(walltime) >= self.max_job_time}

sorted_queues = sorted(queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=False)

self.queue = sorted_queues[0][0]
if self.queue not in self.attempted_queues:
self.attempted_queues.append(self.queue)
return run_job

def save_output_file(self,
key: Optional[str] = None,
val: Optional[Union[float, dict, np.ndarray]] = None,
Expand Down
56 changes: 56 additions & 0 deletions arc/job/adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import time
import shutil
import unittest
from unittest.mock import patch

import pandas as pd

Expand Down Expand Up @@ -193,6 +194,33 @@ def setUpClass(cls):
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
testing=True,
)
cls.job_5 = GaussianAdapter(execution_type='queue',
job_name='spc1',
job_type='opt',
job_id='123456',
job_num=101,
job_server_name = 'server3',
level=Level(method='cbs-qb3'),
project='test',
project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'),
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
server='server3',
testing=True,
)
cls.job_6 = GaussianAdapter(execution_type='queue',
job_name='spc1',
job_type='opt',
job_id='123456',
job_num=101,
job_server_name = 'server1',
level=Level(method='cbs-qb3'),
project='test',
project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'),
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
testing=True,
queue='short_queue',
attempted_queues=['short_queue']
)

def test_determine_job_array_parameters(self):
"""Test determining job array parameters"""
Expand Down Expand Up @@ -404,6 +432,34 @@ def test_get_file_property_dictionary(self):
'source': 'input_files',
'make_x': True})

def test_determine_job_status(self):
"""Test determining the job status"""
self.job_5.determine_job_status()
self.assertEqual(self.job_5.job_status[0], 'done')
self.assertEqual(self.job_5.job_status[1]['status'], 'errored')
self.assertEqual(self.job_5.job_status[1]['keywords'], ['ServerTimeLimit'])

@patch(
"arc.job.trsh.servers",
{
"local": {
"cluster_soft": "PBS",
"un": "test_user",
"queues": {"short_queue": "24:00:0","middle_queue": "48:00:00", "long_queue": "3600:00:00"},
}
},
)
def test_troubleshoot_queue(self):
"""Test troubleshooting a queue job"""
self.job_6.troubleshoot_queue()
self.assertEqual(self.job_6.queue, 'middle_queue')
# Assert that 'middle_queue' and 'short_queue' were attempted
# We do not do assert equal because a user may have different queues from the settings.py originally during cls
self.assertIn('short_queue', self.job_6.attempted_queues)
self.assertIn('middle_queue', self.job_6.attempted_queues)



@classmethod
def tearDownClass(cls):
"""
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/cfour.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -177,6 +179,8 @@ def __init__(self,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
Expand Down
6 changes: 5 additions & 1 deletion arc/job/adapters/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def _initialize_adapter(obj: 'JobAdapter',
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -173,6 +175,8 @@ def _initialize_adapter(obj: 'JobAdapter',
obj.rotor_index = rotor_index
obj.run_time = None
obj.server = server
obj.queue = queue
obj.attempted_queues = attempted_queues or list()
obj.server_nodes = server_nodes or list()
obj.species = [species] if species is not None and not isinstance(species, list) else species
obj.submit_script_memory = None
Expand Down Expand Up @@ -225,7 +229,7 @@ def _initialize_adapter(obj: 'JobAdapter',
obj.species_label = None

obj.args = set_job_args(args=obj.args, level=obj.level, job_name=obj.job_name)
if obj.execution_type != 'incore' and obj.job_adapter in obj.ess_settings.keys():
if obj.execution_type != 'incore' and obj.job_adapter in obj.ess_settings.keys() and obj.server is None:
if 'server' in obj.args['trsh']:
obj.server = obj.args['trsh']['server']
elif obj.job_adapter in obj.ess_settings.keys():
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/gaussian.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -191,6 +193,8 @@ def __init__(self,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/molpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -190,6 +192,8 @@ def __init__(self,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/obabel.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -162,6 +164,8 @@ def __init__(self,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/orca.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -195,6 +197,8 @@ def __init__(self,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
Expand Down
4 changes: 4 additions & 0 deletions arc/job/adapters/psi_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def __init__(self,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
Expand Down Expand Up @@ -242,6 +244,8 @@ def __init__(self,
self.rotor_index = rotor_index
self.server = server
self.server_nodes = server_nodes or list()
self.queue = queue
self.attempted_queues = attempted_queues or list()
self.species = [species] if species is not None and not isinstance(species, list) else species
self.testing = testing
self.torsions = [torsions] if torsions is not None and not isinstance(torsions[0], list) else torsions
Expand Down
Loading

0 comments on commit 1f561ed

Please sign in to comment.