diff --git a/README.md b/README.md index a018311..a0403c3 100644 --- a/README.md +++ b/README.md @@ -44,8 +44,8 @@ Para rodar o bloqueador, você vai precisar de: * **Um conversor de DTMF para FSK (talvez).** Também disponível no Mercado Livre a preços que vão de 30 a 150 reais. Eu tenho uma linha fixa tradicional da [Vivo](https://www.vivo.com.br) (i.e., não é aquela linha que brota do Vivo fibra) e, nessas linhas, a Vivo utiliza modulação DTMF para a identificação de chamadas. A - [Wikipedia](https://en.wikipedia.org/wiki/Caller_ID) no entanto aponta que outras operadoras podem usar outros - padrões (FSK e V23 FSK), então o conversor pode não ser necessário no seu caso. + [Wikipedia](https://en.wikipedia.org/wiki/Caller_ID) no entanto aponta que outras operadoras **no Brasil** + podem usar outros padrões (FSK e V23 FSK), então o conversor pode não ser necessário no seu caso. Depois de montado, o hardware do meu bloqueador ficou com essa cara aqui: @@ -136,6 +136,9 @@ _Voilà!_ O bloqueador de chamadas está rodando! Mas isso não quer dizer, clar forma de se assegurar que o bloqueador está de fato funcionando, por hora, é fazer uma ligação para o próprio número. Se tudo estiver funcionando, ela deve aparecer no lugar do aviso de que não há chamadas para mostrar. +Note que o Docker vai se encarregar de inicializar o bloqueador novamente toda vez que o Raspberry Pi for reiniciado, +então você não precisa se preocupar com criar scripts de inicialização (se é que estava preocupado ;-)). + Limitações ========== diff --git a/callblocker/blocker/__init__.py b/callblocker/blocker/__init__.py index c02f9ee..1365c98 100644 --- a/callblocker/blocker/__init__.py +++ b/callblocker/blocker/__init__.py @@ -7,6 +7,7 @@ class BootstrapMode(Enum): SERVER = 'server' FAKE_SERVER = 'fake_server' COMMAND = 'command' + CUSTOM = 'custom' #: This is an ugly hack. See the blocker.bootstrap module documentation for more information. diff --git a/callblocker/blocker/api/serializer_extensions.py b/callblocker/blocker/api/serializer_extensions.py index 5cecfe7..c8c163a 100644 --- a/callblocker/blocker/api/serializer_extensions.py +++ b/callblocker/blocker/api/serializer_extensions.py @@ -1,7 +1,8 @@ from functools import reduce from rest_framework.exceptions import ValidationError -from rest_framework.fields import SkipField, Field +from rest_framework.fields import SkipField, Field, ChoiceField +from rest_framework.serializers import Serializer from rest_framework.settings import api_settings from rest_framework.utils import html from rest_framework_bulk import BulkListSerializer @@ -10,6 +11,7 @@ # We have to patch BulkListSerializer to deal with https://github.com/miki725/django-rest-framework-bulk/issues/68 # This may break with newer versions of restframework, so it would be nice to get rid of it, eventually. + class PatchedBulkListSerializer(BulkListSerializer): def to_internal_value(self, data): """ @@ -59,6 +61,22 @@ def to_internal_value(self, data): return ret +class EnumField(ChoiceField): + def __init__(self, enum, **kwargs): + self.enum = enum + kwargs['choices'] = [(e.value, e.name) for e in enum] + super(EnumField, self).__init__(**kwargs) + + def to_representation(self, obj): + return obj.name + + def to_internal_value(self, data): + try: + return self.enum[data] + except KeyError: + self.fail('invalid_choice', input=data) + + class GeneratedCharField(Field): def __init__(self, fields, fun=lambda x, y: x + y, **kwargs): @@ -87,3 +105,23 @@ def to_internal_value(self, data): def to_representation(self, value): return value + + +class ExceptionField(Field): + def __init__(self, **kwargs): + # Our typical use case for Exceptions is reporting. Clients are not usually + # allowed to set them. + kwargs['read_only'] = True + super().__init__(**kwargs) + + def to_representation(self, value): + # Well... can't get any simpler than this. + return f'{type(value).__name__}: {str(value)}' + + +class ROSerializer(Serializer): + def create(self, validated_data): + raise NotImplemented('Cannot create readonly objects.') + + def update(self, instance, validated_data): + raise NotImplemented('Cannot update readonly objects.') diff --git a/callblocker/blocker/api/serializers.py b/callblocker/blocker/api/serializers.py index 257b725..70d725c 100644 --- a/callblocker/blocker/api/serializers.py +++ b/callblocker/blocker/api/serializers.py @@ -4,8 +4,10 @@ from rest_framework.validators import UniqueValidator from rest_framework_bulk import BulkSerializerMixin -from callblocker.blocker.api.serializer_extensions import GeneratedCharField, PatchedBulkListSerializer +from callblocker.blocker.api.serializer_extensions import GeneratedCharField, PatchedBulkListSerializer, EnumField, \ + ExceptionField, ROSerializer from callblocker.blocker.models import Call, Source, Caller +from callblocker.core.service import ServiceState class CallSerializer(ModelSerializer): @@ -101,3 +103,15 @@ def create(self, validated_data): if validated_data.get('source') is None: validated_data['source'] = Source.predef_source(Source.USER) return super().create(validated_data) + + +class ServiceStatusSerializer(ROSerializer): + state = EnumField(ServiceState) + exception = ExceptionField() + traceback = serializers.CharField() + + +class ServiceSerializer(ROSerializer): + id = serializers.CharField(max_length=20) + name = serializers.CharField(max_length=80) + status = ServiceStatusSerializer() diff --git a/callblocker/blocker/api/views.py b/callblocker/blocker/api/views.py index 5e6fb21..8e9c2b5 100644 --- a/callblocker/blocker/api/views.py +++ b/callblocker/blocker/api/views.py @@ -5,20 +5,24 @@ from django.db.models import Count, Value, FloatField from django.db.models import Q from django.db.models.functions import Greatest, Lower +from django.http import Http404 +from rest_framework import status from rest_framework.decorators import api_view, parser_classes -from rest_framework.exceptions import ValidationError +from rest_framework.exceptions import ValidationError, APIException from rest_framework.generics import get_object_or_404 from rest_framework.mixins import RetrieveModelMixin, ListModelMixin from rest_framework.pagination import LimitOffsetPagination from rest_framework.parsers import JSONParser from rest_framework.response import Response from rest_framework.status import HTTP_400_BAD_REQUEST, HTTP_202_ACCEPTED -from rest_framework.viewsets import ModelViewSet, GenericViewSet +from rest_framework.viewsets import ModelViewSet, GenericViewSet, ViewSet from rest_framework_bulk import BulkUpdateModelMixin, BulkDestroyModelMixin -from callblocker.blocker.services import services -from callblocker.blocker.api.serializers import CallerSerializer, CallSerializer, CallerPOSTSerializer, SourceSerializer +from callblocker.blocker.api.serializers import CallerSerializer, CallSerializer, CallerPOSTSerializer, \ + SourceSerializer, ServiceSerializer from callblocker.blocker.models import Caller, Call, Source +from callblocker.blocker.services import services +from callblocker.core.service import ServiceState class CallerViewSet(ModelViewSet, BulkUpdateModelMixin, BulkDestroyModelMixin): @@ -144,9 +148,55 @@ class SourceViewSet(ListModelMixin, RetrieveModelMixin, GenericViewSet): queryset = Source.objects.all() -@api_view(['GET']) -def health_status(request): - return Response(services().health()) +class ServicesViewset(ViewSet): + serializer_class = ServiceSerializer + parser_classes = [JSONParser] + + # The user can either start or terminate a service. Nothing else. + ALLOWED_TARGET_STATES = [ServiceState.READY, ServiceState.TERMINATED] + + def list(self, _): + return Response(ServiceSerializer(instance=services().services, many=True).data) + + def retrieve(self, _, pk): + service = self._get_service_or_404(pk) + return Response(ServiceSerializer(instance=service).data) + + def partial_update(self, request, pk): + service = self._get_service_or_404(pk) + content = request.data + + try: + target = self._get_or_400(content, 'status', 'state') + target = ServiceState[target.upper()] + except KeyError: + return Response(f'Invalid target state {target}.', status=status.HTTP_400_BAD_REQUEST) + + if target not in self.ALLOWED_TARGET_STATES: + return Response(f'Cannot set service to {target}.') + + # Either start or stop. + if target == ServiceState.READY: + service.start() + elif target == ServiceState.TERMINATED: + service.stop() + else: + # Should never happen. + raise Exception(f'Bad target state {target}.') + + return Response(status=status.HTTP_202_ACCEPTED) + + def _get_or_400(self, content, *path): + element = path[0] + if element not in content: + raise APIException(detail=f'Missing element {element} in {str(content)}') + return self._get_or_400(content[element], *path[1:]) if len(path) > 1 else content[element] + + def _get_service_or_404(self, pk): + service = getattr(services(), pk, None) + if service is None: + raise Http404(f'No services match {pk}.') + return service @api_view(['POST']) diff --git a/callblocker/blocker/callmonitor.py b/callblocker/blocker/callmonitor.py index 4622e13..4d61efb 100644 --- a/callblocker/blocker/callmonitor.py +++ b/callblocker/blocker/callmonitor.py @@ -32,7 +32,7 @@ def parse_cid(self, string: str) -> Caller: class CallMonitor(AsyncioService): - default_name = 'call monitor' + name = 'call monitor' def __init__(self, provider: TelcoProvider, modem: Modem, aio_loop: AbstractEventLoop): super().__init__(aio_loop=aio_loop) @@ -41,7 +41,7 @@ def __init__(self, provider: TelcoProvider, modem: Modem, aio_loop: AbstractEven self.stream = modem.event_stream() async def _event_loop(self): - self._startup_event.set() + self._signal_started() with self.stream as stream: async for event in stream: await self._process_event(event) diff --git a/callblocker/blocker/services.py b/callblocker/blocker/services.py index a634046..4363966 100644 --- a/callblocker/blocker/services.py +++ b/callblocker/blocker/services.py @@ -13,9 +13,9 @@ from callblocker.core.modem import Modem, PySerialDevice from callblocker.core.service import AsyncioEventLoop from callblocker.core.servicegroup import ServiceGroupSpec, ServiceGroup -#: Server mode services. from callblocker.core.tests.fakeserial import CX930xx_fake, ScriptedModem +#: Server mode services. server = ServiceGroupSpec( aio_loop=lambda _: ( AsyncioEventLoop() diff --git a/callblocker/blocker/tests/test_api.py b/callblocker/blocker/tests/test_blocker_api.py similarity index 100% rename from callblocker/blocker/tests/test_api.py rename to callblocker/blocker/tests/test_blocker_api.py diff --git a/callblocker/blocker/tests/test_service_api.py b/callblocker/blocker/tests/test_service_api.py new file mode 100644 index 0000000..e8d7cb9 --- /dev/null +++ b/callblocker/blocker/tests/test_service_api.py @@ -0,0 +1,94 @@ +import json + +from rest_framework import status + +import callblocker +from callblocker.blocker import services, BootstrapMode +from callblocker.blocker.services import bootstrap +from callblocker.core.service import Service, ServiceStatus, ServiceState +from callblocker.core.servicegroup import ServiceGroupSpec + + +class FlippinService(Service): + def __init__(self, name): + self.state = ServiceState.INITIAL + self._name = name + self.exception = None + self.traceback = None + + def start(self) -> 'Service': + self.state = ServiceState.READY + return self + + def sync_start(self, timeout=None): + self.start() + + def stop(self) -> 'Service': + self.state = ServiceState.TERMINATED + return self + + def sync_stop(self, timeout=None): + self.stop() + + def status(self) -> ServiceStatus: + return ServiceStatus( + self.state, + self.exception, + self.traceback + ) + + @property + def name(self) -> str: + return self._name + + +def test_provides_correct_service_status(api_client): + spec = ServiceGroupSpec( + fp1=lambda _: FlippinService('FlippingService 1'), + fp2=lambda _: FlippinService('FlippingService 2'), + fp3=lambda _: FlippinService('FlippingService 3') + ) + setattr(services, 'custom', spec) + callblocker.blocker.bootstrap_mode(BootstrapMode.CUSTOM) + bootstrap('custom') + + summary = api_client.get('/api/services/').json() + + for i, element in enumerate(summary, start=1): + assert element['id'] == f'fp{i}' + assert element['name'] == f'FlippingService {i}' + assert element['status']['state'] == 'READY' + + fp1 = services.services().fp1 + fp1.state = ServiceState.ERRORED + fp1.exception = EOFError('Ooops!') + fp1.traceback = 'Ooops' + + fp1_summary = api_client.get('/api/services/fp1/').json() + assert fp1_summary['id'] == 'fp1' + assert fp1_summary['status']['state'] == 'ERRORED' + assert fp1_summary['status']['exception'] == 'EOFError: Ooops!' + assert fp1_summary['status']['traceback'] == 'Ooops' + + +def test_starts_stops_service(api_client): + spec = ServiceGroupSpec( + fp1=lambda _: FlippinService('FlippingService 1') + ) + setattr(services, 'custom', spec) + callblocker.blocker.bootstrap_mode(BootstrapMode.CUSTOM) + bootstrap('custom') + + assert api_client.get('/api/services/fp1/').json()['status']['state'] == 'READY' + + for target in ['TERMINATED', 'READY']: + result = api_client.patch( + '/api/services/fp1/', + data=json.dumps({ + 'status': {'state': target} + }), + content_type='application/json' + ) + + assert result.status_code == status.HTTP_202_ACCEPTED + assert services.services().fp1.status().state == ServiceState[target] diff --git a/callblocker/conftest.py b/callblocker/conftest.py index 049bcf7..adc9db1 100644 --- a/callblocker/conftest.py +++ b/callblocker/conftest.py @@ -30,6 +30,6 @@ def api_client(): @pytest.fixture() def aio_loop(): loop = AsyncioEventLoop() - loop.start() + loop.sync_start() yield loop - loop.stop(10) + loop.sync_stop(10) diff --git a/callblocker/core/console.py b/callblocker/core/console.py index 465877d..716334b 100644 --- a/callblocker/core/console.py +++ b/callblocker/core/console.py @@ -5,8 +5,9 @@ import argparse import asyncio import sys -from asyncio import CancelledError +from asyncio import CancelledError, AbstractEventLoop from cmd import Cmd +from typing import Optional from callblocker.core import modems from callblocker.core.modem import Modem, ModemException, ModemEvent, PySerialDevice @@ -20,8 +21,11 @@ class BaseModemConsole(Cmd, AsyncioService): command which stops the event loop and quits the console. """ - def __init__(self, stdout, modem: Modem): - super().__init__(stdout=stdout) + name = 'modem console' + + def __init__(self, stdout, modem: Modem, aio_loop: AbstractEventLoop): + Cmd.__init__(self, stdout=stdout) + AsyncioService.__init__(self, aio_loop=aio_loop) self.stream = modem.event_stream() def do_exit(self, _): @@ -42,8 +46,8 @@ async def _event_loop(self): except CancelledError: pass - def _handle_termination(self): - super()._handle_termination() + def _signal_terminated(self): + super()._signal_terminated() status = self.status() if status == ServiceState.ERRORED: print('Modem monitoring loop died with an exception:\n\n %s \n\nExecution aborted.' % str(status.exception)) @@ -53,8 +57,8 @@ def _handle_termination(self): class ModemConsole(BaseModemConsole): - def __init__(self, stdout, modem: Modem): - super().__init__(stdout=stdout, modem=modem) + def __init__(self, stdout, modem: Modem, aio_loop: AbstractEventLoop): + super().__init__(stdout=stdout, modem=modem, aio_loop=aio_loop) def do_lscommand(self, _): print('Valid commands are: %s' % ', '.join(self.modem.modem_type.COMMANDS), file=self.stdout) @@ -113,7 +117,7 @@ def main(): ) modem.start() - console = ModemConsole(stdout=sys.stdout, modem=modem) + console = ModemConsole(stdout=sys.stdout, modem=modem, aio_loop=aio_loop.aio_loop) console.start() console.cmdloop('Type "help" for available commands.') diff --git a/callblocker/core/modem.py b/callblocker/core/modem.py index 9bd2938..ee4e864 100644 --- a/callblocker/core/modem.py +++ b/callblocker/core/modem.py @@ -78,7 +78,7 @@ async def connect(self, aio_loop): class Modem(AsyncioService): - default_name = 'modem' + name = 'modem' def __init__(self, modem_type: ModemType, @@ -153,7 +153,7 @@ async def _event_loop(self): # Connects to the modem. await self._connect() # Service is ready. - self._startup_event.set() + self._signal_started() try: while True: event = await self._read_event() diff --git a/callblocker/core/service.py b/callblocker/core/service.py index fb36fa5..da5f200 100644 --- a/callblocker/core/service.py +++ b/callblocker/core/service.py @@ -6,7 +6,7 @@ from asyncio import AbstractEventLoop, Task from enum import Enum from threading import Thread, Event -from typing import Optional, Callable +from typing import Optional from callblocker.core.concurrency import with_monitor, synchronized @@ -64,22 +64,27 @@ class Service(ABC): def start(self) -> 'Service': """ Starts the current service if it has never been started, or attempts to restart it - if the service died because of an error or was otherwise terminated. - - To ease client code, the start procedure should be *synchronous*. Meaning that this - method should not return until the service can be sure that it is fully initialized. - The meaning of "fully initialized" is service-specific. + if the service died because of an error or was otherwise terminated. This operation + is asynchronous. :return: the service itself. - :raise ValueError: if :meth:`Service.status` returns a :meth:`ServiceStatus` with - `state == ServiceState.RUNNING`. + :raise ValueError: if :meth:`Service.status` returns a :meth:`ServiceState` that is + either STARTING, READY, or STOPPED. """ pass + @abstractmethod + def sync_start(self, timeout=None) -> 'Service': + pass + @abstractmethod def stop(self) -> 'Service': pass + @abstractmethod + def sync_stop(self, timeout=None) -> 'Service': + pass + @abstractmethod def status(self) -> ServiceStatus: """ @@ -89,6 +94,9 @@ def status(self) -> ServiceStatus: @abstractproperty def name(self) -> str: + """ + :return: a human-readable name for the service. + """ pass @@ -97,21 +105,16 @@ class BaseService(Service): """Base implementation for a service which runs on a separate thread, possibly as part of an asyncio event loop.""" - def __init__(self, name=None): - self._name = self.default_name if name is None else name + def __init__(self): self._error = {} self._state = ServiceState.INITIAL - # We don't need asyncio events as the waiting will always be blocking. - self._startup_event = Event() - self._shutdown_event = Event() - - @property - def name(self): - return self._name + # Events for those who want synchronous start/stop. + self.startup = Event() + self.shutdown = Event() @synchronized - def start(self, timeout=None) -> Service: + def start(self) -> Service: self._disallow_states(*ServiceState.running_states()) try: @@ -126,23 +129,31 @@ def start(self, timeout=None) -> Service: self._error = {} # 2. clears the startup and shutdown events BEFORE firing the event loop. - self._startup_event.clear() - self._shutdown_event.clear() + self.startup.clear() + self.shutdown.clear() - # 3. Starts the event loop and waits for the startup signal (this is deadlock-prone, but I guess - # you'll figure out soon enough if there's a screw up as the service won't start). + # 3. Starts the event loop. logger.info(f'Starting service {self.name}') - self._start_event_loop(self._handle_termination) - self._startup_event.wait(timeout) - - # Once the startup signal is given, sets the state to running. Clearly, the service may have - # already died in the meantime, but the "death signal" won't be missed as we're holding the - # same monitor lock as _handle_termination. - self._state = ServiceState.READY - logger.info(f'{self.name} is now running') + self._start_event_loop() return self except: - self._abort() + self._signal_terminated() + raise + + def sync_start(self, timeout=None): + self.start() + self.startup.wait(timeout=timeout) + + @synchronized + def _signal_started(self) -> Service: + self._allow_states(ServiceState.STARTING) + # Once the startup signal is given, sets the state to running. Clearly, the service may have + # already died in the meantime, but the "death signal" won't be missed as we're holding the + # same monitor lock as _handle_termination. + self._state = ServiceState.READY + logger.info(f'{self.name} is now running') + self.startup.set() + return self @synchronized def stop(self, timeout=None) -> Service: @@ -163,43 +174,36 @@ def stop(self, timeout=None) -> Service: # 2. Halts the event loop. If the service is already dead, this should be a no-op. self._halt_event_loop() - # 3. Waits for shutdown. Again, if the service is already dead, this should return immediately. - self._shutdown_event.wait(timeout) - - # 4. We can't simply set the state to TERMINATED as there might have been an error in the meantime. - # We are sure that the error has been captured, as the event loop must capture the error before - # setting _shutdown_event. There will be a thread stuck at the monitor lock waiting to set the service - # state to ERRORED, but we also do it here so that there are no time windows in which the state of - # the service is inconsistent (i.e. it says TERMINATED when it is actually ERRORED). - self._handle_termination() - logger.info(f'{self.name} has been terminated') return self except: - self._abort() + self._signal_terminated() + raise + + def sync_stop(self, timeout=None): + self.stop() + self.shutdown.wait(timeout=timeout) - def _abort(self): - self._capture_error() - self._state = ServiceState.ERRORED - raise + @synchronized + def _signal_terminated(self): + # Service had to be running. + self._allow_states(*ServiceState.running_states()) + self._state = ServiceState.ERRORED if self._error else ServiceState.TERMINATED + logger.info(f'{self.name} has terminated with state {self._state}') + self.shutdown.set() def status(self) -> ServiceStatus: return ServiceStatus(self._state, **self._error) - @abstractproperty - def default_name(self) -> str: - """Services must define a default name which will be used when no name is supplied.""" - pass - @abstractmethod - def _start_event_loop(self, on_termination: Callable[[], None]): + def _start_event_loop(self): """ Asynchronously fires the event loop for this service, potentially in a separate thread. Event loop implementations MUST: - 1. signal when the service is ready by setting the _startup_event :class:`Event`; - 2. signal when the service dies/stops by setting the _shutdown_event :class:`Event`; + 1. signal when the service is ready by calling _signal_started; + 2. signal when the service dies/stops by calling _signal_terminated; 3. call _capture_error whenever the service terminates due to an unhandled exception, - and do so BEFORE setting the _shutdown_event. A typical event loop would look like this: + and do so BEFORE calling _signal_terminated. A typical event loop would look like this: .. code-block:: python @@ -208,8 +212,7 @@ def _start_event_loop(self, on_termination: Callable[[], None]): except: self._capture_error() finally: - self._shutdown_event.set() - on_termination() + self._signal_terminated() :param on_termination: a callback to be invoked upon service termination (either due to an error, or @@ -225,10 +228,6 @@ def _halt_event_loop(self): """ pass - @synchronized - def _handle_termination(self): - self._state = ServiceState.ERRORED if self._error else ServiceState.TERMINATED - def _capture_error(self): exc_type, value, tb = sys.exc_info() self._error = {'exception': value, 'traceback': traceback.format_tb(tb)} @@ -252,14 +251,13 @@ class AsyncioService(BaseService): set _startup_event as part of event loop initialization. """ - def __init__(self, aio_loop: AbstractEventLoop, name: str = None): - super().__init__(name=name) + def __init__(self, aio_loop: AbstractEventLoop): + super().__init__() self.aio_loop = aio_loop self.future = None - def _start_event_loop(self, callback): + def _start_event_loop(self): self.future = asyncio.run_coroutine_threadsafe(self._wrapped_loop(self._event_loop()), self.aio_loop) - self.future.add_done_callback(lambda _: callback()) async def _wrapped_loop(self, coro): try: @@ -271,7 +269,7 @@ async def _wrapped_loop(self, coro): except: self._capture_error() finally: - self._shutdown_event.set() + self._signal_terminated() def _halt_event_loop(self): self.future.cancel() @@ -291,26 +289,25 @@ class ThreadedService(BaseService): set _startup_event as part of event loop initialization. """ - def __init__(self, name: str = None): - super().__init__(name=name) + def __init__(self): + super().__init__() - def _start_event_loop(self, callback): + def _start_event_loop(self): self.thread = Thread( - target=self._wrap_loop(callback), - name=f'{self.name} event loop', + target=self._wrap_loop(), + name=f'{self.name} thread', daemon=True ) self.thread.start() - def _wrap_loop(self, callback): + def _wrap_loop(self): def wrapped_loop(): try: self._event_loop() except: self._capture_error() finally: - self._shutdown_event.set() - callback() + self._signal_terminated() return wrapped_loop @@ -323,24 +320,24 @@ class AsyncioEventLoop(ThreadedService): """ A :class:`ThreadedService` which spawns an asyncio event loop in a separate thread. """ - default_name = 'asyncio' + name = 'asyncio event loop' - def __init__(self, name=None): - super().__init__(name=name) + def __init__(self): + super().__init__() self._aio_loop = None def _event_loop(self): self._aio_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._aio_loop) - self._startup_event.set() + self._signal_started() self._aio_loop.run_forever() def _halt_event_loop(self): # Stopping this loop will cancel all tasks. async def cancel_tasks(): - current = Task.current_task(self.aio_loop) + current = Task.current_task(self._aio_loop) tasks = [ - task for task in Task.all_tasks(self.aio_loop) + task for task in Task.all_tasks(self._aio_loop) if task is not current ] @@ -354,16 +351,16 @@ async def cancel_tasks(): # cancelled, and do not expect the loop to do anything else after # that. def stop_loop(_): - self.aio_loop.call_soon_threadsafe(self.aio_loop.stop) - self._shutdown_event.set() + self._aio_loop.call_soon_threadsafe(self._aio_loop.stop) # Some contortionism is needed. asyncio.run_coroutine_threadsafe( - cancel_tasks(), self.aio_loop + cancel_tasks(), self._aio_loop ).add_done_callback( stop_loop ) @property def aio_loop(self) -> AbstractEventLoop: + self._allow_states(ServiceState.READY) return self._aio_loop diff --git a/callblocker/core/servicegroup.py b/callblocker/core/servicegroup.py index 62d00f5..c49634d 100644 --- a/callblocker/core/servicegroup.py +++ b/callblocker/core/servicegroup.py @@ -1,43 +1,32 @@ import logging -from typing import Dict, Any, TypeVar - -from callblocker.core.service import ServiceState +from collections import OrderedDict, namedtuple +from typing import TypeVar logger = logging.getLogger(__name__) T = TypeVar('T', bound='Service') +ServiceEntry = namedtuple('ServiceEntry', ['service', 'deps']) + class ServiceGroup(object): def __init__(self): - self.keys = [] + self._services = OrderedDict() + + @property + def services(self): + return self._services.values() - def register_service(self, name: str, service: T) -> T: - self.keys.append(name) - setattr(self, name, service) + def register_service(self, service: T) -> T: + self._services[service.id] = service + setattr(self, service.id, service) return service def shutdown(self): - for service in self.keys: + for service in self.services: service.stop() - def health(self) -> Dict[str, Any]: - report = [] - for key in self.keys: - service = getattr(self, key) - status = service.status() - task_report = { - 'name': service.name, - 'status': status.state.name - } - if service.state == ServiceState.ERRORED: - task_report['exception'] = str(status.exception) - task_report['traceback'] = status.traceback - report.append(task_report) - - return {'services': report} - class ServiceGroupSpec(object): def __init__(self, **kwargs): @@ -46,6 +35,10 @@ def __init__(self, **kwargs): def bootstrap(self) -> ServiceGroup: group = ServiceGroup() for key, initializer in self.services.items(): - group.register_service(key, initializer(group)).start() + instance = initializer(group) + # I could complicate this by patching in a mixin with a readonly property + # or using descriptors, but this is simply easier. + instance.id = key + group.register_service(instance).sync_start() return group diff --git a/callblocker/core/tests/fakeserial.py b/callblocker/core/tests/fakeserial.py index 9412f70..84785a4 100644 --- a/callblocker/core/tests/fakeserial.py +++ b/callblocker/core/tests/fakeserial.py @@ -71,7 +71,7 @@ class ScriptedModem(SerialDeviceFactory, Protocol, AsyncioService): this command). """ - default_name = 'fake modem' + name = 'fake modem' def __init__(self, aio_loop: AbstractEventLoop, command_mode=False): AsyncioService.__init__(self, aio_loop) @@ -185,7 +185,7 @@ def close(self): # ------------------- Management methods ---------------------------------- async def _event_loop(self): - self._startup_event.set() + self._signal_started() # If the queue is empty and we're not in command mode, # we're done. while (not self.script.empty()) or self.command_mode: diff --git a/callblocker/core/tests/test_modem.py b/callblocker/core/tests/test_modem.py index d9d6466..f0ed160 100644 --- a/callblocker/core/tests/test_modem.py +++ b/callblocker/core/tests/test_modem.py @@ -37,7 +37,7 @@ async def collect(): return events try: - modem.start() + modem.sync_start() actual = asyncio.run_coroutine_threadsafe(collect(), loop=loop).result() assert actual == expected finally: @@ -52,7 +52,7 @@ def test_sync_command(fake_serial, aio_loop): loop = aio_loop.aio_loop modem = Modem(modem_type=CX930xx_fake, device_factory=fake_serial, aio_loop=loop) - modem.start() + modem.sync_start() asyncio.run_coroutine_threadsafe(modem.sync_command('ATZ'), loop=loop).result() asyncio.run_coroutine_threadsafe(modem.sync_command('AT+VCID=1'), loop=loop).result() status = modem.status() diff --git a/callblocker/core/tests/test_service.py b/callblocker/core/tests/test_service.py index ce10434..607bcae 100644 --- a/callblocker/core/tests/test_service.py +++ b/callblocker/core/tests/test_service.py @@ -7,7 +7,7 @@ class BuggyAsyncService(AsyncioService): - default_name = 'buggy asyncio' + name = 'buggy asyncio' def __init__(self, aio_loop): super().__init__(aio_loop) @@ -24,14 +24,14 @@ async def setter(): async def _event_loop(self): self.should_error = False self.running.clear() - self._startup_event.set() + self._signal_started() await self.running.wait() if self.should_error: raise Exception("Oh I'm so buggy.") class BuggyThreadedService(ThreadedService): - default_name = 'buggy threaded' + name = 'buggy threaded' def __init__(self): super().__init__() @@ -45,7 +45,7 @@ def die(self): def _event_loop(self): self.should_error = False self.running.clear() - self._startup_event.set() + self._signal_started() # Waits till stop or die. self.running.wait() if self.should_error: @@ -65,9 +65,9 @@ def test_threaded_service(): def service_test(service: BaseService): assert service.status().state == ServiceState.INITIAL - service.start() + service.sync_start(10) - # Start is synchronous so we should see this immediately. + # We did a synchronous start so we should see this immediately. assert service.status().state == ServiceState.READY service.die() @@ -83,9 +83,10 @@ def service_test(service: BaseService): assert service.status().exception.args[0] == "Oh I'm so buggy." # Restarting the service should bring it back to running. - service.start() - assert service.status().state == ServiceState.READY + service.sync_start() + assert service.status().state == ServiceState.READY + service.startup.wait(10) # Trying to start a started service should cause an exception. try: # Trying to stop a dead service should raise an exception. @@ -94,9 +95,9 @@ def service_test(service: BaseService): pass # Normal termination. - service.stop() + service.sync_stop(10) - # Since stop is synchronous, we should see the correct state. + # Since we did a synchronous stop, we should see the correct state. assert service.status().state == ServiceState.TERMINATED assert service.status().exception is None assert service.status().traceback is None diff --git a/callblocker/urls.py b/callblocker/urls.py index dd84586..c4bba5d 100644 --- a/callblocker/urls.py +++ b/callblocker/urls.py @@ -43,11 +43,16 @@ basename='caller-calls' ) +bulk_router.register( + r'services', + api_views.ServicesViewset, + basename='services' +) + urlpatterns = [ url(r'^api/', include(bulk_router.urls)), url(r'^api/', include(nested_router.urls)), path('api/modem/', api_views.modem), - path('api/status/', api_views.health_status), path('admin/', admin.site.urls) ]