From 5b9f3a543583ec7378e89c68f322b98017386af6 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Fri, 9 Dec 2022 14:26:45 +0100 Subject: [PATCH 01/12] Add function to trigger task --- django_leek/api.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/django_leek/api.py b/django_leek/api.py index f8c51f8..982d7c1 100644 --- a/django_leek/api.py +++ b/django_leek/api.py @@ -24,22 +24,28 @@ def __init__(self, a_callable, *args, **kwargs): self.task_callable = a_callable self.args = args self.kwargs = kwargs - + def __call__(self): return self.task_callable(*self.args, **self.kwargs) +def start_task_with_id(task_id): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((HOST, PORT)) + sock.send("{}".format(task_id).encode()) + received = sock.recv(1024) + sock.close() + return json.loads(received.decode()) + + def push_task_to_queue(a_callable, *args, **kwargs): """Original API""" pool_name = kwargs.pop('pool_name', None) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + new_task = Task(a_callable, *args, **kwargs) queued_task = helpers.save_task_to_db(new_task, pool_name) - sock.connect((HOST, PORT)) - sock.send("{}".format(queued_task.id).encode()) - received = sock.recv(1024) - sock.close() - return json.loads(received.decode()) + + return start_task_with_id(queued_task.id) def query_task(task_id: int) -> models.Task: From 98507c28b93ba15142256603c997ab0ec65c91ff Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Fri, 9 Dec 2022 14:39:25 +0100 Subject: [PATCH 02/12] Add function to list tasks --- django_leek/api.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/django_leek/api.py b/django_leek/api.py index 982d7c1..b8fdeb8 100644 --- a/django_leek/api.py +++ b/django_leek/api.py @@ -1,4 +1,5 @@ import socket +import sys from functools import wraps import json @@ -14,9 +15,18 @@ def task(self, f, pool=None): @wraps(f) def _offload(*args, **kwargs): return push_task_to_queue(f, pool_name=pool_name, *args, **kwargs) + f.offload = _offload return f + def list_tasks(self): + for db_task in models.Task.objects.all().order_by('queued_at'): + try: + task = helpers.unpack(db_task.pickled_task) + yield db_task, task + except (ModuleNotFoundError, AttributeError): # things that can happen during unpickle + print("could not unpickle task", db_task.id, file=sys.stderr) + class Task(object): def __init__(self, a_callable, *args, **kwargs): From 05e0f768d4dfe3c76612155c1346b22d3cf2f45e Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Fri, 9 Dec 2022 14:40:33 +0100 Subject: [PATCH 03/12] Refactor: Formatting --- django_leek/server.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/django_leek/server.py b/django_leek/server.py index 76b60b7..283a00f 100644 --- a/django_leek/server.py +++ b/django_leek/server.py @@ -76,11 +76,11 @@ def handle(self): response = None try: task_id = int(data.decode()) - + # Connection are closed by tasks, force it to reconnect django.db.connections.close_all() task = load_task(task_id=task_id) - + # Ensure pool got a worker processing it pool_name = task.pool or self.DEFAULT_POOL pool = self.pools.get(pool_name) @@ -95,13 +95,12 @@ def handle(self): response = {'task': 'queued', 'task_id': task_id} except Exception as e: log.exception("failed to queue task") - response = (False, "TaskServer Put: {}".format(e).encode(),) - response = { - 'task': 'failed to queue', - 'task_id': task_id, - 'error': str(e) - } - + response = ( + False, + "TaskServer Put: {}".format(e).encode(), + ) + response = {'task': 'failed to queue', 'task_id': task_id, 'error': str(e)} + self.request.send(json.dumps(response).encode()) except OSError as e: From 155c106266466132b727308f7b06558de572cff5 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Fri, 9 Dec 2022 14:46:16 +0100 Subject: [PATCH 04/12] Automatically skip tasks already finished --- django_leek/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/django_leek/server.py b/django_leek/server.py index 283a00f..53c5178 100644 --- a/django_leek/server.py +++ b/django_leek/server.py @@ -31,6 +31,10 @@ def target(queue): django.db.connection.close() task = load_task(task_id=task_id) + if task.finished(): + print('skipping ', task_id, "as it is already finished") + continue + pickled_task = helpers.unpack(task.pickled_task) try: task.started_at = timezone.now() From fbf91569c113caad4822577e7537fd4d11eadee8 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 08:59:47 +0100 Subject: [PATCH 05/12] pylint django settings module --- .pylintrc | 2 -- .travis.yml | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.pylintrc b/.pylintrc index 050fde8..9630cd9 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,5 +1,3 @@ [MASTER] errors-only=yes load-plugins=pylint_django -django-settings-module=test_app.settings - diff --git a/.travis.yml b/.travis.yml index 4a3cfc7..2df51fe 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,6 @@ python: - "3.10" cache: pip script: - - pylint django_leek + - pylint django_leek --django-settings-module=test_app.settings - coverage run $(which django-admin) test --pythonpath=. --settings=django_leek.settings - python -m coverage_shield From 5183ffe8abb7f4cd89a92b58ea26c172ad29e707 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 09:06:36 +0100 Subject: [PATCH 06/12] pylint django settings module --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 2df51fe..20a59c8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,6 @@ python: - "3.10" cache: pip script: - - pylint django_leek --django-settings-module=test_app.settings + - pylint django_leek --django-settings-module=django_leek.settings - coverage run $(which django-admin) test --pythonpath=. --settings=django_leek.settings - python -m coverage_shield From ad19c3ee4e352a8a88d931265b4b453a7439a431 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 09:18:09 +0100 Subject: [PATCH 07/12] python path --- .pylintrc | 1 + .travis.yml | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.pylintrc b/.pylintrc index 9630cd9..928c42e 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,3 +1,4 @@ [MASTER] errors-only=yes load-plugins=pylint_django +django-settings-module=test_app.settings diff --git a/.travis.yml b/.travis.yml index 20a59c8..bcd7511 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ python: - "3.10" cache: pip script: - - pylint django_leek --django-settings-module=django_leek.settings - - coverage run $(which django-admin) test --pythonpath=. --settings=django_leek.settings + - export PYTHONPATH=. + - pylint django_leek + - coverage run $(which django-admin) test --settings=django_leek.settings - python -m coverage_shield From 7daf2486daa1716c3eca63d1b698193d9a5b39ba Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 09:59:05 +0100 Subject: [PATCH 08/12] version 1.0.4 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 51b1b7e..d76c5c9 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name='django-leek', - version='1.0.3', + version='1.0.4', packages=find_packages(exclude=['test_app']), install_requires=['django>=1.11'], include_package_data=True, From cdd4e1a7176df4405acc99d9d068ba0c8c591e96 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 11:37:48 +0100 Subject: [PATCH 09/12] Add parameter to list only (un)finished tasks --- django_leek/api.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/django_leek/api.py b/django_leek/api.py index b8fdeb8..b00db3d 100644 --- a/django_leek/api.py +++ b/django_leek/api.py @@ -1,10 +1,11 @@ +import json import socket import sys from functools import wraps -import json +from typing import Iterator, Optional -from . import models from . import helpers +from . import models from .settings import HOST, PORT @@ -19,14 +20,6 @@ def _offload(*args, **kwargs): f.offload = _offload return f - def list_tasks(self): - for db_task in models.Task.objects.all().order_by('queued_at'): - try: - task = helpers.unpack(db_task.pickled_task) - yield db_task, task - except (ModuleNotFoundError, AttributeError): # things that can happen during unpickle - print("could not unpickle task", db_task.id, file=sys.stderr) - class Task(object): def __init__(self, a_callable, *args, **kwargs): @@ -39,7 +32,7 @@ def __call__(self): return self.task_callable(*self.args, **self.kwargs) -def start_task_with_id(task_id): +def start_task_with_id(task_id: int): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) sock.send("{}".format(task_id).encode()) @@ -60,3 +53,15 @@ def push_task_to_queue(a_callable, *args, **kwargs): def query_task(task_id: int) -> models.Task: return helpers.load_task(task_id) + + +def list_tasks(finished: Optional[bool] = None) -> Iterator[tuple[models.Task, Task]]: + db_tasks = models.Task.objects.all().order_by('queued_at') + if finished is not None: + db_tasks = db_tasks.filter(finished_at__isnull=not finished) + for db_task in db_tasks: + try: + task = helpers.unpack(db_task.pickled_task) + yield db_task, task + except (ModuleNotFoundError, AttributeError): # things that can happen during unpickle + print("could not unpickle task", db_task.id, file=sys.stderr) From 0049d13ed2042ce60033f126658ad720ac25df63 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 12:01:42 +0100 Subject: [PATCH 10/12] python 3.6 typing --- django_leek/api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/django_leek/api.py b/django_leek/api.py index b00db3d..2d0dcdb 100644 --- a/django_leek/api.py +++ b/django_leek/api.py @@ -2,7 +2,7 @@ import socket import sys from functools import wraps -from typing import Iterator, Optional +from typing import Iterator, Optional, Tuple from . import helpers from . import models @@ -55,7 +55,7 @@ def query_task(task_id: int) -> models.Task: return helpers.load_task(task_id) -def list_tasks(finished: Optional[bool] = None) -> Iterator[tuple[models.Task, Task]]: +def list_tasks(finished: Optional[bool] = None) -> Iterator[Tuple[models.Task, Task]]: db_tasks = models.Task.objects.all().order_by('queued_at') if finished is not None: db_tasks = db_tasks.filter(finished_at__isnull=not finished) From faba0f9d884a9aa52bb71c650d8d9da95843ca72 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 14:31:38 +0100 Subject: [PATCH 11/12] Remove unused responses --- django_leek/server.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/django_leek/server.py b/django_leek/server.py index 53c5178..f508cc7 100644 --- a/django_leek/server.py +++ b/django_leek/server.py @@ -77,7 +77,7 @@ def handle(self): # assume a serialized task log.info('Got a task') - response = None + task_id = None try: task_id = int(data.decode()) @@ -99,10 +99,6 @@ def handle(self): response = {'task': 'queued', 'task_id': task_id} except Exception as e: log.exception("failed to queue task") - response = ( - False, - "TaskServer Put: {}".format(e).encode(), - ) response = {'task': 'failed to queue', 'task_id': task_id, 'error': str(e)} self.request.send(json.dumps(response).encode()) From a4476cbcee33a161d80db33fec8348e11a960321 Mon Sep 17 00:00:00 2001 From: Simon Hugosson Date: Mon, 12 Dec 2022 14:40:02 +0100 Subject: [PATCH 12/12] More consistent log statements --- django_leek/server.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/django_leek/server.py b/django_leek/server.py index f508cc7..536a6ac 100644 --- a/django_leek/server.py +++ b/django_leek/server.py @@ -1,30 +1,29 @@ -from datetime import datetime -from sys import platform import json import logging -import socketserver import multiprocessing import queue +import socketserver import threading +from sys import platform -from .helpers import load_task -from . import helpers -from django.utils import timezone import django +from django.utils import timezone +from . import helpers +from .helpers import load_task log = logging.getLogger(__name__) def target(queue): django.setup() - log.info('Worker Starts') + log.info("Worker Starts") while True: task_id = queue.get() if task_id is None: return - log.info('running task...') + log.info("running task...") # workaround to solve problems with django + psycopg2 # solution found here: https://stackoverflow.com/a/36580629/10385696 @@ -32,7 +31,7 @@ def target(queue): task = load_task(task_id=task_id) if task.finished(): - print('skipping ', task_id, "as it is already finished") + log.info("skipping %s as it is already finished", task_id) continue pickled_task = helpers.unpack(task.pickled_task) @@ -44,7 +43,7 @@ def target(queue): task.pickled_return = helpers.serialize(return_value) task.save() - log.info('...successfully') + log.info("...successfully") except Exception as e: log.exception("...task failed") task.finished_at = timezone.now() @@ -76,7 +75,7 @@ def handle(self): data = self.request.recv(5000).strip() # assume a serialized task - log.info('Got a task') + log.info("Got a task") task_id = None try: task_id = int(data.decode()) @@ -90,7 +89,7 @@ def handle(self): pool = self.pools.get(pool_name) if pool is None or not pool.worker.is_alive(): # Spawn new pool - log.info('Spawning new pool: {}'.format(pool_name)) + log.info("Spawning new pool: {}".format(pool_name)) self.pools[pool_name] = Pool() self.pools[pool_name].worker.start() @@ -110,5 +109,5 @@ def handle(self): @staticmethod def stop(): for name, pool in TaskSocketServer.pools.items(): - print('Stopping pool:', name) + log.info("Stopping pool: %s", name) pool.stop()