Skip to content

Commit

Permalink
Switched back to mixed async-threading approach for mqtt client library
Browse files Browse the repository at this point in the history
Added new header attributes to messages
Fixed bug affecting sub-device message dispatching
  • Loading branch information
albertogeniola committed Jan 17, 2022
1 parent e0c78fa commit 0c76f41
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 85 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ Anyway, feel free to contribute via donations!

## Changelog
#### 0.4.4.3
- Improved async_loop to avoid flooding when IDLE.
- Improved mqtt client and async: switched back to mixed async/threading approach.
- Added triggerSrc and uuid attributes to header
- Fixed a bug affecting message dispatching to sub-devices

<details>
<summary>Older</summary>
Expand Down
4 changes: 3 additions & 1 deletion docs/meross-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ plug, in sequence.
"timestamp": {{TIMESTAMP}},
"sign": "{{SIGNATURE}}",
"method": "SET",
"namespace": "Appliance.Config.Key"
"namespace": "Appliance.Config.Key",
"triggerSrc": "Android",
"uuid": "{{TARGET_DEVICE_UUID}}"
},
"payload": {
"key": {
Expand Down
3 changes: 2 additions & 1 deletion meross_iot/controller/mixins/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ async def async_update(self, timeout: Optional[float] = None, *args, **kwargs) -
if target_device is None:
_LOGGER.warning(f"Received data for subdevice {target_device}, which has not been registered with this"
f"hub yet. This update will be ignored.")
await target_device.async_handle_subdevice_notification(namespace=Namespace.HUB_MTS100_ALL, data=d)
else:
await target_device.async_handle_subdevice_notification(namespace=Namespace.HUB_MTS100_ALL, data=d)
except Exception as e:
_LOGGER.exception("Error occurred during subdevice update")

Expand Down
104 changes: 27 additions & 77 deletions meross_iot/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
build_meross_device_from_known_types,
)
from meross_iot.http_api import MerossHttpClient
from meross_iot.model.constants import DEFAULT_MQTT_PORT
from meross_iot.model.constants import DEFAULT_MQTT_PORT, DEFAULT_COMMAND_TIMEOUT
from meross_iot.model.enums import Namespace, OnlineStatus
from meross_iot.model.exception import (
CommandTimeoutError,
Expand Down Expand Up @@ -70,8 +70,7 @@ class MqttConnectionStatus(Enum):
class MerossManager(object):
"""
This class implements a full-features Meross Client, which provides device discovery and registry.
*Note*: The manager must be initialized before invoking any of its discovery/registry methods. As soon as
you create a manager, you shoul call :meth:`async_init`!
*Note*: The manager must be initialized before invoking any of its discovery/registry methods.
"""

def __init__(
Expand Down Expand Up @@ -99,8 +98,6 @@ def __init__(
"""

# Store local attributes
self.__started = False
self.__stop_requested = False
self._http_client = http_client
self._cloud_creds = self._http_client.cloud_credentials
self._auto_reconnect = auto_reconnect
Expand All @@ -111,7 +108,6 @@ def __init__(
self._push_coros = []
self._mqtt_skip_validation = mqtt_skip_cert_validation
self._mqtt_clients = {}
self._mqtt_clients_status = {}
self._mqtt_connected_and_subscribed = {}

# Default proxy setup
Expand All @@ -131,8 +127,6 @@ def __init__(
)
self._user_topic = build_client_user_topic(user_id=self._cloud_creds.user_id)

# Keep track of re-connection errors into a dictionary.
self._mqtt_connection_errors = {}

def _get_client_from_domain_port(self, client: mqtt.Client) -> Tuple[Optional[str], Optional[int]]:
for k, v in self._mqtt_clients.items():
Expand All @@ -158,16 +152,17 @@ async def _async_get_create_mqtt_client(self, domain: str, port: int) -> mqtt.Cl
_LOGGER.info("Proxy configuration set for newly created client")
client.proxy_set(proxy_type=self._proxy_type, proxy_addr=self._proxy_addr, proxy_port=self._proxy_port)
self._mqtt_clients[dict_key] = client

# Init the client
if not client.is_connected():
conn_evt = self._mqtt_connected_and_subscribed.get(dict_key) # type: asyncio.Event
if conn_evt is None:
conn_evt = asyncio.Event()
client.connect(host=domain, port=port, keepalive=30)
self._mqtt_connected_and_subscribed[dict_key] = conn_evt

# Start the client looper
client.loop_start()
# Wait for the client to connect
await conn_evt.wait()

return client

def _new_mqtt_client(self) -> mqtt.Client:
Expand Down Expand Up @@ -229,16 +224,14 @@ def unregister_push_notification_handler_coroutine(
)

def close(self):
if self.__stop_requested:
_LOGGER.warning("Manager already stopping/stopped")
return

_LOGGER.info("Manager stop requested.")
_LOGGER.debug("Canceling pending futures...")
for f in _PENDING_FUTURES:
if not f.cancelled():
f.cancel()
self.__stop_requested = True
# Disconnect from all mqtt clients
for client in self._mqtt_clients.values():
client.disconnect()

def find_devices(
self,
Expand Down Expand Up @@ -284,44 +277,6 @@ def find_devices(
online_status=online_status,
)

async def _async_loop(self) -> None:
"""
Looper logic
"""
_LOGGER.info("Starting Manager Looper")
while not self.__stop_requested:
processed_loops = 0
try:
# Loop over a copy of the dictionary to prevent dictionary changes while running
for key, client in {k: v for k, v in self._mqtt_clients.items() if v}.items():
# Process 100ms every loop
await self._loop.run_in_executor(None, client.loop, 0.1)
processed_loops += 1

# Handle reconnection for this client, if needed
client_status = self._mqtt_clients_status.get(client)
if client_status == MqttConnectionStatus.DISCONNECTED and self._auto_reconnect:
# handle reconnection autonomously on connection drops.
_LOGGER.info("Reconnecting (auto_reconnect flag has been set)")
try:
self._mqtt_clients_status[client] = MqttConnectionStatus.CONNECTING
client.reconnect()
except Exception as e:
_LOGGER.exception("Error occurred while (re)connecting to mqtt server %s", key)
self._mqtt_clients_status[client] = MqttConnectionStatus.DISCONNECTED
self._mqtt_connection_errors[client] = self._mqtt_connection_errors.get(client, 0) + 1

# Wait some time before retrying.
time_to_wait = max(pow(2, self._mqtt_connection_errors[client]), 30.0)
await asyncio.sleep(time_to_wait)
except Exception as e:
_LOGGER.exception("Error occurred while executing looper")
finally:
# In case there is nothing to do, wait a bit
if processed_loops < 1:
await asyncio.sleep(.1)
_LOGGER.debug("Stop flag raised, ending loop")

async def async_device_discovery(
self,
update_subdevice_status: bool = True,
Expand Down Expand Up @@ -443,18 +398,10 @@ async def _async_enroll_new_http_subdev(

async def async_init(self):
"""
Starts the looper logic
@deprecated
Ignored, signature left for backward compatibility
"""
if self.__started:
raise RuntimeError("Async started was already called")
if self.__stop_requested:
raise RuntimeError("This managed has been marked for stop and is still stopping")

def reset_started_flag(_fut):
self.__started = False

_LOGGER.debug("Starting looper task")
self._mqtt_looper_task = self._loop.create_task(self._async_loop()).add_done_callback(reset_started_flag)
pass

async def _async_enroll_new_http_dev(
self, device_info: HttpDeviceInfo
Expand Down Expand Up @@ -507,8 +454,6 @@ async def _async_enroll_new_http_dev(
def _on_connect(self, client: mqtt.Client, userdata, rc, other):
# NOTE! This method is called by the paho-mqtt thread, thus any invocation to the
# asyncio platform must be scheduled via `self._loop.call_soon_threadsafe()` method.
self._mqtt_clients_status[client] = MqttConnectionStatus.CONNECTED

topics = [(self._user_topic, 1), (self._client_response_topic, 1)]

_LOGGER.debug(f"Connected with result code {rc}")
Expand All @@ -519,21 +464,19 @@ def _on_connect(self, client: mqtt.Client, userdata, rc, other):
if result != mqtt.MQTT_ERR_SUCCESS:
_LOGGER.error("Failed to subscribe to topics %s", str(topics))

# Reset reconnection errors
self._mqtt_connection_errors[client] = 0

def _on_disconnect(self, client: mqtt.Client, userdata, rc):
# NOTE! This method is called by the paho-mqtt thread, thus any invocation to the
# asyncio platform must be scheduled via `self._loop.call_soon_threadsafe()` method.
self._mqtt_clients_status[client] = MqttConnectionStatus.DISCONNECTED

_LOGGER.info("Disconnection detected. Reason: %s" % str(rc))

# When a disconnection occurs, we need to set "unavailable" status.
asyncio.run_coroutine_threadsafe(
self._notify_connection_drop(), loop=self._loop
)

conn_evt = self._mqtt_connected_and_subscribed.get(userdata) # type: asyncio.Event
conn_evt.clear()

def _on_unsubscribe(self):
# NOTE! This method is called by the paho-mqtt thread, thus any invocation to the
# asyncio platform must be scheduled via `self._loop.call_soon_threadsafe()` method.
Expand Down Expand Up @@ -804,7 +747,7 @@ async def async_execute_cmd(
method: str,
namespace: Namespace,
payload: dict,
timeout: float = 10.0,
timeout: float = DEFAULT_COMMAND_TIMEOUT,
):
"""
This method sends a command to the MQTT Meross broker.
Expand Down Expand Up @@ -836,7 +779,7 @@ async def async_execute_cmd_client(self,
timeout: float = 10.0):
# Send the message over the network
# Build the mqtt message we will send to the broker
message, message_id = self._build_mqtt_message(method, namespace, payload)
message, message_id = self._build_mqtt_message(method, namespace, payload, destination_device_uuid)

# Create a future and perform the send/waiting to a task
fut = self._loop.create_future()
Expand All @@ -853,6 +796,9 @@ async def async_execute_cmd_client(self,
async def _async_send_and_wait_ack(
self, client: mqtt.Client, future: Future, target_device_uuid: str, message: bytes, timeout: float,
):
if not client.is_connected():
raise Exception("MQTT client not connected.")

client.publish(
topic=build_device_request_topic(target_device_uuid), payload=message
)
Expand All @@ -871,13 +817,14 @@ async def _notify_connection_drop(self):
pushn = OnlinePushNotification(originating_device_uuid=d.uuid, raw_data={'online': {'status': -1}})
await self._handle_and_dispatch_push_notification(pushn)

def _build_mqtt_message(self, method: str, namespace: Namespace, payload: dict):
def _build_mqtt_message(self, method: str, namespace: Namespace, payload: dict, destination_device_uuid: str):
"""
Sends a message to the Meross MQTT broker, respecting the protocol payload.
:param method:
:param namespace:
:param payload:
:param destination_device_uuid:
:return:
"""
Expand Down Expand Up @@ -908,11 +855,14 @@ def _build_mqtt_message(self, method: str, namespace: Namespace, payload: dict):
"namespace": namespace.value, # Example: "Appliance.System.All",
"payloadVersion": 1,
"sign": signature, # Example: "b4236ac6fb399e70c3d61e98fcb68b74",
"timestamp": timestamp
"timestamp": timestamp,
"triggerSrc": "Android",
"uuid": destination_device_uuid
},
"payload": payload,
}
strdata = json.dumps(data)

strdata = json.dumps(data,separators=(',', ':'))
return strdata.encode("utf-8"), messageId

def set_proxy(self, proxy_type, proxy_addr, proxy_port):
Expand Down
3 changes: 0 additions & 3 deletions meross_iot/model/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
LONG_TIMEOUT = 30.0 # For wifi scan
SHORT_TIMEOUT = 10.0 # For any other command

DEFAULT_MEROSS_HTTP_API = "https://iot.meross.com"
DEFAULT_MQTT_HOST = "mqtt.meross.com"
DEFAULT_MQTT_PORT = 443
Expand Down
4 changes: 2 additions & 2 deletions tests/test_garage.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ async def test_open_close(self):
await garage.async_close()
else:
await garage.async_open()
await asyncio.sleep(30)
await asyncio.sleep(40)
self.assertEqual(garage.get_is_open(), not is_open)

is_open = garage.get_is_open()
if is_open:
await garage.async_close()
else:
await garage.async_open()
await asyncio.sleep(30)
await asyncio.sleep(40)
self.assertEqual(garage.get_is_open(), not is_open)

async def tearDownAsync(self):
Expand Down

0 comments on commit 0c76f41

Please sign in to comment.