Skip to content

Commit

Permalink
Refactored dummy roof
Browse files Browse the repository at this point in the history
  • Loading branch information
GermanHydrogen committed Mar 20, 2024
1 parent 8705dd8 commit 3967cb6
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 52 deletions.
75 changes: 31 additions & 44 deletions pyobs/modules/roof/dummyroof.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ class DummyRoof(BaseRoof, IRoof):

__module__ = "pyobs.modules.roof"

_ROOF_CLOSED_PERCENTAGE = 0
_ROOF_OPEN_PERCENTAGE = 100

def __init__(self, **kwargs: Any):
"""Creates a new dummy root."""
BaseRoof.__init__(self, **kwargs)

# dummy state
self.open_percentage = 0
self._open_percentage: int = self._ROOF_CLOSED_PERCENTAGE

# allow to abort motion
self._lock_motion = asyncio.Lock()
Expand All @@ -44,34 +47,30 @@ async def init(self, **kwargs: Any) -> None:
AcquireLockFailed: If current motion could not be aborted.
"""

# already open?
if self.open_percentage != 100:
# acquire lock
async with LockWithAbort(self._lock_motion, self._abort_motion):
# change status
await self._change_motion_status(MotionStatus.INITIALIZING)
if self._is_open():
return

async with LockWithAbort(self._lock_motion, self._abort_motion):
await self._change_motion_status(MotionStatus.INITIALIZING)

# open roof
while self.open_percentage < 100:
# open more
self.open_percentage += 1
await self._move_roof(self._ROOF_OPEN_PERCENTAGE)

# abort?
if self._abort_motion.is_set():
await self._change_motion_status(MotionStatus.IDLE)
return
await self._change_motion_status(MotionStatus.IDLE)
self.comm.send_event(RoofOpenedEvent())

# wait a little
await asyncio.sleep(0.1)
def _is_open(self):
return self._open_percentage == self._ROOF_OPEN_PERCENTAGE

# open fully
self.open_percentage = 100
async def _move_roof(self, target_pos: int) -> None:
step = 1 if target_pos > self._open_percentage else -1

# change status
while self._open_percentage != target_pos:
if self._abort_motion.is_set():
await self._change_motion_status(MotionStatus.IDLE)
return

# send event
self.comm.send_event(RoofOpenedEvent())
self._open_percentage += step
await asyncio.sleep(0.1)

@timeout(15)
async def park(self, **kwargs: Any) -> None:
Expand All @@ -81,35 +80,23 @@ async def park(self, **kwargs: Any) -> None:
AcquireLockFailed: If current motion could not be aborted.
"""

# already closed?
if self.open_percentage != 0:
# acquire lock
async with LockWithAbort(self._lock_motion, self._abort_motion):
# change status
await self._change_motion_status(MotionStatus.PARKING)

# send event
self.comm.send_event(RoofClosingEvent())
if self._is_closed():
return

# close roof
while self.open_percentage > 0:
# close more
self.open_percentage -= 1
async with LockWithAbort(self._lock_motion, self._abort_motion):
await self._change_motion_status(MotionStatus.PARKING)
self.comm.send_event(RoofClosingEvent())

# abort?
if self._abort_motion.is_set():
await self._change_motion_status(MotionStatus.IDLE)
return
await self._move_roof(self._ROOF_CLOSED_PERCENTAGE)

# wait a little
await asyncio.sleep(0.1)
await self._change_motion_status(MotionStatus.PARKED)

# change status
await self._change_motion_status(MotionStatus.PARKED)
def _is_closed(self):
return self._open_percentage == self._ROOF_CLOSED_PERCENTAGE

def get_percent_open(self) -> float:
"""Get the percentage the roof is open."""
return self.open_percentage
return self._open_percentage

async def stop_motion(self, device: Optional[str] = None, **kwargs: Any) -> None:
"""Stop the motion.
Expand Down
74 changes: 66 additions & 8 deletions tests/modules/roof/test_dummyroof.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,28 @@

import pytest

from pyobs.events import RoofOpenedEvent
import pyobs
from pyobs.events import RoofOpenedEvent, RoofClosingEvent
from pyobs.modules.roof import DummyRoof
from pyobs.utils.enums import MotionStatus


@pytest.mark.asyncio
async def test_init(mocker):
async def test_open(mocker) -> None:
mocker.patch("pyobs.modules.roof.BaseRoof.open")
roof = DummyRoof()
roof.comm.register_event = AsyncMock()

await roof.open()

pyobs.modules.roof.BaseRoof.open.assert_called_once()

assert roof.comm.register_event.call_args_list[0][0][0] == RoofOpenedEvent
assert roof.comm.register_event.call_args_list[1][0][0] == RoofClosingEvent


@pytest.mark.asyncio
async def test_init(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()
Expand All @@ -17,31 +32,74 @@ async def test_init(mocker):

await roof.init()

assert roof.open_percentage == 100
roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE)
roof.comm.send_event(RoofOpenedEvent())


@pytest.mark.asyncio
async def test_park(mocker):
async def test_park(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()
roof.open_percentage = 100
roof._open_percentage = 100

roof._change_motion_status = AsyncMock()
roof.comm.send_event = Mock()

await roof.park()

assert roof.open_percentage == 0
roof._change_motion_status.assert_awaited_with(MotionStatus.PARKED)


@pytest.mark.asyncio
async def test_stop_motion():
async def test_move_roof_open(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()

await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE)

assert roof._open_percentage == 100


@pytest.mark.asyncio
async def test_move_roof_closed(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()

await roof._move_roof(roof._ROOF_CLOSED_PERCENTAGE)

assert roof._open_percentage == 0


@pytest.mark.asyncio
async def test_move_roof_abort(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()

roof._abort_motion.set()
await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE)

assert roof._open_percentage == 0


@pytest.mark.asyncio
async def test_move_roof_open(mocker) -> None:
mocker.patch("asyncio.sleep")

roof = DummyRoof()

await roof._move_roof(roof._ROOF_OPEN_PERCENTAGE)

assert roof._open_percentage == 100


@pytest.mark.asyncio
async def test_stop_motion() -> None:
roof = DummyRoof()
roof._change_motion_status = AsyncMock()
await roof.stop_motion()

roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE)
roof._change_motion_status.assert_awaited_with(MotionStatus.IDLE)

0 comments on commit 3967cb6

Please sign in to comment.