From 6add967033bf1955f4c41398cac0201747827c0b Mon Sep 17 00:00:00 2001 From: Harsha Kethineni Date: Wed, 7 Feb 2018 12:25:39 -0600 Subject: [PATCH 1/4] Create batch with premade jobs --- api/api.py | 11 +- api/jobs/batch.py | 134 +++++++++++-------- api/jobs/handlers.py | 21 +++ tests/integration_tests/python/test_batch.py | 35 +++++ 4 files changed, 137 insertions(+), 64 deletions(-) diff --git a/api/api.py b/api/api.py index db7a9dc70..6e7b08da5 100644 --- a/api/api.py +++ b/api/api.py @@ -157,12 +157,13 @@ def prefix(path, routes): # Batch jobs - route('/batch', BatchHandler, h='get_all', m=['GET']), - route('/batch', BatchHandler, m=['POST']), + route('/batch', BatchHandler, h='get_all', m=['GET']), + route('/batch', BatchHandler, m=['POST']), prefix('/batch', [ - route('/<:[^/]+>', BatchHandler, h='get', m=['GET']), - route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']), - route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']), + route('/jobs', BatchHandler, h='post_with_jobs', m=['POST']), + route('/<:[^/]+>', BatchHandler, h='get', m=['GET']), + route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']), + route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']), ]), diff --git a/api/jobs/batch.py b/api/jobs/batch.py index a6258aedc..285be566a 100644 --- a/api/jobs/batch.py +++ b/api/jobs/batch.py @@ -134,84 +134,100 @@ def run(batch_job): proposal = batch_job.get('proposal') if not proposal: raise APIStorageException('The batch job is not formatted correctly.') - proposed_inputs = proposal.get('inputs', []) - proposed_destinations = proposal.get('destinations', []) - - gear_id = batch_job['gear_id'] - gear = gears.get_gear(gear_id) - gear_name = gear['gear']['name'] - - config_ = batch_job.get('config') - origin = batch_job.get('origin') - tags = proposal.get('tags', []) - tags.append('batch') - - if gear.get('category') == 'analysis': - analysis_base = proposal.get('analysis', {}) - if not analysis_base.get('label'): - time_now = datetime.datetime.utcnow() - analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)} - an_storage = AnalysisStorage() - acq_storage = AcquisitionStorage() - - jobs = [] - job_ids = [] - - job_defaults = { - 'config': config_, - 'gear_id': gear_id, - 'tags': tags, - 'batch': str(batch_job.get('_id')), - 'inputs': {} - } + preconstructed_jobs = proposal.get('preconstructed_jobs') + + # If Running a batch from already-constructed jobs + if preconstructed_jobs: + origin = batch_job.get('origin') + jobs = [] + job_ids = [] + + for preconstructed_job in preconstructed_jobs: + job = Queue.enqueue_job(preconstructed_job, origin) + job_id = job.id_ + jobs.append(job) + job_ids.append(job_id) - for inputs in proposed_inputs: + # Otherwise create jobs from the containers and gear id provided in the proposal + else: + proposed_inputs = proposal.get('inputs', []) + proposed_destinations = proposal.get('destinations', []) + + gear_id = batch_job['gear_id'] + gear = gears.get_gear(gear_id) + gear_name = gear['gear']['name'] - job_map = copy.deepcopy(job_defaults) - job_map['inputs'] = inputs + config_ = batch_job.get('config') + origin = batch_job.get('origin') + tags = proposal.get('tags', []) + tags.append('batch') if gear.get('category') == 'analysis': + analysis_base = proposal.get('analysis', {}) + if not analysis_base.get('label'): + time_now = datetime.datetime.utcnow() + analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)} + an_storage = AnalysisStorage() + acq_storage = AcquisitionStorage() - analysis = copy.deepcopy(analysis_base) + jobs = [] + job_ids = [] - # Create analysis - acquisition_id = inputs.values()[0].get('id') - session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session') - result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None) - job = result.get('job') - job_id = result.get('job_id') + job_defaults = { + 'config': config_, + 'gear_id': gear_id, + 'tags': tags, + 'batch': str(batch_job.get('_id')), + 'inputs': {} + } - else: + for inputs in proposed_inputs: - job = Queue.enqueue_job(job_map, origin) - job_id = job.id_ + job_map = copy.deepcopy(job_defaults) + job_map['inputs'] = inputs + if gear.get('category') == 'analysis': - jobs.append(job) - job_ids.append(job_id) + analysis = copy.deepcopy(analysis_base) - for dest in proposed_destinations: + # Create analysis + acquisition_id = inputs.values()[0].get('id') + session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session') + result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None) + job = result.get('job') + job_id = result.get('job_id') - job_map = copy.deepcopy(job_defaults) - job_map['destination'] = dest + else: - if gear.get('category') == 'analysis': + job = Queue.enqueue_job(job_map, origin) + job_id = job.id_ - analysis = copy.deepcopy(analysis_base) - # Create analysis - result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None) - job = result.get('job') - job_id = result.get('job_id') + jobs.append(job) + job_ids.append(job_id) - else: + for dest in proposed_destinations: - job = Queue.enqueue_job(job_map, origin) - job_id = job.id_ + job_map = copy.deepcopy(job_defaults) + job_map['destination'] = dest + + if gear.get('category') == 'analysis': + + analysis = copy.deepcopy(analysis_base) + # Create analysis + result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None) + job = result.get('job') + job_id = result.get('job_id') - jobs.append(job) - job_ids.append(job_id) + else: + + job = Queue.enqueue_job(job_map, origin) + job_id = job.id_ + + + jobs.append(job) + job_ids.append(job_id) update(batch_job['_id'], {'state': 'running', 'jobs': job_ids}) return jobs diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index ebef965d6..9369bd676 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -629,6 +629,27 @@ def post(self): return batch_proposal + @require_login + def post_with_jobs(self): + """ + Creates a batch from preconstructed jobs + """ + payload = self.request.json + jobs_ = payload.get('jobs', []) + + batch_proposal = { + 'proposal': { + 'preconstructed_jobs': jobs_ + }, + 'origin': self.origin, + 'state': 'pending', + '_id': bson.ObjectId() + } + batch.insert(batch_proposal) + batch_proposal['preconstructed_jobs'] = batch_proposal.pop('proposal') + + return batch_proposal + @require_login def run(self, _id): """ diff --git a/tests/integration_tests/python/test_batch.py b/tests/integration_tests/python/test_batch.py index 2bdceeac2..5b8c53023 100644 --- a/tests/integration_tests/python/test_batch.py +++ b/tests/integration_tests/python/test_batch.py @@ -84,6 +84,23 @@ def test_batch(data_builder, as_user, as_admin, as_root): assert r.ok analysis_batch_id = r.json()['_id'] + # create a batch with preconstructed jobs + r = as_admin.post('/batch/jobs', json={ + 'jobs': [ + { + 'gear_id': gear, + 'config': { 'two-digit multiple of ten': 20 }, + 'destination': { + 'type': 'acquisition', + 'id': acquisition + }, + 'tags': [ 'test-tag' ] + } + ] + }) + assert r.ok + job_batch_id = r.json()['_id'] + # try to get non-existent batch r = as_admin.get('/batch/000000000000000000000000') assert r.status_code == 404 @@ -97,11 +114,21 @@ def test_batch(data_builder, as_user, as_admin, as_root): assert r.ok assert r.json()['state'] == 'pending' + # get batch from jobs + r = as_admin.get('/batch/' + job_batch_id) + assert r.ok + assert r.json()['state'] == 'pending' + # get batch w/ ?jobs=true r = as_admin.get('/batch/' + batch_id, params={'jobs': 'true'}) assert r.ok assert 'jobs' in r.json() + # get job batch w/ ?jobs=true + r = as_admin.get('/batch/' + job_batch_id, params={'jobs': 'true'}) + assert r.ok + assert 'jobs' in r.json() + # try to cancel non-running batch r = as_admin.post('/batch/' + batch_id + '/cancel') assert r.status_code == 400 @@ -134,6 +161,14 @@ def test_batch(data_builder, as_user, as_admin, as_root): r = as_admin.get('/batch/' + analysis_batch_id) assert r.json()['state'] == 'running' + # run job batch + r = as_admin.post('/batch/' + job_batch_id + '/run') + print r.json() + assert r.ok + + # test batch.state after calling run + r = as_admin.get('/batch/' + job_batch_id) + assert r.json()['state'] == 'running' # Test batch complete # create a batch w/ acquisition target and target_context From 049998c85807af583f4483531ee91627ef751747 Mon Sep 17 00:00:00 2001 From: Harsha Kethineni Date: Wed, 7 Feb 2018 13:05:44 -0600 Subject: [PATCH 2/4] Validate job on batch propose --- api/jobs/handlers.py | 10 ++++++++ api/jobs/queue.py | 16 +++++++++---- tests/integration_tests/python/test_batch.py | 24 ++++++++++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 9369bd676..714eb4d2d 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -637,6 +637,16 @@ def post_with_jobs(self): payload = self.request.json jobs_ = payload.get('jobs', []) + uid = None + if not self.superuser_request: + uid = self.uid + + for job_number, job_ in enumerate(jobs_): + try: + Queue.validate_job(job_, self.origin, create_job=False, perm_check_uid=uid) + except InputValidationException as e: + raise InputValidationException("Job {}: {}".format(job_number, str(e))) + batch_proposal = { 'proposal': { 'preconstructed_jobs': jobs_ diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 89ea8cc10..c18b1e189 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -128,11 +128,10 @@ def retry(job, force=False): return new_id - @staticmethod - def enqueue_job(job_map, origin, perm_check_uid=None): + def validate_job(job_map, origin, create_job=False, perm_check_uid=None): """ - Using a payload for a proposed job, creates and returns (but does not insert) + Using a payload for a proposed job, creates and returns(if create_job is True) (but does not insert) a Job object. This preperation includes: - confirms gear exists - validates config against gear manifest @@ -251,8 +250,17 @@ def enqueue_job(job_map, origin, perm_check_uid=None): if gear_name not in tags: tags.append(gear_name) + if create_job: + job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) + return job + return True - job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) + @staticmethod + def enqueue_job(job_map, origin, perm_check_uid=None): + """ + Validates, Creates, Inserts, and Returns job + """ + job = Queue.validate_job(job_map, origin, create_job=True, perm_check_uid=perm_check_uid) job.insert() return job diff --git a/tests/integration_tests/python/test_batch.py b/tests/integration_tests/python/test_batch.py index 5b8c53023..bce6a9892 100644 --- a/tests/integration_tests/python/test_batch.py +++ b/tests/integration_tests/python/test_batch.py @@ -84,6 +84,30 @@ def test_batch(data_builder, as_user, as_admin, as_root): assert r.ok analysis_batch_id = r.json()['_id'] + # try to create a batch with invalid preconstructed jobs + r = as_admin.post('/batch/jobs', json={ + 'jobs': [ + { + 'gear_id': gear, + 'inputs': { + 'dicom': { + 'type': 'acquisition', + 'id': acquisition, + 'name': 'test.zip' + } + }, + 'config': { 'two-digit multiple of ten': 20 }, + 'destination': { + 'type': 'acquisition', + 'id': acquisition + }, + 'tags': [ 'test-tag' ] + } + ] + }) + assert r.status_code == 400 + assert "Job 0" in r.json().get('message') + # create a batch with preconstructed jobs r = as_admin.post('/batch/jobs', json={ 'jobs': [ From 0578fe24d9868ba857856370b4cab2b771c3de18 Mon Sep 17 00:00:00 2001 From: Harsha Kethineni Date: Mon, 12 Feb 2018 10:14:48 -0600 Subject: [PATCH 3/4] Swagger doc for batch-jobs-create --- swagger/paths/batch.yaml | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/swagger/paths/batch.yaml b/swagger/paths/batch.yaml index b407f82c3..37407992e 100644 --- a/swagger/paths/batch.yaml +++ b/swagger/paths/batch.yaml @@ -31,6 +31,28 @@ # schema: # $ref: schemas/output/batch-insert.json +/batch/job: + post: + summary: Create a batch job proposal from preconstructed jobs and insert it as 'pending'. + operationId: create_batch_job_from_jobs + tags: + - batch + parameters: + - name: body + in: body + description: '' + schema: + type: array + # Schema file does not exist + # $ref: schemas/input/batch-insert.json + responses: + '200': + description: '' + # Schema file does not exist + # schema: + # $ref: schemas/output/batch-insert.json + + /batch/{BatchId}: parameters: - in: path @@ -94,5 +116,5 @@ '200': description: '' examples: - response: + response: canceled_jobs: 4 From 1bb0e5e5daefda94b201c8b49a2d9775a6649d3a Mon Sep 17 00:00:00 2001 From: Harsha Kethineni Date: Mon, 19 Feb 2018 13:48:19 -0600 Subject: [PATCH 4/4] Reverted enqueue_job; added job.insert() where needed --- api/dao/containerstorage.py | 1 + api/jobs/batch.py | 161 +++++++++++++++++++----------------- api/jobs/handlers.py | 4 +- api/jobs/queue.py | 17 +--- api/jobs/rules.py | 3 +- 5 files changed, 97 insertions(+), 89 deletions(-) diff --git a/api/dao/containerstorage.py b/api/dao/containerstorage.py index 84f67d20a..7cc3905d0 100644 --- a/api/dao/containerstorage.py +++ b/api/dao/containerstorage.py @@ -372,6 +372,7 @@ def create_job_and_analysis(self, cont_name, cid, analysis, job, origin, uid): try: job = Queue.enqueue_job(job, origin, perm_check_uid=uid) + job.insert() except Exception as e: # NOTE #775 remove unusable analysis - until jobs have a 'hold' state self.delete_el(analysis['_id']) diff --git a/api/jobs/batch.py b/api/jobs/batch.py index 285be566a..c29e1adac 100644 --- a/api/jobs/batch.py +++ b/api/jobs/batch.py @@ -126,108 +126,121 @@ def update(batch_id, payload): if result.modified_count != 1: raise Exception('Batch job not updated') -def run(batch_job): +def run_preconstructed_jobs(origin, preconstructed_jobs): """ - Creates jobs from proposed inputs, returns jobs enqueued. + Enqueues jobs and returns list of created jobs """ + jobs = [] - proposal = batch_job.get('proposal') - if not proposal: - raise APIStorageException('The batch job is not formatted correctly.') - preconstructed_jobs = proposal.get('preconstructed_jobs') - - # If Running a batch from already-constructed jobs - if preconstructed_jobs: - origin = batch_job.get('origin') - jobs = [] - job_ids = [] + for preconstructed_job in preconstructed_jobs: + job = Queue.enqueue_job(preconstructed_job, origin) + job.insert() + jobs.append(job) - for preconstructed_job in preconstructed_jobs: - job = Queue.enqueue_job(preconstructed_job, origin) - job_id = job.id_ - jobs.append(job) - job_ids.append(job_id) + return jobs - # Otherwise create jobs from the containers and gear id provided in the proposal - else: - proposed_inputs = proposal.get('inputs', []) - proposed_destinations = proposal.get('destinations', []) +def run_container_jobs(batch_job, proposal): + # Create jobs from the containers and gear id provided in the proposal + proposed_inputs = proposal.get('inputs', []) + proposed_destinations = proposal.get('destinations', []) + + gear_id = batch_job['gear_id'] + gear = gears.get_gear(gear_id) + gear_name = gear['gear']['name'] + + config_ = batch_job.get('config') + origin = batch_job.get('origin') + tags = proposal.get('tags', []) + tags.append('batch') + + if gear.get('category') == 'analysis': + analysis_base = proposal.get('analysis', {}) + if not analysis_base.get('label'): + time_now = datetime.datetime.utcnow() + analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)} + an_storage = AnalysisStorage() + acq_storage = AcquisitionStorage() + + jobs = [] + job_ids = [] + + job_defaults = { + 'config': config_, + 'gear_id': gear_id, + 'tags': tags, + 'batch': str(batch_job.get('_id')), + 'inputs': {} + } - gear_id = batch_job['gear_id'] - gear = gears.get_gear(gear_id) - gear_name = gear['gear']['name'] + for inputs in proposed_inputs: - config_ = batch_job.get('config') - origin = batch_job.get('origin') - tags = proposal.get('tags', []) - tags.append('batch') + job_map = copy.deepcopy(job_defaults) + job_map['inputs'] = inputs if gear.get('category') == 'analysis': - analysis_base = proposal.get('analysis', {}) - if not analysis_base.get('label'): - time_now = datetime.datetime.utcnow() - analysis_base['label'] = {'label': '{} {}'.format(gear_name, time_now)} - an_storage = AnalysisStorage() - acq_storage = AcquisitionStorage() - jobs = [] - job_ids = [] + analysis = copy.deepcopy(analysis_base) - job_defaults = { - 'config': config_, - 'gear_id': gear_id, - 'tags': tags, - 'batch': str(batch_job.get('_id')), - 'inputs': {} - } + # Create analysis + acquisition_id = inputs.values()[0].get('id') + session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session') + result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None) + job = result.get('job') + job_id = result.get('job_id') - for inputs in proposed_inputs: - - job_map = copy.deepcopy(job_defaults) - job_map['inputs'] = inputs + else: - if gear.get('category') == 'analysis': + job = Queue.enqueue_job(job_map, origin) + job.insert() + job_id = job.id_ - analysis = copy.deepcopy(analysis_base) - # Create analysis - acquisition_id = inputs.values()[0].get('id') - session_id = acq_storage.get_container(acquisition_id, projection={'session':1}).get('session') - result = an_storage.create_job_and_analysis('sessions', session_id, analysis, job_map, origin, None) - job = result.get('job') - job_id = result.get('job_id') + jobs.append(job) + job_ids.append(job_id) - else: + for dest in proposed_destinations: - job = Queue.enqueue_job(job_map, origin) - job_id = job.id_ + job_map = copy.deepcopy(job_defaults) + job_map['destination'] = dest + if gear.get('category') == 'analysis': - jobs.append(job) - job_ids.append(job_id) + analysis = copy.deepcopy(analysis_base) - for dest in proposed_destinations: + # Create analysis + result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None) + job = result.get('job') + job_id = result.get('job_id') - job_map = copy.deepcopy(job_defaults) - job_map['destination'] = dest + else: - if gear.get('category') == 'analysis': + job = Queue.enqueue_job(job_map, origin) + job.insert() + job_id = job.id_ - analysis = copy.deepcopy(analysis_base) - # Create analysis - result = an_storage.create_job_and_analysis('sessions', bson.ObjectId(dest['id']), analysis, job_map, origin, None) - job = result.get('job') - job_id = result.get('job_id') + jobs.append(job) + job_ids.append(job_id) - else: + return jobs, job_ids - job = Queue.enqueue_job(job_map, origin) - job_id = job.id_ +def run(batch_job): + """ + Creates jobs from proposed inputs, returns jobs enqueued. + """ + proposal = batch_job.get('proposal') + if not proposal: + raise APIStorageException('The batch job is not formatted correctly.') + preconstructed_jobs = proposal.get('preconstructed_jobs') - jobs.append(job) - job_ids.append(job_id) + # If Running a batch from already-constructed jobs + if preconstructed_jobs: + origin = batch_job.get('origin') + jobs = run_preconstructed_jobs(origin, preconstructed_jobs) + job_ids = [job.id_ for job in jobs] + else: + jobs, job_ids = run_container_jobs(batch_job, proposal) update(batch_job['_id'], {'state': 'running', 'jobs': job_ids}) return jobs diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 714eb4d2d..d75029da5 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -241,6 +241,8 @@ def add(self): uid = self.uid job = Queue.enqueue_job(payload, self.origin, perm_check_uid=uid) + job.insert() + return { '_id': job.id_ } @require_admin @@ -643,7 +645,7 @@ def post_with_jobs(self): for job_number, job_ in enumerate(jobs_): try: - Queue.validate_job(job_, self.origin, create_job=False, perm_check_uid=uid) + Queue.enqueue_job(job_, self.origin, perm_check_uid=uid) except InputValidationException as e: raise InputValidationException("Job {}: {}".format(job_number, str(e))) diff --git a/api/jobs/queue.py b/api/jobs/queue.py index c18b1e189..c7edb56a5 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -128,10 +128,11 @@ def retry(job, force=False): return new_id + @staticmethod - def validate_job(job_map, origin, create_job=False, perm_check_uid=None): + def enqueue_job(job_map, origin, perm_check_uid=None): """ - Using a payload for a proposed job, creates and returns(if create_job is True) (but does not insert) + Using a payload for a proposed job, creates and returns (but does not insert) a Job object. This preperation includes: - confirms gear exists - validates config against gear manifest @@ -250,18 +251,8 @@ def validate_job(job_map, origin, create_job=False, perm_check_uid=None): if gear_name not in tags: tags.append(gear_name) - if create_job: - job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) - return job - return True - @staticmethod - def enqueue_job(job_map, origin, perm_check_uid=None): - """ - Validates, Creates, Inserts, and Returns job - """ - job = Queue.validate_job(job_map, origin, create_job=True, perm_check_uid=perm_check_uid) - job.insert() + job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) return job @staticmethod diff --git a/api/jobs/rules.py b/api/jobs/rules.py index b111baf59..7ab4d71a1 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -246,7 +246,8 @@ def create_jobs(db, container_before, container_after, container_type): for pj in potential_jobs: job_map = pj['job'].map() - Queue.enqueue_job(job_map, origin) + job = Queue.enqueue_job(job_map, origin) + job.insert() spawned_jobs.append(pj['rule']['alg'])