Skip to content

Commit

Permalink
Merge branch 'circleci'
Browse files Browse the repository at this point in the history
  • Loading branch information
aclowes committed Feb 9, 2017
2 parents 24f926d + 6554ae5 commit c6107ec
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 58 deletions.
13 changes: 13 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
charset = utf-8

[*.py]
indent_style = space
indent_size = 4

[*.{js,jsx,less,html,json,css,yaml,yml}]
indent_style = space
indent_size = 2
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 120
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# build
*.egg-info/
*build/
*coverage/
.coverage

# node packages
*node_modules/
Expand Down
27 changes: 12 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ Broker
Components
----------

- Web server provides a user interface.
- Worker schedules and executes tasks.
- Postgres 9.5+ database stores state.
Web Servers
The website provides a user interface to view the workflows and tasks running within them.
It allows you to run an existing workflow or re-run a failed task. The web server also provides
a REST API to programatically create and run workflows.

Workers
The worker schedules and executes tasks. The worker uses ``subprocess.Popen`` to run tasks and
capture stdout and stderr.

Concepts
--------
Expand All @@ -45,13 +50,13 @@ Workflow
acyclic graph (DAG). Workflows can be scheduled to run on a regular basis and they are versioned
so they can change over time.

Task
A shell command that specifies the upstream tasks it depends on, the number times to retry, and a
timeout.

Run
A manually triggered or scheduled run of a Workflow.

Task
A shell command that specifies the upstream tasks it depends on, the number times to retry, and a
timeout. The task is given environment variables configured in the workflow and run.

Execution
A single execution of a Task's command, capturing the exit code and standard output and error.

Expand Down Expand Up @@ -186,12 +191,4 @@ Load some examples and run the worker to process them::
.. _create-react-app: https://github.com/facebookincubator/create-react-app
.. _Django: https://airflow.incubator.apache.org/

TODO
----

- WSGI + static file server wrapped in a ``yawn webserver`` command
- Config file for database connection, etc
- Python API / wrapper for creating workflows, submitting tasks
- submit run in UI
- edit parameters on run?
- show env given to the execution?
11 changes: 8 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ machine:
version: 3.5.2

dependencies:
cache_directories:
- frontend/node_modules
override:
- pip install -e .[test]
- cd frontend && npm install
- pip install -e .[test]
- cd frontend && npm install

test:
override:
- pytest
- pytest --cov=yawn yawn
- flake8 yawn
post:
- bash <(curl -s https://codecov.io/bash)
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

# Always prefer setuptools over distutils
from setuptools import setup, find_packages
from setuptools import setup
# To use a consistent encoding
from codecs import open
from os import path
Expand Down Expand Up @@ -85,8 +85,8 @@
extras_require={
# 'dev': ['check-manifest'],
'test': [
'coverage',
'pytest',
'pytest-cov',
'flake8',
'pyyaml',
],
Expand All @@ -113,4 +113,4 @@
'yawn=yawn.manage:main',
],
},
)
)
21 changes: 18 additions & 3 deletions yawn/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ def client():
from rest_framework.test import APIClient

user = User.objects.create_user('test_user')
client = APIClient()
client.force_authenticate(user)
return client
api_client = APIClient()
api_client.force_authenticate(user)
return api_client


@pytest.fixture()
def run():
"""Setup a workflow and run to test with"""
from yawn.workflow.models import WorkflowName
from yawn.task.models import Template

name = WorkflowName.objects.create(name='workflow1')
workflow = name.new_version(parameters={'parent': True, 'child': False})
task1 = Template.objects.create(workflow=workflow, name='task1', command=[''])
task2 = Template.objects.create(workflow=workflow, name='task2', command=[''])
task2.upstream.add(task1)

return workflow.submit_run(parameters={'child': True})
21 changes: 15 additions & 6 deletions yawn/migrations/0001_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class Migration(migrations.Migration):

initial = True

