Skip to content

Commit

Permalink
ConnectionObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-usielski committed Jan 24, 2024
1 parent 6986833 commit 83a1e7f
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions moler/connection_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def __init__(self, connection: AbstractMolerConnection = None, runner: Connectio
self.runner: ConnectionObserverRunner = self._get_runner(runner=runner)
self._result = None
self._exception: Exception = None
self._exception_stack_msg = None
self._exception_stack_msg: str = None

self._future = None

self.device_logger = logging.getLogger('moler.{}'.format(self.get_logger_name()))
self.logger = logging.getLogger('moler.connection.{}'.format(self.get_logger_name()))

def _get_runner(self, runner):
def _get_runner(self, runner: ConnectionObserverRunner) -> ConnectionObserverRunner:
"""
:param runner: Runner
Expand Down Expand Up @@ -102,47 +102,47 @@ def __call__(self, timeout=None, *args, **kwargs):
# TODO: raise ConnectionObserverFailedToStart

@property
def _is_done(self):
def _is_done(self) -> bool:
return self.life_status.is_done

@_is_done.setter
def _is_done(self, value):
def _is_done(self, value: bool):
self.life_status.is_done = value
if value:
CommandScheduler.dequeue_running_on_connection(connection_observer=self)

@property
def _is_cancelled(self):
def _is_cancelled(self) -> bool:
return self.life_status.is_cancelled

@_is_cancelled.setter
def _is_cancelled(self, value):
def _is_cancelled(self, value: bool):
self.life_status.is_cancelled = value

@property
def terminating_timeout(self):
def terminating_timeout(self) -> float:
return self.life_status.terminating_timeout

@terminating_timeout.setter
def terminating_timeout(self, value):
def terminating_timeout(self, value: float):
self.life_status.terminating_timeout = value

@property
def timeout(self):
def timeout(self) -> float:
return self.life_status.timeout

@timeout.setter
def timeout(self, value):
def timeout(self, value: float):
# levels_to_go_up=2 : extract caller info to log where .timeout=XXX has been called from
self._log(logging.DEBUG, "Setting {} timeout to {} [sec]".format(ConnectionObserver.__base_str(self), value),
levels_to_go_up=2)
self.life_status.timeout = value

@property
def start_time(self):
def start_time(self) -> float:
return self.life_status.start_time

def get_logger_name(self):
def get_logger_name(self) -> str:
if self.connection and hasattr(self.connection, "name"):
return self.connection.name
else:
Expand Down Expand Up @@ -170,7 +170,7 @@ def start(self, timeout=None, *args, **kwargs):
# or setting self._future will be delayed by nonempty commands queue.
return self

def _validate_start(self, *args, **kwargs):
def _validate_start(self, *args, **kwargs) -> None:
# check base class invariants first
if self.done():
raise WrongUsage("You can't run same {} multiple times. It is already done.".format(self))
Expand Down Expand Up @@ -221,7 +221,7 @@ def __await__(self):
# result = await connection_observer
return self.__iter__()

def await_done(self, timeout=None):
def await_done(self, timeout: float|None=None):
"""
Await completion of connection-observer.
Expand All @@ -245,7 +245,7 @@ def await_done(self, timeout=None):
self.runner.wait_for(connection_observer=self, connection_observer_future=self._future, timeout=timeout)
return self.result()

def cancel(self):
def cancel(self) -> bool:
"""Cancel execution of connection-observer."""
# TODO: call cancel on runner to stop background run of connection-observer
if self.cancelled() or self.done():
Expand All @@ -254,36 +254,36 @@ def cancel(self):
self._is_done = True
return True

def set_end_of_life(self):
def set_end_of_life(self) -> None:
"""
Set end of life of object. Dedicated for runners only!
:return: None
"""
self._is_done = True

def cancelled(self):
def cancelled(self) -> bool:
"""Return True if the connection-observer has been cancelled."""
return self._is_cancelled

def running(self):
def running(self) -> bool:
"""Return True if the connection-observer is currently executing."""
if self.done() and self.life_status._is_running:
self.life_status._is_running = False
return self.life_status._is_running

def done(self):
def done(self) -> bool:
"""Return True if the connection-observer is already done."""
return self._is_done

def set_result(self, result):
def set_result(self, result) -> None:
"""Should be used to set final result"""
if self.done():
raise ResultAlreadySet(self)
self._result = result
self._is_done = True

def connection_closed_handler(self):
def connection_closed_handler(self) -> None:
"""
Called by Moler (ThreadedMolerConnection) when connection is closed.
Expand All @@ -308,7 +308,7 @@ def data_received(self, data, recv_time):
:return: None.
"""

def set_exception(self, exception):
def set_exception(self, exception: Exception) -> None:
"""
Should be used to indicate some failure during observation.
Expand All @@ -318,7 +318,7 @@ def set_exception(self, exception):
self._set_exception_without_done(exception)
self._is_done = True

def _set_exception_without_done(self, exception):
def _set_exception_without_done(self, exception: Exception) -> None:
"""
Should be used to indicate some failure during observation. This method does not finish connection observer
object!
Expand Down Expand Up @@ -358,7 +358,7 @@ def result(self):
raise ResultNotAvailableYet(self)
return self._result

def on_timeout(self):
def on_timeout(self) -> None:
"""Callback called when observer times out"""
msg = ""
for attribute_name in sorted(self.__dict__.keys()):
Expand All @@ -368,20 +368,20 @@ def on_timeout(self):
msg = "Timeout when '{}':'{}'".format(attribute_name, self.__dict__[attribute_name])
self._log(lvl=logging.INFO, msg=msg, levels_to_go_up=2)

def is_command(self):
def is_command(self) -> bool:
"""
:return: True if instance of ConnectionObserver is a command. False if not a command.
"""
return False

def extend_timeout(self, timedelta): # TODO: probably API to remove since we have runner tracking .timeout=XXX
def extend_timeout(self, timedelta: float) -> None: # TODO: probably API to remove since we have runner tracking .timeout=XXX
prev_timeout = self.timeout
self.timeout = self.timeout + timedelta
msg = "Extended timeout from %.2f with delta %.2f to %.2f" % (prev_timeout, timedelta, self.timeout)
self.runner.timeout_change(timedelta)
self._log(logging.INFO, msg)

def on_inactivity(self):
def on_inactivity(self) -> None:
"""
Callback called when no data is received on connection within self.life_status.inactivity_timeout seconds
Expand All @@ -395,7 +395,7 @@ def observer_name(cls):
return name

@staticmethod
def get_unraised_exceptions(remove=True):
def get_unraised_exceptions(remove: bool=True) -> list:
with ConnectionObserver._exceptions_lock:
if remove:
list_of_exceptions = ConnectionObserver._not_raised_exceptions
Expand All @@ -406,7 +406,7 @@ def get_unraised_exceptions(remove=True):
return list_of_exceptions

@staticmethod
def _change_unraised_exception(new_exception, observer, stack_msg):
def _change_unraised_exception(new_exception: Exception, observer, stack_msg: str) -> None:
with ConnectionObserver._exceptions_lock:
old_exception = observer._exception
ConnectionObserver._log_unraised_exceptions(observer)
Expand Down

0 comments on commit 83a1e7f

Please sign in to comment.