Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom rule config (+use gear_id) #961

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions api/jobs/gears.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from __future__ import absolute_import

import bson.objectid
import copy
import datetime
from jsonschema import Draft4Validator, ValidationError
import gears as gear_tools
import pymongo

from .. import config
from .jobs import Job
Expand Down Expand Up @@ -39,17 +39,10 @@ def get_gears():
return map(lambda x: x['original'], cursor)

def get_gear(_id):
return config.db.gears.find_one({'_id': bson.ObjectId(_id)})

def get_gear_by_name(name):

# Find a gear from the list by name
gear_doc = list(config.db.gears.find({'gear.name': name}).sort('created', pymongo.DESCENDING))

if len(gear_doc) == 0 :
raise APINotFoundException('Unknown gear ' + name)

return gear_doc[0]
gear = config.db.gears.find_one({'_id': bson.ObjectId(_id)})
if gear is None:
raise APINotFoundException('Cannot find gear {}'.format(_id))
return gear

def get_invocation_schema(gear):
return gear_tools.derive_invocation_schema(gear['gear'])
Expand Down Expand Up @@ -104,13 +97,13 @@ def suggest_for_files(gear, files):
return suggested_files

def validate_gear_config(gear, config_):
if len(gear.get('manifest', {}).get('config', {})) > 0:
invocation = gear_tools.derive_invocation_schema(gear['manifest'])
if len(gear.get('gear', {}).get('config', {})) > 0:
invocation = gear_tools.derive_invocation_schema(gear['gear'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice catch

ci = gear_tools.isolate_config_invocation(invocation)
validator = Draft4Validator(ci)

try:
validator.validate(config_)
validator.validate(fill_gear_default_values(gear, config_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults will be filled when the job is spawned, so I would think we wouldn't fill them here. What is the expected behavior if a gear is updated and the update has a new default that the rule didn't explicitly set? If we fill defaults here, it won't use the updated default. It is possible they wouldn't want to use an updated default without any user action.

If I missed a discussion on this, let me know. @kofalt @ambrussimon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it's a job rule, I would expect valid defaults to be used, rather than invalid ones from a different gear document. Really the core issue here is that fill_gear_default_values modifies the map, it's reasonable to call this in places that you won't want to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I would either not call fill_gear_default_values in places where you don't want to modify the document (quick fix), or (better fix) change fill_gear_default_values to use copy.deepcopy, then check the various callees.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nice catch, that was the original intent... I totally agree, fixing right away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, but I was asking if we wanted defaults persisted to the database for rule config if the user didn't explicitly send them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nagem I thought it would be best not to store the default config on the rule, and have new jobs simply use potentially updated/new defaults, filled at spawn time. I gathered that's the most useful/expected behavior for now ~ in line with how it works currently. Proper gear versioning is partially in conflict with the "auto-update" feature... At least that's what I thought during implementation. I guess auto-selecting the latest gear on the UI in a dropdown wouldn't be too complicated, either.

Anyhow, gear versioning or not, I think not storing the defaults is better. Assuming gears have sane defaults for every version, it just working out of the box after an update (at least if no custom config was set!) seems nice, and that's exactly what Thad wants to hold on to. If the user wants to hardcode the defaults, they are still free to, using the new rule config...

except ValidationError as err:
key = None
if len(err.relative_path) > 0:
Expand All @@ -128,8 +121,7 @@ def fill_gear_default_values(gear, config_):
Given a gear and a config map, fill any missing keys using defaults from the gear's config
"""

if config_ is None:
config_ = {}
config_ = copy.deepcopy(config_) or {}

for k,v in gear['gear'].get('config', {}).iteritems():
if 'default' in v:
Expand Down
15 changes: 5 additions & 10 deletions api/jobs/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ..web.errors import APIPermissionException, APINotFoundException, InputValidationException
from ..web.request import AccessType

from .gears import validate_gear_config, get_gears, get_gear, get_invocation_schema, remove_gear, upsert_gear, suggest_container, get_gear_by_name, check_for_gear_insertion
from .gears import validate_gear_config, get_gears, get_gear, get_invocation_schema, remove_gear, upsert_gear, suggest_container, check_for_gear_insertion
from .jobs import Job, JobTicket, Logs
from .batch import check_state, update
from .queue import Queue
Expand Down Expand Up @@ -147,10 +147,7 @@ def post(self, cid):

validate_data(payload, 'rule-new.json', 'input', 'POST', optional=True)
validate_regexes(payload)
try:
get_gear_by_name(payload['alg'])
except APINotFoundException:
self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(payload['alg']))
validate_gear_config(get_gear(payload['gear_id']), payload.get('config'))

payload['project_id'] = cid

Expand Down Expand Up @@ -200,11 +197,9 @@ def put(self, cid, rid):
updates = self.request.json
validate_data(updates, 'rule-update.json', 'input', 'POST', optional=True)
validate_regexes(updates)
if updates.get('alg'):
try:
get_gear_by_name(updates['alg'])
except APINotFoundException:
self.abort(400, 'Cannot find gear for alg {}, alg not valid'.format(updates['alg']))
gear_id = updates.get('gear_id', doc['gear_id'])
config_ = updates.get('config', doc.get('config'))
validate_gear_config(get_gear(gear_id), config_)

doc.update(updates)
config.db.project_rules.replace_one({'_id': bson.ObjectId(rid)}, doc)
Expand Down
4 changes: 2 additions & 2 deletions api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def enqueue_job(job_map, origin, perm_check_uid=None):
if gear.get('gear', {}).get('custom', {}).get('flywheel', {}).get('invalid', False):
raise InputValidationException('Gear marked as invalid, will not run!')

config_ = fill_gear_default_values(gear, job_map.get('config', {}))
config_ = job_map.get('config', {})
validate_gear_config(gear, config_)

# Translate maps to FileReferences
Expand Down Expand Up @@ -194,7 +194,7 @@ def enqueue_job(job_map, origin, perm_check_uid=None):

# Config options are stored on the job object under the "config" key
config_ = {
'config': config_,
'config': fill_gear_default_values(gear, config_),
'inputs': { },
'destination': {
'type': destination.type,
Expand Down
25 changes: 15 additions & 10 deletions api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,25 @@ def eval_rule(rule, file_, container):

return True

def queue_job_legacy(algorithm_id, input_):
def queue_job_legacy(gear_id, input_):
"""
Tie together logic used from the no-manifest, single-file era.
Takes a single FileReference instead of a map.
"""

gear = gears.get_gear_by_name(algorithm_id)
gear = gears.get_gear(gear_id)

if len(gear['gear']['inputs']) != 1:
raise Exception("Legacy gear enqueue attempt of " + algorithm_id + " failed: must have exactly 1 input in manifest")
raise Exception("Legacy gear enqueue attempt of " + gear_id + " failed: must have exactly 1 input in manifest")

input_name = gear['gear']['inputs'].keys()[0]

inputs = {
input_name: input_
}

job = Job(str(gear['_id']), inputs, tags=['auto', algorithm_id])
gear_tag = gear['gear']['name'] + '-' + gear_id
job = Job(str(gear['_id']), inputs, tags=['auto', gear_tag])
return job

def find_type_in_container(container, type_):
Expand Down Expand Up @@ -184,22 +185,26 @@ def create_potential_jobs(db, container, container_type, file_):

if 'from_failed_job' not in file_ and eval_rule(rule, file_, container):

alg_name = rule['alg']
gear_id = rule['gear_id']
gear = gears.get_gear(gear_id)
gear_tag = gear['gear']['name'] + '-' + gear_id

if rule.get('match') is None:
input_ = FileReference(type=container_type, id=str(container['_id']), name=file_['name'])
job = queue_job_legacy(alg_name, input_)
job = queue_job_legacy(gear_id, input_)
else:
inputs = { }

for input_name, match_type in rule['match'].iteritems():
match = find_type_in_container(container, match_type)
if match is None:
raise Exception("No type " + match_type + " found for alg rule " + alg_name + " that should have been satisfied")
raise Exception("No type " + match_type + " found for alg rule " + gear_tag + " that should have been satisfied")
inputs[input_name] = FileReference(type=container_type, id=str(container['_id']), name=match['name'])

gear = gears.get_gear_by_name(alg_name)
job = Job(str(gear['_id']), inputs, tags=['auto', alg_name])
job = Job(str(gear['_id']), inputs, tags=['auto', gear_tag])

if 'config' in rule:
job.config = rule['config']

potential_jobs.append({
'job': job,
Expand Down Expand Up @@ -248,7 +253,7 @@ def create_jobs(db, container_before, container_after, container_type):
job_map = pj['job'].map()
Queue.enqueue_job(job_map, origin)

spawned_jobs.append(pj['rule']['alg'])
spawned_jobs.append(pj['rule']['gear_id'])

return spawned_jobs

Expand Down
2 changes: 1 addition & 1 deletion swagger/examples/input/rule-new.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_id": "5a12f2923306be0016179f47",
"project_id": "57e452791cff88b85f9f9c97",
"alg": "dcm2niix",
"gear_id": "580925ce9e512c57dc8a103c",
"name": "dcm2niix",
"all": [
{
Expand Down
6 changes: 3 additions & 3 deletions swagger/examples/output/rule-list.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"alg": "dcm2niix",
"gear_id": "580925ce9e512c57dc8a103b",
"all": [
{
"regex": true,
Expand All @@ -17,7 +17,7 @@
"any": []
},
{
"alg": "dicom-mr-classifier",
"gear_id": "580925ce9e512c57dc8a103c",
"all": [
{
"type": "file.type",
Expand All @@ -34,7 +34,7 @@
"any": []
},
{
"alg": "mriqc",
"gear_id": "580925ce9e512c57dc8a103d",
"all": [
{
"type": "file.type",
Expand Down
2 changes: 1 addition & 1 deletion swagger/examples/rules_list.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"alg" : "dicom_mr_classifier",
"gear_id": "580925ce9e512c57dc8a103c",
"all" : [
[
"file.type",
Expand Down
6 changes: 4 additions & 2 deletions swagger/schemas/definitions/rule.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
"properties": {
"_id": { "type": "string" },
"project_id": { "type": "string" },
"alg": { "type": "string" },
"gear_id": { "type": "string" },
"name": { "type": "string" },
"config": { "type": "object" },
"any": { "$ref": "#/definitions/rule-items" },
"all": { "$ref": "#/definitions/rule-items" },
"disabled": { "type": "boolean" }
Expand All @@ -43,8 +44,9 @@
"type": "object",
"properties": {
"_id": { "type": "string" },
"alg": { "type": "string" },
"gear_id": { "type": "string" },
"name": { "type": "string" },
"config": { "type": "object" },
"any": { "$ref": "#/definitions/rule-items" },
"all": { "$ref": "#/definitions/rule-items" },
"disabled": { "type": "boolean" }
Expand Down
2 changes: 1 addition & 1 deletion swagger/schemas/input/rule-new.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
"title": "Rule",
"type": "object",
"allOf": [{"$ref": "../definitions/rule.json#/definitions/rule-input"}],
"required": ["alg", "name", "any", "all"]
"required": ["gear_id", "name", "any", "all"]
}
5 changes: 2 additions & 3 deletions tests/integration_tests/python/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro
job0 = copy.deepcopy(job_data)
job0['gear_id'] = '000000000000000000000000'
r = as_admin.post('/jobs/add', json=job0)
assert r.status_code == 400
assert r.status_code == 404

# add job with explicit destination
r = as_admin.post('/jobs/add', json=job_data)
Expand Down Expand Up @@ -358,15 +358,14 @@ def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_
}
gear = data_builder.create_gear(gear=gear_doc)
gear2 = data_builder.create_gear()
gear2_name = as_admin.get('/gears/' + gear2).json()['gear']['name']
project = data_builder.create_project()
session = data_builder.create_session()
acquisition = data_builder.create_acquisition()
assert as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form('test.zip')).ok

# create rule for text files
r = as_admin.post('/projects/' + project + '/rules', json={
'alg': gear2_name,
'gear_id': gear2,
'name': 'text-trigger',
'any': [],
'all': [{'type': 'file.type', 'value': 'text'}]
Expand Down
Loading