Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graceful service shutdown #53

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions aiomisc/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import Service, ServiceMeta, SimpleServer
from .graceful import GracefulMixin, GracefulService
from .profiler import Profiler
from .tcp import TCPServer
from .tls import TLSServer
Expand All @@ -7,6 +8,7 @@


__all__ = (
'GracefulMixin', 'GracefulService',
"MemoryTracer",
"Profiler",
"Service",
Expand Down
125 changes: 125 additions & 0 deletions aiomisc/service/graceful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import asyncio
import logging

try:
from asyncio import create_task
except ImportError:
from asyncio import ensure_future as create_task # type: ignore
from asyncio import Task
from typing import Coroutine, Dict, Optional

from . import Service
from ..timeout import timeout, Number

log = logging.getLogger(__name__)


class GracefulMixin:

__tasks: Dict[Task, bool] = {}

def create_graceful_task(
self, coro: Coroutine, *, cancel: bool = False,
) -> Task:
"""
Creates a task that will either be awaited or cancelled and awaited
upon service stop.
:param coro:
:param cancel: whether to cancel or await the task on service stop
(default `False`)
:return: created task
"""
task = create_task(coro)
# __tasks may be cleared before the task finishes
task.add_done_callback(lambda task: self.__tasks.pop(task, None))
self.__tasks[task] = cancel
return task

async def graceful_shutdown(
self, *,
wait_timeout: Number = None,
cancel_on_timeout: bool = True,
) -> None:
if self.__tasks:
items = list(self.__tasks.items())
to_cancel = [task for task, cancel in items if cancel]
to_wait = [task for task, cancel in items if not cancel]

log.info(
'Graceful shutdown: cancel %d and wait for %d tasks',
len(to_cancel), len(to_wait)
)

await asyncio.wait([
self.__cancel_tasks(*to_cancel),
self.__finish_tasks(
*to_wait,
wait_timeout=wait_timeout,
cancel_on_timeout=cancel_on_timeout,
)
])

self.__tasks.clear()

async def __cancel_tasks(self, *tasks: Task) -> None:
await self.__wait_tasks(*tasks, cancel=True)

async def __finish_tasks(
self, *tasks: Task,
wait_timeout: Optional[Number],
cancel_on_timeout: bool,
) -> None:

if wait_timeout is None:
await self.__wait_tasks(*tasks, cancel=False)
return

try:
await timeout(wait_timeout)(self.__wait_tasks)(
*tasks, cancel=False,
)
except asyncio.TimeoutError:
log.info('Graceful shutdown: wait timeouted')
if not cancel_on_timeout:
return

to_cancel = [task for task in tasks if not task.done()]
log.info(
'Graceful shutdown: cancel %d tasks after timeout',
len(to_cancel),
)
for task in to_cancel:
task.cancel()

@staticmethod
async def __wait_tasks(*tasks: Task, cancel: bool) -> None:
if not tasks:
return

to_stop = []

for task in tasks:
if task.done():
continue

if cancel:
task.cancel()

to_stop.append(task)

await asyncio.wait(to_stop)


class GracefulService(Service, GracefulMixin):

graceful_wait_timeout = None # type: float # in seconds
cancel_on_timeout = True # type: bool

async def start(self) -> None:
raise NotImplementedError

async def stop(self, exception: Exception = None) -> None:
await self.graceful_shutdown(
wait_timeout=self.graceful_wait_timeout,
cancel_on_timeout=self.cancel_on_timeout,
)
38 changes: 38 additions & 0 deletions docs/source/locale/ru/LC_MESSAGES/services.po
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,41 @@ msgstr "Все опции для клиента:"
#: ../../source/services.rst:610
msgid "You will find full specification of options in the `Raven documentation`_."
msgstr "Вы можете найти полное описание параметров в `документации Raven`_."

#: ../../source/services.rst:618
msgid ""
"``GracefulService`` allows creation of tasks with `create_graceful_task(coro)` "
"that will be either awaited (default) with an optional timeout or cancelled and "
"awaited upon service stop."
msgstr ""
"``GracefulService`` позволяет запускать задачи c помощью `create_graceful_task(coro)`, "
"завершение которых (либо отмену с завершением) сервис выждет при остановке."

#: ../../source/services.rst:622
msgid ""
"Optional service parameter ``graceful_wait_timeout`` (default ``None``) "
"specifies the allowed wait time in seconds for tasks created with "
"``create_graceful_task(coro, cancel=False)``."
msgstr ""
"Необязательный параметр сервиса ``graceful_wait_timeout`` "
"(по умолчанию ``None``) указывает время выжидания завершения (в секундах) "
"для задач, запущенных с ``create_graceful_task(coro, cancel=False)``."

#: ../../source/services.rst:626
msgid ""
"Optional service parameter ``cancel_on_timeout`` (default ``True``) "
"specifies whether to cancel tasks (without further waiting) that didn't "
"complete within ``graceful_wait_timeout``."
msgstr ""
"Необязательный параметр сервиса ``cancel_on_timeout`` (по умолчанию ``True``) "
"указывает отменять ли задачи (без последующего ожидания), которые не "
"успели завершиться в течение ``graceful_wait_timeout``."

#: ../../source/services.rst:630
msgid ""
"Tasks created with ``create_graceful_task(coro, cancel=True)`` are cancelled "
"and awaited when service stops."
msgstr ""
"При остановке сервиса, задачи, запущенные с помощью "
"``create_graceful_task(coro, cancel=True)`` отменяются, и производится "
"ожидание их завершения."
51 changes: 51 additions & 0 deletions docs/source/services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,54 @@ Full configuration:
You will find full specification of options in the `Raven documentation`_.

.. _Raven documentation: https://docs.sentry.io/clients/python/advanced/#client-arguments


GracefulService
+++++++++++++++

``GracefulService`` allows creation of tasks with `create_graceful_task(coro)`
that will be either awaited (default) with an optional timeout or cancelled and
awaited upon service stop.

Optional service parameter ``graceful_wait_timeout`` (default ``None``)
specifies the allowed wait time in seconds for tasks created with
``create_graceful_task(coro, cancel=False)``.

Optional service parameter ``cancel_on_timeout`` (default ``True``)
specifies whether to cancel tasks (without further waiting) that didn't
complete within ``graceful_wait_timeout``.

Tasks created with ``create_graceful_task(coro, cancel=True)`` are cancelled
and awaited when the service stops.

.. code-block:: python

import asyncio
from aiomisc.service import GracefulService

class SwanService(GracefulService):

graceful_wait_timeout = 10
cancel_on_timeout = False

async def fly(self):
await asyncio.sleep(1)
print('Flew to a lake')

async def duckify(self):
await asyncio.sleep(1)
print('Became ugly duck')

async def start(self):
self.create_graceful_task(self.fly())
self.create_graceful_task(self.duckify(), cancel=True)

service = SwanService()
await service.start()
await service.stop()

Output example:

.. code-block::

Flew to a lake
Loading