From 0189a3f4f8cb869c2a102a0006e58d75032f4127 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 9 Sep 2024 11:32:19 -0400 Subject: [PATCH 01/20] Log frames that are not ACKed --- bellows/ash.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bellows/ash.py b/bellows/ash.py index 66c0e516..c45d9233 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -631,7 +631,9 @@ async def _send_data_frame(self, frame: AshFrame) -> None: await ack_future except NotAcked: _LOGGER.debug( - "NCP responded with NAK. Retrying (attempt %d)", attempt + 1 + "NCP responded with NAK to %r. Retrying (attempt %d)", + frame, + attempt + 1, ) # For timing purposes, NAK can be treated as an ACK @@ -650,9 +652,10 @@ async def _send_data_frame(self, frame: AshFrame) -> None: raise except asyncio.TimeoutError: _LOGGER.debug( - "No ACK received in %0.2fs (attempt %d)", + "No ACK received in %0.2fs (attempt %d) for %r", self._t_rx_ack, attempt + 1, + frame, ) # If a DATA frame acknowledgement is not received within the # current timeout value, then t_rx_ack is doubled. From 2e39c048e9952b48099191f93fb1f85fbcc1cf11 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 9 Sep 2024 11:32:49 -0400 Subject: [PATCH 02/20] Move command locking and prioritization into the protocol handler --- bellows/ezsp/__init__.py | 23 +------------- bellows/ezsp/protocol.py | 68 ++++++++++++++++++++++++++++++++++------ 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 273add3b..1ff511d3 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -12,8 +12,6 @@ from typing import Any, Callable, Generator import urllib.parse -from zigpy.datastructures import PriorityDynamicBoundedSemaphore - if sys.version_info[:2] < (3, 11): from async_timeout import timeout as asyncio_timeout # pragma: no cover else: @@ -41,8 +39,6 @@ NETWORK_OPS_TIMEOUT = 10 NETWORK_COORDINATOR_STARTUP_RESET_WAIT = 1 -MAX_COMMAND_CONCURRENCY = 1 - class EZSP: _BY_VERSION = { @@ -66,7 +62,6 @@ def __init__(self, device_config: dict): self._ezsp_version = v4.EZSPv4.VERSION self._gw = None self._protocol = None - self._send_sem = PriorityDynamicBoundedSemaphore(value=MAX_COMMAND_CONCURRENCY) self._stack_status_listeners: collections.defaultdict[ t.sl_Status, list[asyncio.Future] @@ -190,21 +185,6 @@ def close(self): self._gw.close() self._gw = None - def _get_command_priority(self, name: str) -> int: - return { - # Deprioritize any commands that send packets - "set_source_route": -1, - "setExtendedTimeout": -1, - "send_unicast": -1, - "send_multicast": -1, - "send_broadcast": -1, - # Prioritize watchdog commands - "nop": 999, - "readCounters": 999, - "readAndClearCounters": 999, - "getValue": 999, - }.get(name, 0) - async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any: command = getattr(self._protocol, name) @@ -217,8 +197,7 @@ async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any: ) raise EzspError("EZSP is not running") - async with self._send_sem(priority=self._get_command_priority(name)): - return await command(*args, **kwargs) + return await command(*args, **kwargs) async def _list_command( self, name, item_frames, completion_frame, spos, *args, **kwargs diff --git a/bellows/ezsp/protocol.py b/bellows/ezsp/protocol.py index f9eca74e..a1af868a 100644 --- a/bellows/ezsp/protocol.py +++ b/bellows/ezsp/protocol.py @@ -6,6 +6,7 @@ import functools import logging import sys +import time from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Iterable import zigpy.state @@ -15,6 +16,8 @@ else: from asyncio import timeout as asyncio_timeout # pragma: no cover +from zigpy.datastructures import PriorityDynamicBoundedSemaphore + from bellows.config import CONF_EZSP_POLICIES from bellows.exception import InvalidCommandError import bellows.types as t @@ -23,7 +26,9 @@ from bellows.uart import Gateway LOGGER = logging.getLogger(__name__) + EZSP_CMD_TIMEOUT = 6 # Sum of all ASH retry timeouts: 0.4 + 0.8 + 1.6 + 3.2 +MAX_COMMAND_CONCURRENCY = 1 class ProtocolHandler(abc.ABC): @@ -42,6 +47,9 @@ def __init__(self, cb_handler: Callable, gateway: Gateway) -> None: for name, (cmd_id, tx_schema, rx_schema) in self.COMMANDS.items() } self.tc_policy = 0 + self._send_semaphore = PriorityDynamicBoundedSemaphore( + value=MAX_COMMAND_CONCURRENCY + ) # Cached by `set_extended_timeout` so subsequent calls are a little faster self._address_table_size: int | None = None @@ -65,18 +73,58 @@ def _ezsp_frame_rx(self, data: bytes) -> tuple[int, int, bytes]: def _ezsp_frame_tx(self, name: str) -> bytes: """Serialize the named frame.""" + def _get_command_priority(self, name: str) -> int: + return { + # Deprioritize any commands that send packets + "setSourceRoute": -1, + "setExtendedTimeout": -1, + "sendUnicast": -1, + "sendMulticast": -1, + "sendBroadcast": -1, + # Prioritize watchdog commands + "nop": 999, + "readCounters": 999, + "readAndClearCounters": 999, + "getValue": 999, + }.get(name, 0) + async def command(self, name, *args, **kwargs) -> Any: """Serialize command and send it.""" - LOGGER.debug("Sending command %s: %s %s", name, args, kwargs) - data = self._ezsp_frame(name, *args, **kwargs) - cmd_id, _, rx_schema = self.COMMANDS[name] - future = asyncio.get_running_loop().create_future() - self._awaiting[self._seq] = (cmd_id, rx_schema, future) - self._seq = (self._seq + 1) % 256 - - async with asyncio_timeout(EZSP_CMD_TIMEOUT): - await self._gw.send_data(data) - return await future + delay_time = 0 + was_delayed = False + + if self._send_semaphore.locked(): + LOGGER.debug( + "Send semaphore is locked, delaying before sending %s(%r, %r)", + name, + args, + kwargs, + ) + delay_time = time.monotonic() + was_delayed = True + + async with self._send_semaphore(priority=self._get_command_priority(name)): + if was_delayed: + LOGGER.debug( + "Sending command %s: %s %s after %0.2fs delay", + name, + args, + kwargs, + time.monotonic() - delay_time, + ) + else: + LOGGER.debug("Sending command %s: %s %s", name, args, kwargs) + + data = self._ezsp_frame(name, *args, **kwargs) + cmd_id, _, rx_schema = self.COMMANDS[name] + + future = asyncio.get_running_loop().create_future() + self._awaiting[self._seq] = (cmd_id, rx_schema, future) + self._seq = (self._seq + 1) % 256 + + async with asyncio_timeout(EZSP_CMD_TIMEOUT): + await self._gw.send_data(data) + return await future async def update_policies(self, policy_config: dict) -> None: """Set up the policies for what the NCP should do.""" From fba2c6fd69cdf22d330fc0116903f0f9c4ea536b Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 9 Sep 2024 11:47:30 -0400 Subject: [PATCH 03/20] Rename `delay_time` to `send_time` --- bellows/ezsp/protocol.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/bellows/ezsp/protocol.py b/bellows/ezsp/protocol.py index a1af868a..88fd3e00 100644 --- a/bellows/ezsp/protocol.py +++ b/bellows/ezsp/protocol.py @@ -90,27 +90,28 @@ def _get_command_priority(self, name: str) -> int: async def command(self, name, *args, **kwargs) -> Any: """Serialize command and send it.""" - delay_time = 0 - was_delayed = False + delayed = False + send_time = None if self._send_semaphore.locked(): + delayed = True + send_time = time.monotonic() + LOGGER.debug( "Send semaphore is locked, delaying before sending %s(%r, %r)", name, args, kwargs, ) - delay_time = time.monotonic() - was_delayed = True async with self._send_semaphore(priority=self._get_command_priority(name)): - if was_delayed: + if delayed: LOGGER.debug( "Sending command %s: %s %s after %0.2fs delay", name, args, kwargs, - time.monotonic() - delay_time, + time.monotonic() - send_time, ) else: LOGGER.debug("Sending command %s: %s %s", name, args, kwargs) From b243e940fe2b4c318d4f4d46d1b685170454115f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 13:25:27 -0400 Subject: [PATCH 04/20] Cancel all pending futures when the connection is lost --- bellows/ash.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/bellows/ash.py b/bellows/ash.py index c45d9233..e3aa949e 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -368,12 +368,22 @@ def connection_made(self, transport): self._ezsp_protocol.connection_made(self) def connection_lost(self, exc): + self._cancel_pending_data_frames() self._ezsp_protocol.connection_lost(exc) def eof_received(self): self._ezsp_protocol.eof_received() + def _cancel_pending_data_frames( + self, exc: BaseException = RuntimeError("Connection has been closed") + ): + for fut in self._pending_data_frames.values(): + if not fut.done(): + fut.set_exception(exc) + def close(self): + self._cancel_pending_data_frames() + if self._transport is not None: self._transport.close() @@ -539,11 +549,7 @@ def ack_frame_received(self, frame: AckFrame) -> None: self._handle_ack(frame) def nak_frame_received(self, frame: NakFrame) -> None: - err = NotAcked(frame=frame) - - for fut in self._pending_data_frames.values(): - if not fut.done(): - fut.set_exception(err) + self._cancel_pending_data_frames(NotAcked(frame=frame)) def rst_frame_received(self, frame: RstFrame) -> None: self._ncp_reset_code = None @@ -558,12 +564,7 @@ def error_frame_received(self, frame: ErrorFrame) -> None: self._enter_failed_state(self._ncp_reset_code) def _enter_failed_state(self, reset_code: t.NcpResetCode) -> None: - exc = NcpFailure(code=reset_code) - - for fut in self._pending_data_frames.values(): - if not fut.done(): - fut.set_exception(exc) - + self._cancel_pending_data_frames(NcpFailure(code=reset_code)) self._ezsp_protocol.reset_received(reset_code) def _write_frame( From ad866bb3b0a6f97462f298f0f9dd82879d62de61 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 13:55:01 -0400 Subject: [PATCH 05/20] Increase ACK_TIMEOUTS from 4 to 5 --- bellows/ash.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bellows/ash.py b/bellows/ash.py index e3aa949e..50864e2c 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -62,7 +62,7 @@ class Reserved(enum.IntEnum): # Maximum number of consecutive timeouts allowed while waiting to receive an ACK before # going to the FAILED state. The value 0 prevents the NCP from entering the error state # due to timeouts. -ACK_TIMEOUTS = 4 +ACK_TIMEOUTS = 5 def generate_random_sequence(length: int) -> bytes: From 8068e6148ce015083bd065354d4c22cd2c8967b7 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:01:48 -0400 Subject: [PATCH 06/20] Increase the EZSP command timeout to 10s --- bellows/ezsp/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bellows/ezsp/protocol.py b/bellows/ezsp/protocol.py index 88fd3e00..8c26c33e 100644 --- a/bellows/ezsp/protocol.py +++ b/bellows/ezsp/protocol.py @@ -27,7 +27,7 @@ LOGGER = logging.getLogger(__name__) -EZSP_CMD_TIMEOUT = 6 # Sum of all ASH retry timeouts: 0.4 + 0.8 + 1.6 + 3.2 +EZSP_CMD_TIMEOUT = 10 MAX_COMMAND_CONCURRENCY = 1 From 3f61c3079720462346c0ff4ea737a62bcfa57056 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:02:01 -0400 Subject: [PATCH 07/20] Do not count the ASH send time in the EZSP command timeout --- bellows/ezsp/protocol.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bellows/ezsp/protocol.py b/bellows/ezsp/protocol.py index 8c26c33e..7dedb8a8 100644 --- a/bellows/ezsp/protocol.py +++ b/bellows/ezsp/protocol.py @@ -123,8 +123,9 @@ async def command(self, name, *args, **kwargs) -> Any: self._awaiting[self._seq] = (cmd_id, rx_schema, future) self._seq = (self._seq + 1) % 256 + await self._gw.send_data(data) + async with asyncio_timeout(EZSP_CMD_TIMEOUT): - await self._gw.send_data(data) return await future async def update_policies(self, policy_config: dict) -> None: From e72670acb2e2ad3af3e874696f80b61dbac10667 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:09:05 -0400 Subject: [PATCH 08/20] Set the NCP state to `FAILED` when we soft fail --- bellows/ash.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bellows/ash.py b/bellows/ash.py index 50864e2c..914b0a83 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -564,6 +564,7 @@ def error_frame_received(self, frame: ErrorFrame) -> None: self._enter_failed_state(self._ncp_reset_code) def _enter_failed_state(self, reset_code: t.NcpResetCode) -> None: + self._ncp_state == NcpState.FAILED self._cancel_pending_data_frames(NcpFailure(code=reset_code)) self._ezsp_protocol.reset_received(reset_code) From 6e88e990e93b2a1a6c05d2cd2b653a4052883a4b Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:15:34 -0400 Subject: [PATCH 09/20] Always handle ACK information, even if the frame is invalid --- bellows/ash.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/bellows/ash.py b/bellows/ash.py index 914b0a83..06156eb0 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -489,7 +489,7 @@ def data_received(self, data: bytes) -> None: f"Unexpected reserved byte found: 0x{reserved_byte:02X}" ) # pragma: no cover - def _handle_ack(self, frame: DataFrame | AckFrame) -> None: + def _handle_ack(self, frame: DataFrame | AckFrame | NakFrame) -> None: # Note that ackNum is the number of the next frame the receiver expects and it # is one greater than the last frame received. for ack_num_offset in range(-TX_K, 0): @@ -504,14 +504,19 @@ def _handle_ack(self, frame: DataFrame | AckFrame) -> None: def frame_received(self, frame: AshFrame) -> None: _LOGGER.debug("Received frame %r", frame) + # If a frame has ACK information (DATA, ACK, or NAK), it should be used even if + # the frame is out of sequence or invalid if isinstance(frame, DataFrame): + self._handle_ack(frame) self.data_frame_received(frame) - elif isinstance(frame, RStackFrame): - self.rstack_frame_received(frame) elif isinstance(frame, AckFrame): + self._handle_ack(frame) self.ack_frame_received(frame) elif isinstance(frame, NakFrame): + self._handle_ack(frame) self.nak_frame_received(frame) + elif isinstance(frame, RStackFrame): + self.rstack_frame_received(frame) elif isinstance(frame, RstFrame): self.rst_frame_received(frame) elif isinstance(frame, ErrorFrame): @@ -523,7 +528,6 @@ def data_frame_received(self, frame: DataFrame) -> None: # The Host may not piggyback acknowledgments and should promptly send an ACK # frame when it receives a DATA frame. if frame.frm_num == self._rx_seq: - self._handle_ack(frame) self._rx_seq = (frame.frm_num + 1) % 8 self._write_frame(AckFrame(res=0, ncp_ready=0, ack_num=self._rx_seq)) @@ -546,7 +550,7 @@ def rstack_frame_received(self, frame: RStackFrame) -> None: self._ezsp_protocol.reset_received(frame.reset_code) def ack_frame_received(self, frame: AckFrame) -> None: - self._handle_ack(frame) + pass def nak_frame_received(self, frame: NakFrame) -> None: self._cancel_pending_data_frames(NotAcked(frame=frame)) From d384755ebf9fd1c7bfba5fdf3dd6568ff792a434 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:16:14 -0400 Subject: [PATCH 10/20] Remove stale constants from `Gateway` --- bellows/uart.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/bellows/uart.py b/bellows/uart.py index ee2aea08..d48838d5 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -19,21 +19,6 @@ class Gateway(asyncio.Protocol): - FLAG = b"\x7E" # Marks end of frame - ESCAPE = b"\x7D" - XON = b"\x11" # Resume transmission - XOFF = b"\x13" # Stop transmission - SUBSTITUTE = b"\x18" - CANCEL = b"\x1A" # Terminates a frame in progress - STUFF = 0x20 - RANDOMIZE_START = 0x42 - RANDOMIZE_SEQ = 0xB8 - - RESERVED = FLAG + ESCAPE + XON + XOFF + SUBSTITUTE + CANCEL - - class Terminator: - pass - def __init__(self, application, connected_future=None, connection_done_future=None): self._application = application From 4dbe60a9f71791b17e7af851de50ad6fa71f2e84 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:23:59 -0400 Subject: [PATCH 11/20] Guard to make sure we can't send data while the transport is closing --- bellows/ash.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bellows/ash.py b/bellows/ash.py index 06156eb0..2f9c8212 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -579,6 +579,9 @@ def _write_frame( prefix: tuple[Reserved] = (), suffix: tuple[Reserved] = (Reserved.FLAG,), ) -> None: + if self._transport is None or self._transport.is_closing(): + raise NcpFailure("Transport is closed, cannot send frame") + if _LOGGER.isEnabledFor(logging.DEBUG): prefix_str = "".join([f"{r.name} + " for r in prefix]) suffix_str = "".join([f" + {r.name}" for r in suffix]) From ffd813ea04173c8dfa2285882e64a2ef469e9a57 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:59:20 -0400 Subject: [PATCH 12/20] Fix unit tests --- tests/test_ash.py | 13 +++++++++++-- tests/test_uart.py | 1 + 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_ash.py b/tests/test_ash.py index 9b479c2d..348f98b1 100644 --- a/tests/test_ash.py +++ b/tests/test_ash.py @@ -86,14 +86,21 @@ def send_reset(self) -> None: class FakeTransport: - def __init__(self, receiver): + def __init__(self, receiver) -> None: self.receiver = receiver - self.paused = False + self.paused: bool = False + self.closing: bool = False def write(self, data: bytes) -> None: if not self.paused: self.receiver.data_received(data) + def close(self) -> None: + self.closing = True + + def is_closing(self) -> bool: + return self.closing + class FakeTransportOneByteAtATime(FakeTransport): def write(self, data: bytes) -> None: @@ -317,6 +324,7 @@ async def test_sequence(): loop = asyncio.get_running_loop() ezsp = MagicMock() transport = MagicMock() + transport.is_closing.return_value = False protocol = ash.AshProtocol(ezsp) protocol._write_frame = MagicMock(wraps=protocol._write_frame) @@ -408,6 +416,7 @@ async def test_ash_protocol_startup(caplog): ezsp = MagicMock() transport = MagicMock() + transport.is_closing.return_value = False protocol = ash.AshProtocol(ezsp) protocol._write_frame = MagicMock(wraps=protocol._write_frame) diff --git a/tests/test_uart.py b/tests/test_uart.py index aadacdf8..fdd404fb 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -188,6 +188,7 @@ def test_eof_received(gw): async def test_connection_lost_reset_error_propagation(monkeypatch): app = MagicMock() transport = MagicMock() + transport.is_closing.return_value = False async def mockconnect(loop, protocol_factory, **kwargs): protocol = protocol_factory() From 1cf8130b410326b54b065f81e915cad2c5fe33f2 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 10 Sep 2024 23:41:32 -0400 Subject: [PATCH 13/20] Send a NAK frame on any parsing error --- bellows/ash.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/bellows/ash.py b/bellows/ash.py index 2f9c8212..9d506fb2 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -4,6 +4,7 @@ import asyncio import binascii from collections.abc import Coroutine +import contextlib import dataclasses import enum import logging @@ -409,7 +410,9 @@ def _unstuff_bytes(data: bytes) -> bytes: for c in data: if escaped: byte = c ^ 0b00100000 - assert byte in RESERVED_BYTES + if byte not in RESERVED_BYTES: + raise ParsingError(f"Invalid escaped byte: 0x{byte:02X}") + out.append(byte) escaped = False elif c == Reserved.ESCAPE: @@ -457,14 +460,19 @@ def data_received(self, data: bytes) -> None: if not frame_bytes: continue - data = self._unstuff_bytes(frame_bytes) - try: + data = self._unstuff_bytes(frame_bytes) frame = parse_frame(data) except Exception: _LOGGER.debug( "Failed to parse frame %r", frame_bytes, exc_info=True ) + + with contextlib.suppress(NcpFailure): + self._write_frame( + NakFrame(res=0, ncp_ready=0, ack_num=self._rx_seq), + prefix=(Reserved.CANCEL,), + ) else: self.frame_received(frame) elif reserved_byte == Reserved.CANCEL: From f58c320f0efe46de9a81fcd8c88720fe07c494cb Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 11 Sep 2024 12:48:42 -0400 Subject: [PATCH 14/20] Reset the random seed every ASH test invocation --- tests/test_ash.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/test_ash.py b/tests/test_ash.py index 348f98b1..627c9fcf 100644 --- a/tests/test_ash.py +++ b/tests/test_ash.py @@ -11,11 +11,6 @@ import bellows.types as t -@pytest.fixture(autouse=True, scope="function") -def random_seed(): - random.seed(0) - - class AshNcpProtocol(ash.AshProtocol): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) @@ -502,6 +497,7 @@ async def test_ash_protocol_startup(caplog): ], ) async def test_ash_end_to_end(transport_cls: type[FakeTransport]) -> None: + random.seed(2) asyncio.get_running_loop() host_ezsp = MagicMock() From 8b54e139fc7e8d4d17241a028dd8fbfa52a842e9 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 11 Sep 2024 12:49:49 -0400 Subject: [PATCH 15/20] Remove unnecessary `asyncio.get_running_loop()` --- tests/test_ash.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_ash.py b/tests/test_ash.py index 627c9fcf..f97a8437 100644 --- a/tests/test_ash.py +++ b/tests/test_ash.py @@ -498,7 +498,6 @@ async def test_ash_protocol_startup(caplog): ) async def test_ash_end_to_end(transport_cls: type[FakeTransport]) -> None: random.seed(2) - asyncio.get_running_loop() host_ezsp = MagicMock() ncp_ezsp = MagicMock() From 7fcc6008f3394e9a45abff86ef76a96cb9cc0441 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:51:22 -0400 Subject: [PATCH 16/20] Add a few more unit tests for coverage --- tests/test_ash.py | 4 ++++ tests/test_ezsp_v4.py | 27 ++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/tests/test_ash.py b/tests/test_ash.py index f97a8437..0f3037e5 100644 --- a/tests/test_ash.py +++ b/tests/test_ash.py @@ -177,6 +177,10 @@ def test_stuffing(): assert ash.AshProtocol._stuff_bytes(b"\x7F") == b"\x7F" assert ash.AshProtocol._unstuff_bytes(b"\x7F") == b"\x7F" + with pytest.raises(ash.ParsingError): + # AB is not a sequence of bytes that can be unescaped + assert ash.AshProtocol._unstuff_bytes(b"\x7D\xAB") + def test_pseudo_random_data_sequence(): assert ash.PSEUDO_RANDOM_DATA_SEQUENCE.startswith(b"\x42\x21\xA8\x54\x2A") diff --git a/tests/test_ezsp_v4.py b/tests/test_ezsp_v4.py index 1fec0187..fc31d251 100644 --- a/tests/test_ezsp_v4.py +++ b/tests/test_ezsp_v4.py @@ -1,5 +1,6 @@ +import asyncio import logging -from unittest.mock import MagicMock, call, patch +from unittest.mock import AsyncMock, MagicMock, call, patch import pytest import zigpy.state @@ -515,3 +516,27 @@ async def test_set_extended_timeout_bad_table_size(ezsp_f) -> None: assert ezsp_f.getConfigurationValue.mock_calls == [ call(t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE) ] + + +async def test_send_concurrency(ezsp_f, caplog) -> None: + async def send_data(data: bytes) -> None: + await asyncio.sleep(0.1) + + rsp_data = bytearray(data) + rsp_data[1] |= 0x80 + + ezsp_f.__call__(rsp_data) + + ezsp_f._gw.send_data = AsyncMock(side_effect=send_data) + + with caplog.at_level(logging.DEBUG): + await asyncio.gather( + ezsp_f.command("nop"), + ezsp_f.command("nop"), + ezsp_f.command("nop"), + ezsp_f.command("nop"), + ) + + # All but the first queue up + assert caplog.text.count("Send semaphore is locked, delaying before sending") == 3 + assert caplog.text.count("s delay") == 3 From ddc876d38e2db5db04beb9908b789278ce46229f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:51:29 -0400 Subject: [PATCH 17/20] Null out the transport when we are done with it --- bellows/ash.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bellows/ash.py b/bellows/ash.py index 9d506fb2..deab78ad 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -369,6 +369,7 @@ def connection_made(self, transport): self._ezsp_protocol.connection_made(self) def connection_lost(self, exc): + self._transport = None self._cancel_pending_data_frames() self._ezsp_protocol.connection_lost(exc) @@ -387,6 +388,7 @@ def close(self): if self._transport is not None: self._transport.close() + self._transport = None @staticmethod def _stuff_bytes(data: bytes) -> bytes: From fb601d5d29e93cff64dee86b4acdf92dc89b0982 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:34:05 -0400 Subject: [PATCH 18/20] Fix typo when setting ncp_state --- bellows/ash.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bellows/ash.py b/bellows/ash.py index deab78ad..d5c1761c 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -578,7 +578,7 @@ def error_frame_received(self, frame: ErrorFrame) -> None: self._enter_failed_state(self._ncp_reset_code) def _enter_failed_state(self, reset_code: t.NcpResetCode) -> None: - self._ncp_state == NcpState.FAILED + self._ncp_state = NcpState.FAILED self._cancel_pending_data_frames(NcpFailure(code=reset_code)) self._ezsp_protocol.reset_received(reset_code) From 02ef7a1e6791195d3c639739ea77fe52db18a7e8 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:39:08 -0400 Subject: [PATCH 19/20] Fix typo with buffer truncation --- bellows/ash.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bellows/ash.py b/bellows/ash.py index d5c1761c..1349e36d 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -432,7 +432,7 @@ def data_received(self, data: bytes) -> None: _LOGGER.debug( "Truncating buffer to %s bytes, it is growing too fast", MAX_BUFFER_SIZE ) - self._buffer = self._buffer[:MAX_BUFFER_SIZE] + self._buffer = self._buffer[-MAX_BUFFER_SIZE:] while self._buffer: if self._discarding_until_next_flag: From 9f6f286254a5f1bc816ee02a03fd4a9b28088dd4 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:42:40 -0400 Subject: [PATCH 20/20] Fix unit test to account for retries after NCP failure --- tests/test_ash.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_ash.py b/tests/test_ash.py index 0f3037e5..cb7c356a 100644 --- a/tests/test_ash.py +++ b/tests/test_ash.py @@ -557,8 +557,9 @@ async def test_ash_end_to_end(transport_cls: type[FakeTransport]) -> None: send_task = asyncio.create_task(host.send_data(b"ncp NAKing")) await asyncio.sleep(host._t_rx_ack) - # It'll still succeed - await send_task + # The NCP is in a failed state, we can't send it + with pytest.raises(ash.NcpFailure): + await send_task ncp_ezsp.data_received.reset_mock() host_ezsp.data_received.reset_mock()