Skip to content

Commit

Permalink
Add missing calls for ble sync as well as hook up callbacks, remove d…
Browse files Browse the repository at this point in the history
…ead code and general clean up, added test call to verify things work
  • Loading branch information
mikey0000 committed Aug 17, 2024
1 parent 41a24cc commit 21011b6
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pymammotion/aliyun/dataclass/dev_by_account_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

@dataclass
class Device(DataClassORJSONMixin):
productModel: str
gmtModified: int
netType: str
nickName: str
Expand All @@ -26,6 +25,7 @@ class Device(DataClassORJSONMixin):
status: int
productImage: Optional[str] = None
categoryImage: Optional[str] = None
productModel: Optional[str] = None


@dataclass
Expand Down
4 changes: 2 additions & 2 deletions pymammotion/http/http.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Generic, Literal, TypeVar
from typing import Generic, Literal, TypeVar, Optional

from aiohttp import ClientSession
from mashumaro import DataClassDictMixin
Expand All @@ -25,7 +25,7 @@ class Response(DataClassDictMixin, Generic[DataT]):
class LoginResponseUserInformation(DataClassORJSONMixin):
areaCode: str
domainAbbreviation: str
email: str
email: Optional[str]
userId: str
userAccount: str
authType: str
Expand Down
73 changes: 56 additions & 17 deletions pymammotion/mammotion/devices/mammotion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from abc import abstractmethod
from enum import Enum
from typing import Any, cast
from typing import Any, cast, Optional, Callable
from uuid import UUID
import base64

Expand Down Expand Up @@ -730,25 +730,69 @@ def __init__(
) -> None:
"""Initialize MammotionBaseCloudDevice."""
super().__init__()
self._ble_sync_task = None
self.is_ready = False
self._mqtt_client = mqtt_client
self.iot_id = iot_id
self.nick_name = nick_name
self._command_futures = {}
self._commands: MammotionCommand = MammotionCommand(device_name)
self.currentID = ""
self.on_ready_callback: Optional[Callable[[], None]] = None
self._operation_lock = asyncio.Lock()

def _on_mqtt_message(self, topic: str, payload: str) -> None:
self._mqtt_client.on_connected = self.on_connected
self._mqtt_client.on_disconnected = self.on_disconnected
self._mqtt_client.on_message = self._on_mqtt_message
self._mqtt_client.on_ready = self.on_ready
if self._mqtt_client.is_connected:
self._ble_sync()
self.run_periodic_sync_task()

# temporary for testing only
# self._start_sync_task = self.loop.call_later(30, lambda: asyncio.ensure_future(self.start_sync(0)))


def on_ready(self):
"""Callback for when MQTT is subscribed to events."""
if self.on_ready_callback:
self.on_ready_callback()

def on_connected(self):
"""Callback for when MQTT connects."""
self._ble_sync()
self.run_periodic_sync_task()

def on_disconnected(self):
"""Callback for when MQTT disconnects."""
pass

def _ble_sync(self):
command_bytes = self._commands.send_todev_ble_sync(3)
self._mqtt_client.get_cloud_client().send_cloud_command(self.iot_id, command_bytes)

async def run_periodic_sync_task(self) -> None:
"""Send ble sync to robot."""
try:
self._ble_sync()
finally:
self.schedule_ble_sync()

def schedule_ble_sync(self):
"""Periodically sync to keep connection alive."""
if self._mqtt_client is not None and self._mqtt_client.is_connected:
self._ble_sync_task = self.loop.call_later(
160, lambda: asyncio.ensure_future(self.run_periodic_sync_task())
)

def _on_mqtt_message(self, topic: str, payload: str, iot_id: str) -> None:
"""Handle incoming MQTT messages."""
_LOGGER.debug("MQTT message received on topic %s: %s", topic, payload)

