From 3967cb6c792b12a24cd1bc78ba07b685a753084f Mon Sep 17 00:00:00 2001 From: GermanHydrogen Date: Wed, 20 Mar 2024 10:09:12 +0100 Subject: [PATCH] Refactored dummy roof --- pyobs/modules/roof/dummyroof.py | 75 ++++++++++++---------------- tests/modules/roof/test_dummyroof.py | 74 ++++++++++++++++++++++++--- 2 files changed, 97 insertions(+), 52 deletions(-) diff --git a/pyobs/modules/roof/dummyroof.py b/pyobs/modules/roof/dummyroof.py index ffdefbf8..58b894ac 100644 --- a/pyobs/modules/roof/dummyroof.py +++ b/pyobs/modules/roof/dummyroof.py @@ -17,12 +17,15 @@ class DummyRoof(BaseRoof, IRoof): __module__ = "pyobs.modules.roof" + _ROOF_CLOSED_PERCENTAGE = 0 + _ROOF_OPEN_PERCENTAGE = 100 + def __init__(self, **kwargs: Any): """Creates a new dummy root.""" BaseRoof.__init__(self, **kwargs) # dummy state - self.open_percentage = 0 + self._open_percentage: int = self._ROOF_CLOSED_PERCENTAGE # allow to abort motion self._lock_motion = asyncio.Lock() @@ -44,34 +47,30 @@ async def init(self, **kwargs: Any) -> None: AcquireLockFailed: If current motion could not be aborted. """ - # already open? - if self.open_percentage != 100: - # acquire lock - async with LockWithAbort(self._lock_motion, self._abort_motion): - # change status - await self._change_motion_status(MotionStatus.INITIALIZING) + if self._is_open(): + return + + async with LockWithAbort(self._lock_motion, self._abort_motion): + await self._change_motion_status(MotionStatus.INITIALIZING) - # open roof - while self.open_percentage < 100: - # open more - self.open_percentage += 1 + await self._move_roof(self._ROOF_OPEN_PERCENTAGE) - # abort? - if self._abort_motion.is_set(): - await self._change_motion_status(MotionStatus.IDLE) - return + await self._change_motion_status(MotionStatus.IDLE) + self.comm.send_event(RoofOpenedEvent()) - # wait a little - await asyncio.sleep(0.1) + def _is_open(self): + return self._open_percentage == self._ROOF_OPEN_PERCENTAGE - # open fully - self.open_percentage = 100 + async def _move_roof(self, target_pos: int) -> None: + step = 1 if target_pos > self._open_percentage else -1 - # change status + while self._open_percentage != target_pos: + if self._abort_motion.is_set(): await self._change_motion_status(MotionStatus.IDLE) + return - # send event - self.comm.send_event(RoofOpenedEvent()) + self._open_percentage += step + await asyncio.sleep(0.1) @timeout(15) async def park(self, **kwargs: Any) -> None: @@ -81,35 +80,23 @@ async def park(self, **kwargs: Any) -> None: AcquireLockFailed: If current motion could not be aborted. """ - # already closed? - if self.open_percentage != 0: - # acquire lock - async with LockWithAbort(self._lock_motion, self._abort_motion): - # change status - await self._change_motion_status(MotionStatus.PARKING) - - # send event - self.comm.send_event(RoofClosingEvent()) + if self._is_closed(): + return - # close roof - while self.open_percentage > 0: - # close more - self.open_percentage -= 1 + async with LockWithAbort(self._lock_motion, self._abort_motion): + await self._change_motion_status(MotionStatus.PARKING) + self.comm.send_event(RoofClosingEvent()) - # abort? - if self._abort_motion.is_set(): - await self._change_motion_status(MotionStatus.IDLE) - return + await self._move_roof(self._ROOF_CLOSED_PERCENTAGE) - # wait a little - await asyncio.sleep(0.1) + await self._change_motion_status(MotionStatus.PARKED) - # change status - await self._change_motion_status(MotionStatus.PARKED) + def _is_closed(self): + return self._open_percentage == self._ROOF_CLOSED_PERCENTAGE def get_percent_open(self) -> float: """Get the percentage the roof is open.""" - return self.open_percentage + return self._open_percentage async def stop_motion(self, device: Optional[str] = None, **kwargs: Any) -> None: """Stop the motion. diff --git a/tests/modules/roof/test_dummyroof.py b/tests/modules/roof/test_dummyroof.py index 16be5465..62dd30ae 100644 --- a/tests/modules/roof/test_dummyroof.py +++ b/tests/modules/roof/test_dummyroof.py @@ -2,13 +2,28 @@ import pytest -from pyobs.events import RoofOpenedEvent +import pyobs +from pyobs.events import RoofOpenedEvent, RoofClosingEvent from pyobs.modules.roof import DummyRoof from pyobs.utils.enums import MotionStatus @pytest.mark.asyncio -async def test_init(mocker): +async def test_open(mocker) -> None: + mocker.patch("pyobs.modules.roof.BaseRoof.open") + roof = DummyRoof() + roof.comm.register_event = AsyncMock() + + await roof.open() + + pyobs.modules.roof.BaseRoof.open.assert_called_once() + + assert roof.comm.register_event.call_args_list[0][0][0] == RoofOpenedEvent + assert roof.comm.register_event.call_args_list[1][0][0] == RoofClosingEvent + + +@pytest.mark.asyncio +async def test_init(mocker) -> None: mocker.patch("asyncio.sleep") roof = DummyRoof() @@ -17,31 +32,74 @@ async def test_init(mocker): await roof.init() - assert roof.open_percentage == 100 roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE) roof.comm.send_event(RoofOpenedEvent()) @pytest.mark.asyncio -async def test_park(mocker): +async def test_park(mocker) -> None: mocker.patch("asyncio.sleep") roof = DummyRoof() - roof.open_percentage = 100 + roof._open_percentage = 100 roof._change_motion_status = AsyncMock() roof.comm.send_event = Mock() await roof.park() - assert roof.open_percentage == 0 roof._change_motion_status.assert_awaited_with(MotionStatus.PARKED) @pytest.mark.asyncio -async def test_stop_motion(): +async def test_move_roof_open(mocker) -> None: + mocker.patch("asyncio.sleep") + + roof = DummyRoof() + + await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE) + + assert roof._open_percentage == 100 + + +@pytest.mark.asyncio +async def test_move_roof_closed(mocker) -> None: + mocker.patch("asyncio.sleep") + + roof = DummyRoof() + + await roof._move_roof(roof._ROOF_CLOSED_PERCENTAGE) + + assert roof._open_percentage == 0 + + +@pytest.mark.asyncio +async def test_move_roof_abort(mocker) -> None: + mocker.patch("asyncio.sleep") + + roof = DummyRoof() + + roof._abort_motion.set() + await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE) + + assert roof._open_percentage == 0 + + +@pytest.mark.asyncio +async def test_move_roof_open(mocker) -> None: + mocker.patch("asyncio.sleep") + + roof = DummyRoof() + + await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE) + + assert roof._open_percentage == 100 + + +@pytest.mark.asyncio +async def test_stop_motion() -> None: roof = DummyRoof() roof._change_motion_status = AsyncMock() await roof.stop_motion() - roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE) \ No newline at end of file + roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE)