Skip to content

Commit

Permalink
Do not read XNCP flow control on older radios (#659)
Browse files Browse the repository at this point in the history
* Only request XNCP flow control type if the feature is supported

* Ignore XNCP for older gateways

* Add a unit test
  • Loading branch information
puddly authored Dec 8, 2024
1 parent acd2aa5 commit b7f3ef9
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
4 changes: 4 additions & 0 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ async def version(self):
)

async def get_xncp_features(self) -> xncp.FirmwareFeatures:
# Some older gateways seem to have their own XNCP protocol. Ignore them.
if self._ezsp_version < 13:
return FirmwareFeatures.NONE

try:
self._xncp_features = await self.xncp_get_supported_firmware_features()
except InvalidCommandError:
Expand Down
4 changes: 2 additions & 2 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class XncpCommandPayload(t.Struct):


class FlowControlType(t.enum8):
Software = 0x00
Hardware = 0x01
SOFTWARE = 0x00
HARDWARE = 0x01


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ)
Expand Down
13 changes: 4 additions & 9 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@
CONF_USE_THREAD,
CONFIG_SCHEMA,
)
from bellows.exception import (
ControllerError,
EzspError,
InvalidCommandError,
StackAlreadyRunning,
)
from bellows.exception import ControllerError, EzspError, StackAlreadyRunning
import bellows.ezsp
from bellows.ezsp.xncp import FirmwareFeatures
import bellows.multicast
Expand Down Expand Up @@ -299,9 +294,9 @@ async def load_network_info(self, *, load_devices=False) -> None:
can_burn_userdata_custom_eui64 = await ezsp.can_burn_userdata_custom_eui64()
can_rewrite_custom_eui64 = await ezsp.can_rewrite_custom_eui64()

try:
flow_control = await self._ezsp.xncp_get_flow_control_type()
except InvalidCommandError:
if FirmwareFeatures.FLOW_CONTROL_TYPE in ezsp._xncp_features:
flow_control = await ezsp.xncp_get_flow_control_type()
else:
flow_control = None

self.state.network_info = zigpy.state.NetworkInfo(
Expand Down
22 changes: 20 additions & 2 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from bellows.exception import ControllerError, EzspError
import bellows.ezsp as ezsp
from bellows.ezsp.v9.commands import GetTokenDataRsp
from bellows.ezsp.xncp import FirmwareFeatures
from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType
import bellows.types
import bellows.types as t
import bellows.types.struct
Expand Down Expand Up @@ -1874,7 +1874,7 @@ def zigpy_backup() -> zigpy.backups.NetworkBackup:
metadata={
"ezsp": {
"stack_version": 8,
"flow_control": None,
"flow_control": "hardware",
"can_burn_userdata_custom_eui64": True,
"can_rewrite_custom_eui64": True,
}
Expand All @@ -1890,6 +1890,24 @@ async def test_load_network_info(
) -> None:
await app.load_network_info(load_devices=True)

zigpy_backup.network_info.metadata["ezsp"]["flow_control"] = None

assert app.state.node_info == zigpy_backup.node_info
assert app.state.network_info == zigpy_backup.network_info


async def test_load_network_info_xncp_flow_control(
app: ControllerApplication,
ieee: zigpy_t.EUI64,
zigpy_backup: zigpy.backups.NetworkBackup,
) -> None:
app._ezsp._xncp_features |= FirmwareFeatures.FLOW_CONTROL_TYPE
app._ezsp.xncp_get_flow_control_type = AsyncMock(
return_value=FlowControlType.HARDWARE
)

await app.load_network_info(load_devices=True)

assert app.state.node_info == zigpy_backup.node_info
assert app.state.network_info == zigpy_backup.network_info

Expand Down
10 changes: 8 additions & 2 deletions tests/test_xncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,22 @@ async def test_xncp_get_flow_control_type(ezsp_f: EZSP) -> None:
t.EmberStatus.SUCCESS,
xncp.XncpCommand.from_payload(
xncp.GetFlowControlTypeRsp(
flow_control_type=xncp.FlowControlType.Hardware
flow_control_type=xncp.FlowControlType.HARDWARE
)
).serialize(),
]
)

assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.Hardware
assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.HARDWARE
assert customFrame.mock_calls == [
call(xncp.XncpCommand.from_payload(xncp.GetFlowControlTypeReq()).serialize())
]


async def test_xncp_get_xncp_features_fixes(ezsp_f: EZSP) -> None:
"""Test XNCP `get_xncp_features`, with fixes."""
ezsp_f._ezsp_version = 13

ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
Expand Down Expand Up @@ -220,6 +222,10 @@ async def test_xncp_get_xncp_features_fixes(ezsp_f: EZSP) -> None:
| xncp.FirmwareFeatures.MEMBER_OF_ALL_GROUPS
)

# XNCP is ignored for older EmberZNet
ezsp_f._ezsp_version = 8
assert (await ezsp_f.get_xncp_features()) == xncp.FirmwareFeatures.NONE

assert customFrame.mock_calls == [
call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()),
call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()),
Expand Down

0 comments on commit b7f3ef9

Please sign in to comment.