Skip to content

Commit

Permalink
Reverted enqueue_job; added job.insert() where needed
Browse files Browse the repository at this point in the history
  • Loading branch information
hkethi002 committed Feb 19, 2018
1 parent fb96ce4 commit 054dd24
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 89 deletions.
1 change: 1 addition & 0 deletions api/dao/containerstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def create_job_and_analysis(self, cont_name, cid, analysis, job, origin, uid):
try:

job = Queue.enqueue_job(job, origin, perm_check_uid=uid)
job.insert()
except Exception as e:
# NOTE #775 remove unusable analysis - until jobs have a 'hold' state
self.delete_el(analysis['_id'])
Expand Down
161 changes: 87 additions & 74 deletions api/jobs/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,108 +126,121 @@ def update(batch_id, payload):
if result.modified_count != 1:
raise Exception('Batch job not updated')

def run(batch_job):
def run_preconstructed_jobs(origin, preconstructed_jobs):
"""
Creates jobs from proposed inputs, returns jobs enqueued.
Enqueues jobs and returns list of created jobs
"""
jobs = []

proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
preconstructed_jobs = proposal.get('preconstructed_jobs')

# If Running a batch from already-constructed jobs
if preconstructed_jobs:
origin = batch_job.get('origin')
jobs = []
job_ids = []
for preconstructed_job in preconstructed_jobs:
job = Queue.enqueue_job(preconstructed_job, origin)
job.insert()
jobs.append(job)

for preconstructed_job in preconstructed_jobs:
job = Queue.enqueue_job(preconstructed_job, origin)
job_id = job.id_
jobs.append(job)
job_ids.append(job_id)
return jobs

# Otherwise create jobs from the containers and gear id provided in the proposal
else:
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])
def run_container_jobs(batch_job, proposal):
# Create jobs from the containers and gear id provided in the proposal
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])

gear_id = batch_job['gear_id']
gear = gears.get_gear(gear_id)
gear_name = gear['gear']['name']

config_ = batch_job.get('config')
origin = batch_job.get('origin')
tags = proposal.get('tags', [])
tags.append('batch')

if gear.get('category') == 'analysis':
analysis_base = proposal.get('analysis', {})
if not analysis_base.get('label'):
time_now = datetime.datetime.utcnow()
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
an_storage = AnalysisStorage()
acq_storage = AcquisitionStorage()

jobs = []
job_ids = []

job_defaults = {
'config': config_,
'gear_id': gear_id,
'tags': tags,
'batch': str(batch_job.get('_id')),
'inputs': {}
}

gear_id = batch_job['gear_id']
gear = gears.get_gear(gear_id)
gear_name = gear['gear']['name']
for inputs in proposed_inputs:

config_ = batch_job.get('config')
origin = batch_job.get('origin')
tags = proposal.get('tags', [])
tags.append('batch')
job_map = copy.deepcopy(job_defaults)
job_map['inputs'] = inputs

if gear.get('category') == 'analysis':
analysis_base = proposal.get('analysis', {})
if not analysis_base.get('label'):
time_now = datetime.datetime.utcnow()
analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)}
an_storage = AnalysisStorage()
acq_storage = AcquisitionStorage()

jobs = []
job_ids = []
analysis = copy.deepcopy(analysis_base)

job_defaults = {
'config': config_,
'gear_id': gear_id,
'tags': tags,
'batch': str(batch_job.get('_id')),
'inputs': {}
}
# Create analysis
acquisition_id = inputs.values()[0].get('id')
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')

for inputs in proposed_inputs:

job_map = copy.deepcopy(job_defaults)
job_map['inputs'] = inputs
else:

if gear.get('category') == 'analysis':
job = Queue.enqueue_job(job_map, origin)
job.insert()
job_id = job.id_

analysis = copy.deepcopy(analysis_base)

# Create analysis
acquisition_id = inputs.values()[0].get('id')
session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session')
result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')
jobs.append(job)
job_ids.append(job_id)

else:
for dest in proposed_destinations:

job = Queue.enqueue_job(job_map, origin)
job_id = job.id_
job_map = copy.deepcopy(job_defaults)
job_map['destination'] = dest

if gear.get('category') == 'analysis':

jobs.append(job)
job_ids.append(job_id)
analysis = copy.deepcopy(analysis_base)

for dest in proposed_destinations:
# Create analysis
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')

job_map = copy.deepcopy(job_defaults)
job_map['destination'] = dest
else:

if gear.get('category') == 'analysis':
job = Queue.enqueue_job(job_map, origin)
job.insert()
job_id = job.id_

analysis = copy.deepcopy(analysis_base)

# Create analysis
result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None)
job = result.get('job')
job_id = result.get('job_id')
jobs.append(job)
job_ids.append(job_id)

else:
return jobs, job_ids

job = Queue.enqueue_job(job_map, origin)
job_id = job.id_
def run(batch_job):
"""
Creates jobs from proposed inputs, returns jobs enqueued.
"""

proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
preconstructed_jobs = proposal.get('preconstructed_jobs')

jobs.append(job)
job_ids.append(job_id)
# If Running a batch from already-constructed jobs
if preconstructed_jobs:
origin = batch_job.get('origin')
jobs = run_preconstructed_jobs(origin, preconstructed_jobs)
jobs_ids = [job.id_ for job in jobs]
else:
jobs, job_ids = run_container_jobs(batch_job, proposal)

update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
return jobs
Expand Down
3 changes: 2 additions & 1 deletion api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def add(self):
uid = self.uid

job = Queue.enqueue_job(payload,self.origin, perm_check_uid=uid)
job.insert()

return { '_id': job.id_ }

Expand Down Expand Up @@ -713,7 +714,7 @@ def post_with_jobs(self):

for job_number, job_ in enumerate(jobs_):
try:
Queue.validate_job(job_, self.origin, create_job=False, perm_check_uid=uid)
Queue.enqueue_job(job_, self.origin, perm_check_uid=uid)
except InputValidationException as e:
raise InputValidationException("Job {}: {}".format(job_number, str(e)))

Expand Down
17 changes: 4 additions & 13 deletions api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ def retry(job, force=False):

return new_id


@staticmethod
def validate_job(job_map, origin, create_job=False, perm_check_uid=None):
def enqueue_job(job_map, origin, perm_check_uid=None):
"""
Using a payload for a proposed job, creates and returns(if create_job is True) (but does not insert)
Using a payload for a proposed job, creates and returns (but does not insert)
a Job object. This preperation includes:
- confirms gear exists
- validates config against gear manifest
Expand Down Expand Up @@ -250,18 +251,8 @@ def validate_job(job_map, origin, create_job=False, perm_check_uid=None):

if gear_name not in tags:
tags.append(gear_name)
if create_job:
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
return job
return True

@staticmethod
def enqueue_job(job_map, origin, perm_check_uid=None):
"""
Validates, Creates, Inserts, and Returns job
"""
job = Queue.validate_job(job_map, origin, create_job=True, perm_check_uid=perm_check_uid)
job.insert()
job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
return job

@staticmethod
Expand Down
3 changes: 2 additions & 1 deletion api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ def create_jobs(db, container_before, container_after, container_type):

for pj in potential_jobs:
job_map = pj['job'].map()
Queue.enqueue_job(job_map, origin)
job = Queue.enqueue_job(job_map, origin)
job.insert()

spawned_jobs.append(pj['rule']['alg'])

Expand Down

0 comments on commit 054dd24

Please sign in to comment.