From 63a0fe3eeab87827113bec10bd651a161650ef40 Mon Sep 17 00:00:00 2001 From: Jason Antman Date: Mon, 23 Oct 2023 20:22:52 -0400 Subject: [PATCH] working? --- py_vista_turbo_serial/communicator.py | 32 ++++- py_vista_turbo_serial/events.py | 34 ++++- .../examples/pushover_example.py | 24 ++-- py_vista_turbo_serial/messages.py | 135 ++++++++++++++---- py_vista_turbo_serial/state_monitor.py | 23 ++- py_vista_turbo_serial/tests/test_events.py | 1 - py_vista_turbo_serial/tests/test_messages.py | 15 +- 7 files changed, 191 insertions(+), 73 deletions(-) diff --git a/py_vista_turbo_serial/communicator.py b/py_vista_turbo_serial/communicator.py index a70c13f..be1202b 100644 --- a/py_vista_turbo_serial/communicator.py +++ b/py_vista_turbo_serial/communicator.py @@ -36,11 +36,14 @@ """ import logging -from typing import List, Generator +from typing import List, Generator, Dict from serial import Serial -from py_vista_turbo_serial.messages import MessagePacket +from py_vista_turbo_serial.messages import ( + MessagePacket, ArmingStatusRequest, ZoneStatusRequest, + ZoneDescriptorRequest, ZoneDescriptorReport +) logger = logging.getLogger(__name__) @@ -57,7 +60,12 @@ def __init__(self, port: str, timeout_sec: int = 1): port, baudrate=9600, timeout=timeout_sec ) # default 8N1 logger.debug('Serial is connected') - self.outgoing: List[str] = [] + self.zones: Dict[int, str] = {} + self.outgoing: List[str] = [ + ArmingStatusRequest.generate(), + ZoneDescriptorRequest.generate(), + ZoneStatusRequest.generate(), + ] def __del__(self): logger.debug('Closing serial port') @@ -76,14 +84,28 @@ def communicate(self) -> Generator[MessagePacket, None, None]: logger.info('Sending message: %s', msg) self.serial.write(bytes(msg, 'ascii') + b'\r\n') # this might be better with select(), but let's try this... + asked_for_zones: bool = False + have_zones: bool = False + zone_msg: str = ZoneDescriptorRequest.generate() while True: # @TODO handle exception on timeout line = self.serial.readline().decode().strip() if line == '': continue logger.debug('Got line: %s', line) - yield MessagePacket.parse(line) - if self.outgoing: + pkt: MessagePacket = MessagePacket.parse(line, self.zones) + if isinstance(pkt, ZoneDescriptorReport): + self.zones[pkt.zone_num] = pkt.zone_name + if pkt.zone_num == 0: + have_zones = True + logger.info('Zones: %s', self.zones) + yield pkt + if ( + self.outgoing and + ((not asked_for_zones) or (asked_for_zones and have_zones)) + ): msg = self.outgoing.pop(0) logger.info('Sending message: %s', msg) self.serial.write(bytes(msg, 'ascii') + b'\r\n') + if msg == zone_msg: + asked_for_zones = True diff --git a/py_vista_turbo_serial/events.py b/py_vista_turbo_serial/events.py index a286ee6..51bc577 100644 --- a/py_vista_turbo_serial/events.py +++ b/py_vista_turbo_serial/events.py @@ -35,30 +35,49 @@ ################################################################################## """ -from typing import Union +from typing import Union, Dict, Optional EventTypes: type = Union['SystemEvent', 'UnknownEvent'] +def subclasses_recursive(cls): + direct = cls.__subclasses__() + indirect = [] + for subclass in direct: + indirect.extend(subclasses_recursive(subclass)) + return direct + indirect + + class SystemEvent: NAME: str = 'Unknown Event' CODE: int = 0 - def __init__(self, zone_or_user: int): + IS_ZONE: bool = True + + def __init__(self, zone_or_user: int, zone_or_user_name: Optional[str]): self.zone_or_user: int = zone_or_user + self.zone_or_user_name: Optional[str] = zone_or_user_name def __repr__(self) -> str: - return f'<{self.NAME}(zone_or_user={self.zone_or_user})>' + if not self.IS_ZONE: + return f'<{self.NAME}(user={self.zone_or_user})>' + if self.zone_or_user_name is None: + return f'<{self.NAME}(zone_num={self.zone_or_user})>' + return (f'<{self.NAME}(zone={self.zone_or_user} ' + f'"{self.zone_or_user_name}")>') @classmethod def event_for_code( - cls, event_code: int, zone_or_user: int + cls, event_code: int, zone_or_user: int, zones: Dict[int, str] ) -> EventTypes: - for klass in cls.__subclasses__(): + for klass in subclasses_recursive(cls): if klass.CODE == event_code: - return klass(zone_or_user) + if klass.IS_ZONE: + return klass(zone_or_user, zones.get(zone_or_user)) + else: + return klass(zone_or_user, None) return UnknownEvent(event_code, zone_or_user) @@ -144,7 +163,7 @@ class OtherTroubleRestore(SystemEvent): class ArmDisarmEvent(SystemEvent): - pass + IS_ZONE: bool = False class ArmStay(ArmDisarmEvent): @@ -185,6 +204,7 @@ class AcRestore(SystemEvent): class AlarmCancel(SystemEvent): NAME: str = 'Alarm Cancel' CODE: int = 0x20 + IS_ZONE: bool = False class OtherBypass(SystemEvent): diff --git a/py_vista_turbo_serial/examples/pushover_example.py b/py_vista_turbo_serial/examples/pushover_example.py index 217f9fe..6d0ce83 100644 --- a/py_vista_turbo_serial/examples/pushover_example.py +++ b/py_vista_turbo_serial/examples/pushover_example.py @@ -46,9 +46,7 @@ from datetime import datetime from py_vista_turbo_serial.communicator import Communicator -from py_vista_turbo_serial.messages import ( - MessagePacket, ArmingStatusRequest, ZoneStatusRequest -) +from py_vista_turbo_serial.messages import MessagePacket import requests @@ -94,6 +92,9 @@ def __init__( def _do_notify_pushover(self, title, message, sound=None): """Build Pushover API request arguments and call _send_pushover""" + if self._dry_run: + logger.warning('DRY RUN - don\'t actually send') + return d = { 'data': { 'token': self._app_token, @@ -106,9 +107,6 @@ def _do_notify_pushover(self, title, message, sound=None): if sound is not None: d['data']['sound'] = sound logger.info('Sending Pushover notification: %s', d) - if self._dry_run: - logger.warning('DRY RUN - don\'t actually send') - return for i in range(0, 2): try: self._send_pushover(d) @@ -183,6 +181,7 @@ def __init__( 'PushoverAlarmNotifier initializing at ' + datetime.now().strftime('%Y-%m-%dT%H:%M:%S') ) + self._dry_run: bool = dry_run self.notifier: PushoverNotifier = PushoverNotifier( store=self.store, app_token=os.environ['PUSHOVER_APIKEY'], @@ -198,8 +197,6 @@ def run(self): 'PushoverAlarmNotifier starting run loop at ' + datetime.now().strftime('%Y-%m-%dT%H:%M:%S') ) - self.panel.send_message(ArmingStatusRequest.generate()) - self.panel.send_message(ZoneStatusRequest.generate()) message: MessagePacket dt: str for message in self.panel.communicate(): @@ -212,6 +209,9 @@ def run(self): else: self.store.add(str(message) + ' (' + dt + ')') """ + if self._dry_run: + logger.info(str(message)) + continue self.store.add(str(message) + ' (' + dt + ')') @@ -239,19 +239,19 @@ def parse_args(argv): return args -def set_log_info(l: logging.Logger): +def set_log_info(lgr: logging.Logger): """set logger level to INFO""" set_log_level_format( - l, + lgr, logging.INFO, '%(asctime)s %(levelname)s:%(name)s:%(message)s' ) -def set_log_debug(l: logging.Logger): +def set_log_debug(lgr: logging.Logger): """set logger level to DEBUG, and debug-level output format""" set_log_level_format( - l, + lgr, logging.DEBUG, "%(asctime)s [%(levelname)s %(filename)s:%(lineno)s - " "%(name)s.%(funcName)s() ] %(message)s" diff --git a/py_vista_turbo_serial/messages.py b/py_vista_turbo_serial/messages.py index 899ce1b..98ab0ad 100644 --- a/py_vista_turbo_serial/messages.py +++ b/py_vista_turbo_serial/messages.py @@ -123,13 +123,15 @@ class MessagePacket: MSG_SUBTYPES: List[str] = [] def __init__( - self, raw_message: str, data: str = '', from_panel: bool = True + self, raw_message: str, zones: Dict[int, str], data: str = '', + from_panel: bool = True ): """ raw_message cannot be an empty string and should be just one line without leading or trailing whitespace or terminator (CR/LF) """ self.raw_message: str = raw_message + self._zones: Dict[int, str] = zones self._data: str = data self.from_panel: bool = from_panel @@ -158,7 +160,7 @@ def _parse_message(cls, raw_message: str) -> Tuple[str, str, str]: return msgtype, subtype, data @classmethod - def parse(cls, raw_message: str) -> 'MessagePacket': + def parse(cls, raw_message: str, zones: Dict[int, str]) -> 'MessagePacket': msgtype: str subtype: str data: str @@ -167,11 +169,12 @@ def parse(cls, raw_message: str) -> 'MessagePacket': for klass in cls.__subclasses__(): if msgtype in klass.MSG_TYPES and subtype in klass.MSG_SUBTYPES: return klass( - raw_message=raw_message, data=data, from_panel=from_panel + raw_message=raw_message, data=data, from_panel=from_panel, + zones=zones ) return UnknownMessage( raw_message=raw_message, data=data, from_panel=from_panel, - msg_type=msgtype, msg_subtype=subtype + msg_type=msgtype, msg_subtype=subtype, zones=zones ) @classmethod @@ -185,10 +188,10 @@ def generate(cls) -> str: class UnknownMessage(MessagePacket): def __init__( - self, raw_message: str, data: str, from_panel: bool, msg_type: str, + self, raw_message: str, data: str, zones: Dict[int, str], from_panel: bool, msg_type: str, msg_subtype: str ): - super().__init__(raw_message, data, from_panel) + super().__init__(raw_message, zones, data, from_panel) self.msg_type: str = msg_type self.msg_subtype: str = msg_subtype @@ -203,8 +206,11 @@ class ArmAwayMessage(MessagePacket): MSG_SUBTYPES: List[str] = ['A', 'a'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.user: int = int(data[0:2], 16) self.user_code: str = data[2:] @@ -219,8 +225,11 @@ class ArmHomeMessage(MessagePacket): MSG_SUBTYPES: List[str] = ['H', 'h'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.user: int = int(data[0:2], 16) self.user_code: str = data[2:] @@ -235,8 +244,11 @@ class DisarmMessage(MessagePacket): MSG_SUBTYPES: List[str] = ['D', 'd'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.user: int = int(data[0:2], 16) self.user_code: str = data[2:] @@ -251,8 +263,11 @@ class ArmingStatusRequest(MessagePacket): MSG_SUBTYPES: List[str] = ['s'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) def __repr__(self): return f'' @@ -268,8 +283,11 @@ class ArmingStatusReport(MessagePacket): MSG_SUBTYPES: List[str] = ['S'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.partition_state: Dict[int, PartitionState] = {} for idx, val in enumerate(data): if val == 'H': @@ -302,8 +320,11 @@ class ZoneStatusRequest(MessagePacket): MSG_SUBTYPES: List[str] = ['s'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) def __repr__(self): return f'' @@ -319,8 +340,11 @@ class ZoneStatusReport(MessagePacket): MSG_SUBTYPES: List[str] = ['S'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.zones: Dict[int, ZoneState] = {} for idx, val in enumerate(data): self.zones[idx + 1] = ZoneState(int(val, 16)) @@ -332,14 +356,59 @@ def __repr__(self): return f'' +class ZoneDescriptorRequest(MessagePacket): + + MSG_TYPES: List[str] = ['Z'] + + MSG_SUBTYPES: List[str] = ['D'] + + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) + + def __repr__(self): + return f'' + + @classmethod + def generate(cls) -> str: + return '08ZD009A' + + +class ZoneDescriptorReport(MessagePacket): + + MSG_TYPES: List[str] = ['z'] + + MSG_SUBTYPES: List[str] = ['d'] + + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) + self.data: str = data + assert data[3] == '"' + assert data[-1] == '"' + self.zone_num: int = int(data[0:3]) + self.zone_name: str = data[4:-2].strip() + + def __repr__(self): + return (f'') + + class ZonePartitionRequest(MessagePacket): MSG_TYPES: List[str] = ['z'] MSG_SUBTYPES: List[str] = ['p'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) def __repr__(self): return f'' @@ -355,8 +424,11 @@ class ZonePartitionReport(MessagePacket): MSG_SUBTYPES: List[str] = ['P'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self.partitions: Dict[int, int] = {} for idx, val in enumerate(data): self.partitions[idx + 1] = int(val) @@ -375,12 +447,15 @@ class SystemEventNotification(MessagePacket): MSG_SUBTYPES: List[str] = ['Q', 'q'] - def __init__(self, raw_message: str, data: str, from_panel: bool): - super().__init__(raw_message, data, from_panel) + def __init__( + self, raw_message: str, zones: Dict[int, str], data: str, + from_panel: bool + ): + super().__init__(raw_message, zones, data, from_panel) self._event_type: int = int(data[0:2], 16) - self.zone_or_user: int = int(data[2:4]) + self.zone_or_user: int = int(data[2:4], 16) + 1 self.event_type: EventTypes = SystemEvent.event_for_code( - self._event_type, self.zone_or_user + self._event_type, self.zone_or_user, zones ) self.minute: int = int(data[4:6]) self.hour: int = int(data[6:8]) @@ -388,6 +463,4 @@ def __init__(self, raw_message: str, data: str, from_panel: bool): self.month: int = int(data[10:12]) def __repr__(self): - return (f'') + return self.event_type.__repr__() diff --git a/py_vista_turbo_serial/state_monitor.py b/py_vista_turbo_serial/state_monitor.py index 61b0557..b6aa3b0 100644 --- a/py_vista_turbo_serial/state_monitor.py +++ b/py_vista_turbo_serial/state_monitor.py @@ -39,19 +39,18 @@ import sys import argparse import logging -from typing import Dict, List, Set +from typing import Dict, Set from py_vista_turbo_serial.communicator import Communicator from py_vista_turbo_serial.messages import ( MessagePacket, ArmAwayMessage, ArmHomeMessage, DisarmMessage, - ArmingStatusRequest, ArmingStatusReport, PartitionState, ZoneStatusRequest, - ZoneStatusReport, ZoneState, ZonePartitionRequest, ZonePartitionReport, + ArmingStatusReport, PartitionState, + ZoneStatusReport, ZoneState, ZonePartitionReport, SystemEventNotification ) from py_vista_turbo_serial.events import ( AlarmEvent, OtherAlarmRestore, RfLowBattery, RfLowBatteryRestore, - OtherTrouble, OtherTroubleRestore, ArmDisarmEvent, ArmStay, Arm, Disarm, - LowBattery, LowBatteryRestore, AcFail, AcRestore, AlarmCancel, + OtherTrouble, OtherTroubleRestore, LowBattery, LowBatteryRestore, OtherBypass, OtherUnbypass, FaultEvent, FaultRestoreEvent, SystemEvent ) @@ -246,12 +245,6 @@ def _handle_event(self, evt: SystemEvent): ) def run(self): - # @TODO every N minutes/hours we want to re-send the status requests - # and re-populate our stored state - # queue commands to get initial state - self.panel.send_message(ArmingStatusRequest.generate()) - self.panel.send_message(ZoneStatusRequest.generate()) - self.panel.send_message(ZonePartitionRequest.generate()) is_ready: bool = False message: MessagePacket for message in self.panel.communicate(): @@ -329,19 +322,19 @@ def parse_args(argv): return args -def set_log_info(l: logging.Logger): +def set_log_info(lgr: logging.Logger): """set logger level to INFO""" set_log_level_format( - l, + lgr, logging.INFO, '%(asctime)s %(levelname)s:%(name)s:%(message)s' ) -def set_log_debug(l: logging.Logger): +def set_log_debug(lgr: logging.Logger): """set logger level to DEBUG, and debug-level output format""" set_log_level_format( - l, + lgr, logging.DEBUG, "%(asctime)s [%(levelname)s %(filename)s:%(lineno)s - " "%(name)s.%(funcName)s() ] %(message)s" diff --git a/py_vista_turbo_serial/tests/test_events.py b/py_vista_turbo_serial/tests/test_events.py index 5b4c00c..0c5ac87 100644 --- a/py_vista_turbo_serial/tests/test_events.py +++ b/py_vista_turbo_serial/tests/test_events.py @@ -34,4 +34,3 @@ Jason Antman ################################################################################## """ - diff --git a/py_vista_turbo_serial/tests/test_messages.py b/py_vista_turbo_serial/tests/test_messages.py index 9cb5c7b..401b770 100644 --- a/py_vista_turbo_serial/tests/test_messages.py +++ b/py_vista_turbo_serial/tests/test_messages.py @@ -42,7 +42,7 @@ UnknownMessage, ArmAwayMessage, ArmHomeMessage, DisarmMessage, ArmingStatusRequest, ArmingStatusReport, PartitionState, ZoneStatusRequest, ZoneStatusReport, ZoneState, ZonePartitionRequest, ZonePartitionReport, - SystemEventNotification + SystemEventNotification, ZoneDescriptorRequest ) from py_vista_turbo_serial.events import FaultEvent @@ -261,6 +261,17 @@ def test_from_panel(self): assert res.zones == expected +class TestZoneDescriptorRequest: + + def test_to_panel(self): + res = MessagePacket.parse('08zd005A') + assert isinstance(res, ZoneDescriptorRequest) + assert res.from_panel is False + + def test_generate(self): + assert ZoneDescriptorRequest.generate() == '08zd005A' + + class TestZonePartitionRequest: def test_to_panel(self): @@ -295,7 +306,7 @@ def test_zone_14_open(self): ) assert isinstance(res, SystemEventNotification) assert res._event_type == 0x2b - assert res.zone_or_user == 14 + assert res.zone_or_user == 21 assert res.minute == 23 assert res.hour == 10 assert res.day == 21