diff --git a/aiomisc/service/__init__.py b/aiomisc/service/__init__.py index 46f0ca81..98c304a2 100644 --- a/aiomisc/service/__init__.py +++ b/aiomisc/service/__init__.py @@ -1,4 +1,5 @@ from .base import Service, ServiceMeta, SimpleServer +from .graceful import GracefulMixin, GracefulService from .profiler import Profiler from .tcp import TCPServer from .tls import TLSServer @@ -7,6 +8,7 @@ __all__ = ( + 'GracefulMixin', 'GracefulService', "MemoryTracer", "Profiler", "Service", diff --git a/aiomisc/service/graceful.py b/aiomisc/service/graceful.py new file mode 100644 index 00000000..66a17fa1 --- /dev/null +++ b/aiomisc/service/graceful.py @@ -0,0 +1,125 @@ +import asyncio +import logging + +try: + from asyncio import create_task +except ImportError: + from asyncio import ensure_future as create_task # type: ignore +from asyncio import Task +from typing import Coroutine, Dict, Optional + +from . import Service +from ..timeout import timeout, Number + +log = logging.getLogger(__name__) + + +class GracefulMixin: + + __tasks: Dict[Task, bool] = {} + + def create_graceful_task( + self, coro: Coroutine, *, cancel: bool = False, + ) -> Task: + """ + Creates a task that will either be awaited or cancelled and awaited + upon service stop. + :param coro: + :param cancel: whether to cancel or await the task on service stop + (default `False`) + :return: created task + """ + task = create_task(coro) + # __tasks may be cleared before the task finishes + task.add_done_callback(lambda task: self.__tasks.pop(task, None)) + self.__tasks[task] = cancel + return task + + async def graceful_shutdown( + self, *, + wait_timeout: Number = None, + cancel_on_timeout: bool = True, + ) -> None: + if self.__tasks: + items = list(self.__tasks.items()) + to_cancel = [task for task, cancel in items if cancel] + to_wait = [task for task, cancel in items if not cancel] + + log.info( + 'Graceful shutdown: cancel %d and wait for %d tasks', + len(to_cancel), len(to_wait) + ) + + await asyncio.wait([ + self.__cancel_tasks(*to_cancel), + self.__finish_tasks( + *to_wait, + wait_timeout=wait_timeout, + cancel_on_timeout=cancel_on_timeout, + ) + ]) + + self.__tasks.clear() + + async def __cancel_tasks(self, *tasks: Task) -> None: + await self.__wait_tasks(*tasks, cancel=True) + + async def __finish_tasks( + self, *tasks: Task, + wait_timeout: Optional[Number], + cancel_on_timeout: bool, + ) -> None: + + if wait_timeout is None: + await self.__wait_tasks(*tasks, cancel=False) + return + + try: + await timeout(wait_timeout)(self.__wait_tasks)( + *tasks, cancel=False, + ) + except asyncio.TimeoutError: + log.info('Graceful shutdown: wait timeouted') + if not cancel_on_timeout: + return + + to_cancel = [task for task in tasks if not task.done()] + log.info( + 'Graceful shutdown: cancel %d tasks after timeout', + len(to_cancel), + ) + for task in to_cancel: + task.cancel() + + @staticmethod + async def __wait_tasks(*tasks: Task, cancel: bool) -> None: + if not tasks: + return + + to_stop = [] + + for task in tasks: + if task.done(): + continue + + if cancel: + task.cancel() + + to_stop.append(task) + + await asyncio.wait(to_stop) + + +class GracefulService(Service, GracefulMixin): + + graceful_wait_timeout = None # type: float # in seconds + cancel_on_timeout = True # type: bool + + async def start(self) -> None: + raise NotImplementedError + + async def stop(self, exception: Exception = None) -> None: + await self.graceful_shutdown( + wait_timeout=self.graceful_wait_timeout, + cancel_on_timeout=self.cancel_on_timeout, + ) diff --git a/docs/source/locale/ru/LC_MESSAGES/services.po b/docs/source/locale/ru/LC_MESSAGES/services.po index 429daeb7..fea1492d 100644 --- a/docs/source/locale/ru/LC_MESSAGES/services.po +++ b/docs/source/locale/ru/LC_MESSAGES/services.po @@ -306,3 +306,41 @@ msgstr "Все опции для клиента:" #: ../../source/services.rst:610 msgid "You will find full specification of options in the `Raven documentation`_." msgstr "Вы можете найти полное описание параметров в `документации Raven`_." + +#: ../../source/services.rst:618 +msgid "" +"``GracefulService`` allows creation of tasks with `create_graceful_task(coro)` " +"that will be either awaited (default) with an optional timeout or cancelled and " +"awaited upon service stop." +msgstr "" +"``GracefulService`` позволяет запускать задачи c помощью `create_graceful_task(coro)`, " +"завершение которых (либо отмену с завершением) сервис выждет при остановке." + +#: ../../source/services.rst:622 +msgid "" +"Optional service parameter ``graceful_wait_timeout`` (default ``None``) " +"specifies the allowed wait time in seconds for tasks created with " +"``create_graceful_task(coro, cancel=False)``." +msgstr "" +"Необязательный параметр сервиса ``graceful_wait_timeout`` " +"(по умолчанию ``None``) указывает время выжидания завершения (в секундах) " +"для задач, запущенных с ``create_graceful_task(coro, cancel=False)``." + +#: ../../source/services.rst:626 +msgid "" +"Optional service parameter ``cancel_on_timeout`` (default ``True``) " +"specifies whether to cancel tasks (without further waiting) that didn't " +"complete within ``graceful_wait_timeout``." +msgstr "" +"Необязательный параметр сервиса ``cancel_on_timeout`` (по умолчанию ``True``) " +"указывает отменять ли задачи (без последующего ожидания), которые не " +"успели завершиться в течение ``graceful_wait_timeout``." + +#: ../../source/services.rst:630 +msgid "" +"Tasks created with ``create_graceful_task(coro, cancel=True)`` are cancelled " +"and awaited when service stops." +msgstr "" +"При остановке сервиса, задачи, запущенные с помощью " +"``create_graceful_task(coro, cancel=True)`` отменяются, и производится " +"ожидание их завершения." diff --git a/docs/source/services.rst b/docs/source/services.rst index 110a24ff..6cebdb18 100644 --- a/docs/source/services.rst +++ b/docs/source/services.rst @@ -610,3 +610,54 @@ Full configuration: You will find full specification of options in the `Raven documentation`_. .. _Raven documentation: https://docs.sentry.io/clients/python/advanced/#client-arguments + + +GracefulService ++++++++++++++++ + +``GracefulService`` allows creation of tasks with `create_graceful_task(coro)` +that will be either awaited (default) with an optional timeout or cancelled and +awaited upon service stop. + +Optional service parameter ``graceful_wait_timeout`` (default ``None``) +specifies the allowed wait time in seconds for tasks created with +``create_graceful_task(coro, cancel=False)``. + +Optional service parameter ``cancel_on_timeout`` (default ``True``) +specifies whether to cancel tasks (without further waiting) that didn't +complete within ``graceful_wait_timeout``. + +Tasks created with ``create_graceful_task(coro, cancel=True)`` are cancelled +and awaited when the service stops. + +.. code-block:: python + + import asyncio + from aiomisc.service import GracefulService + + class SwanService(GracefulService): + + graceful_wait_timeout = 10 + cancel_on_timeout = False + + async def fly(self): + await asyncio.sleep(1) + print('Flew to a lake') + + async def duckify(self): + await asyncio.sleep(1) + print('Became ugly duck') + + async def start(self): + self.create_graceful_task(self.fly()) + self.create_graceful_task(self.duckify(), cancel=True) + + service = SwanService() + await service.start() + await service.stop() + +Output example: + +.. code-block:: + + Flew to a lake diff --git a/tests/test_graceful.py b/tests/test_graceful.py new file mode 100644 index 00000000..de785166 --- /dev/null +++ b/tests/test_graceful.py @@ -0,0 +1,166 @@ +import asyncio +from asyncio import CancelledError, Task + +import pytest + +from aiomisc.service.graceful import GracefulMixin, GracefulService + + +async def test_graceful(): + class Graceful(GracefulMixin): + pass + + async def pho(): + pass + + graceful = Graceful() + + task = graceful.create_graceful_task(pho()) + assert isinstance(task, Task) + + await asyncio.sleep(0.1) + + # Check that done and popped itself from the task store + assert task.done() + assert not graceful._GracefulMixin__tasks + + +@pytest.mark.parametrize( + 'sleep,timeout,error', [ + (0.2, 0.1, CancelledError), + (0.1, 0.2, None), + ] +) +async def test_graceful_timeout(sleep, timeout, error): + class Graceful(GracefulMixin): + pass + + async def pho(): + await asyncio.sleep(sleep) + + graceful = Graceful() + + task = graceful.create_graceful_task(pho()) + assert isinstance(task, Task) + + await graceful.graceful_shutdown(wait_timeout=timeout) + assert task.done() + + if not error: + assert not task.exception() + else: + with pytest.raises(error): + assert task.exception() + + assert not graceful._GracefulMixin__tasks + + +@pytest.mark.parametrize( + 'cancel,error', [ + (False, None), + (True, CancelledError), + ] +) +async def test_graceful_shutdown(cancel, error): + class Graceful(GracefulMixin): + pass + + async def pho(): + await asyncio.sleep(0.1) + + graceful = Graceful() + + task = graceful.create_graceful_task(pho(), cancel=cancel) + assert isinstance(task, Task) + + await graceful.graceful_shutdown() + assert task.done() + + if not error: + assert not task.exception() + else: + with pytest.raises(error): + task.exception() + + assert not graceful._GracefulMixin__tasks + + +async def test_graceful_service(): + class TestService(GracefulService): + + task_wait = None + task_cancel = None + + async def start(self): + self.task_wait = self.create_graceful_task(self.pho()) + self.task_cancel = self.create_graceful_task( + self.pho(), cancel=True, + ) + + async def pho(self): + await asyncio.sleep(0.1) + + service = TestService() + + await service.start() + await service.stop() + + assert service.task_wait.done() + assert service.task_cancel.done() + + assert not service.task_wait.exception() + with pytest.raises(CancelledError): + assert not service.task_cancel.exception() + + assert not service._GracefulMixin__tasks + + +async def test_graceful_service_with_timeout_cancel(): + class TestService(GracefulService): + + graceful_wait_timeout = 0.1 + + task_wait = None + + async def start(self): + self.task_wait = self.create_graceful_task(self.pho()) + + async def pho(self): + await asyncio.sleep(0.2) + + service = TestService() + + await service.start() + await service.stop() + + assert service.task_wait.cancelled() + assert not service._GracefulMixin__tasks + + +async def test_graceful_service_with_timeout_no_cancel(): + class TestService(GracefulService): + + graceful_wait_timeout = 0.1 + cancel_on_timeout = False + + task_wait = None + + async def start(self): + self.task_wait = self.create_graceful_task(self.pho()) + + async def pho(self): + await asyncio.sleep(0.2) + return 123 + + service = TestService() + + await service.start() + await service.stop() + + assert not service.task_wait.done() + assert not service._GracefulMixin__tasks + + await asyncio.sleep(0.1) + + assert service.task_wait.done() + assert service.task_wait.result() == 123