From c2e39225d79a2edf839567526ec68d156f80bfdf Mon Sep 17 00:00:00 2001 From: Kimmo Huoman Date: Tue, 9 Feb 2016 19:01:10 +0200 Subject: [PATCH] Addresses some of the comments in issue #14. - Convert all values received by `Packet.parse_msg` to integers. - Replace `RadioPacket.sender` and `RadioPacket.destination` with lists of integers. Create corresponding functions for receiving hex and integer values. - Move commonly used stuff to `enocean.utils` --- enocean/protocol/eep.py | 12 +++--- enocean/protocol/packet.py | 76 ++++++++++++++++---------------------- enocean/utils.py | 34 +++++++++++++++++ tests/test_eep.py | 2 + 4 files changed, 73 insertions(+), 51 deletions(-) create mode 100644 enocean/utils.py diff --git a/enocean/protocol/eep.py b/enocean/protocol/eep.py index 86b01e8..9edde0b 100644 --- a/enocean/protocol/eep.py +++ b/enocean/protocol/eep.py @@ -4,6 +4,8 @@ from bs4 import BeautifulSoup import logging +import enocean.utils + logger = logging.getLogger('enocean.protocol.eep') path = os.path.dirname(os.path.realpath(__file__)) @@ -20,10 +22,6 @@ def __init__(self): logger.warn('Cannot load protocol file!') pass - def _get_hex(self, nmb): - ''' Get hex-representation of number ''' - return '0x%02X' % (nmb) - def _get_raw(self, source, bitarray): ''' Get raw data as integer, based on offset and size ''' offset = int(source['offset']) @@ -125,17 +123,17 @@ def find_profile(self, rorg, func, type, direction=None): logger.warn('EEP.xml not loaded!') return None - rorg = self.soup.find('telegram', {'rorg': self._get_hex(rorg)}) + rorg = self.soup.find('telegram', {'rorg': '0x%s' % enocean.utils._to_hex_string(rorg)}) if not rorg: logger.warn('Cannot find rorg in EEP!') return None - func = rorg.find('profiles', {'func': self._get_hex(func)}) + func = rorg.find('profiles', {'func': '0x%s' % enocean.utils._to_hex_string(func)}) if not func: logger.warn('Cannot find func in EEP!') return None - profile = func.find('profile', {'type': self._get_hex(type)}) + profile = func.find('profile', {'type': '0x%s' % enocean.utils._to_hex_string(type)}) if not profile: logger.warn('Cannot find type in EEP!') return None diff --git a/enocean/protocol/packet.py b/enocean/protocol/packet.py index e3b963a..e0c2bc6 100644 --- a/enocean/protocol/packet.py +++ b/enocean/protocol/packet.py @@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals, division import logging +import enocean.utils from enocean.protocol import crc8 from enocean.protocol.eep import EEP from enocean.protocol.constants import PACKET, RORG, PARSE_RESULT, DB0, DB2, DB3 @@ -45,45 +46,20 @@ def __unicode__(self): def __eq__(self, other): return self.type == other.type and self.rorg == other.rorg and self.data == other.data and self.optional == other.optional - def _get_bit(self, byte, bit): - ''' Get bit value from byte ''' - return (byte >> bit) & 0x01 - - def _combine_hex(self, data): - ''' Combine list of integer values to one big integer ''' - output = 0x00 - for i, d in enumerate(reversed(data)): - output |= (d << i * 8) - return output - - def _to_bitarray(self, data, width=8): - ''' Convert data (list of integers, bytearray or integer) to bitarray ''' - if isinstance(data, list) or isinstance(data, bytearray): - data = self._combine_hex(data) - return [True if digit == '1' else False for digit in bin(data)[2:].zfill(width)] - - def _from_bitarray(self, data): - ''' Convert bit array back to integer ''' - return int(''.join(['1' if x else '0' for x in data]), 2) - - def _to_hex_string(self, data): - ''' Convert list of integers to a hex string, separated by ":" ''' - return ':'.join([('%02X' % o) for o in data]) - def _data_to_bitdata(self): ''' Updates the bit_data member based on data member.''' if self.rorg == RORG.RPS or self.rorg == RORG.BS1: - self.bit_data = self._to_bitarray(self.data[1], 8) + self.bit_data = enocean.utils._to_bitarray(self.data[1], 8) if self.rorg == RORG.BS4: - self.bit_data = self._to_bitarray(self.data[1:5], 32) + self.bit_data = enocean.utils._to_bitarray(self.data[1:5], 32) def _bitdata_to_data(self): ''' Updates the data member based on bit_data member.''' if self.rorg in [RORG.RPS, RORG.BS1]: - self.data[1] = self._from_bitarray(self.bit_data) + self.data[1] = enocean.utils._from_bitarray(self.bit_data) if self.rorg == RORG.BS4: for byte in range(4): - self.data[byte+1] = self._from_bitarray(self.bit_data[byte*8:(byte+1)*8]) + self.data[byte+1] = enocean.utils._from_bitarray(self.bit_data[byte*8:(byte+1)*8]) @staticmethod def parse_msg(buf): @@ -101,7 +77,7 @@ def parse_msg(buf): # Valid buffer starts from 0x55 # Convert to list, as index -method isn't defined for bytearray - buf = buf[list(buf).index(0x55):] + buf = [ord(x) if not isinstance(x, int) else x for x in buf[list(buf).index(0x55):]] try: data_len = (buf[1] << 8) | buf[2] opt_len = buf[3] @@ -211,11 +187,11 @@ def parse(self): self.status = self.data[-1] if self.rorg == RORG.VLD: self.status = self.optional[-1] - self.bit_status = self._to_bitarray(self.status) + self.bit_status = enocean.utils._to_bitarray(self.status) if self.rorg in [RORG.RPS, RORG.BS1, RORG.BS4]: # These message types should have repeater count in the last for bits of status. - self.repeater_count = self._from_bitarray(self.bit_status[4:]) + self.repeater_count = enocean.utils._from_bitarray(self.bit_status[4:]) return self.parsed def select_eep(self, func, type, direction=None): @@ -242,7 +218,7 @@ def set_eep(self, data): self._data_to_bitdata() # data is a dict with EEP description keys self.bit_data, self.bit_status = self.eep.set_values(self._profile, self.bit_data, self.bit_status, data) - self.status = self._from_bitarray(self.bit_status) + self.status = enocean.utils._from_bitarray(self.bit_status) # update data based on bit_data self._bitdata_to_data() @@ -258,11 +234,9 @@ def build(self): class RadioPacket(Packet): - destination = 0 - destination_hex = '' + destination = [0xFF, 0xFF, 0xFF, 0xFF] dBm = 0 - sender = 0 - sender_hex = '' + sender = [0xFF, 0xFF, 0xFF, 0xFF] learn = True contains_eep = False @@ -277,12 +251,26 @@ def create(rorg, func, type, direction=None, learn=False, **kwargs): return Packet.create(PACKET.RADIO, rorg, func, type, direction, destination, sender, learn, **kwargs) + @property + def sender_int(self): + return enocean.utils._combine_hex(self.sender) + + @property + def sender_hex(self): + return enocean.utils._to_hex_string(self.sender) + + @property + def destination_int(self): + return enocean.utils._combine_hex(self.destination) + + @property + def destination_hex(self): + return enocean.utils._to_hex_string(self.destination) + def parse(self): - self.destination = self._combine_hex(self.optional[1:5]) - self.destination_hex = self._to_hex_string(self.optional[1:5]) + self.destination = self.optional[1:5] self.dBm = -self.optional[5] - self.sender = self._combine_hex(self.data[-5:-1]) - self.sender_hex = self._to_hex_string(self.data[-5:-1]) + self.sender = self.data[-5:-1] # Default to learn == True, as some devices don't have a learn button self.learn = True @@ -298,9 +286,9 @@ def parse(self): self.contains_eep = self.bit_data[DB0.BIT_7] if self.contains_eep: # Get rorg_func and rorg_type from an unidirectional learn packet - self.rorg_func = self._from_bitarray(self.bit_data[DB3.BIT_7:DB3.BIT_1]) - self.rorg_type = self._from_bitarray(self.bit_data[DB3.BIT_1:DB2.BIT_2]) - self.rorg_manufacturer = self._from_bitarray(self.bit_data[DB2.BIT_2:DB0.BIT_7]) + self.rorg_func = enocean.utils._from_bitarray(self.bit_data[DB3.BIT_7:DB3.BIT_1]) + self.rorg_type = enocean.utils._from_bitarray(self.bit_data[DB3.BIT_1:DB2.BIT_2]) + self.rorg_manufacturer = enocean.utils._from_bitarray(self.bit_data[DB2.BIT_2:DB0.BIT_7]) logger.debug('learn received, EEP detected, RORG: 0x%02X, FUNC: 0x%02X, TYPE: 0x%02X, Manufacturer: 0x%02X' % (self.rorg, self.rorg_func, self.rorg_type, self.rorg_manufacturer)) return super(RadioPacket, self).parse() diff --git a/enocean/utils.py b/enocean/utils.py new file mode 100644 index 0000000..3dff665 --- /dev/null +++ b/enocean/utils.py @@ -0,0 +1,34 @@ +# -*- encoding: utf-8 -*- +from __future__ import print_function, unicode_literals, division + + +def _get_bit(byte, bit): + ''' Get bit value from byte ''' + return (byte >> bit) & 0x01 + + +def _combine_hex(data): + ''' Combine list of integer values to one big integer ''' + output = 0x00 + for i, d in enumerate(reversed(data)): + output |= (d << i * 8) + return output + + +def _to_bitarray(data, width=8): + ''' Convert data (list of integers, bytearray or integer) to bitarray ''' + if isinstance(data, list) or isinstance(data, bytearray): + data = _combine_hex(data) + return [True if digit == '1' else False for digit in bin(data)[2:].zfill(width)] + + +def _from_bitarray(data): + ''' Convert bit array back to integer ''' + return int(''.join(['1' if x else '0' for x in data]), 2) + + +def _to_hex_string(data): + ''' Convert list of integers to a hex string, separated by ":" ''' + if isinstance(data, int): + return '%02X' % data + return ':'.join([('%02X' % o) for o in data]) diff --git a/tests/test_eep.py b/tests/test_eep.py index b12b6ed..191c60e 100644 --- a/tests/test_eep.py +++ b/tests/test_eep.py @@ -26,6 +26,8 @@ def test_temperature(): assert p.rorg_type is 0x05 assert p.status == 0x00 assert p.repeater_count == 0 + assert p.sender == [0x01, 0x81, 0xB7, 0x44] + assert p.sender_hex == '01:81:B7:44' def test_magnetic_switch():