Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better job name for qstat monitoring #73

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bin/submit_to_cluster
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ if args.project is None:
try:
proj_name = environ['MINDLABPROJ']
if proj_name == 'NA':
raise KeyError('Force an error')
raise KeyError('MINDLABPROJ not defined')
except KeyError:
msg = ('You must specify a project name either by means of the '
'--project flag or by setting the MINDLABPROJ environment '
Expand All @@ -40,8 +40,8 @@ else:
# basic check of the memory-argument
if args.total_memory is not None:
if (not args.total_memory.startswith(
tuple('%d' % ii for ii in range(1, 10))) or
not args.total_memory.upper().endswith(('M', 'G', 'T'))):
tuple('%d' % ii for ii in range(1, 10))) or not
args.total_memory.upper().endswith(('M', 'G', 'T'))):
raise ValueError('Bad memory format: {:s}'.format(args.total_memory))

job = ClusterJob(args.exec_cmd, proj_name=proj_name, n_threads=args.n_threads,
Expand Down
2 changes: 1 addition & 1 deletion stormdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
STORM database access and cluster processing tools.
"""

__version__ = '0.7.dev0'
__version__ = '0.8'
30 changes: 15 additions & 15 deletions stormdb/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def _get_login_code(self):
try:
with open(os.path.expanduser(self._stormdblogin), 'r') as fid:
if self._verbose:
print('Reading login credentials from ' +
self._stormdblogin)
print('Reading login credentials from {:s}'.format(
self._stormdblogin))
self._login_code = fid.readline()
except IOError:
print('Login credentials not found, please enter them here')
Expand Down Expand Up @@ -193,7 +193,7 @@ def _send_request(self, url, verbose=None):

try:
req = requests.get(full_url)
except:
except ConnectionError:
print('hyades00 is not responding, it may be down.')
print('Contact a system administrator for confirmation.')
raise
Expand Down Expand Up @@ -240,12 +240,12 @@ def get_subjects(self,
raise ValueError(
'You can only specify a modality OR a series, not both.')
type_err = '{} must be a string, not {}.'
if (has_modality is not None
and not isinstance(has_modality, string_types)):
if (has_modality is not None and not
isinstance(has_modality, string_types)):
raise ValueError(
type_err.format('has_modality', type(has_modality)))
if (has_series is not None
and not isinstance(has_series, string_types)):
if (has_series is not None and not
isinstance(has_series, string_types)):
raise ValueError(type_err.format('has_series', type(has_series)))

# using 'subjecs' here would return only numeric ID, not code
Expand Down Expand Up @@ -278,8 +278,8 @@ def get_subjects(self,
subj_list = [
ser['subjectcode'] for ser in all_series
if ser['subjectcode'] not in used and (
used.append(ser['subjectcode']) or True)
and ser['subjectcode'] in subj_list
used.append(ser['subjectcode']) or True) and (
ser['subjectcode'] in subj_list)
]
# The following also works, but is slower?
# pop_inds = []
Expand All @@ -297,8 +297,8 @@ def get_subjects(self,
subj_list = [
ser['subjectcode'] for ser in all_series
if ser['subjectcode'] not in used and (
used.append(ser['subjectcode']) or True)
and ser['subjectcode'] in subj_list
used.append(ser['subjectcode']) or True) and (
ser['subjectcode'] in subj_list)
]

return (subj_list)
Expand Down Expand Up @@ -618,7 +618,7 @@ def filter_series(self,
format(study_metas['name'],
study_metas['comparison'],
study_metas['value'])
except:
except KeyError:
print('Problem with study_metas:')
print(study_metas)
raise
Expand Down Expand Up @@ -653,7 +653,7 @@ def filter_series(self,
key_val_pair[1].sort(key=lambda x: os.path.splitext(x)[0])

elif 'path' in key_val_pair[0]:
m = re.search('\d{3}\.(.+?)/files', key_val_pair[1])
m = re.search(r'\d{3}\.(.+?)/files', key_val_pair[1])
info.append(['seriename', m.group(1)])
info.append(key_val_pair)
info_dict = {key: value for (key, value) in info}
Expand All @@ -664,8 +664,8 @@ def filter_series(self,
study_date_range = [study_date_range, study_date_range]
info_dict_list = [
s for s in info_dict_list
if s['study'][:8] >= study_date_range[0]
and s['study'][:8] <= study_date_range[1]
if s['study'][:8] >= study_date_range[0] and (
s['study'][:8] <= study_date_range[1])
]
return (info_dict_list)

Expand Down
38 changes: 20 additions & 18 deletions stormdb/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import re
import math
from six import string_types
from os.path import expanduser
from os.path import expanduser, basename
from .access import Query
from .base import enforce_path_exists

Expand Down Expand Up @@ -103,19 +103,19 @@ def get_memlimit_per_process(self, queue):
if queue not in self.queues:
raise ValueError('Unknown queue: {:s}'.format(queue))

lim = self._query('qconf -sq ' + queue +
'| grep h_vmem | awk {\'print $2\'}')[0]
lim = self._query('qconf -sq ' + queue + (
'| grep h_vmem | awk {\'print $2\'}'))[0]

_, lim_int, lim_units = re.split('(\d+)', lim)
_, lim_int, lim_units = re.split(r'(\d+)', lim)
assert isinstance(int(lim_int), int)
assert isinstance(lim_units, string_types)

return (lim)

def _check_parallel_env(self, queue, pe_name):
"""Check that a PE is in the pe_list for a given queue"""
pes = self._query('qconf -sq ' + queue +
'| grep pe_list')[0] # just one line
pes = self._query('qconf -sq ' + queue + (
'| grep pe_list'))[0] # just one line
pe_list = pes.split()[1:]
if pe_name not in pe_list:
raise ValueError('Queue \'{0}\' does not support the \'{1}\' '
Expand Down Expand Up @@ -238,14 +238,14 @@ def __init__(self,
'Maximum number of parallel threads is one (1) when total '
'memory consumption is specified.')
# XXX would be nice with some sanity checking here...
_, totmem, totmem_unit = re.split('(\d+)', self.total_memory)
_, memlim, memlim_unit = re.split('(\d+)', h_vmem)
_, totmem, totmem_unit = re.split(r'(\d+)', self.total_memory)
_, memlim, memlim_unit = re.split(r'(\d+)', h_vmem)

if totmem_unit != memlim_unit:
units = dict(k=1e3, m=1e6, g=1e9, t=1e12)
try:
ratio = units[totmem_unit.lower()] /\
units[memlim_unit.lower()]
units[memlim_unit.lower()]
except KeyError:
raise ValueError('Something is wrong with the memory units'
', likely {:s}'.format(self.total_memory))
Expand All @@ -260,7 +260,9 @@ def __init__(self,
opt_threaded_flag = "#$ -pe threaded {:d}".format(self.n_threads)

if job_name is None:
job_name = 'py-wrapper'
# self._cmd may have multiple \n-separated commands: take 1st
# basename followed by 0 or more spaces with arguments (ditch)
job_name = basename(self._cmd.split('\n')[0]).split(' ')[0]
log_name_prefix = job_name

if working_dir is not None and isinstance(working_dir, string_types):
Expand Down Expand Up @@ -308,9 +310,9 @@ def _initialise_cmd(self, value):
def _create_qsub_script(self, job_name, cwd_flag, opt_threaded_flag,
opt_h_vmem_flag, log_name_prefix):
"""All variables should be defined"""
if (self.cmd is None or self.queue is None or job_name is None
or cwd_flag is None or opt_threaded_flag is None
or opt_h_vmem_flag is None):
if (self.cmd is None or self.queue is None or job_name is None or
cwd_flag is None or opt_threaded_flag is None or
opt_h_vmem_flag is None):
raise ValueError('This should not happen, please report an Issue!')

self._qsub_script =\
Expand Down Expand Up @@ -365,7 +367,7 @@ def submit(self, fake=False, sh_file='~/submit_job.sh'):
else:
# py2-3 safety
output = output.decode('ascii', 'ignore').rstrip()
m = re.search('(\d+)', output)
m = re.search(r'(\d+)', output)
self._jobid = m.group(1)
if self._cleanup_qsub_job:
self._delete_qsub_job()
Expand All @@ -385,8 +387,8 @@ def _check_status(self):
' | awk \'{print $5, $8}\'')[0] # ONLY

if len(output) == 0:
if (self._submitted and not self._running and not self._completed
and not self._waiting):
if (self._submitted and not self._running and not
self._completed and not self._waiting):
self._status_msg = ('Submission failed, see log for'
' output errors!')
elif self._submitted and not self._completed:
Expand Down Expand Up @@ -483,8 +485,8 @@ def verbose(self, value):
def kill(self, jobid=None):
"""Kill (delete) all the jobs in the batch."""
for job in self._joblist:
if (jobid is None
or (jobid is not None and int(job._jobid) == int(jobid))):
if (jobid is None or (
jobid is not None and int(job._jobid) == int(jobid))):
job.kill()

def build_cmd(self):
Expand Down