From 430cb79bcc93b90c1bd77480586de63a723c4bc5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 5 Feb 2022 16:49:24 -0600 Subject: [PATCH] Use asyncio.Protocol/asyncio.DatagramTransport directly - Fixes log spam when the device is online --- README.md | 2 +- pywizlight/bulb.py | 144 +++++++++++++++++++++++----------------- pywizlight/discovery.py | 6 +- requirements.txt | 1 - setup.py | 2 +- 5 files changed, 89 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 333b8b1..ec0780b 100644 --- a/README.md +++ b/README.md @@ -203,7 +203,7 @@ The discovery works with an UDP Broadcast request and collects all bulbs in the ## Async I/O -For async I/O this component uses https://github.com/jsbronder/asyncio-dgram, which internally uses asyncio DatagramTransport, which allows completely non-blocking UDP transport +For async I/O this component uses python's built-in asyncio DatagramTransport, which allows completely non-blocking UDP transport ## Classes diff --git a/pywizlight/bulb.py b/pywizlight/bulb.py index 85e3cf4..9e654e8 100755 --- a/pywizlight/bulb.py +++ b/pywizlight/bulb.py @@ -3,10 +3,7 @@ import json import logging import socket -from time import time -from typing import Any, Dict, Tuple, Optional, Union, List - -from asyncio_dgram.aio import connect as connect_dgram, DatagramStream +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast from pywizlight.bulblibrary import BulbClass, BulbType from pywizlight.exceptions import ( @@ -263,10 +260,26 @@ def get_colortemp(self) -> Optional[int]: return None -async def receiveUDPwithTimeout(stream: DatagramStream, timeout: float) -> bytes: - """Get message with timeout value.""" - data, remote_addr = await asyncio.wait_for(stream.recv(), timeout) - return data +class WizProtocol(asyncio.DatagramProtocol): + def __init__( + self, + on_response: Callable[[bytes, Tuple[str, int]], None], + ) -> None: + """Init the discovery protocol.""" + self.transport = None + self.on_response = on_response + + def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: + """Trigger on_response.""" + self.on_response(data, addr) + + def error_received(self, ex: Optional[Exception]) -> None: + """Handle error.""" + _LOGGER.debug("WizProtocol error: %s", ex) + + def connection_lost(self, ex: Optional[Exception]) -> None: + """The connection is lost.""" + _LOGGER.debug("WizProtocol connection lost: %s", ex) class wizlight: @@ -289,6 +302,11 @@ def __init__( self.bulbtype: Optional[BulbType] = None self.whiteRange: Optional[List[float]] = None self.extwhiteRange: Optional[List[float]] = None + self.transport: Optional[asyncio.DatagramTransport] = None + self.protocol: Optional[WizProtocol] = None + self.lock = asyncio.Lock() + self.loop = asyncio.get_event_loop() + self.response_future: Optional[asyncio.Future] = None # check the state on init if connect_on_init: self._check_connection() @@ -302,6 +320,23 @@ def status(self) -> Optional[bool]: # ------------------ Non properties -------------- # + async def _ensure_connection(self) -> None: + """Ensure we are connected.""" + if self.transport: + return + transport_proto = await self.loop.create_datagram_endpoint( + lambda: WizProtocol(on_response=self._on_response), + remote_addr=(self.ip, self.port), + ) + self.transport = cast(asyncio.DatagramTransport, transport_proto[0]) + self.protocol = cast(WizProtocol, transport_proto[1]) + + def _on_response(self, message: bytes, addr: Tuple[str, int]) -> None: + """Handle a response from the device.""" + _LOGGER.debug("%s: << %s", self.ip, message) + if self.response_future and not self.response_future.done(): + self.response_future.set_result(message) + def _check_connection(self) -> None: """Check the connection to the bulb.""" message = r'{"method":"getPilot","params":{}}' @@ -471,65 +506,50 @@ async def lightSwitch(self) -> None: async def sendUDPMessage(self, message: str) -> BulbResponse: """Send the UDP message to the bulb.""" - connid = hex(int(time() * 10000000))[2:] - # overall 10 sec. for timeout + await self._ensure_connection() timeout = 10 + data = message.encode("utf-8") send_interval = 0.5 max_send_datagrams = int(timeout / send_interval) - - stream = None - - try: - _LOGGER.debug( - f"[wizlight {self.ip}, connid {connid}] connecting to UDP port " - f"with send_interval of {send_interval} sec.." - ) - stream = await asyncio.wait_for( - connect_dgram((self.ip, self.port)), timeout - ) - _LOGGER.debug( - f"[wizlight {self.ip}, connid {connid}] listening for response datagram" - ) - - receive_task = asyncio.create_task(receiveUDPwithTimeout(stream, timeout)) - - for i in range(max_send_datagrams): + assert self.transport is not None + async with self.lock: + self.response_future = asyncio.Future() + for send in range(max_send_datagrams): + attempt = send + 1 _LOGGER.debug( - f"[wizlight {self.ip}, connid {connid}] sending command datagram {i}: {message}" - ) - asyncio.create_task(stream.send(bytes(message, "utf-8"))) - done, pending = await asyncio.wait( - [receive_task], timeout=send_interval + "%s: >> %s (%s/%s)", self.ip, data, attempt, max_send_datagrams ) - if done: + self.transport.sendto(data, (self.ip, self.port)) + try: + response = await asyncio.wait_for( + asyncio.shield(self.response_future), timeout=send_interval + ) + except asyncio.TimeoutError: + _LOGGER.debug( + "%s: Timed out waiting for response to %s (%s/%s)", + self.ip, + message, + attempt, + max_send_datagrams, + ) + if attempt == max_send_datagrams: + raise WizLightTimeOutError("The request to the bulb timed out") + else: break - await receive_task - data = receive_task.result() + resp = json.loads(response.decode()) + if "error" not in resp: + return resp + elif resp["error"]["code"] == -32601: + raise WizLightMethodNotFound("Method not found; maybe older bulb FW?") + else: + raise WizLightConnectionError(f'Error recieved: {resp["error"]}') + + async def async_close(self): + """Close the transport.""" + self.transport.close() - except asyncio.TimeoutError: - _LOGGER.debug( - f"[wizlight {self.ip}, connid {connid}] Failed to do UDP call(s) to wiz light - Timeout Error!", - exc_info=False, - ) - raise WizLightTimeOutError("The request to the bulb timed out") - finally: - if stream: - stream.close() - else: - raise WizLightConnectionError( - "Bulb is offline or IP address is not correct." - ) - if data is not None and len(data) is not None: - resp = dict(json.loads(data.decode())) - if "error" not in resp: - _LOGGER.debug( - f"[wizlight {self.ip}, connid {connid}] response received: {resp}" - ) - return resp - elif resp["error"]["code"] == -32601: - raise WizLightMethodNotFound( - "Cant found the methode. Maybe older bulb FW?" - ) - # exception should be created - raise ValueError(f"Can't read response from the bulb. Debug: {data!r}") + def __del__(self): + """Close the connection when the object is destroyed.""" + if self.transport: + self.loop.call_soon_threadsafe(self.transport.close) diff --git a/pywizlight/discovery.py b/pywizlight/discovery.py index 840c88b..b72135c 100644 --- a/pywizlight/discovery.py +++ b/pywizlight/discovery.py @@ -51,7 +51,11 @@ class BroadcastProtocol(asyncio.DatagramProtocol): """Protocol that sends an UDP broadcast message for bulb discovery.""" def __init__( - self, loop: AbstractEventLoop, registry: BulbRegistry, broadcast_address: str, future: Future + self, + loop: AbstractEventLoop, + registry: BulbRegistry, + broadcast_address: str, + future: Future, ) -> None: """Init discovery function.""" self.loop = loop diff --git a/requirements.txt b/requirements.txt index d4cd9ef..b98f660 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -asyncio-dgram click \ No newline at end of file diff --git a/setup.py b/setup.py index 1a7fc33..976e6be 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,6 @@ def get_version() -> str: "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], - install_requires=["asyncio-dgram", "click"], + install_requires=["click"], python_requires=">=3.7", )