Skip to content

Commit

Permalink
Merge pull request #16 from aclowes/simple-tasks
Browse files Browse the repository at this point in the history
allow tasks without an associated workflow
  • Loading branch information
aclowes authored Jun 29, 2017
2 parents f3b21fd + a3ad536 commit 4deb5fb
Show file tree
Hide file tree
Showing 21 changed files with 185 additions and 40 deletions.
8 changes: 7 additions & 1 deletion frontend/src/App.css
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
margin: 0;
}

pre {
margin: 0;
padding: 5px;
display: inline-block;
}

/* Status Colors */

.waiting {
Expand Down Expand Up @@ -41,4 +47,4 @@ g.edgePath path {
stroke: #333;
fill: #333;
stroke-width: 1px;
}
}
4 changes: 2 additions & 2 deletions frontend/src/ExecutionTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default class ExecutionTable extends React.Component {
return this.props.executions.map((execution) => (
<tr key={execution.id}>
<td><Link to={`/workers/${execution.worker.id}`}>{execution.worker.name}</Link></td>
<td>{execution.task.workflow.name}</td>
<td>{execution.task.workflow && execution.task.workflow.name}</td>
<td><Link to={`/tasks/${execution.task.id}`}>{execution.task.name}</Link></td>
<td>{execution.status}</td>
<td>{execution.start_timestamp}</td>
Expand Down Expand Up @@ -45,4 +45,4 @@ export default class ExecutionTable extends React.Component {
</Table>
)
}
}
}
21 changes: 15 additions & 6 deletions frontend/src/TaskDetail.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ export default class TaskDetail extends React.Component {
))
}

renderWorkflowLink() {
const task = this.state.task;
if (task.workflow) return (
<Link to={`/workflows/${task.workflow.id}`}>
{task.workflow.name} - v{task.workflow.version}
</Link>
)
}