json_str = json.dumps(payload)
payload = json.loads(json_str)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._handle_mqtt_message(topic, payload))
loop.close()
self._handle_mqtt_message(topic, payload)

async def _send_command(self, key: str, retry: int | None = None) -> bytes | None:
"""Send command to device via MQTT and read response."""
Expand Down Expand Up @@ -848,29 +892,24 @@ def _extract_encoded_message(self, payload: dict) -> str:
_LOGGER.error("Error extracting encoded message. Payload: %s", payload)
return ""

async def _parse_mqtt_response(self, topic: str, payload: dict) -> None:
def _parse_mqtt_response(self, topic: str, payload: dict) -> None:
"""Parse the MQTT response."""
if topic.endswith("/app/down/thing/events"):
print (f"Thing event received")
_LOGGER.debug(f"Thing event received")
event = ThingEventMessage.from_dicts(payload)
params = event.params
if params.identifier == "device_protobuf_msg_event":
print (f"Protobuf reveice")
_LOGGER.debug(f"Protobuf event")
binary_data = base64.b64decode(params.value.get('content', ''))
self._update_raw_data(cast(bytes, binary_data))
new_msg = LubaMsg().parse(cast(bytes, binary_data))
if self._notify_future and not self._notify_future.done():
self._notify_future.set_result(new_msg)
await self._state_manager.notification(new_msg)
asyncio.run(self._state_manager.notification(new_msg))

async def _handle_mqtt_message(self, topic: str, payload: dict) -> None:
def _handle_mqtt_message(self, topic: str, payload: dict) -> None:
"""Async handler for incoming MQTT messages."""
await self._parse_mqtt_response(topic=topic, payload=payload)
# message_id = self._extract_message_id(payload)
# print (f"Received message id: {self.currentID}")
# message_id = self.currentID
# if message_id and message_id in self._command_futures:
# print (f"Start parsing response")
self._parse_mqtt_response(topic=topic, payload=payload)


