forked from kellerza/sunsynk
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enhance Modbus frame handling in SolarmanSunsynk
- Updated the extraction of the Modbus frame to include the CRC byte for improved validation. - Added detailed logging for extracted Modbus frames and calculated CRC values to aid in debugging. - Introduced a new `calculate_modbus_crc` function in tests to ensure accurate CRC calculations for Modbus frames. - Enhanced unit tests to validate the reading and writing of registers, ensuring proper interaction with the Modbus client. These changes improve the reliability and traceability of Modbus communication in the SolarmanSunsynk implementation.
- Loading branch information
1 parent
5f4970c
commit ca8faad
Showing
2 changed files
with
97 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,98 @@ | ||
"""Solarman Sunsynk""" | ||
|
||
import logging | ||
from typing import Any | ||
from unittest.mock import AsyncMock, patch | ||
from unittest.mock import AsyncMock, MagicMock, patch | ||
|
||
import pytest | ||
|
||
from sunsynk.solarmansunsynk import SolarmanSunsynk | ||
from sunsynk.solarmansunsynk import POLY, SolarmanSunsynk | ||
|
||
P_CONNECT = "sunsynk.solarmansunsynk.SolarmanSunsynk.connect" | ||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def calculate_modbus_crc(frame: bytes) -> int: | ||
"""Calculate Modbus CRC for a frame.""" | ||
crc = 0xFFFF | ||
for pos in range(len(frame)): | ||
crc ^= frame[pos] | ||
for _ in range(8): | ||
if crc & 0x0001: | ||
crc = (crc >> 1) ^ POLY | ||
else: | ||
crc >>= 1 | ||
return crc | ||
|
||
|
||
@pytest.mark.asyncio | ||
@patch(P_CONNECT, new_callable=AsyncMock) | ||
async def test_uss_sensor(connect: Any) -> None: | ||
@patch("sunsynk.solarmansunsynk.SolarmanSunsynk.connect") | ||
@patch("sunsynk.solarmansunsynk.SolarmanSunsynk.disconnect") | ||
async def test_uss_sensor(mock_disconnect: AsyncMock, mock_connect: AsyncMock) -> None: | ||
ss = SolarmanSunsynk(port="tcp://127.0.0.1:502", dongle_serial_number=101) | ||
# await ss.connect() | ||
ss.client = AsyncMock() | ||
rhr = ss.client.read_holding_registers = AsyncMock() | ||
|
||
# _LOGGER.warning("%s", dir(ss.client)) | ||
assert not rhr.called | ||
await ss.read_holding_registers(1, 2) | ||
assert rhr.called | ||
# Create a mock client with the required methods | ||
mock_client = AsyncMock() | ||
mock_response = MagicMock() | ||
|
||
# Set up the client before any operations | ||
ss.client = mock_client | ||
|
||
# Test read_holding_registers | ||
assert not mock_client.read_holding_registers.called | ||
|
||
# The sequence number will be set by advance_sequence_number() | ||
# We'll capture it when read_holding_registers is called | ||
def read_holding_registers_side_effect(*args: tuple, **kwargs: dict[str, Any]) -> MagicMock: | ||
sequence_number = kwargs.get('sequence_number', 0) | ||
if not isinstance(sequence_number, int): | ||
sequence_number = 0 | ||
# Create the Modbus frame first (without CRC) | ||
modbus_frame = bytes([ | ||
1, # Slave ID | ||
3, # Function code | ||
4, # Data length | ||
0, 1, # First register | ||
0, 2, # Second register | ||
]) | ||
# Calculate CRC for the Modbus frame | ||
crc = calculate_modbus_crc(modbus_frame) | ||
# Create complete Modbus frame with CRC | ||
modbus_frame_with_crc = modbus_frame + bytes([ | ||
crc & 0xFF, # CRC low byte | ||
crc >> 8, # CRC high byte | ||
]) | ||
|
||
_LOGGER.debug("Modbus frame: %s", " ".join(f"{b:02x}" for b in modbus_frame)) | ||
_LOGGER.debug("Calculated CRC: %04x", crc) | ||
_LOGGER.debug("Complete Modbus frame: %s", " ".join(f"{b:02x}" for b in modbus_frame_with_crc)) | ||
|
||
# Create complete response with V5 packet framing | ||
mock_response.raw_response = bytes([ | ||
0xa5, # Start byte | ||
0x00, # Ver | ||
0x00, # Slave address | ||
0x00, # Control code | ||
0x00, # Function code | ||
sequence_number, # Sequence number (now guaranteed to be an int) | ||
0x00, 0x00, # Data length | ||
0x00, 0x00, # Source address | ||
0x00, 0x00, # Destination address | ||
]) + modbus_frame_with_crc + bytes([ | ||
0x15 # End byte | ||
]) | ||
|
||
_LOGGER.debug("Complete V5 packet: %s", " ".join(f"{b:02x}" for b in mock_response.raw_response)) | ||
|
||
mock_response.registers = [1, 2] | ||
return mock_response | ||
|
||
mock_client.read_holding_registers.side_effect = read_holding_registers_side_effect | ||
|
||
result = await ss.read_holding_registers(1, 2) | ||
assert mock_client.read_holding_registers.called | ||
assert result == [1, 2] | ||
|
||
wrr = ss.client.write_multiple_holding_registers = AsyncMock() | ||
assert not wrr.called | ||
# Test write_register | ||
assert not mock_client.write_multiple_holding_registers.called | ||
await ss.write_register(address=1, value=2) | ||
assert wrr.called | ||
assert mock_client.write_multiple_holding_registers.called |