Skip to content

Commit

Permalink
WIP initial code
Browse files Browse the repository at this point in the history
  • Loading branch information
jantman committed Oct 13, 2023
1 parent fe01c3b commit 72abdb4
Show file tree
Hide file tree
Showing 7 changed files with 722 additions and 12 deletions.
87 changes: 87 additions & 0 deletions py_vista_turbo_serial/communicator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
The latest version of this package is available at:
<http://github.com/jantman/py-vista-turbo-serial>
##################################################################################
Copyright 2018 Jason Antman <[email protected]> <http://www.jasonantman.com>
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
##################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/py-vista-turbo-serial> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
##################################################################################
AUTHORS:
Jason Antman <[email protected]> <http://www.jasonantman.com>
##################################################################################
"""

import logging
from typing import List, Generator

from serial import Serial

from py_vista_turbo_serial.messages import MessagePacket

logger = logging.getLogger(__name__)


class Communicator:
"""
Class for handling communication with the alarm.
"""

def __init__(self, port: str, timeout_sec: int = 1):
self._port: str = port
logger.debug('Opening serial connection on %s', port)
self.serial: Serial = Serial(
port, baudrate=9600, timeout=timeout_sec
) # default 8N1
logger.debug('Serial is connected')
self.outgoing: List[str] = []

def __del__(self):
logger.debug('Closing serial port')
self.serial.close()
logger.debug('Serial port closed')

def send_message(self, msg: str):
logger.debug('Enqueueing message: %s', msg)
self.outgoing.append(msg)

def communicate(self) -> Generator[MessagePacket, None, None]:
logger.info('Entering communicate() loop')
# at start, if we have a message to send, send it
if self.outgoing:
msg = self.outgoing.pop(0)
logger.info('Sending message: %s', msg)
self.serial.write(msg + '\r\n')
# this might be better with select(), but let's try this...
while True:
# @TODO handle exception on timeout
line = self.serial.readline().decode().strip()
logger.debug('Got line: %s', line)
yield MessagePacket.parse(line)
if self.outgoing:
msg = self.outgoing.pop(0)
logger.info('Sending message: %s', msg)
self.serial.write(msg + '\r\n')
215 changes: 215 additions & 0 deletions py_vista_turbo_serial/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
"""
The latest version of this package is available at:
<http://github.com/jantman/py-vista-turbo-serial>
##################################################################################
Copyright 2018 Jason Antman <[email protected]> <http://www.jasonantman.com>
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
##################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/py-vista-turbo-serial> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
##################################################################################
AUTHORS:
Jason Antman <[email protected]> <http://www.jasonantman.com>
##################################################################################
"""


class UnknownEventException(Exception):
pass


class SystemEvent:

NAME: str = 'Unknown Event'

CODE: int = 0

def __init__(self, zone_or_user: int):
self.zone_or_user: int = zone_or_user

def __repr__(self) -> str:
return f'<{self.NAME}(zone_or_user={self.zone_or_user})>'

@classmethod
def event_for_code(
cls, event_code: int, zone_or_user: int
) -> 'SystemEvent':
for klass in cls.__subclasses__():
if klass.CODE == event_code:
return klass(zone_or_user)
raise UnknownEventException(f'Unknown event code: {event_code}')


class AlarmEvent(SystemEvent):
pass


class PerimeterAlarm(AlarmEvent):
NAME: str = 'Perimeter Alarm'
CODE: int = 0


class EntryExitAlarm(AlarmEvent):
NAME: str = 'Entry/Exit Alarm'
CODE: int = 1


class InteriorFollowerAlarm(AlarmEvent):
NAME: str = 'Interior Follower Alarm'
CODE: int = 4


class FireAlarm(AlarmEvent):
NAME: str = 'Fire Alarm'
CODE: int = 6


class AudiblePanicAlarm(AlarmEvent):
NAME: str = 'Audible Panic Alarm'
CODE: int = 7


class SilentPanicAlarm(AlarmEvent):
NAME: str = 'Silent Panic Alarm'
CODE: int = 8


class Aux24hrAlarm(AlarmEvent):
NAME: str = '24-Hour Auxiliary'
CODE: int = 9


class DuressAlarm(AlarmEvent):
NAME: str = 'Duress Alarm'
CODE: int = 0x0c


class OtherAlarmRestore(SystemEvent):
NAME: str = 'Other Alarm Restore'
CODE: int = 0x0e


class RfLowBattery(SystemEvent):
NAME: str = 'RF Low Battery'
CODE: int = 0x0f


class RfLowBatteryRestore(SystemEvent):
NAME: str = 'RF Low Battery Restore'
CODE: int = 0x10


class OtherTrouble(SystemEvent):
NAME: str = 'Other Trouble'
CODE: int = 0x11


class OtherTroubleRestore(SystemEvent):
NAME: str = 'Other Trouble Restore'
CODE: int = 0x12


class ArmDisarmEvent(SystemEvent):
pass


class ArmStay(ArmDisarmEvent):
NAME: str = 'Arm Stay/Home'
CODE: int = 0x15


class Disarm(ArmDisarmEvent):
NAME: str = 'Disarm'
CODE: int = 0x16


class Arm(ArmDisarmEvent):
NAME: str = 'Arm'
CODE: int = 0x18


class LowBattery(SystemEvent):
NAME: str = 'Low Battery'
CODE: int = 0x1a


class LowBatteryRestore(SystemEvent):
NAME: str = 'Low Battery Restore'
CODE: int = 0x1b


class AcFail(SystemEvent):
NAME: str = 'AC Fail'
CODE: int = 0x1c


class AcRestore(SystemEvent):
NAME: str = 'AC Restore'
CODE: int = 0x1d


class AlarmCancel(SystemEvent):
NAME: str = 'Alarm Cancel'
CODE: int = 0x20


class OtherBypass(SystemEvent):
NAME: str = 'Other Bypass'
CODE: int = 0x21


class OtherUnbypass(SystemEvent):
NAME: str = 'Other Unbypass'
CODE: int = 0x22


class DayNightAlarm(SystemEvent):
NAME: str = 'Day/Night Alarm'
CODE: int = 0x23


class DayNightRestore(SystemEvent):
NAME: str = 'Day/Night Restore'
CODE: int = 0x24


class FailToDisarm(SystemEvent):
NAME: str = 'Fail to Disarm'
CODE: int = 0x27


class FailToArm(SystemEvent):
NAME: str = 'Fail to Arm'
CODE: int = 0x28


class FaultEvent(SystemEvent):
NAME: str = 'Fault'
CODE: int = 0x2b


class FaultRestoreEvent(SystemEvent):
NAME: str = 'Fault Restore'
CODE: int = 0x2c
15 changes: 6 additions & 9 deletions py_vista_turbo_serial/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from enum import Enum
from functools import total_ordering

from py_vista_turbo_serial.events import SystemEvent

logger = logging.getLogger(__name__)


Expand All @@ -51,13 +53,6 @@ class PartitionState(Enum):
AWAY = 3


class SystemEventType(Enum):

PERIMETER_ALARM = 0
ENTRY_EXIT_ALARM = 1
FAULT = 0x2b


@total_ordering
class ZoneState:

Expand Down Expand Up @@ -380,14 +375,16 @@ class SystemEventNotification(MessagePacket):
def __init__(self, raw_message: str, data: str, from_panel: bool):
super().__init__(raw_message, data, from_panel)
self._event_type: int = int(data[0:2], 16)
self.event_type: SystemEventType = SystemEventType(self._event_type)
self.zone_or_user: int = int(data[2:4])
self.event_type: SystemEvent = SystemEvent.event_for_code(
self._event_type, self.zone_or_user
)
self.minute: int = int(data[4:6])
self.hour: int = int(data[6:8])
self.day: int = int(data[8:10])
self.month: int = int(data[10:12])

def __repr__(self):
return (f'<SystemEvent(Type={self.event_type.name} Zone/User='
return (f'<SystemEvent(Type={self.event_type.NAME} Zone/User='
f'{self.zone_or_user} Time={self.minute}:{self.hour} '
f'{self.month}/{self.day})>')
Loading

0 comments on commit 72abdb4

Please sign in to comment.