From 83a1e7f5217ca0e4f1d0f2a1727731c65d4d68e8 Mon Sep 17 00:00:00 2001 From: Marcin Usielski Date: Wed, 24 Jan 2024 10:41:55 +0100 Subject: [PATCH] ConnectionObserver --- moler/connection_observer.py | 58 ++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/moler/connection_observer.py b/moler/connection_observer.py index c94ef9bd5..d6523a9ad 100644 --- a/moler/connection_observer.py +++ b/moler/connection_observer.py @@ -49,14 +49,14 @@ def __init__(self, connection: AbstractMolerConnection = None, runner: Connectio self.runner: ConnectionObserverRunner = self._get_runner(runner=runner) self._result = None self._exception: Exception = None - self._exception_stack_msg = None + self._exception_stack_msg: str = None self._future = None self.device_logger = logging.getLogger('moler.{}'.format(self.get_logger_name())) self.logger = logging.getLogger('moler.connection.{}'.format(self.get_logger_name())) - def _get_runner(self, runner): + def _get_runner(self, runner: ConnectionObserverRunner) -> ConnectionObserverRunner: """ :param runner: Runner @@ -102,47 +102,47 @@ def __call__(self, timeout=None, *args, **kwargs): # TODO: raise ConnectionObserverFailedToStart @property - def _is_done(self): + def _is_done(self) -> bool: return self.life_status.is_done @_is_done.setter - def _is_done(self, value): + def _is_done(self, value: bool): self.life_status.is_done = value if value: CommandScheduler.dequeue_running_on_connection(connection_observer=self) @property - def _is_cancelled(self): + def _is_cancelled(self) -> bool: return self.life_status.is_cancelled @_is_cancelled.setter - def _is_cancelled(self, value): + def _is_cancelled(self, value: bool): self.life_status.is_cancelled = value @property - def terminating_timeout(self): + def terminating_timeout(self) -> float: return self.life_status.terminating_timeout @terminating_timeout.setter - def terminating_timeout(self, value): + def terminating_timeout(self, value: float): self.life_status.terminating_timeout = value @property - def timeout(self): + def timeout(self) -> float: return self.life_status.timeout @timeout.setter - def timeout(self, value): + def timeout(self, value: float): # levels_to_go_up=2 : extract caller info to log where .timeout=XXX has been called from self._log(logging.DEBUG, "Setting {} timeout to {} [sec]".format(ConnectionObserver.__base_str(self), value), levels_to_go_up=2) self.life_status.timeout = value @property - def start_time(self): + def start_time(self) -> float: return self.life_status.start_time - def get_logger_name(self): + def get_logger_name(self) -> str: if self.connection and hasattr(self.connection, "name"): return self.connection.name else: @@ -170,7 +170,7 @@ def start(self, timeout=None, *args, **kwargs): # or setting self._future will be delayed by nonempty commands queue. return self - def _validate_start(self, *args, **kwargs): + def _validate_start(self, *args, **kwargs) -> None: # check base class invariants first if self.done(): raise WrongUsage("You can't run same {} multiple times. It is already done.".format(self)) @@ -221,7 +221,7 @@ def __await__(self): # result = await connection_observer return self.__iter__() - def await_done(self, timeout=None): + def await_done(self, timeout: float|None=None): """ Await completion of connection-observer. @@ -245,7 +245,7 @@ def await_done(self, timeout=None): self.runner.wait_for(connection_observer=self, connection_observer_future=self._future, timeout=timeout) return self.result() - def cancel(self): + def cancel(self) -> bool: """Cancel execution of connection-observer.""" # TODO: call cancel on runner to stop background run of connection-observer if self.cancelled() or self.done(): @@ -254,7 +254,7 @@ def cancel(self): self._is_done = True return True - def set_end_of_life(self): + def set_end_of_life(self) -> None: """ Set end of life of object. Dedicated for runners only! @@ -262,28 +262,28 @@ def set_end_of_life(self): """ self._is_done = True - def cancelled(self): + def cancelled(self) -> bool: """Return True if the connection-observer has been cancelled.""" return self._is_cancelled - def running(self): + def running(self) -> bool: """Return True if the connection-observer is currently executing.""" if self.done() and self.life_status._is_running: self.life_status._is_running = False return self.life_status._is_running - def done(self): + def done(self) -> bool: """Return True if the connection-observer is already done.""" return self._is_done - def set_result(self, result): + def set_result(self, result) -> None: """Should be used to set final result""" if self.done(): raise ResultAlreadySet(self) self._result = result self._is_done = True - def connection_closed_handler(self): + def connection_closed_handler(self) -> None: """ Called by Moler (ThreadedMolerConnection) when connection is closed. @@ -308,7 +308,7 @@ def data_received(self, data, recv_time): :return: None. """ - def set_exception(self, exception): + def set_exception(self, exception: Exception) -> None: """ Should be used to indicate some failure during observation. @@ -318,7 +318,7 @@ def set_exception(self, exception): self._set_exception_without_done(exception) self._is_done = True - def _set_exception_without_done(self, exception): + def _set_exception_without_done(self, exception: Exception) -> None: """ Should be used to indicate some failure during observation. This method does not finish connection observer object! @@ -358,7 +358,7 @@ def result(self): raise ResultNotAvailableYet(self) return self._result - def on_timeout(self): + def on_timeout(self) -> None: """Callback called when observer times out""" msg = "" for attribute_name in sorted(self.__dict__.keys()): @@ -368,20 +368,20 @@ def on_timeout(self): msg = "Timeout when '{}':'{}'".format(attribute_name, self.__dict__[attribute_name]) self._log(lvl=logging.INFO, msg=msg, levels_to_go_up=2) - def is_command(self): + def is_command(self) -> bool: """ :return: True if instance of ConnectionObserver is a command. False if not a command. """ return False - def extend_timeout(self, timedelta): # TODO: probably API to remove since we have runner tracking .timeout=XXX + def extend_timeout(self, timedelta: float) -> None: # TODO: probably API to remove since we have runner tracking .timeout=XXX prev_timeout = self.timeout self.timeout = self.timeout + timedelta msg = "Extended timeout from %.2f with delta %.2f to %.2f" % (prev_timeout, timedelta, self.timeout) self.runner.timeout_change(timedelta) self._log(logging.INFO, msg) - def on_inactivity(self): + def on_inactivity(self) -> None: """ Callback called when no data is received on connection within self.life_status.inactivity_timeout seconds @@ -395,7 +395,7 @@ def observer_name(cls): return name @staticmethod - def get_unraised_exceptions(remove=True): + def get_unraised_exceptions(remove: bool=True) -> list: with ConnectionObserver._exceptions_lock: if remove: list_of_exceptions = ConnectionObserver._not_raised_exceptions @@ -406,7 +406,7 @@ def get_unraised_exceptions(remove=True): return list_of_exceptions @staticmethod - def _change_unraised_exception(new_exception, observer, stack_msg): + def _change_unraised_exception(new_exception: Exception, observer, stack_msg: str) -> None: with ConnectionObserver._exceptions_lock: old_exception = observer._exception ConnectionObserver._log_unraised_exceptions(observer)