diff --git a/broadlink/remote.py b/broadlink/remote.py index 8161927f..89cb71f5 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -5,47 +5,45 @@ from . import exceptions as e from .device import Device -TICK = 32.84 -IR_TOKEN = 0x26 - - -def durations_to_ir_data(durations) -> None: - """Convert a microsecond duration sequence into a Broadlink IR packet - - This can then be passed into send_data. - """ - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: + +def pulses_to_data(pulses: t.List[int], tick: int = 32.84) -> None: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: result.append(0) - result.append(num / 256) - result.append(num % 256) - return result + result.append(div) + result.append(mod) + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result -def data_to_durations(bytes): - """Parse a Broadlink packet into a microsecond duration sequence - Supports both IR and RF. - """ +def data_to_pulses(data: bytes, tick: int = 32.84) -> t.List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" result = [] - # XXX: Should check IR/RF token, repeat and length bytes. index = 4 - while index < len(bytes): - chunk = bytes[index] + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] index += 1 + if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError: + raise ValueError("Malformed data.") index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0D05: - break + + result.append(int(chunk * tick)) + return result diff --git a/cli/broadlink_cli b/cli/broadlink_cli index b4c3c1a4..35317ee4 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,13 +1,13 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +import typing as t import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError -from broadlink.remote import data_to_durations, durations_to_ir_data +from broadlink.remote import data_to_pulses, pulses_to_data TIMEOUT = 30 @@ -16,20 +16,17 @@ def auto_int(x): return int(x, 0) -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result +def format_pulses(pulses: t.List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: t.List[str]) -> t.List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -79,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = data_to_durations(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -92,8 +89,11 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_ir_data(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() @@ -111,17 +111,19 @@ if args.learn or (args.learnfile and not args.rflearn): print("No data received...") exit(1) - learned_str = format_durations(data_to_durations(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned_str) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + base64.b64encode(data).decode('ascii')) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned_str) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -205,14 +207,15 @@ if args.rflearn: exit(1) print("Packet found!") - learned = format_durations(data_to_durations(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt)