Skip to content

Commit

Permalink
Merge pull request #99 from bdraco/protocol
Browse files Browse the repository at this point in the history
Use asyncio.Protocol/asyncio.DatagramTransport directly
  • Loading branch information
sbidy authored Feb 6, 2022
2 parents 6162384 + 430cb79 commit 60a59f2
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 66 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ The discovery works with an UDP Broadcast request and collects all bulbs in the

## Async I/O

For async I/O this component uses https://github.com/jsbronder/asyncio-dgram, which internally uses asyncio DatagramTransport, which allows completely non-blocking UDP transport
For async I/O this component uses python's built-in asyncio DatagramTransport, which allows completely non-blocking UDP transport

## Classes

Expand Down
144 changes: 82 additions & 62 deletions pywizlight/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import json
import logging
import socket
from time import time
from typing import Any, Dict, Tuple, Optional, Union, List

from asyncio_dgram.aio import connect as connect_dgram, DatagramStream
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

from pywizlight.bulblibrary import BulbClass, BulbType
from pywizlight.exceptions import (
Expand Down Expand Up @@ -263,10 +260,26 @@ def get_colortemp(self) -> Optional[int]:
return None


async def receiveUDPwithTimeout(stream: DatagramStream, timeout: float) -> bytes:
"""Get message with timeout value."""
data, remote_addr = await asyncio.wait_for(stream.recv(), timeout)
return data
class WizProtocol(asyncio.DatagramProtocol):
def __init__(
self,
on_response: Callable[[bytes, Tuple[str, int]], None],
) -> None:
"""Init the discovery protocol."""
self.transport = None
self.on_response = on_response

def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
"""Trigger on_response."""
self.on_response(data, addr)

def error_received(self, ex: Optional[Exception]) -> None:
"""Handle error."""
_LOGGER.debug("WizProtocol error: %s", ex)

def connection_lost(self, ex: Optional[Exception]) -> None:
"""The connection is lost."""
_LOGGER.debug("WizProtocol connection lost: %s", ex)


class wizlight:
Expand All @@ -289,6 +302,11 @@ def __init__(
self.bulbtype: Optional[BulbType] = None
self.whiteRange: Optional[List[float]] = None
self.extwhiteRange: Optional[List[float]] = None
self.transport: Optional[asyncio.DatagramTransport] = None
self.protocol: Optional[WizProtocol] = None
self.lock = asyncio.Lock()
self.loop = asyncio.get_event_loop()
self.response_future: Optional[asyncio.Future] = None
# check the state on init
if connect_on_init:
self._check_connection()
Expand All @@ -302,6 +320,23 @@ def status(self) -> Optional[bool]:

# ------------------ Non properties -------------- #

async def _ensure_connection(self) -> None:
"""Ensure we are connected."""
if self.transport:
return
transport_proto = await self.loop.create_datagram_endpoint(
lambda: WizProtocol(on_response=self._on_response),
remote_addr=(self.ip, self.port),
)
self.transport = cast(asyncio.DatagramTransport, transport_proto[0])
self.protocol = cast(WizProtocol, transport_proto[1])

def _on_response(self, message: bytes, addr: Tuple[str, int]) -> None:
"""Handle a response from the device."""
_LOGGER.debug("%s: << %s", self.ip, message)
if self.response_future and not self.response_future.done():
self.response_future.set_result(message)

def _check_connection(self) -> None:
"""Check the connection to the bulb."""
message = r'{"method":"getPilot","params":{}}'
Expand Down Expand Up @@ -471,65 +506,50 @@ async def lightSwitch(self) -> None:

async def sendUDPMessage(self, message: str) -> BulbResponse:
"""Send the UDP message to the bulb."""
connid = hex(int(time() * 10000000))[2:]
# overall 10 sec. for timeout
await self._ensure_connection()
timeout = 10
data = message.encode("utf-8")
send_interval = 0.5
max_send_datagrams = int(timeout / send_interval)

stream = None

try:
_LOGGER.debug(
f"[wizlight {self.ip}, connid {connid}] connecting to UDP port "
f"with send_interval of {send_interval} sec.."
)
stream = await asyncio.wait_for(
connect_dgram((self.ip, self.port)), timeout
)
_LOGGER.debug(
f"[wizlight {self.ip}, connid {connid}] listening for response datagram"
)

receive_task = asyncio.create_task(receiveUDPwithTimeout(stream, timeout))

for i in range(max_send_datagrams):
assert self.transport is not None
async with self.lock:
self.response_future = asyncio.Future()
for send in range(max_send_datagrams):
attempt = send + 1
_LOGGER.debug(
f"[wizlight {self.ip}, connid {connid}] sending command datagram {i}: {message}"
)
asyncio.create_task(stream.send(bytes(message, "utf-8")))
done, pending = await asyncio.wait(
[receive_task], timeout=send_interval
"%s: >> %s (%s/%s)", self.ip, data, attempt, max_send_datagrams
)
if done:
self.transport.sendto(data, (self.ip, self.port))
try:
response = await asyncio.wait_for(
asyncio.shield(self.response_future), timeout=send_interval
)
except asyncio.TimeoutError:
_LOGGER.debug(
"%s: Timed out waiting for response to %s (%s/%s)",
self.ip,
message,
attempt,
max_send_datagrams,
)
if attempt == max_send_datagrams:
raise WizLightTimeOutError("The request to the bulb timed out")
else:
break

await receive_task
data = receive_task.result()
resp = json.loads(response.decode())
if "error" not in resp:
return resp
elif resp["error"]["code"] == -32601:
raise WizLightMethodNotFound("Method not found; maybe older bulb FW?")
else:
raise WizLightConnectionError(f'Error recieved: {resp["error"]}')

async def async_close(self):
"""Close the transport."""
self.transport.close()

except asyncio.TimeoutError:
_LOGGER.debug(
f"[wizlight {self.ip}, connid {connid}] Failed to do UDP call(s) to wiz light - Timeout Error!",
exc_info=False,
)
raise WizLightTimeOutError("The request to the bulb timed out")
finally:
if stream:
stream.close()
else:
raise WizLightConnectionError(
"Bulb is offline or IP address is not correct."
)
if data is not None and len(data) is not None:
resp = dict(json.loads(data.decode()))
if "error" not in resp:
_LOGGER.debug(
f"[wizlight {self.ip}, connid {connid}] response received: {resp}"
)
return resp
elif resp["error"]["code"] == -32601:
raise WizLightMethodNotFound(
"Cant found the methode. Maybe older bulb FW?"
)
# exception should be created
raise ValueError(f"Can't read response from the bulb. Debug: {data!r}")
def __del__(self):
"""Close the connection when the object is destroyed."""
if self.transport:
self.loop.call_soon_threadsafe(self.transport.close)
6 changes: 5 additions & 1 deletion pywizlight/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ class BroadcastProtocol(asyncio.DatagramProtocol):
"""Protocol that sends an UDP broadcast message for bulb discovery."""

def __init__(
self, loop: AbstractEventLoop, registry: BulbRegistry, broadcast_address: str, future: Future
self,
loop: AbstractEventLoop,
registry: BulbRegistry,
broadcast_address: str,
future: Future,
) -> None:
"""Init discovery function."""
self.loop = loop
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
asyncio-dgram
click
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ def get_version() -> str:
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
install_requires=["asyncio-dgram", "click"],
install_requires=["click"],
python_requires=">=3.7",
)

0 comments on commit 60a59f2

Please sign in to comment.