Skip to content

Commit

Permalink
Merge pull request #287 from superphy/pipeline-timings
Browse files Browse the repository at this point in the history
MERGE: fixes to time multiple files
  • Loading branch information
kevinkle authored Apr 10, 2018
2 parents 9f8e8d9 + 406df62 commit 074b634
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 142 deletions.
2 changes: 0 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ Use:
5. Visit http://localhost:8090
6. Eat cake :cake:

.. warning:: There is a option to run all subtyping methods in the background even if the user doesn't select them. This is enabled by DEFAULT. You can disable the option by setting ``BACKLOG_ENABLED = False`` in app/config.py

Submodule Build Statuses:
-------------------------

Expand Down
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
PAN_TIMEOUT = 100000
# if BACKLOG_ENABLED = True, then all analyses modules will be run in the
# in the background for every submitted file
BACKLOG_ENABLED = True
BACKLOG_ENABLED = False

DATASTORE = '/datastore'
if not os.path.isdir(DATASTORE):
Expand Down
263 changes: 163 additions & 100 deletions app/middleware/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
import copy
import traceback
import config
import redis
import dill
Expand Down Expand Up @@ -28,11 +28,12 @@ def model_to_json(model):
"""
Converts models to json for the front-end.
"""
#TODO: can access the list directly, no longer need this.
# Validate the model submitted before processing.
assert isinstance(model, list)
# model.validate()
# Conversion.
print("model_to_json() called with model: {0}".format(str(model)))
# print("model_to_json() called with model: {0}".format(str(model)))
return model
# if isinstance(model, models.Base):
# return _convert_model(model)
Expand Down Expand Up @@ -61,7 +62,7 @@ def store(pipeline):
d[pipeline_id]['analysis'] = "Subtyping"

d[pipeline_id]['file'] = pipeline.files
print '_store_pipeline(): finished'
print('store(): storing pipeline with id {0} containing {1} # of files with {2} # of final jobs has finished.'.format(pipeline.sig, len(pipeline.files), len(pipeline._expand())))
return d

