diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 070f06db..1d4ffb2a 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -12,7 +12,7 @@ from .hub import s3 from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro -from .sensor import a1 +from .sensor import a1, a2 from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { @@ -142,7 +142,10 @@ 0x653C: ("RM4 pro", "Broadlink"), }, a1: { - 0x2714: ("e-Sensor", "Broadlink"), + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), diff --git a/broadlink/cover.py b/broadlink/cover.py index 23727a86..1d8b41ef 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -63,31 +63,32 @@ class dooya2(Device): TYPE = "DT360E-2" - def _send(self, operation: int, data: bytes): + def _send(self, operation: int, data: bytes = b""): """Send a command to the device.""" - packet = bytearray(14) + packet = bytearray(12) packet[0x02] = 0xA5 packet[0x03] = 0xA5 packet[0x04] = 0x5A packet[0x05] = 0x5A packet[0x08] = operation packet[0x09] = 0x0B - - data_len = len(data) - packet[0x0A] = data_len & 0xFF - packet[0x0B] = data_len >> 8 - packet += bytes(data) + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[6] = checksum & 0xFF - packet[7] = checksum >> 8 + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 packet_len = len(packet) - 2 - packet[0] = packet_len & 0xFF - packet[1] = packet_len >> 8 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 - resp = self.send_packet(0x6a, packet) + resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) return payload @@ -119,31 +120,32 @@ class wser(Device): TYPE = "WSER" - def _send(self, operation: int, data: bytes): + def _send(self, operation: int, data: bytes = b""): """Send a command to the device.""" - packet = bytearray(14) + packet = bytearray(12) packet[0x02] = 0xA5 packet[0x03] = 0xA5 packet[0x04] = 0x5A packet[0x05] = 0x5A packet[0x08] = operation packet[0x09] = 0x0B - - data_len = len(data) - packet[0x0A] = data_len & 0xFF - packet[0x0B] = data_len >> 8 - packet += bytes(data) + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[6] = checksum & 0xFF - packet[7] = checksum >> 8 + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 packet_len = len(packet) - 2 - packet[0] = packet_len & 0xFF - packet[1] = packet_len >> 8 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 - resp = self.send_packet(0x6a, packet) + resp = self.send_packet(0x6A, packet) e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) return payload @@ -156,24 +158,24 @@ def get_position(self) -> int: def open(self) -> int: """Open the curtain.""" - resp = self._send(2, [0x4a, 0x31, 0xa0]) + resp = self._send(2, [0x4A, 0x31, 0xA0]) position = resp[0x0E] return position def close(self) -> int: """Close the curtain.""" - resp = self._send(2, [0x61, 0x32, 0xa0]) + resp = self._send(2, [0x61, 0x32, 0xA0]) position = resp[0x0E] return position def stop(self) -> int: """Stop the curtain.""" - resp = self._send(2, [0x4c, 0x73, 0xa0]) + resp = self._send(2, [0x4C, 0x73, 0xA0]) position = resp[0x0E] return position def set_position(self, position: int) -> int: """Set the position of the curtain.""" - resp = self._send(2, [position, 0x70, 0xa0]) + resp = self._send(2, [position, 0x70, 0xA0]) position = resp[0x0E] return position diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 33e7587d..21bae0b9 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -1,6 +1,4 @@ """Support for sensors.""" -import struct - from . import exceptions as e from .device import Device @@ -29,19 +27,62 @@ def check_sensors(self) -> dict: def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) + + return { + "temperature": data[0x04] + data[0x05] / 10.0, + "humidity": data[0x06] + data[0x07] / 10.0, + "light": data[0x08], + "air_quality": data[0x0A], + "noise": data[0x0C], + } + + +class a2(Device): + """Controls a Broadlink A2.""" + + TYPE = "A2" - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) return { - "temperature": temperature, - "humidity": humidity, - "light": data[0x4], - "air_quality": data[0x6], - "noise": data[0x8], + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], }