From f46e4cb440d4d8dd4f2569fc9bbab279ec7c30b0 Mon Sep 17 00:00:00 2001 From: maslyankov Date: Mon, 20 Jan 2025 19:37:08 +0200 Subject: [PATCH] Refactor register value handling in helpers and sensors - Introduced `pack_value` and `unpack_value` functions for packing and unpacking register values, enhancing register format handling. - Updated `NumberRWSensor` to utilize `pack_value` for 16-bit and 32-bit register packing. - Refactored `reg_to_value` method in `Sensor` class to use `unpack_value` for improved register value extraction. - Removed deprecated functions `make_32bit`, `split_to_16bit`, and `signed` from helpers to streamline code. --- src/sunsynk/helpers.py | 85 +++++++++++++++++++++++----------------- src/sunsynk/rwsensors.py | 17 +++++--- src/sunsynk/sensors.py | 12 +----- 3 files changed, 62 insertions(+), 52 deletions(-) diff --git a/src/sunsynk/helpers.py b/src/sunsynk/helpers.py index 94f3236a..7d1c4ec1 100644 --- a/src/sunsynk/helpers.py +++ b/src/sunsynk/helpers.py @@ -13,6 +13,46 @@ NumType = float | int +def pack_value(value: int, bits: int = 16, signed: bool = True) -> int | tuple[int, int]: + """Pack a value into register format. + + Args: + value: The value to pack + bits: Number of bits (16 or 32) + signed: Whether the value should be treated as signed + + Returns: + For 16-bit: single register value + For 32-bit: tuple of (low, high) register values + """ + if bits == 16: + fmt = 'h' if signed else 'H' + return struct.unpack('H', struct.pack(fmt, value))[0] + if bits == 32: + fmt = 'i' if signed else 'I' + return struct.unpack('2H', struct.pack(fmt, value)) + raise ValueError(f"Unsupported number of bits: {bits}") + + +def unpack_value(regs: RegType, signed: bool = True) -> int: + """Unpack register value(s) into an integer. + + Args: + regs: Register values (1 or 2 registers) + signed: Whether to treat as signed value + + Returns: + Unpacked integer value + """ + if len(regs) == 1: + fmt = 'h' if signed else 'H' + return struct.unpack(fmt, struct.pack('H', regs[0]))[0] + if len(regs) == 2: + fmt = 'i' if signed else 'I' + return struct.unpack(fmt, struct.pack('2H', regs[0], regs[1]))[0] + raise ValueError(f"Unsupported number of registers: {len(regs)}") + + def ensure_tuple(val: Any) -> tuple[int, ...]: """Return a tuple.""" if isinstance(val, tuple): @@ -49,28 +89,19 @@ def as_num(val: ValType) -> float | int: return 0 -def signed(val: int | float, bits: int = 16) -> int | float: - """Convert value to signed int using struct packing.""" - if isinstance(val, float): - return val - - if bits == 16: - return struct.unpack('h', struct.pack('H', val))[0] - if bits == 32: - return struct.unpack('i', struct.pack('I', val))[0] - - # Fallback for non-standard bit lengths - sign_bit = 1 << (bits - 1) - if val < sign_bit: - return val - return val - (1 << bits) - - def slug(name: str) -> str: """Create a slug.""" return name.lower().replace(" ", "_").replace("-", "_") +def hex_str(regs: RegType, address: RegType | None = None) -> str: + """Convert register values to hex strings.""" + res = (f"0x{r:04x}" for r in regs) + if address: + res = (f"{k}={v}" for k, v in zip(address, res, strict=True)) + return f"{{{' '.join(res)}}}" + + class SSTime: """Deals with inverter time format conversion complexities.""" @@ -125,23 +156,3 @@ def str_value(self, value: str) -> None: def patch_bitmask(value: int, patch: int, bitmask: int) -> int: """Combine bitmask values.""" return (patch & bitmask) + (value & (0xFFFF - bitmask)) - - -def hex_str(regs: RegType, address: RegType | None = None) -> str: - """Convert register values to hex strings.""" - res = (f"0x{r:04x}" for r in regs) - if address: - res = (f"{k}={v}" for k, v in zip(address, res, strict=True)) - return f"{{{' '.join(res)}}}" - - -def make_32bit(low16_value: int, high16_value: int, signed: bool = True) -> int: - """Convert two 16-bit registers into a 32-bit value using struct packing.""" - fmt = ' Tuple[int, int]: - """Split a 32-bit value into two 16-bit values using struct unpacking.""" - fmt = ' RegType: minv = resolve_num(resolve, self.min, 0) maxv = resolve_num(resolve, self.max, 100) val = int(max(minv, min(maxv, fval)) / abs(self.factor)) + if len(self.address) == 1: - if val < 0: - val = 0x10000 + val - return self.reg(val) + return self.reg(pack_value(val, bits=16, signed=self.factor < 0)) if len(self.address) == 2: - low, high = split_to_16bit(val, signed=self.factor < 0) + low, high = pack_value(val, bits=32, signed=self.factor < 0) # type: ignore return self.reg(low, high) raise NotImplementedError(f"Address length not supported: {self.address}") diff --git a/src/sunsynk/sensors.py b/src/sunsynk/sensors.py index 38abde09..cc548a27 100644 --- a/src/sunsynk/sensors.py +++ b/src/sunsynk/sensors.py @@ -12,9 +12,8 @@ ValType, ensure_tuple, int_round, - signed, slug, - make_32bit, + unpack_value, ) _LOGGER = logging.getLogger(__name__) @@ -39,14 +38,7 @@ def id(self) -> str: def reg_to_value(self, regs: RegType) -> ValType: """Return the value from the registers.""" regs = self.masked(regs) - if len(regs) == 1: - val = regs[0] - if self.factor < 0: # Indicates this register is signed - val = signed(val, bits=16) - elif len(regs) == 2: - val = make_32bit(regs[0], regs[1], signed=self.factor < 0) - else: - raise ValueError(f"Unsupported register length: {len(regs)}") + val = unpack_value(regs, signed=self.factor < 0) val = int_round(val * abs(self.factor)) _LOGGER.debug("%s=%s%s %s", self.id, val, self.unit, regs) return val