Skip to content

Commit

Permalink
replace paho-mqtt with its async wrapper aiomqtt (to prepare for upgr…
Browse files Browse the repository at this point in the history
…ading PySwitchbot)
  • Loading branch information
fphammerle committed Nov 4, 2023
1 parent 9952b2b commit 4208221
Show file tree
Hide file tree
Showing 13 changed files with 747 additions and 537 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- declare compatibility with `python3.11`

### Changed
- replaced [paho-mqtt](https://github.com/eclipse/paho.mqtt.python)
with its async wrapper [aiomqtt](https://github.com/sbtinstruments/aiomqtt)

### Removed
- compatibility with `python3.7`

Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ black = "*"
mypy = "*"
pylint = "*"
pytest = "*"
pytest-asyncio = "*"
pytest-cov = "*"

# python3.10 compatibility
Expand Down
19 changes: 18 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@
],
entry_points={"console_scripts": ["switchbot-mqtt = switchbot_mqtt._cli:_main"]},
# >=3.6 variable type hints, f-strings, typing.Collection & * to force keyword-only arguments
# >=3.7 postponed evaluation of type annotations (PEP563) & dataclass
python_requires=">=3.8", # python<3.8 untested
# >=3.7 postponed evaluation of type annotations (PEP563) & asyncio.run
# >=3.8 unittest.mock.AsyncMock
python_requires=">=3.8",
install_requires=[
# >=1.3.0 for btle.BTLEManagementError (could be replaced with BTLEException)
# >=0.1.0 for btle.helperExe
Expand All @@ -83,7 +84,7 @@
# >=0.10.0 for SwitchbotCurtain.{update,get_position}
# >=0.9.0 for SwitchbotCurtain.set_position
"PySwitchbot>=0.10.0,<0.13",
"paho-mqtt<2",
"aiomqtt<2",
],
setup_requires=["setuptools_scm"],
tests_require=["pytest"],
Expand Down
130 changes: 80 additions & 50 deletions switchbot_mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import logging
import socket
import ssl
import typing

import paho.mqtt.client
import aiomqtt

from switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor
from switchbot_mqtt._actors.base import _MQTTCallbackUserdata

_LOGGER = logging.getLogger(__name__)

Expand All @@ -34,34 +34,54 @@
_MQTT_LAST_WILL_PAYLOAD = "offline"


def _mqtt_on_connect(
mqtt_client: paho.mqtt.client.Client,
userdata: _MQTTCallbackUserdata,
flags: typing.Dict[str, int],
return_code: int,
async def _listen(
*,
mqtt_client: aiomqtt.Client,
topic_callbacks: typing.Iterable[typing.Tuple[str, typing.Callable]],
mqtt_topic_prefix: str,
retry_count: int,
device_passwords: typing.Dict[str, str],
fetch_device_info: bool,
) -> None:
# pylint: disable=unused-argument; callback
# https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
assert return_code == 0, return_code # connection accepted
mqtt_broker_host, mqtt_broker_port, *_ = mqtt_client.socket().getpeername()
# https://www.rfc-editor.org/rfc/rfc5952#section-6
_LOGGER.debug(
"connected to MQTT broker %s:%d",
f"[{mqtt_broker_host}]"
if mqtt_client.socket().family == socket.AF_INET6
else mqtt_broker_host,
mqtt_broker_port,
)
mqtt_client.publish(
topic=userdata.mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
payload=_MQTT_BIRTH_PAYLOAD,
retain=True,
)
_ButtonAutomator.mqtt_subscribe(mqtt_client=mqtt_client, settings=userdata)
_CurtainMotor.mqtt_subscribe(mqtt_client=mqtt_client, settings=userdata)
async with mqtt_client.messages() as messages:
await mqtt_client.publish(
topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
payload=_MQTT_BIRTH_PAYLOAD,
retain=True,
)
async for message in messages:
for topic, callback in topic_callbacks:
if message.topic.matches(topic):
await callback(
mqtt_client=mqtt_client,
message=message,
mqtt_topic_prefix=mqtt_topic_prefix,
retry_count=retry_count,
device_passwords=device_passwords,
fetch_device_info=fetch_device_info,
)


def _log_mqtt_connected(mqtt_client: aiomqtt.Client) -> None:
if _LOGGER.getEffectiveLevel() <= logging.DEBUG:
mqtt_socket = (
# aiomqtt neither exposes instance of paho.mqtt.client.Client nor socket publicly.
# level condition to avoid accessing protected `mqtt_client._client` in production.
# pylint: disable=protected-access
mqtt_client._client.socket()
)
(mqtt_broker_host, mqtt_broker_port, *_) = mqtt_socket.getpeername()
# https://github.com/sbtinstruments/aiomqtt/blob/v1.2.1/aiomqtt/client.py#L1089
_LOGGER.debug(
"connected to MQTT broker %s:%d",
f"[{mqtt_broker_host}]"
if mqtt_socket.family == socket.AF_INET6
else mqtt_broker_host,
mqtt_broker_port,
)


def _run( # pylint: disable=too-many-arguments
async def _run( # pylint: disable=too-many-arguments
*,
mqtt_host: str,
mqtt_port: int,
Expand All @@ -73,33 +93,43 @@ def _run( # pylint: disable=too-many-arguments
device_passwords: typing.Dict[str, str],
fetch_device_info: bool,
) -> None:
# https://pypi.org/project/paho-mqtt/
mqtt_client = paho.mqtt.client.Client(
userdata=_MQTTCallbackUserdata(
retry_count=retry_count,
device_passwords=device_passwords,
fetch_device_info=fetch_device_info,
mqtt_topic_prefix=mqtt_topic_prefix,
)
)
mqtt_client.on_connect = _mqtt_on_connect
_LOGGER.info(
"connecting to MQTT broker %s:%d (TLS %s)",
mqtt_host,
mqtt_port,
"disabled" if mqtt_disable_tls else "enabled",
)
if not mqtt_disable_tls:
mqtt_client.tls_set(ca_certs=None) # enable tls trusting default system certs
if mqtt_username:
mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
elif mqtt_password:
if mqtt_password is not None and mqtt_username is None:
raise ValueError("Missing MQTT username")
mqtt_client.will_set(
topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
payload=_MQTT_LAST_WILL_PAYLOAD,
retain=True,
)
mqtt_client.connect(host=mqtt_host, port=mqtt_port)
# https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py#L1740
mqtt_client.loop_forever()
async with aiomqtt.Client( # raises aiomqtt.MqttError
hostname=mqtt_host,
port=mqtt_port,
# > The settings [...] usually represent a higher security level than
# > when calling the SSLContext constructor directly.
# https://web.archive.org/web/20230714183106/https://docs.python.org/3/library/ssl.html
tls_context=None if mqtt_disable_tls else ssl.create_default_context(),
username=None if mqtt_username is None else mqtt_username,
password=None if mqtt_password is None else mqtt_password,
will=aiomqtt.Will(
topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
payload=_MQTT_LAST_WILL_PAYLOAD,
retain=True,
),
) as mqtt_client:
_log_mqtt_connected(mqtt_client=mqtt_client)
topic_callbacks: typing.List[typing.Tuple[str, typing.Callable]] = []
for actor_class in (_ButtonAutomator, _CurtainMotor):
async for topic, callback in actor_class.mqtt_subscribe(
mqtt_client=mqtt_client,
mqtt_topic_prefix=mqtt_topic_prefix,
fetch_device_info=fetch_device_info,
):
topic_callbacks.append((topic, callback))
await _listen(
mqtt_client=mqtt_client,
topic_callbacks=topic_callbacks,
mqtt_topic_prefix=mqtt_topic_prefix,
retry_count=retry_count,
device_passwords=device_passwords,
fetch_device_info=fetch_device_info,
)
Loading

0 comments on commit 4208221

Please sign in to comment.