Skip to content

Commit

Permalink
various observer pattern changes
Browse files Browse the repository at this point in the history
  • Loading branch information
magnuselden authored and magnuselden committed Sep 17, 2024
1 parent 22ba8df commit 6891252
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from peaqevcore.common.models.observer_types import ObserverTypes

from custom_components.peaqhvac.service.observer.const import (
COMMAND_WAIT, TIMEOUT)
COMMAND_WAIT)
from custom_components.peaqhvac.service.observer.models.command import \
Command
from custom_components.peaqhvac.service.observer.models.observer_model import \
Expand All @@ -18,14 +18,15 @@
_LOGGER = logging.getLogger(__name__)


DISPATCH_DELAY_TIMEOUT = 5
COMMAND_VALIDITY = 10

class IObserver:
"""
Observer class handles updates throughout peaq.
Attach to hub class and subscribe to updates (string matches) in other classes connected to the hub.
When broadcasting, you may use one argument that the of-course needs to correspond to your receiving function.
"""

def __init__(self):
self.model = ObserverModel()
self._lock = asyncio.Lock()
Expand Down Expand Up @@ -56,16 +57,16 @@ def add(self, command: ObserverTypes|str, func):
self.model.subscribers[command] = [func]

async def async_broadcast(self, command: ObserverTypes|str, argument=None):
command = self._check_and_convert_enum_type(command)
self.broadcast(command, argument)

def broadcast(self, command: ObserverTypes|str, argument=None):
_LOGGER.debug(f"received broadcast: {command} - {argument}")
command = self._check_and_convert_enum_type(command)
_expiration = time.time() + TIMEOUT
_expiration = time.time() + COMMAND_VALIDITY
cc = Command(command, _expiration, argument)
if cc not in self.model.broadcast_queue:
self.model.broadcast_queue.append(cc)
if cc not in self.model.dispatch_delay_queue.keys():
_LOGGER.debug(f"received broadcast: {command} - {argument}")
self.model.broadcast_queue.append(cc)

async def async_dispatch(self, *args):
q: Command
Expand All @@ -74,13 +75,22 @@ async def async_dispatch(self, *args):
await self.async_dequeue_and_broadcast(q)

async def async_dequeue_and_broadcast(self, command: Command):
if await self.async_ok_to_broadcast(command):
async with self._lock:
for func in self.model.subscribers.get(command.command, []):
_LOGGER.debug(f"broadcasting {command.command} with {command.argument}")
await self.async_broadcast_separator(func, command)
if command in self.model.broadcast_queue:
self.model.broadcast_queue.remove(command)
#if await self.async_ok_to_broadcast(command):
async with self._lock:
for func in self.model.subscribers.get(command.command, []):
_LOGGER.debug(f"broadcasting {command.command} with {command.argument}")
await self.async_broadcast_separator(func, command)
if command in self.model.broadcast_queue:
self.model.broadcast_queue.remove(command)
await self.async_update_dispatch_delay(command)

async def async_update_dispatch_delay(self, command: Command):
q: Command
old_items = [k for k, v in self.model.dispatch_delay_queue.items() if time.time() - v > DISPATCH_DELAY_TIMEOUT]
for old in old_items:
self.model.dispatch_delay_queue.pop(old)
if command not in self.model.dispatch_delay_queue.keys():
self.model.dispatch_delay_queue[command] = time.time()

@abstractmethod
async def async_broadcast_separator(self, func, command):
Expand Down Expand Up @@ -121,12 +131,12 @@ async def async_call_func(func: Callable, command: Command) -> None:
except Exception as e:
_LOGGER.error(f"async_call_func for {func} with command {command}: {e}")

async def async_ok_to_broadcast(self, command: Command) -> bool:
if command not in self.model.wait_queue.keys():
self.model.wait_queue[command] = time.time()
return True
if time.time() - self.model.wait_queue.get(command, 0) > COMMAND_WAIT:
self.model.wait_queue[command] = time.time()
return True
_LOGGER.debug(f"Catched command {command} in wait_queue")
return False
# async def async_ok_to_broadcast(self, command: Command) -> bool:
# if command not in self.model.wait_queue.keys():
# self.model.wait_queue[command] = time.time()
# return True
# if time.time() - self.model.wait_queue.get(command, 0) > COMMAND_WAIT:
# self.model.wait_queue[command] = time.time()
# return True
# _LOGGER.debug(f"Catched command {command} in wait_queue")
# return False
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ class ObserverModel:
subscribers: dict = field(default_factory=lambda: {})
broadcast_queue: list[Command] = field(default_factory=lambda: [])
wait_queue: dict[Command, float] = field(default_factory=lambda: {})
dispatch_delay_queue: dict[Command,float] = field(default_factory=lambda: {})
active: bool = False

0 comments on commit 6891252

Please sign in to comment.