Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propose batch from pre-constructed jobs #1069

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ def prefix(path, routes):

# Batch jobs

route('/batch', BatchHandler, h='get_all', m=['GET']),
route('/batch', BatchHandler, m=['POST']),
route('/batch', BatchHandler, h='get_all', m=['GET']),
route('/batch', BatchHandler, m=['POST']),
prefix('/batch', [
route('/<:[^/]+>', BatchHandler, h='get', m=['GET']),
route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']),
route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']),
route('/jobs', BatchHandler, h='post_with_jobs', m=['POST']),
route('/<:[^/]+>', BatchHandler, h='get', m=['GET']),
route('/<:[^/]+>/run', BatchHandler, h='run', m=['POST']),
route('/<:[^/]+>/cancel', BatchHandler, h='cancel', m=['POST']),
]),


Expand Down
1 change: 1 addition & 0 deletions api/dao/containerstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def create_job_and_analysis(self, cont_name, cid, analysis, job, origin, uid):
try:

job = Queue.enqueue_job(job, origin, perm_check_uid=uid)
job.insert()
except Exception as e:
# NOTE #775 remove unusable analysis - until jobs have a 'hold' state
self.delete_el(analysis['_id'])
Expand Down
39 changes: 34 additions & 5 deletions api/jobs/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,21 @@ def update(batch_id, payload):
if result.modified_count != 1:
raise Exception('Batch job not updated')

def run(batch_job):
def run_preconstructed_jobs(origin, preconstructed_jobs):
"""
Creates jobs from proposed inputs, returns jobs enqueued.
Enqueues jobs and returns list of created jobs
"""
jobs = []

proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
for preconstructed_job in preconstructed_jobs:
job = Queue.enqueue_job(preconstructed_job, origin)
job.insert()
jobs.append(job)

return jobs

def run_container_jobs(batch_job, proposal):
# Create jobs from the containers and gear id provided in the proposal
proposed_inputs = proposal.get('inputs', [])
proposed_destinations = proposal.get('destinations', [])

Expand Down Expand Up @@ -184,6 +191,7 @@ def run(batch_job):
else:

job = Queue.enqueue_job(job_map, origin)
job.insert()
job_id = job.id_


Expand All @@ -207,12 +215,33 @@ def run(batch_job):
else:

job = Queue.enqueue_job(job_map, origin)
job.insert()
job_id = job.id_


jobs.append(job)
job_ids.append(job_id)

return jobs, job_ids

def run(batch_job):
"""
Creates jobs from proposed inputs, returns jobs enqueued.
"""

proposal = batch_job.get('proposal')
if not proposal:
raise APIStorageException('The batch job is not formatted correctly.')
preconstructed_jobs = proposal.get('preconstructed_jobs')

# If Running a batch from already-constructed jobs
if preconstructed_jobs:
origin = batch_job.get('origin')
jobs = run_preconstructed_jobs(origin, preconstructed_jobs)
job_ids = [job.id_ for job in jobs]
else:
jobs, job_ids = run_container_jobs(batch_job, proposal)

update(batch_job['_id'], {'state': 'running', 'jobs': job_ids})
return jobs

Expand Down
33 changes: 33 additions & 0 deletions api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ def add(self):
uid = self.uid

job = Queue.enqueue_job(payload, self.origin, perm_check_uid=uid)
job.insert()

return { '_id': job.id_ }

@require_admin
Expand Down Expand Up @@ -629,6 +631,37 @@ def post(self):

return batch_proposal

@require_login
def post_with_jobs(self):
"""
Creates a batch from preconstructed jobs
"""
payload = self.request.json
jobs_ = payload.get('jobs', [])

uid = None
if not self.superuser_request:
uid = self.uid

for job_number, job_ in enumerate(jobs_):
try:
Queue.enqueue_job(job_, self.origin, perm_check_uid=uid)
except InputValidationException as e:
raise InputValidationException("Job {}: {}".format(job_number, str(e)))

batch_proposal = {
'proposal': {
'preconstructed_jobs': jobs_
},
'origin': self.origin,
'state': 'pending',
'_id': bson.ObjectId()
}
batch.insert(batch_proposal)
batch_proposal['preconstructed_jobs'] = batch_proposal.pop('proposal')

