diff --git a/api/api.py b/api/api.py index db7a9dc70..6e7b08da5 100644 --- a/api/api.py +++ b/api/api.py @@ -157,12 +157,13 @@ def prefix(path, routes): # Batch jobs - route('/batch', BatchHandler, h='get_all', m=['GET']), - route('/batch', BatchHandler, m=['POST']), + route('/batch', BatchHandler, h='get_all', m=['GET']), + route('/batch', BatchHandler, m=['POST']), prefix('/batch', [ - route('/<:[^/]+>', BatchHandler, h='get', m=['GET']), - route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']), - route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']), + route('/jobs', BatchHandler, h='post_with_jobs', m=['POST']), + route('/<:[^/]+>', BatchHandler, h='get', m=['GET']), + route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']), + route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']), ]), diff --git a/api/dao/containerstorage.py b/api/dao/containerstorage.py index 84f67d20a..7cc3905d0 100644 --- a/api/dao/containerstorage.py +++ b/api/dao/containerstorage.py @@ -372,6 +372,7 @@ def create_job_and_analysis(self, cont_name, cid, analysis, job, origin, uid): try: job = Queue.enqueue_job(job, origin, perm_check_uid=uid) + job.insert() except Exception as e: # NOTE #775 remove unusable analysis - until jobs have a 'hold' state self.delete_el(analysis['_id']) diff --git a/api/jobs/batch.py b/api/jobs/batch.py index a6258aedc..c29e1adac 100644 --- a/api/jobs/batch.py +++ b/api/jobs/batch.py @@ -126,14 +126,21 @@ def update(batch_id, payload): if result.modified_count != 1: raise Exception('Batch job not updated') -def run(batch_job): +def run_preconstructed_jobs(origin, preconstructed_jobs): """ - Creates jobs from proposed inputs, returns jobs enqueued. + Enqueues jobs and returns list of created jobs """ + jobs = [] - proposal = batch_job.get('proposal') - if not proposal: - raise APIStorageException('The batch job is not formatted correctly.') + for preconstructed_job in preconstructed_jobs: + job = Queue.enqueue_job(preconstructed_job, origin) + job.insert() + jobs.append(job) + + return jobs + +def run_container_jobs(batch_job, proposal): + # Create jobs from the containers and gear id provided in the proposal proposed_inputs = proposal.get('inputs', []) proposed_destinations = proposal.get('destinations', []) @@ -184,6 +191,7 @@ def run(batch_job): else: job = Queue.enqueue_job(job_map, origin) + job.insert() job_id = job.id_ @@ -207,12 +215,33 @@ def run(batch_job): else: job = Queue.enqueue_job(job_map, origin) + job.insert() job_id = job.id_ jobs.append(job) job_ids.append(job_id) + return jobs, job_ids + +def run(batch_job): + """ + Creates jobs from proposed inputs, returns jobs enqueued. + """ + + proposal = batch_job.get('proposal') + if not proposal: + raise APIStorageException('The batch job is not formatted correctly.') + preconstructed_jobs = proposal.get('preconstructed_jobs') + + # If Running a batch from already-constructed jobs + if preconstructed_jobs: + origin = batch_job.get('origin') + jobs = run_preconstructed_jobs(origin, preconstructed_jobs) + job_ids = [job.id_ for job in jobs] + else: + jobs, job_ids = run_container_jobs(batch_job, proposal) + update(batch_job['_id'], {'state': 'running', 'jobs': job_ids}) return jobs diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index ebef965d6..d75029da5 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -241,6 +241,8 @@ def add(self): uid = self.uid job = Queue.enqueue_job(payload, self.origin, perm_check_uid=uid) + job.insert() + return { '_id': job.id_ } @require_admin @@ -629,6 +631,37 @@ def post(self): return batch_proposal + @require_login + def post_with_jobs(self): + """ + Creates a batch from preconstructed jobs + """ + payload = self.request.json + jobs_ = payload.get('jobs', []) + + uid = None + if not self.superuser_request: + uid = self.uid + + for job_number, job_ in enumerate(jobs_): + try: + Queue.enqueue_job(job_, self.origin, perm_check_uid=uid) + except InputValidationException as e: + raise InputValidationException("Job {}: {}".format(job_number, str(e))) + + batch_proposal = { + 'proposal': { + 'preconstructed_jobs': jobs_ + }, + 'origin': self.origin, + 'state': 'pending', + '_id': bson.ObjectId() + } + batch.insert(batch_proposal) + batch_proposal['preconstructed_jobs'] = batch_proposal.pop('proposal') + + return batch_proposal + @require_login def run(self, _id): """ diff --git a/api/jobs/queue.py b/api/jobs/queue.py index 89ea8cc10..c7edb56a5 100644 --- a/api/jobs/queue.py +++ b/api/jobs/queue.py @@ -253,7 +253,6 @@ def enqueue_job(job_map, origin, perm_check_uid=None): tags.append(gear_name) job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch) - job.insert() return job @staticmethod diff --git a/api/jobs/rules.py b/api/jobs/rules.py index b111baf59..7ab4d71a1 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -246,7 +246,8 @@ def create_jobs(db, container_before, container_after, container_type): for pj in potential_jobs: job_map = pj['job'].map() - Queue.enqueue_job(job_map, origin) + job = Queue.enqueue_job(job_map, origin) + job.insert() spawned_jobs.append(pj['rule']['alg']) diff --git a/swagger/paths/batch.yaml b/swagger/paths/batch.yaml index b407f82c3..37407992e 100644 --- a/swagger/paths/batch.yaml +++ b/swagger/paths/batch.yaml @@ -31,6 +31,28 @@ # schema: # $ref: schemas/output/batch-insert.json +/batch/job: + post: + summary: Create a batch job proposal from preconstructed jobs and insert it as 'pending'. + operationId: create_batch_job_from_jobs + tags: + - batch + parameters: + - name: body + in: body + description: '' + schema: + type: array + # Schema file does not exist + # $ref: schemas/input/batch-insert.json + responses: + '200': + description: '' + # Schema file does not exist + # schema: + # $ref: schemas/output/batch-insert.json + + /batch/{BatchId}: parameters: - in: path @@ -94,5 +116,5 @@ '200': description: '' examples: - response: + response: canceled_jobs: 4 diff --git a/tests/integration_tests/python/test_batch.py b/tests/integration_tests/python/test_batch.py index 2bdceeac2..bce6a9892 100644 --- a/tests/integration_tests/python/test_batch.py +++ b/tests/integration_tests/python/test_batch.py @@ -84,6 +84,47 @@ def test_batch(data_builder, as_user, as_admin, as_root): assert r.ok analysis_batch_id = r.json()['_id'] + # try to create a batch with invalid preconstructed jobs + r = as_admin.post('/batch/jobs', json={ + 'jobs': [ + { + 'gear_id': gear, + 'inputs': { + 'dicom': { + 'type': 'acquisition', + 'id': acquisition, + 'name': 'test.zip' + } + }, + 'config': { 'two-digit multiple of ten': 20 }, + 'destination': { + 'type': 'acquisition', + 'id': acquisition + }, + 'tags': [ 'test-tag' ] + } + ] + }) + assert r.status_code == 400 + assert "Job 0" in r.json().get('message') + + # create a batch with preconstructed jobs + r = as_admin.post('/batch/jobs', json={ + 'jobs': [ + { + 'gear_id': gear, + 'config': { 'two-digit multiple of ten': 20 }, + 'destination': { + 'type': 'acquisition', + 'id': acquisition + }, + 'tags': [ 'test-tag' ] + } + ] + }) + assert r.ok + job_batch_id = r.json()['_id'] + # try to get non-existent batch r = as_admin.get('/batch/000000000000000000000000') assert r.status_code == 404 @@ -97,11 +138,21 @@ def test_batch(data_builder, as_user, as_admin, as_root): assert r.ok assert r.json()['state'] == 'pending' + # get batch from jobs + r = as_admin.get('/batch/' + job_batch_id) + assert r.ok + assert r.json()['state'] == 'pending' + # get batch w/ ?jobs=true r = as_admin.get('/batch/' + batch_id, params={'jobs': 'true'}) assert r.ok assert 'jobs' in r.json() + # get job batch w/ ?jobs=true + r = as_admin.get('/batch/' + job_batch_id, params={'jobs': 'true'}) + assert r.ok + assert 'jobs' in r.json() + # try to cancel non-running batch r = as_admin.post('/batch/' + batch_id + '/cancel') assert r.status_code == 400 @@ -134,6 +185,14 @@ def test_batch(data_builder, as_user, as_admin, as_root): r = as_admin.get('/batch/' + analysis_batch_id) assert r.json()['state'] == 'running' + # run job batch + r = as_admin.post('/batch/' + job_batch_id + '/run') + print r.json() + assert r.ok + + # test batch.state after calling run + r = as_admin.get('/batch/' + job_batch_id) + assert r.json()['state'] == 'running' # Test batch complete # create a batch w/ acquisition target and target_context