Skip to content

Commit

Permalink
- remove blocking imports
Browse files Browse the repository at this point in the history
- refactor mqtt client
  • Loading branch information
tolwi committed Sep 5, 2024
1 parent 8f05d35 commit 8d161da
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 37 deletions.
2 changes: 1 addition & 1 deletion custom_components/ecoflow_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
device_option.power_step)
device.configure(hass, device_option.refresh_period, device_option.diagnostic_mode)

api_client.start()
await hass.async_add_executor_job(api_client.start)
hass.data[ECOFLOW_DOMAIN][entry.entry_id] = api_client
await hass.config_entries.async_forward_entry_setups(entry, _PLATFORMS)
await api_client.quota_all(None)
Expand Down
87 changes: 53 additions & 34 deletions custom_components/ecoflow_cloud/api/ecoflow_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import random
import ssl
import time
from _socket import SocketType
from typing import Any

from homeassistant.core import callback

from custom_components.ecoflow_cloud.api import EcoflowMqttInfo
from custom_components.ecoflow_cloud.devices import BaseDevice
import paho.mqtt.client as mqtt_client

_LOGGER = logging.getLogger(__name__)

Expand All @@ -17,26 +19,33 @@ class EcoflowMQTTClient:
def __init__(self, mqtt_info: EcoflowMqttInfo, devices: dict[str, BaseDevice]):

from ..devices import BaseDevice

self.connected = False
self.__mqtt_info = mqtt_info
self.__devices: dict[str, BaseDevice] = devices
self.__client: mqtt_client.Client = mqtt_client.Client(client_id=self.__mqtt_info.client_id, reconnect_on_failure=True)

from homeassistant.components.mqtt.async_client import AsyncMQTTClient
self.__client: AsyncMQTTClient = AsyncMQTTClient(
client_id=self.__mqtt_info.client_id,
reconnect_on_failure=True,
clean_session=True)

# self.__client._connect_timeout = 15.0
self.__client.setup()
self.__client.username_pw_set(self.__mqtt_info.username, self.__mqtt_info.password)
self.__client.tls_set(certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED)
self.__client.tls_insecure_set(False)
self.__client.on_connect = self.on_connect
self.__client.on_disconnect = self.on_disconnect
self.__client.on_message = self.on_message
# self.__client.on_socket_close = self.on_socket_close
self.__client.on_connect = self._on_connect
self.__client.on_disconnect = self._on_disconnect
self.__client.on_message = self._on_message
self.__client.on_socket_close = self._on_socket_close

_LOGGER.info(
f"Connecting to MQTT Broker {self.__mqtt_info.url}:{self.__mqtt_info.port} with client id {self.__mqtt_info.client_id} and username {self.__mqtt_info.username}")
self.__client.connect_async(self.__mqtt_info.url, self.__mqtt_info.port)
self.__client.connect(self.__mqtt_info.url, self.__mqtt_info.port, keepalive=15)
self.__client.loop_start()

def is_connected(self):
return self.connected
return self.__client.is_connected()

def reconnect(self) -> bool:
try:
Expand All @@ -49,22 +58,22 @@ def reconnect(self) -> bool:
_LOGGER.error(e)
return False

def on_connect(self, client, userdata, flags, rc):
@callback
def _on_socket_close(self, client, userdata: Any, sock: SocketType) -> None:
_LOGGER.error(f"Unexpected MQTT Socket disconnection : {str(sock)}")

@callback
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
for (sn, device) in self.__devices.items():
_LOGGER.debug(f"Add Topics for {sn}")
for topic in device.device_info.topics():
self.__client.subscribe(topic, 1)
_LOGGER.info(f"Subscribed to MQTT topics {topic}")
target_topics = [(topic, 1) for topic in self.__target_topics()]
self.__client.subscribe(target_topics)
_LOGGER.info(f"Subscribed to MQTT topics {target_topics}")
else:
self.__log_with_reason("connect", client, userdata, rc)

#
# def on_socket_close(self, client, userdata, socket):
# _LOGGER.error(f"Unexpected MQTT Socket disconnection : {str(socket)}")