return batch_proposal

@require_login
def run(self, _id):
"""
Expand Down
1 change: 0 additions & 1 deletion api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def enqueue_job(job_map, origin, perm_check_uid=None):
tags.append(gear_name)

job = Job(str(gear['_id']), inputs, destination=destination, tags=tags, config_=config_, now=now_flag, attempt=attempt_n, previous_job_id=previous_job_id, origin=origin, batch=batch)
job.insert()
return job

@staticmethod
Expand Down
3 changes: 2 additions & 1 deletion api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ def create_jobs(db, container_before, container_after, container_type):

for pj in potential_jobs:
job_map = pj['job'].map()
Queue.enqueue_job(job_map, origin)
job = Queue.enqueue_job(job_map, origin)
job.insert()

spawned_jobs.append(pj['rule']['alg'])

Expand Down
24 changes: 23 additions & 1 deletion swagger/paths/batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@
# schema:
# $ref: schemas/output/batch-insert.json

/batch/job:
post:
summary: Create a batch job proposal from preconstructed jobs and insert it as 'pending'.
operationId: create_batch_job_from_jobs
tags:
- batch
parameters:
- name: body
in: body
description: ''
schema:
type: array
# Schema file does not exist
# $ref: schemas/input/batch-insert.json
responses:
'200':
description: ''
# Schema file does not exist
# schema:
# $ref: schemas/output/batch-insert.json


/batch/{BatchId}:
parameters:
- in: path
Expand Down Expand Up @@ -94,5 +116,5 @@
'200':
description: ''
examples:
response:
response:
canceled_jobs: 4
59 changes: 59 additions & 0 deletions tests/integration_tests/python/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,47 @@ def test_batch(data_builder, as_user, as_admin, as_root):
assert r.ok
analysis_batch_id = r.json()['_id']

# try to create a batch with invalid preconstructed jobs
r = as_admin.post('/batch/jobs', json={
'jobs': [
{
'gear_id': gear,
'inputs': {
'dicom': {
'type': 'acquisition',
'id': acquisition,
'name': 'test.zip'
}
},
'config': { 'two-digit multiple of ten': 20 },
'destination': {
'type': 'acquisition',
'id': acquisition
},
'tags': [ 'test-tag' ]
}
]
})
assert r.status_code == 400
assert "Job 0" in r.json().get('message')

# create a batch with preconstructed jobs
r = as_admin.post('/batch/jobs', json={
'jobs': [
{
'gear_id': gear,
'config': { 'two-digit multiple of ten': 20 },
'destination': {
'type': 'acquisition',
'id': acquisition
},
'tags': [ 'test-tag' ]
}
]
})
assert r.ok
job_batch_id = r.json()['_id']

# try to get non-existent batch
r = as_admin.get('/batch/000000000000000000000000')
assert r.status_code == 404
Expand All @@ -97,11 +138,21 @@ def test_batch(data_builder, as_user, as_admin, as_root):
assert r.ok
assert r.json()['state'] == 'pending'

# get batch from jobs
r = as_admin.get('/batch/' + job_batch_id)
assert r.ok
assert r.json()['state'] == 'pending'

# get batch w/ ?jobs=true
r = as_admin.get('/batch/' + batch_id, params={'jobs': 'true'})
assert r.ok
assert 'jobs' in r.json()

# get job batch w/ ?jobs=true
r = as_admin.get('/batch/' + job_batch_id, params={'jobs': 'true'})
assert r.ok
assert 'jobs' in r.json()

# try to cancel non-running batch
r = as_admin.post('/batch/' + batch_id + '/cancel')
assert r.status_code == 400
Expand Down Expand Up @@ -134,6 +185,14 @@ def test_batch(data_builder, as_user, as_admin, as_root):
r = as_admin.get('/batch/' + analysis_batch_id)
assert r.json()['state'] == 'running'

# run job batch
r = as_admin.post('/batch/' + job_batch_id + '/run')
print r.json()
assert r.ok

# test batch.state after calling run
r = as_admin.get('/batch/' + job_batch_id)
assert r.json()['state'] == 'running'

# Test batch complete
# create a batch w/ acquisition target and target_context
Expand Down