dependencies = [
Expand All @@ -21,7 +20,9 @@ class Migration(migrations.Migration):
name='Execution',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('status', models.TextField(choices=[('running', 'running'), ('succeeded', 'succeeded'), ('failed', 'failed'), ('killed', 'killed'), ('lost', 'lost')], default='running')),
('status', models.TextField(
choices=[('running', 'running'), ('succeeded', 'succeeded'), ('failed', 'failed'),
('killed', 'killed'), ('lost', 'lost')], default='running')),
('start_timestamp', models.DateTimeField(default=django.db.models.functions.base.Now)),
('stop_timestamp', models.DateTimeField(null=True)),
('exit_code', models.IntegerField(null=True)),
Expand All @@ -48,15 +49,20 @@ class Migration(migrations.Migration):
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('submitted_time', models.DateTimeField()),
('scheduled_time', models.DateTimeField(null=True)),
('status', models.TextField(choices=[('running', 'running'), ('succeeded', 'succeeded'), ('failed', 'failed')], default='running')),
('status',
models.TextField(choices=[('running', 'running'), ('succeeded', 'succeeded'), ('failed', 'failed')],
default='running')),
('parameters', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
],
),
migrations.CreateModel(
name='Task',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('status', models.TextField(choices=[('waiting', 'waiting'), ('queued', 'queued'), ('running', 'running'), ('succeeded', 'succeeded'), ('failed', 'failed'), ('upstream_failed', 'upstream_failed')], default='waiting')),
('status', models.TextField(
choices=[('waiting', 'waiting'), ('queued', 'queued'), ('running', 'running'),
('succeeded', 'succeeded'), ('failed', 'failed'), ('upstream_failed', 'upstream_failed')],
default='waiting')),
('run', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='yawn.Run')),
],
),
Expand All @@ -77,7 +83,8 @@ class Migration(migrations.Migration):
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.TextField()),
('status', models.TextField(choices=[('active', 'active'), ('exited', 'exited'), ('lost', 'lost')], default='active')),
('status', models.TextField(choices=[('active', 'active'), ('exited', 'exited'), ('lost', 'lost')],
default='active')),
('start_timestamp', models.DateTimeField(default=django.db.models.functions.base.Now)),
('last_heartbeat', models.DateTimeField(default=django.db.models.functions.base.Now)),
],
Expand All @@ -98,7 +105,9 @@ class Migration(migrations.Migration):
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.SlugField(allow_unicode=True, unique=True)),
('current_version', models.OneToOneField(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='is_current', to='yawn.Workflow')),
('current_version',
models.OneToOneField(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='is_current',
to='yawn.Workflow')),
],
),
migrations.AddField(
Expand Down
2 changes: 1 addition & 1 deletion yawn/settings/debug.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from yawn.settings.base import *
from yawn.settings.base import * # NOQA

SECRET_KEY = 'example secret key, change me'
DEBUG = True
Expand Down
2 changes: 1 addition & 1 deletion yawn/settings/test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import *
from .base import * # NOQA

SECRET_KEY = 'secret key for tests'

Expand Down
18 changes: 1 addition & 17 deletions yawn/task/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,8 @@
A & B succeed, don't submit C(failed)
A succeeded but B failed, mark C and D upstream_failed
"""
import pytest

from yawn.worker.models import Queue, Worker
from yawn.task.models import Template, Task, Execution
from yawn.workflow.models import WorkflowName


@pytest.fixture()
def run():
"""Setup a workflow and run to test with"""
name = WorkflowName.objects.create(name='workflow1')
workflow = name.new_version(parameters={'parent': True, 'child': False})
task1 = Template.objects.create(workflow=workflow, name='task1', command=[''])
task2 = Template.objects.create(workflow=workflow, name='task2', command=[''])
task2.upstream.add(task1)

run = workflow.submit_run(parameters={'child': True})
return run
from yawn.task.models import Task, Execution


def test_first_queued(run):
Expand Down
2 changes: 0 additions & 2 deletions yawn/task/tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from yawn.worker.models import Worker
# to make the fixture available:
from yawn.task.tests.test_models import run


def test_get_task(client, run):
Expand Down
4 changes: 2 additions & 2 deletions yawn/worker/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import enum
import signal
import typing
import typing # NOQA
import collections

from django.db import transaction
from django.db.models import functions

from yawn.task.models import Task, Execution
from yawn.worker.models import Worker, Queue
from yawn.worker.executor import Manager, Result
from yawn.worker.executor import Manager, Result # NOQA
from yawn.workflow.models import Workflow
from yawn.utilities import logger
from yawn.utilities.cron import Crontab
Expand Down
2 changes: 1 addition & 1 deletion yawn/worker/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from yawn.task.models import Template, Task, Execution
from yawn.task.models import Template, Task, Execution # NOQA
from yawn.worker.models import Worker
from yawn.workflow.models import WorkflowName

Expand Down
1 change: 0 additions & 1 deletion yawn/workflow/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,3 @@ def test_statuses():
task2.save()
run.update_status()
assert run.status == Run.SUCCEEDED

2 changes: 0 additions & 2 deletions yawn/workflow/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from django.utils.dateparse import parse_datetime

from yawn.workflow.models import WorkflowName
# to make the fixture available:
from yawn.task.tests.test_models import run


@pytest.fixture()
Expand Down

0 comments on commit c6107ec

Please sign in to comment.