renderExecution() {
if (this.state.execution === 0) {
return <div>No executions</div>
Expand All @@ -63,7 +72,7 @@ export default class TaskDetail extends React.Component {
<dd>{execution.status}</dd>
<dt>Worker</dt>
<dd>
<Link to={`/worker/${execution.worker.id}`}>
<Link to={`/workers/${execution.worker.id}`}>
{execution.worker.name}
</Link>
</dd>
Expand Down Expand Up @@ -102,14 +111,14 @@ export default class TaskDetail extends React.Component {
<dl className="dl-horizontal">
<dt>Workflow</dt>
<dd>
<Link to={`/workflows/${task.workflow.id}`}>
{task.workflow.name} - v{task.workflow.version}
</Link>
{this.renderWorkflowLink()}
</dd>
<dt>Task Name</dt>
<dd>{task.name}</dd>
<dt>Command</dt>
<dd>{JSON.stringify(task.command)}</dd>
<dd>
<pre>{task.command}</pre>
</dd>
<dt>Max Retries</dt>
<dd>{task.max_retries}</dd>
<dt>Timeout</dt>
Expand All @@ -130,4 +139,4 @@ export default class TaskDetail extends React.Component {
)
}
}
}
}
4 changes: 3 additions & 1 deletion frontend/src/tests/__snapshots__/TaskDetail.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ exports[`TaskDetail success 1`] = `
Command
</dt>
<dd>
"echo Starting..."
<pre>
echo Starting...
</pre>
</dd>
<dt>
Max Retries
Expand Down
4 changes: 2 additions & 2 deletions yawn/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def run():

name = WorkflowName.objects.create(name='workflow1')
workflow = name.new_version(parameters={'parent': True, 'child': False})
task1 = Template.objects.create(workflow=workflow, name='task1', command=[''])
task2 = Template.objects.create(workflow=workflow, name='task2', command=[''])
task1 = Template.objects.create(workflow=workflow, name='task1', command='')
task2 = Template.objects.create(workflow=workflow, name='task2', command='')
task2.upstream.add(task1)

return workflow.submit_run(parameters={'child': True})
7 changes: 3 additions & 4 deletions yawn/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ def main():
# check if yawn is in installed apps, and bail if it is not
if 'yawn' not in settings.INSTALLED_APPS:
print("Please check your DJANGO_SETTINGS_MODULE environment variable.\n"
"Make sure 'yawn' must be in your INSTALLED_APPS.\n"
"Make sure 'yawn' is in your INSTALLED_APPS.\n"
"Generally, your settings file should start with 'from yawn.settings.base import *'")
sys.exit(1)

print('\nYAWN workflow management tool')

print('YAWN workflow management tool')
if os.environ['DJANGO_SETTINGS_MODULE'] == 'yawn.settings.debug':
print(' Running in DEBUG mode')

# run the django manage.py commandline
# run the django manage.py command line
execute_from_command_line(sys.argv)


Expand Down
21 changes: 15 additions & 6 deletions yawn/management/commands/exec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import argparse
"""
This helper command will import any python callable on your python path and
call it with the supplied arguments.
Use `yawn.task.decorators.make_task` to
"""
import importlib

from django.core.management.base import BaseCommand
Expand All @@ -8,15 +13,19 @@ class Command(BaseCommand):
help = 'Execute a python callable'

def add_arguments(self, parser):
parser.add_argument('module')
parser.add_argument('callable')
parser.add_argument('arguments', nargs=argparse.REMAINDER)
parser.add_argument('module', help='The python module to import, i.e. animal.bird')
parser.add_argument('callable', help='The python callable to invoke, i.e. Swallow')
parser.add_argument('argument', nargs='*', help='Arguments to pass to the callable')

def handle(self, *args, **options):
self.stdout.write('Importing module %s' % options['module'])
module_ = importlib.import_module(options['module'])

self.stdout.write('Calling %s("%s")' % (options['callable'], '", "'.join(options['arguments'])))
getattr(module_, options['callable'])(*options['arguments'])
arguments = ''
if options['argument']:
arguments = "'{}'".format("', '".join(options['argument']))

self.stdout.write('Calling %s(%s)' % (options['callable'], arguments))
getattr(module_, options['callable'])(*options['argument'])

self.stdout.write('Execution complete')
2 changes: 1 addition & 1 deletion yawn/management/commands/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __call__(self, environ, start_response):
and we return index.html, and then react-router interprets the path.
"""
path = decode_path_info(environ['PATH_INFO'])
if path.startswith('/api/'):
if path.startswith('/api'):
return self.application(environ, start_response)
static_file = self.files.get(path)
if static_file is None:
Expand Down
42 changes: 42 additions & 0 deletions yawn/migrations/0002_tasks_without_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.2 on 2017-06-29 16:44
from __future__ import unicode_literals

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('yawn', '0001_initial'),
]

operations = [
migrations.AlterField(
model_name='queue',
name='name',
field=models.TextField(unique=True),
),
migrations.AlterField(
model_name='task',
name='run',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.PROTECT, to='yawn.Run'),
),
migrations.AlterField(
model_name='template',
name='name',
field=models.TextField(),
),
migrations.AlterField(
model_name='template',
name='workflow',
field=models.ForeignKey(editable=False, null=True, on_delete=django.db.models.deletion.PROTECT,
to='yawn.Workflow'),
),
migrations.AlterField(
model_name='workflowname',
name='name',
field=models.TextField(unique=True),
),
]
31 changes: 31 additions & 0 deletions yawn/task/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import shlex

from yawn.task.models import Template, Task
from yawn.workflow.models import Workflow # noqa - needed to load the foreign key!
from yawn.worker.models import Queue


def delay(func, *args, timeout=None, max_retries=0, queue=None):
arguments = [shlex.quote(arg) for arg in args]
command = 'yawn exec {0.__module__} {0.__name__} {1}'.format(
func, ' '.join(arguments)).strip()
task_name = '{0.__module__}.{0.__name__}({1})'.format(
func, ', '.join(arguments))

if queue:
queue_obj, _ = Queue.objects.get_or_create(name=queue)
else:
queue_obj = Queue.get_default_queue()

template, _ = Template.objects.get_or_create(
name=task_name,
command=command,
queue=queue_obj,
max_retries=max_retries,
timeout=timeout
)
task = Task.objects.create(
template=template
)
task.enqueue()
return task
9 changes: 5 additions & 4 deletions yawn/task/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@


class Template(models.Model):
workflow = models.ForeignKey('Workflow', models.PROTECT, editable=False)
workflow = models.ForeignKey('Workflow', models.PROTECT, editable=False, null=True)
queue = models.ForeignKey(Queue, models.PROTECT)
name = models.SlugField(allow_unicode=True, db_index=False)
name = models.TextField()

command = models.TextField()
max_retries = models.IntegerField(default=0)
Expand Down Expand Up @@ -41,7 +41,7 @@ class Task(models.Model):
UPSTREAM_FAILED = 'upstream_failed'
STATUS_CHOICES = [(x, x) for x in (WAITING, QUEUED, RUNNING, SUCCEEDED, FAILED, UPSTREAM_FAILED)]

run = models.ForeignKey('Run', models.PROTECT)
run = models.ForeignKey('Run', models.PROTECT, null=True)
template = models.ForeignKey(Template, models.PROTECT)
status = models.TextField(choices=STATUS_CHOICES, default=WAITING)

Expand Down Expand Up @@ -177,7 +177,8 @@ def mark_finished(self, exit_code=None, lost=False):
self.task.save()
with transaction.atomic():
self.task.update_downstream()
self.task.run.update_status()
if self.task.run:
self.task.run.update_status()

self.stop_timestamp = functions.Now()
# need to be careful not to overwrite stdout/stderr
Expand Down
4 changes: 2 additions & 2 deletions yawn/task/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@


class SimpleWorkflowSerializer(serializers.ModelSerializer):
name = serializers.SlugField(source='name.name', read_only=True)
name = serializers.CharField(source='name.name', read_only=True)

class Meta:
model = Workflow
fields = ('id', 'name', 'version')


class TaskSerializer(serializers.ModelSerializer):
name = serializers.SlugField(source='template.name', read_only=True)
name = serializers.CharField(source='template.name', read_only=True)
workflow = SimpleWorkflowSerializer(source='template.workflow', read_only=True)

class Meta:
Expand Down
35 changes: 35 additions & 0 deletions yawn/task/tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from yawn.task.helpers import delay


def some_function(*args):
pass


class SomeClass:
pass


def test_quoted_arg():
task = delay(SomeClass, 'A small "taste" of chaos')
assert task.template.command == 'yawn exec yawn.task.tests.test_helpers ' \
'SomeClass \'A small "taste" of chaos\''


def test_many_args():
task = delay(some_function, 'something', '1')
assert task.template.command == "yawn exec yawn.task.tests.test_helpers " \
"some_function something 1"


def test_deduplication():
task1 = delay(some_function)
task2 = delay(some_function)
assert task1.template_id == task2.template_id


def test_queued():
task = delay(SomeClass, queue='queue', max_retries=2, timeout=10)
assert task.message_set.count() == 1
assert task.message_set.first().queue.name == 'queue'
assert task.template.max_retries == 2
assert task.template.timeout == 10
14 changes: 13 additions & 1 deletion yawn/task/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
A succeeded but B failed, mark C and D upstream_failed
"""
from yawn.worker.models import Queue, Worker
from yawn.task.models import Task, Execution
from yawn.task.models import Task, Execution, Template


def test_first_queued(run):
Expand Down Expand Up @@ -91,3 +91,15 @@ def test_execution_output(run):
execution.refresh_from_db()
assert execution.stdout == 'foobar'
assert execution.stderr == 'blah'


def test_task_without_workflow():
template = Template.objects.create(name='task1', command='')
task = Task.objects.create(template=template)
worker = Worker.objects.create(name='worker1')

# asserts that a task can be run without having a workflow associated
execution = task.start_execution(worker)
execution.mark_finished(exit_code=0)
task.refresh_from_db()
assert task.status == Task.SUCCEEDED
2 changes: 1 addition & 1 deletion yawn/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def start_tasks(self):
self.executor.start_subprocess(
execution_id=execution.id,
command=execution.task.template.command,
environment=execution.task.run.parameters,
environment=execution.task.run.parameters if execution.task.run else {},
timeout=execution.task.template.timeout
)

Expand Down
2 changes: 1 addition & 1 deletion yawn/worker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __str__(self):
class Queue(models.Model):
"""Arbitrary tag defining where tasks run."""

name = models.SlugField(unique=True, allow_unicode=True)
name = models.TextField(unique=True)

_default = None

Expand Down
2 changes: 1 addition & 1 deletion yawn/worker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def update(self, instance, validated_data):


class MessageSerializer(serializers.ModelSerializer):
queue = serializers.SlugField(source='queue.name')
queue = serializers.CharField(source='queue.name')

class Meta:
model = Message
Expand Down
2 changes: 1 addition & 1 deletion yawn/worker/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class WorkerViewSet(viewsets.GenericViewSet,
"""
Worker endpoint, GET(list)
"""
queryset = Worker.objects.all().order_by('id')
queryset = Worker.objects.all().order_by('-id')

serializer_class = WorkerSerializer

Expand Down
Loading

0 comments on commit 4deb5fb

Please sign in to comment.