async def _disconnect(self):
Expand Down
63 changes: 16 additions & 47 deletions pymammotion/mqtt/mammotion_mqtt.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""MammotionMQTT."""

import asyncio
import hashlib
import hmac
import json
import logging
from logging import getLogger
from typing import Callable, Optional, cast
from typing import Callable, Optional, cast, Awaitable

from linkkit.linkkit import LinkKit
from paho.mqtt.client import Client, MQTTMessage, MQTTv311, connack_string
Expand All @@ -22,7 +22,6 @@
class MammotionMQTT:
"""MQTT client for pymammotion."""

_cloud_client = None

def __init__(
self,
Expand All @@ -35,8 +34,10 @@ def __init__(
):
"""Create instance of MammotionMQTT."""
super().__init__()

self.is_connected = False
self.is_ready = False
self.on_connected: Optional[Callable[[], None]] = None
self.on_ready: Optional[Callable[[], None]] = None
self.on_error: Optional[Callable[[str], None]] = None
self.on_disconnected: Optional[Callable[[], None]] = None
self.on_message: Optional[Callable[[str, str, str], None]] = None
Expand Down Expand Up @@ -76,22 +77,6 @@ def __init__(
# self._mqtt_host = "public.itls.eu-central-1.aliyuncs.com"
self._mqtt_host = f"{self._product_key}.iot-as-mqtt.{region_id}.aliyuncs.com"

self._client = Client(
client_id=self._mqtt_client_id,
protocol=MQTTv311,
)
self._client.on_message = self._on_message
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.username_pw_set(self._mqtt_username, self._mqtt_password)
self._client.enable_logger(logger.getChild("paho"))

# region Connection handling
def connect(self):
"""Connect to MQTT Server."""
logger.info("Connecting...")
self._client.connect(host=self._mqtt_host)
self._client.loop_forever()

def connect_async(self):
"""Connect async to MQTT Server."""
Expand All @@ -106,8 +91,8 @@ def disconnect(self):
"""Disconnect from MQTT Server."""
logger.info("Disconnecting...")
self._linkkit_client.disconnect()
self._client.disconnect()
self._client.loop_stop()
# self._client.disconnect()
# self._client.loop_stop()

def _thing_on_thing_enable(self, user_data):
"""Is called when Thing is enabled."""
Expand Down Expand Up @@ -139,6 +124,10 @@ def _thing_on_thing_enable(self, user_data):
),
)


if self.on_ready:
self.is_ready = True
self.on_ready()
# self._linkkit_client.query_ota_firmware()
# command = MammotionCommand(device_name="Luba")
# self._cloud_client.send_cloud_command(command.get_report_cfg())
Expand All @@ -158,39 +147,19 @@ def _thing_on_topic_message(self, topic, payload, qos, user_data):

def _thing_on_connect(self, session_flag, rc, user_data):
"""Is called on thing connect."""
self.is_connected = True
if self.on_connected is not None:
self.on_connected()
logger.debug("on_connect, session_flag:%d, rc:%d", session_flag, rc)

# self._linkkit_client.subscribe_topic(f"/sys/{self._product_key}/{self._device_name}/#")

def _on_connect(self, _client, _userdata, _flags: dict, rc: int):
"""Is called when on connect."""
if rc == 0:
logger.debug("Connected")
self._client.subscribe(f"/sys/{self._product_key}/{self._device_name}/#")
self._client.subscribe(f"/sys/{self._product_key}/{self._device_name}/app/down/account/bind_reply")

self._client.publish(
f"/sys/{self._product_key}/{self._device_name}/app/up/account/bind",
json.dumps(
{
"id": "msgid1",
"version": "1.0",
"request": {"clientId": self._mqtt_username},
"params": {"iotToken": self._iot_token},
}
),
)

if self.on_connected:
self.on_connected()
else:
logger.error("Could not connect %s", connack_string(rc))
if self.on_error:
self.on_error(connack_string(rc))

def _on_disconnect(self, _client, _userdata, rc: int):
"""Is called on disconnect."""
logger.info("Disconnected")
self.is_connected = False
self.is_ready = False
logger.debug(rc)
if self.on_disconnected:
self.on_disconnected()
Expand Down
12 changes: 2 additions & 10 deletions tests/login_and_mqtt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def sync_status_and_map(cloud_device: MammotionBaseCloudDevice):
await asyncio.sleep(1)
await cloud_device.start_sync(0)
await asyncio.sleep(2)
await cloud_device.start_map_sync()
# await cloud_device.start_map_sync()

while(True):
print(cloud_device.luba_msg)
Expand All @@ -52,15 +52,13 @@ async def sync_status_and_map(cloud_device: MammotionBaseCloudDevice):
asyncio.set_event_loop(event_loop)
cloud_client = event_loop.run_until_complete(run())



_mammotion_mqtt = MammotionMQTT(region_id=cloud_client._region.data.regionId,
product_key=cloud_client._aep_response.data.productKey,
device_name=cloud_client._aep_response.data.deviceName,
device_secret=cloud_client._aep_response.data.deviceSecret, iot_token=cloud_client._session_by_authcode_response.data.iotToken, client_id=cloud_client._client_id)


_mammotion_mqtt._cloud_client = cloud_client
#mammotion.connect() blocks further calls
_mammotion_mqtt.connect_async()

_devices_list = []
Expand All @@ -74,11 +72,5 @@ async def sync_status_and_map(cloud_device: MammotionBaseCloudDevice):
)
_devices_list.append(dev)

#Assign callback based on iotId
_mammotion_mqtt.on_message = lambda topic, payload, iot_id: [
device._on_mqtt_message(topic, payload) for device in _devices_list if device.iot_id == iot_id
]

sync = event_loop.run_until_complete(sync_status_and_map(_devices_list[0]))

event_loop.run_forever()

0 comments on commit 21011b6

Please sign in to comment.