def load(pipeline_id):
Expand Down Expand Up @@ -143,31 +144,64 @@ def __init__(self, rq_job, name="", transitory=True, backlog=True, display=False
self.transitory = transitory
self.backlog = backlog
self.display = display
self.times = None

def refetch(self):
# Start a Redis connection.
redis_url = config.REDIS_URL
redis_connection = redis.from_url(redis_url)
if self.times:
# If timings exist, the job is done. Don't refetch.
pass
elif self.rq_job.exc_info == 'job not found':
# If called on a job who's result_ttl has elapsed.
print('model.Job.refetch(): job {0} was already determined to not be found.'.format(self.name))
else:
# Start a Redis connection.
redis_url = config.REDIS_URL
redis_connection = redis.from_url(redis_url)

# While you can call rq_job.result without refetching, you must refetch
# do get the start and stop times.
job = fetch_job(self.rq_job.get_id(), redis_connection)
self.rq_job = job
# While you can call rq_job.result without refetching, you must refetch
# do get the start and stop times.
job = fetch_job(self.rq_job.get_id(), redis_connection)
self.rq_job = job

def time(self):
self.refetch()
job = self.rq_job

assert job.is_finished
start = job.started_at
stop = job.ended_at
try:
timedelta = stop - start
sec = timedelta.total_seconds()
except:
print('model.Job.time(): could not calculate time for {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job))
sec = 0
return (start,stop,sec)
# Only necessary if called from Pipeline.cache.
if not self.times:
self.refetch()

# If we've already saved the timings.
if self.times:
return self.times
elif self.rq_job.exc_info == 'job not found':
# If called on a job who's result_ttl has elapsed.
# Should only ever see this called once per job.
print('model.Job.time(): job {0} could not be found.'.format(self.name))
self.times = (0,0,0)
return self.times
elif not self.rq_job.is_finished:
# Should never see this.
print('model.Job.time(): time was called for an unfinished job named {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job))
return (-1,-1,-1)
else:
try:
job = self.rq_job
assert job.is_finished
start = job.started_at
stop = job.ended_at
timedelta = stop - start
sec = timedelta.total_seconds()
self.times = (start,stop,sec)
return (start,stop,sec)
except Exception, e:
# Try and grab the traceback.
# This doesn't really happen do to above tests beforehand.
try:
exc_info = sys.exc_info()
finally:
print('model.Job.time(): could not calculate time for {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job))
traceback.print_exception(*exc_info)
del exc_info
self.times = (0,0,0)
return self.times

class Pipeline():
def __init__(self, jobs=None, files=None, func=None, options=None, date=None):
Expand All @@ -181,68 +215,66 @@ def __init__(self, jobs=None, files=None, func=None, options=None, date=None):
now = datetime.now()
now = now.strftime("%Y-%m-%d-%H-%M-%S-%f")
date = now
self.jobs = {} # {'somename': instance of RQ.Job} Only used when enqueing.
self.final_jobs = [] # Jobs for every file in the request.
self.cache = [] # For temporary storage of RQ.Jobs.

# Variables for generating run signature.
self.sig = None
self.files = []
self.files = files
self.func = func # Additional attribute for storing pipeline function.
self.options = options
self.signature() # Create & Store a signature for the pipeline.

# Addition variables.
self.jobs = {} # {'somename': instance of RQ.Job} Only used when enqueing.
self.final_jobs = {} # Jobs for every file in the request.
self.cache = [] # For temporary storage of RQ.Jobs.
self.date = date
self.done = False # Bypass for the self.complete() method.

def cache_jobs(self):
def cache_jobs(self, filename):
"""
Copy current jobs to cache.
Copy current jobs to cache. Called on every iteration of a file.
"""
self.cache += [self.jobs]
self.cache += [{filename:self.jobs}]
self.jobs = {}

def merge_jobs(self, drop=True):
"""
"""Merges all the jobs into a self.final_jobs of form {filename:[models.Job]}.
"""
# If the jobs dictionary is not empty.
if self.jobs:
self.cache_jobs()
# Actual merge. Notice were converting to list.
self.final_jobs = [
j # Where j is our custom Job class, not an rq_job
for d in self.cache
for j in d.values()
]

# Drop the backlog jobs, makes for faster status checking.
if drop:
self.final_jobs = [
j
for j in self.final_jobs
if not j.backlog
]
self.cache = [
j
for d in self.cache
for j in d.values()
if not j.backlog
]
raise Exception('models.Pipeline.merge_jobs(): called before caching all jobs.')

for outer_d in self.cache:
new_l = []
# {filename: {(str)func: Job}} should only have 1 inner dict/filename.
assert len(outer_d.keys()) == 1
assert len(outer_d.values()) == 1
filename = outer_d.keys()[0]
inner_d = outer_d.values()[0]
# Gather the models.Job instances.
for j in inner_d.values():
if not drop:
new_l.append(j)
elif drop and not j.backlog:
new_l.append(j)
self.final_jobs.update({filename:new_l})
ret = { f:len(l) for f,l in self.final_jobs.items() }
print("merge_jobs(): merged with {0}.".format(ret))

def _expand(self):
r = [j for l in self.final_jobs.values() for j in l]
return r

def refetch(self):
'''Refetch method for the Pipeline class. Removes jobs that are finished
and can no longer be found. Also updates itself on Redis DB.
'''
new_finals = []
for j in self.final_jobs:
j.refetch()
if not j.rq_job.exc_info == 'job not found':
new_finals.append(j)
self.final_jobs = new_finals
new_cache = []
for j in self.cache:
# new_finals = []
for j in self._expand():
j.refetch()
if not j.rq_job.exc_info == 'job not found':
new_cache.append(j)
self.cache = new_cache
# if not j.rq_job.exc_info == 'job not found':
# new_finals.append(j)
# self.final_jobs = new_finals

def complete(self):
"""
Expand All @@ -251,39 +283,91 @@ def complete(self):
if self.done:
return True
else:
print("complete() checking status for: {0} with {1} # of final jobs.".format(self.sig, len(self.final_jobs)))
for j in self.final_jobs:
# Refetch job status.
j.refetch()
print("complete() checking status for: {0} with {1} # of final jobs.".format(self.sig, len(self._expand())))
self.refetch()
exc_info = None
notcomplete = False
for j in self._expand():
# Type check.
assert isinstance(j, Job)
rq_job = j.rq_job
if j.backlog:
if j.times:
# We've checked this before and its complete.
continue
elif j.backlog:
# Some backlog job, we don't care (though Sentry will catch it).
# print("complete(): job {0} is in backlog.".format(j.name))
# This doesn't really get called due to dropping backlog by default.
continue
elif rq_job.exc_info == 'job not found':
# Job finished, but the result_ttl timed out.
print("complete(): job {0} is finished but the result_ttl timed out.".format(j.name))
continue
elif rq_job.is_finished and not j.times:
# The job is done, save timings.
print("complete(): saving timing for job {0}.".format(j.name))
j.time()
elif j.transitory and rq_job.is_finished:
# Some of the elif is extra, but checks that this is
# a transitory job that is finished.
print("complete(): job {0} is j.transitory and rq_job.is_finished.".format(j.name))
continue
elif rq_job.is_failed:
# If the job failed, return the error.
# Failed jobs last forever (result_ttl=-1) in RQ.
print("complete(): job {0} is failed with exc_info {1}.".format(j.name, rq_job.exc_info))
return rq_job.exc_info
exc_info = rq_job.exc_info
elif not j.transitory and not rq_job.is_finished:
# One of the jobs hasn't finished.
# The only reason this works is that there's always a non-
# transitory job to be run after w/e this job is.
print("complete(): job {0} is still pending with var: {1}.".format(j.name, rq_job.is_finished))
return False
print("complete() pipeline {0} is complete.".format(self.sig))
# Pipeline complete, update + save jobs.
self.refetch()
self.done = True
store(self)
return True
notcomplete = True
# Will always store what has been updated for timings.
if exc_info:
store(self)
return exc_info
elif notcomplete:
# complete() is not complete
store(self)
return False
else:
print("complete(): pipeline {0} is complete.".format(self.sig))
# Pipeline complete.
self.done = True
store(self)
return True

def timings(self):
'''Looks through Pipeline.final_jobs and return timings, also finds the
overall runtime from start to finish.
'''
assert self.done
# l is the actual return list.
l = [{'{0}|{1}'.format(f,j.name): j.time()} for f,l in self.final_jobs.items() for j in l]
# Sanity check.
assert len(l) == len(self._expand())
# Tabulate starts and stops.
starts = [i.values()[0][0] for i in l if i.values()[0][0]]
stops = [i.values()[0][1] for i in l if i.values()[0][1]]
# Calculate min/max datetime.date values.
mn = starts[0]
for i in starts:
if i < mn:
mn = i
mx = stops[0]
for i in stops:
if i > mx:
mx = i
# Append total runtime.
sec = (mx-mn).total_seconds()
l.append({'total': (mn,mx,sec)})
return l

def _completed_jobs(self):
'''Returns jobs that are required for frontend and are complete.
'''
completed_jobs = [
j for j in self.final_jobs
j for j in self._expand()
if j.display and not j.backlog and j.rq_job.is_finished and not j.rq_job.is_failed
]
return completed_jobs
Expand Down Expand Up @@ -311,27 +395,6 @@ def to_json(self):
l += list_json
return jsonify(l)

def timings(self):
assert self.done
# l is the actual return list.
l = [{j.name: j.time()} for j in self.cache]
# Tabulate starts and stops.
starts = [i.values()[0][0] for i in l if i.values()[0][0]]
stops = [i.values()[0][1] for i in l if i.values()[0][1]]
# Calculate min/max datetime.date values.
mn = starts[0]
for i in starts:
if i < mn:
mn = i
mx = stops[0]
for i in stops:
if i > mx:
mx = i
# Append total runtime.
sec = (mx-mn).total_seconds()
l.append({'total': (mn,mx,sec)})
return l

def _function_signature(self):
"""
Generates signatures for functions.
Expand Down
Loading

0 comments on commit 074b634

Please sign in to comment.