Skip to content

Commit

Permalink
Merge pull request #31 from swainn/master
Browse files Browse the repository at this point in the history
Workflow Node Status
  • Loading branch information
swainn authored Sep 28, 2018
2 parents e4bffbb + 36cbf2c commit ebf9c8e
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 9 deletions.
12 changes: 9 additions & 3 deletions condorpy/htcondor_object_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class HTCondorObjectBase(object):
"""
"""
NULL_CLUSTER_ID = 0

def __init__(self,
host=None,
Expand All @@ -36,7 +37,7 @@ def __init__(self,
"""
object.__setattr__(self, '_cluster_id', 0)
object.__setattr__(self, '_cluster_id', self.NULL_CLUSTER_ID)
object.__setattr__(self, '_remote', None)
object.__setattr__(self, '_remote_input_files', remote_input_files or None)
object.__setattr__(self, '_cwd', working_directory)
Expand All @@ -53,6 +54,10 @@ def cluster_id(self):
"""
return self._cluster_id

@property
def num_jobs(self):
return 1

@property
def scheduler(self):
"""
Expand Down Expand Up @@ -176,14 +181,15 @@ def close_remote(self):
del self._remote

@set_cwd
def _execute(self, args, shell=False):
def _execute(self, args, shell=False, run_in_job_dir=True):
out = None
err = None
if self._remote:
log.info('Executing remote command %s', ' '.join(args))
cmd = ' '.join(args)
try:
cmd = 'cd %s && %s' % (self._remote_id, cmd)
if run_in_job_dir:
cmd = 'cd %s && %s' % (self._remote_id, cmd)
out = '\n'.join(self._remote.execute(cmd))
except RuntimeError as e:
err = str(e)
Expand Down
18 changes: 14 additions & 4 deletions condorpy/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,14 @@ def num_jobs(self, num_jobs):

@property
def status(self):
"""The job status
"""The status
"""
if self.cluster_id == self.NULL_CLUSTER_ID:
return "Unexpanded"

status_dict = self.statuses
#determin job status
# determine job status
status = "Various"
for key, val in status_dict.iteritems():
if val == self.num_jobs:
Expand All @@ -161,6 +164,9 @@ def statuses(self):
"""
Return dictionary of all process statuses
"""
if self.cluster_id == self.NULL_CLUSTER_ID:
return "Unexpanded"

return self._update_status()

@property
Expand Down Expand Up @@ -352,7 +358,7 @@ def _update_status(self, sub_job_num=None):
format = ['-format', '"%d"', 'JobStatus']
cmd = 'condor_q {0} {1} && condor_history {0} {1}'.format(job_id, ' '.join(format))
args = [cmd]
out, err = self._execute(args, shell=True)
out, err = self._execute(args, shell=True, run_in_job_dir=False)
if err:
log.error('Error while updating status for job %s: %s', job_id, err)
raise HTCondorError(err)
Expand All @@ -377,7 +383,11 @@ def _update_status(self, sub_job_num=None):
status_dict[val] = 0

for status_code_str in out:
status_code = int(status_code_str)
status_code = 0
try:
status_code = int(status_code_str)
except ValueError:
pass
key = CONDOR_JOB_STATUSES[status_code]
status_dict[key] += 1

Expand Down
116 changes: 114 additions & 2 deletions condorpy/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def name(self):
"""
return self._name

@property
def num_jobs(self):
return len(self._node_set)

@property
def config(self):
"""
Expand Down Expand Up @@ -102,6 +106,8 @@ def add_max_jobs_throttle(self, category, max_jobs):
def node_set(self):
"""
"""
if self.cluster_id != self.NULL_CLUSTER_ID:
self.update_node_ids()
return self._node_set

@property
Expand All @@ -113,14 +119,29 @@ def dag_file(self):
@property
def initial_dir(self):
"""
"""
return ''

@property
def status(self):
"""
Returns status of workflow as a whole (DAG status).
"""
if self.cluster_id == self.NULL_CLUSTER_ID:
return "Unexpanded"

return self._update_status()

@property
def statuses(self):
"""
Get status of workflow nodes.
"""
if self.cluster_id == self.NULL_CLUSTER_ID:
return "Unexpanded"

return self._update_statuses()

def _update_status(self, sub_job_num=None):
"""Gets the workflow status.
Expand All @@ -132,7 +153,7 @@ def _update_status(self, sub_job_num=None):
format = ['-format', '"%d"', 'JobStatus']
cmd = 'condor_q {0} {1} && condor_history {0} {1}'.format(job_id, ' '.join(format))
args = [cmd]
out, err = self._execute(args, shell=True)
out, err = self._execute(args, shell=True, run_in_job_dir=False)
if err:
log.error('Error while updating status for job %s: %s', job_id, err)
raise HTCondorError(err)
Expand All @@ -156,6 +177,97 @@ def _update_status(self, sub_job_num=None):

return key

def _update_statuses(self, sub_job_num=None):
"""
Update statuses of jobs nodes in workflow.
"""
# initialize status dictionary
status_dict = dict()

for val in CONDOR_JOB_STATUSES.itervalues():
status_dict[val] = 0

for node in self.node_set:
job = node.job
try:
job_status = job.status
status_dict[job_status] += 1
except (KeyError, HTCondorError):
status_dict['Unexpanded'] += 1

return status_dict

def update_node_ids(self, sub_job_num=None):
"""
Associate Jobs with respective cluster ids.
"""
# Build condor_q and condor_history commands
dag_id = '%s.%s' % (self.cluster_id, sub_job_num) if sub_job_num else str(self.cluster_id)
job_delimiter = '+++'
attr_delimiter = ';;;'

format = [
'-format', '"%d' + attr_delimiter + '"', 'ClusterId',
'-format', '"%v' + attr_delimiter + '"', 'Cmd',
'-format', '"%v' + attr_delimiter + '"', 'Args', # Old way
'-format', '"%v' + job_delimiter + '"', 'Arguments' # New way
]
# Get ID, Executable, and Arguments for each job that is either started to be processed or finished in the workflow
cmd = 'condor_q -constraint DAGManJobID=={0} {1} && condor_history -constraint DAGManJobID=={0} {1}'.format(dag_id, ' '.join(format))

# 'condor_q -constraint DAGManJobID==1018 -format "%d\n" ClusterId -format "%s\n" CMD -format "%s\n" ARGS && condor_history -constraint DAGManJobID==1018 -format "%d\n" ClusterId -format "%s\n" CMD -format "%s\n" ARGS'
_args = [cmd]
out, err = self._execute(_args, shell=True, run_in_job_dir=False)

if err:
log.error('Error while associating ids for jobs dag %s: %s', dag_id, err)
raise HTCondorError(err)
if not out:
log.warning('Error while associating ids for jobs in dag %s: No jobs found for dag.', dag_id)

try:
# Split into one line per job
jobs_out = out.split(job_delimiter)

# Match node to cluster id using combination of cmd and arguments
for node in self._node_set:
job = node.job

# Skip jobs that already have cluster id defined
if job.cluster_id != job.NULL_CLUSTER_ID:
continue

for job_out in jobs_out:
if not job_out or attr_delimiter not in job_out:
continue

# Split line by attributes
cluster_id, cmd, _args, _arguments = job_out.split(attr_delimiter)

# If new form of arguments is used, _args will be 'undefined' and _arguments will not
if _args == 'undefined' and _arguments != 'undefined':
args = _arguments.strip()

# If both are undefined, then there are no arguments
elif _args == 'undefined' and _arguments == 'undefined':
args = None

# Otherwise, using old form and _arguments will be 'undefined' and _args will not.
else:
args = _args.strip()

job_cmd = job.executable
job_args = job.arguments.strip() if job.arguments else None

if job_cmd in cmd and job_args == args:
log.info('Linking cluster_id %s to job with command and arguments: %s %s', cluster_id,
job_cmd, job_args)
job._cluster_id = int(cluster_id)
break

except ValueError as e:
log.warning(e.message)

def add_node(self, node):
"""
"""
Expand Down

0 comments on commit ebf9c8e

Please sign in to comment.