Skip to content

Commit

Permalink
Merge pull request #353
Browse files Browse the repository at this point in the history
Background Tasks Start & Stop
  • Loading branch information
thusser authored Mar 26, 2024
2 parents 8b57d23 + 0b37f2e commit 601101e
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 59 deletions.
39 changes: 39 additions & 0 deletions pyobs/background_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import logging
from typing import Optional, Coroutine, Any, Callable

from pyobs.utils.exceptions import SevereError

log = logging.getLogger(__name__)


class BackgroundTask:
def __init__(self, func: Callable[..., Coroutine[Any, Any, None]], restart: bool) -> None:
self._func: Callable[..., Coroutine[Any, Any, None]] = func
self._restart: bool = restart
self._task: Optional[asyncio.Future] = None

def start(self) -> None:
self._task = asyncio.create_task(self._func())
self._task.add_done_callback(self._callback_function)

def _callback_function(self, args=None) -> None:
try:
exception = self._task.exception()
except asyncio.CancelledError:
return

if isinstance(exception, SevereError):
raise exception
elif exception is not None:
log.error("Exception %s in task %s.", exception, self._func.__name__)

if self._restart:
log.error("Background task for %s has died, restarting...", self._func.__name__)
self.start()
else:
log.error("Background task for %s has died, quitting...", self._func.__name__)

def stop(self) -> None:
if self._task is not None:
self._task.cancel()
80 changes: 21 additions & 59 deletions pyobs/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from astroplan import Observer
from astropy.coordinates import EarthLocation

from pyobs.background_task import BackgroundTask
from pyobs.comm import Comm
from pyobs.comm.dummy import DummyComm

