diff --git a/README.rst b/README.rst index c4d73a1e..2067b932 100644 --- a/README.rst +++ b/README.rst @@ -22,8 +22,6 @@ Use: 5. Visit http://localhost:8090 6. Eat cake :cake: -.. warning:: There is a option to run all subtyping methods in the background even if the user doesn't select them. This is enabled by DEFAULT. You can disable the option by setting ``BACKLOG_ENABLED = False`` in app/config.py - Submodule Build Statuses: ------------------------- diff --git a/app/config.py b/app/config.py index 95ac015d..dd64407b 100644 --- a/app/config.py +++ b/app/config.py @@ -19,7 +19,7 @@ PAN_TIMEOUT = 100000 # if BACKLOG_ENABLED = True, then all analyses modules will be run in the # in the background for every submitted file -BACKLOG_ENABLED = True +BACKLOG_ENABLED = False DATASTORE = '/datastore' if not os.path.isdir(DATASTORE): diff --git a/app/middleware/models.py b/app/middleware/models.py index c082083d..5f95e9cb 100644 --- a/app/middleware/models.py +++ b/app/middleware/models.py @@ -1,5 +1,5 @@ import sys -import copy +import traceback import config import redis import dill @@ -28,11 +28,12 @@ def model_to_json(model): """ Converts models to json for the front-end. """ + #TODO: can access the list directly, no longer need this. # Validate the model submitted before processing. assert isinstance(model, list) # model.validate() # Conversion. - print("model_to_json() called with model: {0}".format(str(model))) + # print("model_to_json() called with model: {0}".format(str(model))) return model # if isinstance(model, models.Base): # return _convert_model(model) @@ -61,7 +62,7 @@ def store(pipeline): d[pipeline_id]['analysis'] = "Subtyping" d[pipeline_id]['file'] = pipeline.files - print '_store_pipeline(): finished' + print('store(): storing pipeline with id {0} containing {1} # of files with {2} # of final jobs has finished.'.format(pipeline.sig, len(pipeline.files), len(pipeline._expand()))) return d def load(pipeline_id): @@ -143,31 +144,64 @@ def __init__(self, rq_job, name="", transitory=True, backlog=True, display=False self.transitory = transitory self.backlog = backlog self.display = display + self.times = None def refetch(self): - # Start a Redis connection. - redis_url = config.REDIS_URL - redis_connection = redis.from_url(redis_url) + if self.times: + # If timings exist, the job is done. Don't refetch. + pass + elif self.rq_job.exc_info == 'job not found': + # If called on a job who's result_ttl has elapsed. + print('model.Job.refetch(): job {0} was already determined to not be found.'.format(self.name)) + else: + # Start a Redis connection. + redis_url = config.REDIS_URL + redis_connection = redis.from_url(redis_url) - # While you can call rq_job.result without refetching, you must refetch - # do get the start and stop times. - job = fetch_job(self.rq_job.get_id(), redis_connection) - self.rq_job = job + # While you can call rq_job.result without refetching, you must refetch + # do get the start and stop times. + job = fetch_job(self.rq_job.get_id(), redis_connection) + self.rq_job = job def time(self): - self.refetch() - job = self.rq_job - - assert job.is_finished - start = job.started_at - stop = job.ended_at - try: - timedelta = stop - start - sec = timedelta.total_seconds() - except: - print('model.Job.time(): could not calculate time for {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job)) - sec = 0 - return (start,stop,sec) + # Only necessary if called from Pipeline.cache. + if not self.times: + self.refetch() + + # If we've already saved the timings. + if self.times: + return self.times + elif self.rq_job.exc_info == 'job not found': + # If called on a job who's result_ttl has elapsed. + # Should only ever see this called once per job. + print('model.Job.time(): job {0} could not be found.'.format(self.name)) + self.times = (0,0,0) + return self.times + elif not self.rq_job.is_finished: + # Should never see this. + print('model.Job.time(): time was called for an unfinished job named {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job)) + return (-1,-1,-1) + else: + try: + job = self.rq_job + assert job.is_finished + start = job.started_at + stop = job.ended_at + timedelta = stop - start + sec = timedelta.total_seconds() + self.times = (start,stop,sec) + return (start,stop,sec) + except Exception, e: + # Try and grab the traceback. + # This doesn't really happen do to above tests beforehand. + try: + exc_info = sys.exc_info() + finally: + print('model.Job.time(): could not calculate time for {0} of type {1} with content {2}'.format(self.name, type(self.rq_job), self.rq_job)) + traceback.print_exception(*exc_info) + del exc_info + self.times = (0,0,0) + return self.times class Pipeline(): def __init__(self, jobs=None, files=None, func=None, options=None, date=None): @@ -181,68 +215,66 @@ def __init__(self, jobs=None, files=None, func=None, options=None, date=None): now = datetime.now() now = now.strftime("%Y-%m-%d-%H-%M-%S-%f") date = now - self.jobs = {} # {'somename': instance of RQ.Job} Only used when enqueing. - self.final_jobs = [] # Jobs for every file in the request. - self.cache = [] # For temporary storage of RQ.Jobs. + + # Variables for generating run signature. self.sig = None - self.files = [] + self.files = files self.func = func # Additional attribute for storing pipeline function. self.options = options self.signature() # Create & Store a signature for the pipeline. + + # Addition variables. + self.jobs = {} # {'somename': instance of RQ.Job} Only used when enqueing. + self.final_jobs = {} # Jobs for every file in the request. + self.cache = [] # For temporary storage of RQ.Jobs. self.date = date self.done = False # Bypass for the self.complete() method. - def cache_jobs(self): + def cache_jobs(self, filename): """ - Copy current jobs to cache. + Copy current jobs to cache. Called on every iteration of a file. """ - self.cache += [self.jobs] + self.cache += [{filename:self.jobs}] self.jobs = {} def merge_jobs(self, drop=True): - """ - + """Merges all the jobs into a self.final_jobs of form {filename:[models.Job]}. """ # If the jobs dictionary is not empty. if self.jobs: - self.cache_jobs() - # Actual merge. Notice were converting to list. - self.final_jobs = [ - j # Where j is our custom Job class, not an rq_job - for d in self.cache - for j in d.values() - ] - - # Drop the backlog jobs, makes for faster status checking. - if drop: - self.final_jobs = [ - j - for j in self.final_jobs - if not j.backlog - ] - self.cache = [ - j - for d in self.cache - for j in d.values() - if not j.backlog - ] + raise Exception('models.Pipeline.merge_jobs(): called before caching all jobs.') + + for outer_d in self.cache: + new_l = [] + # {filename: {(str)func: Job}} should only have 1 inner dict/filename. + assert len(outer_d.keys()) == 1 + assert len(outer_d.values()) == 1 + filename = outer_d.keys()[0] + inner_d = outer_d.values()[0] + # Gather the models.Job instances. + for j in inner_d.values(): + if not drop: + new_l.append(j) + elif drop and not j.backlog: + new_l.append(j) + self.final_jobs.update({filename:new_l}) + ret = { f:len(l) for f,l in self.final_jobs.items() } + print("merge_jobs(): merged with {0}.".format(ret)) + + def _expand(self): + r = [j for l in self.final_jobs.values() for j in l] + return r def refetch(self): '''Refetch method for the Pipeline class. Removes jobs that are finished and can no longer be found. Also updates itself on Redis DB. ''' - new_finals = [] - for j in self.final_jobs: - j.refetch() - if not j.rq_job.exc_info == 'job not found': - new_finals.append(j) - self.final_jobs = new_finals - new_cache = [] - for j in self.cache: + # new_finals = [] + for j in self._expand(): j.refetch() - if not j.rq_job.exc_info == 'job not found': - new_cache.append(j) - self.cache = new_cache + # if not j.rq_job.exc_info == 'job not found': + # new_finals.append(j) + # self.final_jobs = new_finals def complete(self): """ @@ -251,39 +283,91 @@ def complete(self): if self.done: return True else: - print("complete() checking status for: {0} with {1} # of final jobs.".format(self.sig, len(self.final_jobs))) - for j in self.final_jobs: - # Refetch job status. - j.refetch() + print("complete() checking status for: {0} with {1} # of final jobs.".format(self.sig, len(self._expand()))) + self.refetch() + exc_info = None + notcomplete = False + for j in self._expand(): # Type check. assert isinstance(j, Job) rq_job = j.rq_job - if j.backlog: + if j.times: + # We've checked this before and its complete. + continue + elif j.backlog: # Some backlog job, we don't care (though Sentry will catch it). - # print("complete(): job {0} is in backlog.".format(j.name)) + # This doesn't really get called due to dropping backlog by default. continue elif rq_job.exc_info == 'job not found': # Job finished, but the result_ttl timed out. print("complete(): job {0} is finished but the result_ttl timed out.".format(j.name)) continue + elif rq_job.is_finished and not j.times: + # The job is done, save timings. + print("complete(): saving timing for job {0}.".format(j.name)) + j.time() + elif j.transitory and rq_job.is_finished: + # Some of the elif is extra, but checks that this is + # a transitory job that is finished. + print("complete(): job {0} is j.transitory and rq_job.is_finished.".format(j.name)) + continue elif rq_job.is_failed: # If the job failed, return the error. + # Failed jobs last forever (result_ttl=-1) in RQ. print("complete(): job {0} is failed with exc_info {1}.".format(j.name, rq_job.exc_info)) - return rq_job.exc_info + exc_info = rq_job.exc_info elif not j.transitory and not rq_job.is_finished: # One of the jobs hasn't finished. + # The only reason this works is that there's always a non- + # transitory job to be run after w/e this job is. print("complete(): job {0} is still pending with var: {1}.".format(j.name, rq_job.is_finished)) - return False - print("complete() pipeline {0} is complete.".format(self.sig)) - # Pipeline complete, update + save jobs. - self.refetch() - self.done = True - store(self) - return True + notcomplete = True + # Will always store what has been updated for timings. + if exc_info: + store(self) + return exc_info + elif notcomplete: + # complete() is not complete + store(self) + return False + else: + print("complete(): pipeline {0} is complete.".format(self.sig)) + # Pipeline complete. + self.done = True + store(self) + return True + + def timings(self): + '''Looks through Pipeline.final_jobs and return timings, also finds the + overall runtime from start to finish. + ''' + assert self.done + # l is the actual return list. + l = [{'{0}|{1}'.format(f,j.name): j.time()} for f,l in self.final_jobs.items() for j in l] + # Sanity check. + assert len(l) == len(self._expand()) + # Tabulate starts and stops. + starts = [i.values()[0][0] for i in l if i.values()[0][0]] + stops = [i.values()[0][1] for i in l if i.values()[0][1]] + # Calculate min/max datetime.date values. + mn = starts[0] + for i in starts: + if i < mn: + mn = i + mx = stops[0] + for i in stops: + if i > mx: + mx = i + # Append total runtime. + sec = (mx-mn).total_seconds() + l.append({'total': (mn,mx,sec)}) + return l def _completed_jobs(self): + '''Returns jobs that are required for frontend and are complete. + ''' completed_jobs = [ - j for j in self.final_jobs + j for j in self._expand() if j.display and not j.backlog and j.rq_job.is_finished and not j.rq_job.is_failed ] return completed_jobs @@ -311,27 +395,6 @@ def to_json(self): l += list_json return jsonify(l) - def timings(self): - assert self.done - # l is the actual return list. - l = [{j.name: j.time()} for j in self.cache] - # Tabulate starts and stops. - starts = [i.values()[0][0] for i in l if i.values()[0][0]] - stops = [i.values()[0][1] for i in l if i.values()[0][1]] - # Calculate min/max datetime.date values. - mn = starts[0] - for i in starts: - if i < mn: - mn = i - mx = stops[0] - for i in stops: - if i > mx: - mx = i - # Append total runtime. - sec = (mx-mn).total_seconds() - l.append({'total': (mn,mx,sec)}) - return l - def _function_signature(self): """ Generates signatures for functions. diff --git a/app/modules/spfy.py b/app/modules/spfy.py index 5e5d2bb8..82e77ad0 100644 --- a/app/modules/spfy.py +++ b/app/modules/spfy.py @@ -585,27 +585,6 @@ def blob_savvy_enqueue(single_dict, pipeline): return jobs -def blob_savvy(args_dict, pipeline): - ''' - Handles enqueuing of all files in a given directory or just a single file. - ''' - d = {} - if os.path.isdir(args_dict['i']): - for f in os.listdir(args_dict['i']): - single_dict = dict(args_dict.items() + - {'i': os.path.join(args_dict['i'], f)}.items()) - d.update( - blob_savvy_enqueue( - single_dict, - pipeline - ) - ) - else: - d.update(blob_savvy_enqueue(args_dict, pipeline)) - - return d - - def spfy(args_dict, pipeline): ''' ''' @@ -614,6 +593,6 @@ def spfy(args_dict, pipeline): #print 'Starting blob_savvy call' #logger.info('args_dict: ' + str(args_dict)) - jobs_dict = blob_savvy(args_dict, pipeline) + jobs_dict = blob_savvy_enqueue(args_dict, pipeline) return jobs_dict diff --git a/app/routes/ra_posts.py b/app/routes/ra_posts.py index 4bd58e8d..1a1ef570 100644 --- a/app/routes/ra_posts.py +++ b/app/routes/ra_posts.py @@ -227,8 +227,6 @@ def upload(): # Get a list of files submitted. uploaded_files = request.files.getlist("file") - names = [secure_filename(file.filename) for file in uploaded_files] - names = sorted(names) print 'upload(): about to enqueue files' #set up constants for identifying this sessions @@ -236,34 +234,57 @@ def upload(): now = now.strftime("%Y-%m-%d-%H-%M-%S-%f") jobs_dict = {} - pipeline = Pipeline( - files = str(names), - func = spfy, - options = options, - date = now - ) - + # Create a list to iterate spfy calls. + l = [] for file in uploaded_files: + # Sanity check. if file: - # for saving file + # For saving file. filename = os.path.join(current_app.config[ 'DATASTORE'], now + '-' + secure_filename(file.filename)) file.save(filename) print 'Uploaded File Saved at', str(filename) + # Handle compressed files. if tarfile.is_tarfile(filename): # set filename to dir for spfy call filename = handle_tar(filename, now) elif zipfile.is_zipfile(filename): filename = handle_zip(filename, now) - # for enqueing task - jobs_enqueued = spfy( - args_dict = {'i': filename, 'pi':options['pi'], 'options':options}, - pipeline = pipeline - ) - jobs_dict.update(jobs_enqueued) - pipeline.cache_jobs() + + # The compressed file was extracted. + if os.path.isdir(filename): + # Walk and append. + for bname in os.listdir(filename): + f = os.path.join(filename, secure_filename(bname)) + l.append(f) + else: + # Add the single file. + l.append(filename) + + # Gather names for Pipeline creation. + names = [secure_filename(f) for f in l] + names = sorted(names) + print('upload(): {0}'.format(names)) + + pipeline = Pipeline( + files = names, + func = spfy, + options = options, + date = now + ) + + # Iterate. + for f in l: + # for enqueing task + jobs_enqueued = spfy( + args_dict = {'i': f, 'pi':options['pi'], 'options':options}, + pipeline = pipeline + ) + jobs_dict.update(jobs_enqueued) + pipeline.cache_jobs(f) + print 'upload(): all files enqueued, returning...' pipeline.merge_jobs() print("upload() pipeline jobs: {0}".format(str(pipeline.final_jobs)))