Skip to content

Commit

Permalink
Merge pull request #20 from Volumental/tech/start-list-tasks
Browse files Browse the repository at this point in the history
Tech/start list tasks
  • Loading branch information
simonhugosson authored Dec 12, 2022
2 parents 44c9337 + a4476cb commit 8952249
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 34 deletions.
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@
errors-only=yes
load-plugins=pylint_django
django-settings-module=test_app.settings

3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ python:
- "3.10"
cache: pip
script:
- export PYTHONPATH=.
- pylint django_leek
- coverage run $(which django-admin) test --pythonpath=. --settings=django_leek.settings
- coverage run $(which django-admin) test --settings=django_leek.settings
- python -m coverage_shield
39 changes: 30 additions & 9 deletions django_leek/api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import socket
import sys
from functools import wraps
import json
from typing import Iterator, Optional, Tuple

from . import models
from . import helpers
from . import models
from .settings import HOST, PORT


Expand All @@ -14,6 +16,7 @@ def task(self, f, pool=None):
@wraps(f)
def _offload(*args, **kwargs):
return push_task_to_queue(f, pool_name=pool_name, *args, **kwargs)

f.offload = _offload
return f

Expand All @@ -24,23 +27,41 @@ def __init__(self, a_callable, *args, **kwargs):
self.task_callable = a_callable
self.args = args
self.kwargs = kwargs

def __call__(self):
return self.task_callable(*self.args, **self.kwargs)


def start_task_with_id(task_id: int):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
sock.send("{}".format(task_id).encode())
received = sock.recv(1024)
sock.close()
return json.loads(received.decode())


def push_task_to_queue(a_callable, *args, **kwargs):
"""Original API"""
pool_name = kwargs.pop('pool_name', None)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

new_task = Task(a_callable, *args, **kwargs)
queued_task = helpers.save_task_to_db(new_task, pool_name)
sock.connect((HOST, PORT))
sock.send("{}".format(queued_task.id).encode())
received = sock.recv(1024)
sock.close()
return json.loads(received.decode())

return start_task_with_id(queued_task.id)


def query_task(task_id: int) -> models.Task:
return helpers.load_task(task_id)


def list_tasks(finished: Optional[bool] = None) -> Iterator[Tuple[models.Task, Task]]:
db_tasks = models.Task.objects.all().order_by('queued_at')
if finished is not None:
db_tasks = db_tasks.filter(finished_at__isnull=not finished)
for db_task in db_tasks:
try:
task = helpers.unpack(db_task.pickled_task)
yield db_task, task
except (ModuleNotFoundError, AttributeError): # things that can happen during unpickle
print("could not unpickle task", db_task.id, file=sys.stderr)
42 changes: 20 additions & 22 deletions django_leek/server.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
from datetime import datetime
from sys import platform
import json
import logging
import socketserver
import multiprocessing
import queue
import socketserver
import threading
from sys import platform

from .helpers import load_task
from . import helpers
from django.utils import timezone
import django
from django.utils import timezone

from . import helpers
from .helpers import load_task

log = logging.getLogger(__name__)


def target(queue):
django.setup()
log.info('Worker Starts')
log.info("Worker Starts")
while True:
task_id = queue.get()
if task_id is None:
return

log.info('running task...')
log.info("running task...")

# workaround to solve problems with django + psycopg2
# solution found here: https://stackoverflow.com/a/36580629/10385696
django.db.connection.close()

task = load_task(task_id=task_id)
if task.finished():
log.info("skipping %s as it is already finished", task_id)
continue

pickled_task = helpers.unpack(task.pickled_task)
try:
task.started_at = timezone.now()
Expand All @@ -40,7 +43,7 @@ def target(queue):
task.pickled_return = helpers.serialize(return_value)
task.save()

log.info('...successfully')
log.info("...successfully")
except Exception as e:
log.exception("...task failed")
task.finished_at = timezone.now()
Expand Down Expand Up @@ -72,21 +75,21 @@ def handle(self):
data = self.request.recv(5000).strip()

# assume a serialized task
log.info('Got a task')
response = None
log.info("Got a task")
task_id = None
try:
task_id = int(data.decode())

# Connection are closed by tasks, force it to reconnect
django.db.connections.close_all()
task = load_task(task_id=task_id)

# Ensure pool got a worker processing it
pool_name = task.pool or self.DEFAULT_POOL
pool = self.pools.get(pool_name)
if pool is None or not pool.worker.is_alive():
# Spawn new pool
log.info('Spawning new pool: {}'.format(pool_name))
log.info("Spawning new pool: {}".format(pool_name))
self.pools[pool_name] = Pool()
self.pools[pool_name].worker.start()

Expand All @@ -95,13 +98,8 @@ def handle(self):
response = {'task': 'queued', 'task_id': task_id}
except Exception as e:
log.exception("failed to queue task")
response = (False, "TaskServer Put: {}".format(e).encode(),)
response = {
'task': 'failed to queue',
'task_id': task_id,
'error': str(e)
}

response = {'task': 'failed to queue', 'task_id': task_id, 'error': str(e)}

self.request.send(json.dumps(response).encode())

except OSError as e:
Expand All @@ -111,5 +109,5 @@ def handle(self):
@staticmethod
def stop():
for name, pool in TaskSocketServer.pools.items():
print('Stopping pool:', name)
log.info("Stopping pool: %s", name)
pool.stop()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setup(
name='django-leek',
version='1.0.3',
version='1.0.4',
packages=find_packages(exclude=['test_app']),
install_requires=['django>=1.11'],
include_package_data=True,
Expand Down

0 comments on commit 8952249

Please sign in to comment.