Expand Down Expand Up @@ -270,33 +271,31 @@ def __init__(
self._opened = False

# background tasks
self._background_tasks: Dict[
Callable[..., Coroutine[Any, Any, None]], Tuple[Optional[asyncio.Task[bool]], bool]
] = {}
self._watchdog_task: Optional[asyncio.Task[None]] = None
self._background_tasks: List[Tuple[BackgroundTask, bool]] = []

def add_background_task(self, func: Callable[..., Coroutine[Any, Any, None]], restart: bool = True) -> None:
def add_background_task(self, func: Callable[..., Coroutine[Any, Any, None]],
restart: bool = True, autostart: bool = True) -> BackgroundTask:
"""Add a new function that should be run in the background.
MUST be called in constructor of derived class or at least before calling open() on the object.
Args:
func: Func to add.
restart: Whether to restart this function.
autostart: Whether to start this function when the module is opened
Returns:
Background task
"""

# create thread
self._background_tasks[func] = (None, restart)
background_task = BackgroundTask(func, restart)
self._background_tasks.append((background_task, autostart))

return background_task

async def open(self) -> None:
"""Open module."""

# start background tasks
for func, (task, restart) in self._background_tasks.items():
log.info("Starting background task for %s...", func.__name__)
self._background_tasks[func] = (asyncio.create_task(self._background_func(func)), restart)
if len(self._background_tasks) > 0:
self._watchdog_task = asyncio.create_task(self._watchdog())
self._perform_background_task_autostart()

# open child objects
for obj in self._child_objects:
Expand All @@ -309,6 +308,11 @@ async def open(self) -> None:
# success
self._opened = True

def _perform_background_task_autostart(self) -> None:
todo = filter(lambda b: b[1] is True, self._background_tasks)
for task, _ in todo:
task.start()

@property
def opened(self) -> bool:
"""Whether object has been opened."""
Expand All @@ -322,58 +326,16 @@ async def close(self) -> None:
if hasattr(obj, "close"):
await obj.close()

# join watchdog and then all threads
if self._watchdog_task and not self._watchdog_task.done():
self._watchdog_task.cancel()
for func, (task, restart) in self._background_tasks.items():
if task and not task.done():
task.cancel()

@staticmethod
async def _background_func(target: Callable[..., Coroutine[Any, Any, None]]) -> None:
"""Run given function.
Args:
target: Function to run.
"""
try:
await target()

except asyncio.CancelledError:
# task was canceled
return
self._stop_background_tasks()

except:
log.exception("Exception in thread method %s." % target.__name__)
def _stop_background_tasks(self) -> None:
for task, _ in self._background_tasks:
task.stop()

def quit(self) -> None:
"""Can be overloaded to quit program."""
pass

async def _watchdog(self) -> None:
"""Watchdog thread that tries to restart threads if they quit."""

while True:
# get dead taks that need to be restarted
dead = {}
for func, (task, restart) in self._background_tasks.items():
if task.done():
dead[func] = restart

# restart dead tasks or quit
for func, restart in dead.items():
if restart:
log.error("Background task for %s has died, restarting...", func.__name__)
del self._background_tasks[func]
self._background_tasks[func] = (asyncio.create_task(self._background_func(func)), restart)
else:
log.error("Background task for %s has died, quitting...", func.__name__)
self.quit()
return

# sleep a little
await asyncio.sleep(1)

@overload
def get_object(
self,
Expand Down
76 changes: 76 additions & 0 deletions tests/test_background_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
import logging
from unittest.mock import AsyncMock, Mock

import pytest
import pyobs.utils.exceptions as exc
from pyobs.background_task import BackgroundTask


@pytest.mark.asyncio
async def test_callback_canceled(caplog):
test_function = AsyncMock()
task = asyncio.create_task(test_function())
task.exception = Mock(side_effect=asyncio.CancelledError())

bg_task = BackgroundTask(test_function, False)
bg_task._task = task

with caplog.at_level(logging.ERROR):
bg_task._callback_function()

assert len(caplog.messages) == 0


@pytest.mark.asyncio
async def test_callback_exception(caplog):
test_function = AsyncMock()
test_function.__name__ = "test_function"

task = asyncio.create_task(test_function())
task.exception = Mock(return_value=Exception("TestError"))

bg_task = BackgroundTask(test_function, False)
bg_task._task = task

with caplog.at_level(logging.ERROR):
bg_task._callback_function()

assert caplog.messages[0] == "Exception TestError in task test_function."
assert caplog.messages[1] == "Background task for test_function has died, quitting..."


@pytest.mark.asyncio
async def test_callback_pyobs_error():
test_function = AsyncMock()
test_function.__name__ = "test_function"

task = asyncio.create_task(test_function())
task.exception = Mock(return_value=exc.SevereError(exc.ImageError("TestError")))

bg_task = BackgroundTask(test_function, False)
bg_task._task = task

with pytest.raises(exc.SevereError):
bg_task._callback_function()


@pytest.mark.asyncio
async def test_callback_restart(caplog):
test_function = AsyncMock()
test_function.__name__ = "test_function"

task = asyncio.create_task(test_function())
task.exception = Mock(return_value=None)

bg_task = BackgroundTask(test_function, True)
bg_task._task = task

bg_task.start = Mock()

with caplog.at_level(logging.ERROR):
bg_task._callback_function()

assert caplog.messages[0] == "Background task for test_function has died, restarting..."
bg_task.start.assert_called_once()

53 changes: 53 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from unittest.mock import AsyncMock

import pyobs
from pyobs.background_task import BackgroundTask
from pyobs.object import Object


def test_add_background_task():
obj = Object()
test_function = AsyncMock()

task = obj.add_background_task(test_function, False, False)

assert task._func == test_function
assert task._restart is False

assert obj._background_tasks[0] == (task, False)


def test_perform_background_task_autostart(mocker):
mocker.patch("pyobs.background_task.BackgroundTask.start")

obj = Object()
test_function = AsyncMock()

obj.add_background_task(test_function, False, True)
obj._perform_background_task_autostart()

pyobs.background_task.BackgroundTask.start.assert_called_once()


def test_perform_background_task_no_autostart(mocker):
mocker.patch("pyobs.background_task.BackgroundTask.start")

obj = Object()
test_function = AsyncMock()

obj.add_background_task(test_function, False, False)
obj._perform_background_task_autostart()

pyobs.background_task.BackgroundTask.start.assert_not_called()


def test_stop_background_task(mocker):
mocker.patch("pyobs.background_task.BackgroundTask.stop")

obj = Object()
test_function = AsyncMock()

obj.add_background_task(test_function, False, False)
obj._stop_background_tasks()

pyobs.background_task.BackgroundTask.stop.assert_called_once()

0 comments on commit 601101e

Please sign in to comment.