def on_disconnect(self, client, userdata, rc):
@callback
def _on_disconnect(self, client, userdata, rc):
if not self.connected:
# from homeassistant/components/mqtt/client.py
# This function is re-entrant and may be called multiple times
Expand All @@ -73,17 +82,33 @@ def on_disconnect(self, client, userdata, rc):
self.connected = False
if rc != 0:
self.__log_with_reason("disconnect", client, userdata, rc)
time.sleep(15)
time.sleep(5)

def on_message(self, client, userdata, message):
@callback
def _on_message(self, client, userdata, message):
try:
for (sn, device) in self.__devices.items():
if device.update_data(message.payload, message.topic):
_LOGGER.debug(f"Message for {sn} and Topic {message.topic}")
except UnicodeDecodeError as error:
_LOGGER.error(f"UnicodeDecodeError: {error}. Ignoring message and waiting for the next one.")

def send_get_message(self, device_sn: str, command: dict):
payload = self.__prepare_payload(command)
self.__send(self.__devices[device_sn].device_info.get_topic, json.dumps(payload))

def send_set_message(self, device_sn: str, mqtt_state: dict[str, Any], command: dict):
self.__devices[device_sn].data.update_to_target_state(mqtt_state)
payload = self.__prepare_payload(command)
self.__send(self.__devices[device_sn].device_info.set_topic, json.dumps(payload))

def stop(self):
self.__client.unsubscribe(self.__target_topics())
self.__client.loop_stop()
self.__client.disconnect()

def __log_with_reason(self, action: str, client, userdata, rc):
import paho.mqtt.client as mqtt_client
_LOGGER.error(f"MQTT {action}: {mqtt_client.error_string(rc)} ({self.__mqtt_info.client_id}) - {userdata}")

message_id = 999900000 + random.randint(10000, 99999)
Expand All @@ -105,15 +130,9 @@ def __send(self, topic: str, message: str):
except Exception as error:
_LOGGER.debug(error, "Error on topic " + topic + " and message " + message)

def send_get_message(self, device_sn: str, command: dict):
payload = self.__prepare_payload(command)
self.__send(self.__devices[device_sn].device_info.get_topic, json.dumps(payload))

def send_set_message(self, device_sn: str, mqtt_state: dict[str, Any], command: dict):
self.__devices[device_sn].data.update_to_target_state(mqtt_state)
payload = self.__prepare_payload(command)
self.__send(self.__devices[device_sn].device_info.set_topic, json.dumps(payload))

def stop(self):
self.__client.loop_stop()
self.__client.disconnect()
def __target_topics(self) -> list[str]:
topics = []
for (sn, device) in self.__devices.items():
for topic in device.device_info.topics():
topics.append(topic)
return topics
2 changes: 1 addition & 1 deletion custom_components/ecoflow_cloud/api/public_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def login(self):
_LOGGER.info(f"Requesting IoT MQTT credentials")
response = await self.call_api("/certification")
self._accept_mqqt_certification(response)
self.mqtt_info.client_id = f"HomeAssistant_{self.group}_{datetime.strftime(dt.now(), '%Y%m%d')}"
self.mqtt_info.client_id = f"Hassio-{self.group.replace(' ', '-')}-{datetime.strftime(dt.now(), '%Y%m%d')}"

async def fetch_all_available_devices(self) -> list[EcoflowDeviceInfo]:
_LOGGER.info(f"Requesting all devices")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
DecivoltSensorEntity, InWattsSolarSensorEntity, LevelSensorEntity,
MiscSensorEntity, RemainSensorEntity, StatusSensorEntity, ReconnectStatusSensorEntity,
)
from .proto import ecopacket_pb2 as ecopacket, powerstream_pb2 as powerstream
from ...api import EcoflowApiClient

# from ..number import MinBatteryLevelEntity, MaxBatteryLevelEntity
Expand Down Expand Up @@ -112,6 +111,7 @@ def selects(self, client: EcoflowApiClient) -> list[BaseSelectEntity]:

def _prepare_data(self, raw_data) -> dict[str, any]:
raw = {"params": {}}
from .proto import ecopacket_pb2 as ecopacket, powerstream_pb2 as powerstream
try:
payload =raw_data

Expand Down

0 comments on commit 8d161da

Please sign in to comment.