diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml new file mode 100644 index 0000000..4e8a8dd --- /dev/null +++ b/.github/workflows/validate.yml @@ -0,0 +1,22 @@ +name: Validate + +on: + push: + branches: + - main + pull_request: + branches: + - main + schedule: + - cron: "0 0 * * *" + workflow_dispatch: + +jobs: + validate-hacs: + runs-on: "ubuntu-latest" + steps: + - uses: "actions/checkout@v3" + - name: HACS validation + uses: "hacs/action@main" + with: + category: "integration" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9bef590 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# artifacts +__pycache__ +.pytest* +*.egg-info +*/build/* +*/dist/* + + +# misc +.coverage +.vscode +venv/ +__pycache__/ +.env +.venv +.ipynb_checkpoints/ +.DS_Store +*.py[cod] +coverage.xml + + +# Home Assistant configuration +config/* +!config/configuration.yaml diff --git a/README.md b/README.md new file mode 100644 index 0000000..b2c0135 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +## RS-485 + +透過 rs-485 to tcp 裝置控制特定的 rs-485 裝置 + +目前僅支援: +- rs-485 開關 LP-F8 +- 杜亞窗簾電機 CMD82-5S + +使用方式: +- 輸入 rs-485 to tcp 的 ip 位置與 port +- 選擇要加入的裝置類型 + - 繼電器開關 + - 裝置名稱 + - 從機位置(10進位) + - 按鍵數量 + - 是否包含繼電器(強電) + - 窗簾電機 + - 裝置名稱 + - 從機位置(10進位) + 預設位置為 0x12 0x34 所以要輸入 4660 + +目前功能只開發必要功能,還有很多功能尚未實現 diff --git a/custom_components/rs485_device/__init__.py b/custom_components/rs485_device/__init__.py new file mode 100644 index 0000000..e02f73b --- /dev/null +++ b/custom_components/rs485_device/__init__.py @@ -0,0 +1,86 @@ +"""The RS-485 Device integration.""" + +from __future__ import annotations + +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import ( + CONF_COVERS, + CONF_DEVICE, + CONF_HOST, + CONF_NAME, + CONF_PORT, + CONF_SENSORS, + CONF_SLAVE, + CONF_STATE, + CONF_SWITCHES, + Platform, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers import device_registry as dr + +from .const import CURTAIN_MODEL, DOMAIN, SENSOR_MODEL, SWITCH_MODEL +from .rs485_tcp_publisher import RS485TcpPublisher + +PLATFORMS: dict[str, list[Platform]] = { + CONF_SWITCHES: [Platform.SWITCH], + CONF_COVERS: [Platform.COVER], + CONF_SENSORS: [Platform.SENSOR], +} + + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """獲取裝置註冊表。.""" + + # 從 entry.data 中獲取所配置的裝置類型 + device_type = entry.data[CONF_DEVICE] + + device_registry = dr.async_get(hass) + + _model = None + _domain_data = {"watchdog_task": None} + if device_type == CONF_SWITCHES: + _model = SWITCH_MODEL + _domain_data.update( + { + CONF_STATE: None, + CONF_SWITCHES: None, + } + ) + elif device_type == CONF_COVERS: + _model = CURTAIN_MODEL + elif device_type == CONF_SENSORS: + _model = SENSOR_MODEL[0] + + # 在裝置註冊表中創建一個新的裝置 + device = device_registry.async_get_or_create( + config_entry_id=entry.entry_id, + identifiers={(DOMAIN, entry.data[CONF_SLAVE])}, + name=entry.data[CONF_NAME], + model=_model, + ) + + hass.data.setdefault( + DOMAIN, + { + "rs485_tcp_publisher": RS485TcpPublisher( + host=entry.data[CONF_HOST], port=entry.data[CONF_PORT], byte_length=12 + ) + }, + ) + hass.data[DOMAIN][entry.entry_id] = {CONF_DEVICE: device, **_domain_data} + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS[device_type]) + + return True + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a config entry.""" + device_type = entry.data[CONF_DEVICE] + + if unload_ok := await hass.config_entries.async_unload_platforms( + entry, PLATFORMS[device_type] + ): + hass.data[DOMAIN].pop(entry.entry_id) + + return unload_ok diff --git a/custom_components/rs485_device/config_flow.py b/custom_components/rs485_device/config_flow.py new file mode 100644 index 0000000..29733c1 --- /dev/null +++ b/custom_components/rs485_device/config_flow.py @@ -0,0 +1,105 @@ +"""Config flow for RS-485 Device integration.""" + +from __future__ import annotations + +import logging +from typing import Any, Optional + +import voluptuous as vol + +from homeassistant.config_entries import ConfigFlow, ConfigFlowResult +from homeassistant.const import ( + CONF_COUNT, + CONF_DEVICE, + CONF_ENTITY_ID, + CONF_HOST, + CONF_NAME, + CONF_PORT, + CONF_SLAVE, + CONF_SWITCHES, +) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entity import async_generate_entity_id + +from .const import DEFAULT_NAME, DEVICE_TYPE, DOMAIN, HAS_RELAY, KEY_COUNT + +_LOGGER = logging.getLogger(__name__) + +STEP_USER_DATA_SCHEMA = vol.Schema( + { + vol.Required(CONF_HOST, default="10.0.4.101"): cv.string, + vol.Required(CONF_PORT, default=4196): int, + vol.Required(CONF_DEVICE, default=CONF_SWITCHES): vol.In(DEVICE_TYPE), + } +) + +STEP_SWITCH_CONFIG_SCHEMA = { + vol.Required(CONF_SLAVE, default=1): cv.positive_int, + vol.Required(CONF_COUNT, default=1): vol.In(KEY_COUNT), + vol.Required(HAS_RELAY, default=True): cv.boolean, +} + + +class RS485DeviceConfigFlow(ConfigFlow, domain=DOMAIN): + """Handle a config flow for RS-485 Device.""" + + VERSION = 1 + + def __init__(self): + """Initialize the config flow.""" + self.rs485_config = {} + + async def async_step_user( + self, user_input: Optional[dict[str, Any]] | None = None + ) -> ConfigFlowResult: + """Handle the user step of the config flow.""" + if user_input is not None: + self.rs485_config.update(user_input) + return await self.async_step_device_config() + + return self.async_show_form(step_id="user", data_schema=STEP_USER_DATA_SCHEMA) + + async def async_step_device_config( + self, user_input: Optional[dict[str, Any]] | None = None + ) -> ConfigFlowResult: + """Handle the initial step.""" + + errors: dict[str, str] = {} + if user_input is not None: + self.rs485_config.update(user_input) + try: + self.rs485_config[CONF_ENTITY_ID] = async_generate_entity_id( + DOMAIN + ".{}", + f"{self.rs485_config[CONF_DEVICE]}.{self.rs485_config[CONF_NAME]}.{self.rs485_config[CONF_SLAVE]}", + hass=self.hass, + ) + return self.async_create_entry( + title=self.rs485_config[CONF_NAME], + data=self.rs485_config, + ) + except ValueError as e: + _LOGGER.exception("Error generating entity ID: %s", e) + errors["base"] = "entity_id_error" + except KeyError as e: + _LOGGER.exception("Missing required input: %s", e) + errors["base"] = "missing_input" + except Exception as e: # pylint: disable=broad-except + _LOGGER.exception("Unexpected exception: %s", e) + errors["base"] = "unknown" + + device_type = self.rs485_config[CONF_DEVICE] + + default_name = DEFAULT_NAME[device_type] + + schema = { + vol.Required(CONF_NAME, default=default_name): cv.string, + vol.Required(CONF_SLAVE, default=1): cv.positive_int, + } + + if device_type == CONF_SWITCHES: + schema.update(STEP_SWITCH_CONFIG_SCHEMA) + + return self.async_show_form( + step_id="device_config", + data_schema=vol.Schema(schema), + ) diff --git a/custom_components/rs485_device/const.py b/custom_components/rs485_device/const.py new file mode 100644 index 0000000..2bfc08a --- /dev/null +++ b/custom_components/rs485_device/const.py @@ -0,0 +1,29 @@ +"""Constants for the RS-485 Device integration.""" + +from typing import Final + +from homeassistant.const import CONF_COVERS, CONF_SENSORS, CONF_SWITCHES + +DOMAIN: Final = "rs485_device" +MODBUS_HUB: Final = "rs-485_device_hub" +CURTAIN_MODEL: Final = "CMD82-5S" +SENSOR_MODEL: Final = ["SD123-HPR05", "SD123-HPR06"] +SWITCH_MODEL: Final = "LP-F8" +DEFAULT_NAME: Final = { + CONF_SWITCHES: "Wall Switch", + CONF_COVERS: "Curtain", + CONF_SENSORS: "Sensor", +} + +# 按鈕數量 +KEY_COUNT: Final = list(range(1, 7)) + +# 含有繼電器 +HAS_RELAY: Final = "has_relay" + +# 設備類型 +DEVICE_TYPE: Final = { + CONF_SWITCHES: CONF_SWITCHES, + CONF_COVERS: CONF_COVERS, + CONF_SENSORS: CONF_SENSORS, +} diff --git a/custom_components/rs485_device/cover.py b/custom_components/rs485_device/cover.py new file mode 100644 index 0000000..a66e0b6 --- /dev/null +++ b/custom_components/rs485_device/cover.py @@ -0,0 +1,233 @@ +"""RS485 Curtain component.""" +import asyncio +from datetime import timedelta +import logging +from typing import Any, Final + +from homeassistant.components.cover import ( + ATTR_POSITION, + CoverDeviceClass, + CoverEntity, + CoverEntityFeature, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_SLAVE +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback + +from .const import DOMAIN +from .rs485_tcp_publisher import RS485TcpPublisher + +_LOGGER = logging.getLogger(__name__) + +SCAN_INTERVAL = timedelta(seconds=20) + +START_CODE: Final = 0x55 # 起始碼 +READ_CMD: Final = 0x01 # 讀命令 +WRITE_CMD: Final = 0x02 # 寫命令 +CONTROL_CMD: Final = 0x03 # 控制命令 +OPEN_CODE: Final = 0x01 # 開啟碼 +CLOSE_CODE: Final = 0x02 # 關閉碼 +STOP_CODE: Final = 0x03 # 停止碼 +PERCENTAGE_CODE: Final = 0x04 # 百分比碼 + + +async def async_setup_entry( + hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: + """通過配置條目設置開關實體.""" + + # 從 entry.data 中獲取配置數據 + config = { + **entry.data, + "entry_id": entry.entry_id, + } + + async_add_entities([RS485CurtainCover(hass, config)], True) + + +class RS485CurtainCover(CoverEntity): + """表示一个窗帘类的 cover 设备.""" + + _attr_has_entity_name = True + _attr_device_class = CoverDeviceClass.CURTAIN + + def __init__(self, hass: HomeAssistant, config: dict[str, Any]) -> None: + """初始化窗帘 cover 实体.""" + self.hass = hass + self._is_open = False + self._slave: int = config.get(CONF_SLAVE, 0) + self._slave_bytes: int = self._slave.to_bytes(2, byteorder="big") + self._entry_id: str = config.get("entry_id", "") + # self._name: str = f"{config.get(CONF_NAME)}" + self._unique_id: str = f"{self._entry_id}" + self._position = 100 # 完全打开 + self._watching = True + self._publisher: RS485TcpPublisher = self.hass.data[DOMAIN][ + "rs485_tcp_publisher" + ] + self._watchdog_task = self.hass.data[DOMAIN][self._entry_id]["watchdog_task"] + + @property + def name(self) -> str: + """返回实体的名字.""" + return "" + + @property + def unique_id(self) -> str: + """返回實體的唯一 ID.""" + return self._unique_id + + @property + def is_closed(self) -> bool: + """如果窗帘关闭返回 True.""" + return self._position == 0 + + @property + def device_info(self) -> DeviceInfo: + """Return device information for this entity.""" + device = self.hass.data[DOMAIN][self._entry_id]["device"] + return { + "identifiers": device.identifiers, + "name": device.name, + "manufacturer": device.manufacturer, + "model": device.model, + "connections": device.connections, + } + + @property + def supported_features(self) -> CoverEntityFeature: + """返回该实体支持的功能.""" + supported_features = CoverEntityFeature(0) + if self.current_cover_position is not None: + supported_features |= ( + CoverEntityFeature.OPEN + | CoverEntityFeature.CLOSE + | CoverEntityFeature.STOP + | CoverEntityFeature.SET_POSITION + ) + + return supported_features + + @property + def current_cover_position(self) -> int | None: + """返回当前窗帘的位置.""" + return self._position + + async def _watchdogs(self): + """監控 Publisher 是否運行.""" + try: + while True: + if self._publisher.is_running and self._watching: + await asyncio.wait_for( + self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x06\x55" + + self._slave_bytes + + b"\x01\x02\x01" + ), + timeout=1, + ) + await asyncio.sleep(1) + except asyncio.CancelledError: + _LOGGER.info("Watchdog task was cancelled") + return + + async def _subscribe_callback(self, sub_id: str, data: tuple[int]) -> None: + if sub_id != self._unique_id or len(data) < 12 or data[1] != 140: + return + high_byte, low_byte = data[7:-3][::-1] + _slave = (high_byte << 8) | low_byte + _LOGGER.info("data: %s", _slave) + if _slave == self._slave: + data_length = data[5] + position = self._position + if data_length == 6: + position = 100 - data[-1:][0] + if data_length > 10: + position = data[-1:][0] + + if position != self._position: + self._position = position + else: + self._watching = False + + self.async_write_ha_state() + _LOGGER.info("📡 Curtain Received data: %s 📡", data) + + async def async_added_to_hass(self): + """當實體添加到 Home Assistant 時,設置狀態更新的計劃.""" + await self._publisher.start() + # 訂閱數據 + await self._publisher.subscribe(self._subscribe_callback, self._unique_id) + # 設置 watchdog 任務 + if self._watchdog_task is None: + self._watchdog_task = asyncio.create_task(self._watchdogs()) + + async def async_will_remove_from_hass(self): + """當實體從 Home Assistant 中移除時,取消計劃.""" + self._watchdog_task.cancel() + await self._publisher.unsubscribe(self._unique_id) + sub_length = self._publisher.subscribers_length + + # 如果沒有訂閱者,則關閉 rs-485 伺服器的連接 + if sub_length == 0: + await self._publisher.close() + _LOGGER.info("🚧 Close publisher connect 🚧") + + async def async_update(self): + """更新窗帘的状态.""" + if not self._watching: + _LOGGER.info("Updating the curtain %s") + await self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x06\x55" + self._slave_bytes + b"\x01\x02\x01" + ) + self.schedule_update_ha_state() + + async def async_stop_cover(self, **kwargs: Any) -> None: + """停止窗簾.""" + _LOGGER.info("Stopping the curtain") + await self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x05\x55" + self._slave_bytes + b"\x03\x03" + ) + await asyncio.sleep(1) + self._watching = True + self.schedule_update_ha_state() + + async def async_close_cover(self, **kwargs: Any) -> None: + """關閉窗簾.""" + _LOGGER.info("Closing the curtain") + await self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x05\x55" + self._slave_bytes + b"\x03\x01" + ) + await asyncio.sleep(1) + self._is_open = True + self._position = 0 + self.schedule_update_ha_state() + + async def async_open_cover(self, **kwargs: Any) -> None: + """打開窗簾.""" + _LOGGER.info("Opening the curtain") + await self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x05\x55" + self._slave_bytes + b"\x03\x02" + ) + await asyncio.sleep(1) + self._is_open = False + self._position = 100 + self.schedule_update_ha_state() + + async def async_set_cover_position(self, **kwargs: Any) -> None: + """设置窗帘的位置.""" + if ATTR_POSITION in kwargs: + position = kwargs[ATTR_POSITION] + _LOGGER.info("Setting the curtain position to %s", position) + await self._publisher.send_message( + b"\x00\x8C\x00\x00\x00\x06\x55" + + self._slave_bytes + + b"\x03\x04" + + bytes([100 - position]) + ) + await asyncio.sleep(1) + self._position = position + self._is_open = position > 0 + self.schedule_update_ha_state() diff --git a/custom_components/rs485_device/manifest.json b/custom_components/rs485_device/manifest.json new file mode 100644 index 0000000..638975b --- /dev/null +++ b/custom_components/rs485_device/manifest.json @@ -0,0 +1,13 @@ +{ + "domain": "rs485_device", + "name": "RS-485 Device", + "codeowners": ["@RayChang"], + "config_flow": true, + "dependencies": [], + "documentation": "https://www.home-assistant.io/integrations/rs485_device", + "homekit": {}, + "iot_class": "local_push", + "requirements": [], + "ssdp": [], + "zeroconf": [] +} diff --git a/custom_components/rs485_device/rs485_tcp_publisher.py b/custom_components/rs485_device/rs485_tcp_publisher.py new file mode 100644 index 0000000..b4b0fa8 --- /dev/null +++ b/custom_components/rs485_device/rs485_tcp_publisher.py @@ -0,0 +1,201 @@ +"""RS485 TCP Publisher.""" +import asyncio +import logging +from typing import Any + +_LOGGER = logging.getLogger(__name__) + + +class RS485TcpPublisher: + """RS485 TCP Publisher.""" + + def __init__( + self, + host: str, + port: int, + byte_length: int = 12, + max_retry_delay: int = 60, + connect_timeout: int = 10, + ) -> None: + """初始化 RS485 TCP Publisher 服務.""" + + self.host = host + self.port = port + self.max_retry_delay = max_retry_delay # 最大重試間隔,單位為秒 + self.connect_timeout = connect_timeout # 連接超時時間,單位為秒 + self.byte_length = byte_length # 用於存儲接收數據的字節長度 + self.connection_task = None # 用於存儲連接任務的引用 + self.subscribers: dict[str, Any] = {} + self.lock = asyncio.Lock() # 增加一個鎖來控制對訂閱者列表的訪問 + self._running = False # 增加一個運行狀態標誌 + self.is_running = False + self.writer = None # 用於存儲當前連接的StreamWriter對象 + + @property + def subscribers_length(self) -> int: + """返回 self.subscribers 的長度作为属性.""" + return len(self.subscribers) + + # def _construct_modbus_message( + # self, + # slave: int, + # function_code: int, + # register: int, + # value: int | None = None, + # length: int | None = None, + # ) -> bytes: + # """Modbus TCP Message.""" + # header = b"\x00\x00\x00\x00\x00\x06" + bytes([slave]) + # func_code = bytes([function_code]) + # register_high = register >> 8 + # register_low = register & 0xFF + + # if function_code in (3, 4) and length is not None: # 讀取寄存器,需要長度參數 + # length_high = length >> 8 + # length_low = length & 0xFF + # message = ( + # header + # + func_code + # + bytes([register_high, register_low, length_high, length_low]) + # ) + # elif function_code == 6 and value is not None: # 寫單個寄存器,需要值參數 + # value_high = value >> 8 + # value_low = value & 0xFF + # message = ( + # header + # + func_code + # + bytes([register_high, register_low, value_high, value_low]) + # ) + # return message + + async def subscribe(self, callback, callback_id=None) -> None: + """訂閱數據,必須提供 ID.""" + if callback_id is None: + _LOGGER.error("訂閱必須包括一個唯一的ID。") + return + async with self.lock: # 使用異步鎖來保護訂閱者列表的修改 + self.subscribers[callback_id] = callback + _LOGGER.info("訂閱者: %s 已添加", callback_id) + + async def unsubscribe(self, callback_id): + """取消訂閱,使用 ID 進行.""" + async with self.lock: + if callback_id in self.subscribers: + del self.subscribers[callback_id] + _LOGGER.info("訂閱者: %s 已移除", callback_id) + else: + _LOGGER.info('沒有找到 ID 為"%s"的訂閱者', callback_id) + + async def send_message(self, message: bytes) -> None: + """向 RS-485 伺服器發送訊息.""" + + _LOGGER.info("💬 Message: %s 💬", message) + if self.writer is None or self.writer.is_closing(): + _LOGGER.error("⛔️ 無有效連線,無法發送訊息。⛔️") + return + + async with self.lock: + try: + self.writer.write(message) + await self.writer.drain() + _LOGGER.info("🚀 訊息已成功發送。 🚀") + except Exception as e: # pylint: disable=broad-except + _LOGGER.error("🚧 發送訊息時出錯: %s 🚧", e) + + # async def read_register(self, slave: int, register: int, length: int) -> None: + # """讀取寄存器。構造並發送Modbus TCP請求讀取保持寄存器的消息.""" + # message = self._construct_modbus_message(slave, 3, register, length=length) + # await self._send_message(message) + + # async def write_register(self, slave: int, register: int, value: int) -> None: + # """寫入寄存器。構造並發送 Modbus TCP 請求寫入保持寄存器的消息.""" + # message = self._construct_modbus_message(slave, 6, register, value=value) + # await self._send_message(message) + + async def _publish(self, data): + """發布數據給所有訂閱者,並返回他們的 ID.""" + tasks = [] + async with self.lock: + for callback_id, callback in self.subscribers.items(): + task = asyncio.create_task(callback(sub_id=callback_id, data=data)) + tasks.append(task) + # results = await asyncio.gather(*tasks, return_exceptions=True) + # for task, result in zip(tasks, results): + # if isinstance(result, Exception): + # _LOGGER.error( + # "Exception in subscriber %s: %s", task.callback_id, result + # ) + + async def _handle_connection(self): + retry_delay = 1 # 初始重試間隔為1秒 + while self._running: + try: + reader, self.writer = await asyncio.wait_for( + asyncio.open_connection(self.host, self.port), + timeout=self.connect_timeout, + ) + _LOGGER.info("成功連接到 %s:%i", self.host, self.port) + self.is_running = True + retry_delay = 1 # 連接成功,重置重試間隔 + await self._manage_connection(reader) + except TimeoutError: + _LOGGER.warning("連接到 %s:%i 超時", self.host, self.port) + except Exception as e: # pylint: disable=broad-except + _LOGGER.error("連線錯誤: %s", e) + finally: + if self._running: # 只有在運行狀態下才輸出重連信息 + _LOGGER.info( + "嘗試重新連接到 %s:%i,等待 %i 秒…", + self.host, + self.port, + retry_delay, + ) + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, self.max_retry_delay) + if self.writer: + await self._close_writer() + + async def _manage_connection(self, reader): + try: + while True: + data = await reader.read(self.byte_length) + if not data: + _LOGGER.warning("連線被關閉,準備重新連接…") + break + await self._publish(tuple(data)) + except asyncio.CancelledError: + _LOGGER.info("連線被取消") + + async def _close_writer(self): + if self.writer and not self.writer.is_closing(): + self.writer.close() + await self.writer.wait_closed() + self.is_running = False + + async def start(self): + """建立連線並開始接收數據.""" + if not self._running: + self._running = True + # 創建並啟動一個異步任務進行連接和數據接收 + self.connection_task = asyncio.create_task(self._handle_connection()) + else: + _LOGGER.warning("連接已經建立,無需再次建立") + + async def close(self): + """關閉當前連接並停止嘗試重連.""" + self._running = False # 設置運行狀態為False以停止重連嘗試 + if self.connection_task and not self.connection_task.done(): + self.connection_task.cancel() + try: + await self.connection_task + except asyncio.CancelledError: + _LOGGER.info("Connection task cancelled") + + if self.writer: + try: + self.writer.close() + await self.writer.wait_closed() + _LOGGER.info("連接已關閉") + except Exception as e: # pylint: disable=broad-except + _LOGGER.error("關閉連接時發生錯誤: %s", e) + self.writer = None diff --git a/custom_components/rs485_device/sensor.py b/custom_components/rs485_device/sensor.py new file mode 100644 index 0000000..5c56554 --- /dev/null +++ b/custom_components/rs485_device/sensor.py @@ -0,0 +1 @@ +"""RS485 Sensor component.""" diff --git a/custom_components/rs485_device/strings.json b/custom_components/rs485_device/strings.json new file mode 100644 index 0000000..2753fe8 --- /dev/null +++ b/custom_components/rs485_device/strings.json @@ -0,0 +1,29 @@ +{ + "config": { + "step": { + "user": { + "data": { + "host": "[%key:common::config_flow::data::host%]", + "port": "[%key:common::config_flow::data::port%]", + "device": "[%key:common::config_flow::data::device%]" + } + }, + "device_config": { + "data": { + "name": "[%key:common::config_flow::data::name%]", + "slave": "slave", + "count": "count", + "has_relay": "has_relay" + } + } + }, + "error": { + "entity_id_error": "entity_id_error", + "missing_input": "missing_input", + "unknown": "[%key:common::config_flow::error::unknown%]" + }, + "abort": { + "already_configured": "[%key:common::config_flow::abort::already_configured_device%]" + } + } +} diff --git a/custom_components/rs485_device/switch.py b/custom_components/rs485_device/switch.py new file mode 100644 index 0000000..abef605 --- /dev/null +++ b/custom_components/rs485_device/switch.py @@ -0,0 +1,306 @@ +"""RS485 switch component.""" +import asyncio +from datetime import timedelta +import logging +import math +from typing import Any, Final + +from homeassistant.components.switch import SwitchEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_COUNT, CONF_SLAVE, CONF_STATE, CONF_SWITCHES +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback + +from .const import DOMAIN, HAS_RELAY +from .rs485_tcp_publisher import RS485TcpPublisher + +_LOGGER = logging.getLogger(__name__) + +SCAN_INTERVAL = timedelta(seconds=5) + +DEFAULT_STATE: Final = 256 +PLACEHOLDER: Final = "00000000" +REGISTER_ADDRESS: Final = 0x1008 + + +async def async_setup_entry( + hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: + """通過配置條目設置開關實體.""" + + # 從 entry.data 中獲取配置數據 + config = { + **entry.data, + "entry_id": entry.entry_id, + } + + switch_count = entry.data.get(CONF_COUNT, 1) + switches = [] + for i in range(switch_count): + switches.append(RS485Switch(hass, config, i + 1)) + async_add_entities(switches, True) + + +class RS485Switch(SwitchEntity): + """表示一個示例開關的實體.""" + + _attr_has_entity_name = True + _attr_should_poll = False + + def __init__( + self, hass: HomeAssistant, config: dict[str, Any], switch_index: int + ) -> None: + """初始化開關.""" + self.hass = hass + self._is_on: bool = False + self._slave: int = config.get(CONF_SLAVE, 0) + self._state: int = DEFAULT_STATE + self._has_relay: bool = config.get(HAS_RELAY, True) + self._entry_id: str = config.get("entry_id", "") + self._index: int = switch_index + self._name: str = f"Button_{self._index}" + self._unique_id: str = f"{self._entry_id}_{self._index}" + self._publisher: RS485TcpPublisher = self.hass.data[DOMAIN][ + "rs485_tcp_publisher" + ] + + @property + def device_info(self) -> DeviceInfo: + """Return device information for this entity.""" + device = self.hass.data[DOMAIN][self._entry_id]["device"] + return { + "identifiers": device.identifiers, + "name": device.name, + "manufacturer": device.manufacturer, + "model": device.model, + "connections": device.connections, + } + + @property + def unique_id(self) -> str: + """返回實體的唯一 ID.""" + return self._unique_id + + @property + def name(self) -> str: + """返回實體的名稱.""" + return self._name + + @property + def is_on(self) -> bool: + """如果開關打開,返回 True.""" + return self._is_on + + def _binary_list_to_int(self, binary_list: list[int]) -> int: + """將二進制列表轉換為整數.""" + high_byte = binary_list[0] + low_byte = binary_list[1] + result = (high_byte << 8) + (low_byte & 0xFF) + return result + + def _construct_modbus_message( + self, + slave: int, + function_code: int, + register: int, + value: int | None = None, + length: int | None = None, + ) -> bytes: + """Modbus TCP Message.""" + header = b"\x00\x00\x00\x00\x00\x06" + bytes([slave]) + func_code = bytes([function_code]) + register_high = register >> 8 + register_low = register & 0xFF + + if function_code in (3, 4) and length is not None: # 讀取寄存器,需要長度參數 + length_high = length >> 8 + length_low = length & 0xFF + message = ( + header + + func_code + + bytes([register_high, register_low, length_high, length_low]) + ) + elif function_code == 6 and value is not None: # 寫單個寄存器,需要值參數 + value_high = value >> 8 + value_low = value & 0xFF + message = ( + header + + func_code + + bytes([register_high, register_low, value_high, value_low]) + ) + return message + + async def _watchdogs(self): + """監控 Publisher 是否運行.""" + read_message = self._construct_modbus_message( + self._slave, 3, REGISTER_ADDRESS, length=1 + ) + watchdog_task: asyncio.Task = self.hass.data[DOMAIN][self._entry_id][ + "watchdog_task" + ] + try: + while True: + _LOGGER.warning( + "❓ Publisher is running?: %s ❓", self._publisher.is_running + ) + if self._publisher.is_running: + await asyncio.sleep(0.1 + self._slave / 10) + await asyncio.wait_for( + self._publisher.send_message(read_message), + timeout=2 * self._slave, + ) + watchdog_task.cancel() + await asyncio.sleep(3) + except asyncio.CancelledError: + _LOGGER.info("Watchdog task was cancelled") + return + + async def _handle_switch(self, is_on: bool) -> None: + """處理開關的切換.""" + self.hass.data[DOMAIN][self._entry_id][CONF_SWITCHES] = self._index + read_message = self._construct_modbus_message( + self._slave, 3, REGISTER_ADDRESS, length=1 + ) + await self._publisher.send_message(read_message) + await asyncio.sleep(0.1) + state = self.hass.data[DOMAIN][self._entry_id][CONF_STATE] + value = state ^ self._index + write_message = self._construct_modbus_message( + self._slave, 6, REGISTER_ADDRESS, value=value + ) + await self._publisher.send_message(write_message) + self.hass.data[DOMAIN][self._entry_id][CONF_STATE] = value + self._is_on = is_on + self.async_write_ha_state() + + async def _subscribe_callback(self, sub_id: str, data: tuple[int]) -> None: + """訂閱回調.""" + + if data[1] == 140 or len(data) < 8: + return + + _length, slave, function_code, *_last = data[5:] + + # [0,0,0,0,0,6,3,3,0,2,13,1] + # 弱電版本的開關不管是按下實體按鈕,或是讀取狀態,都會回傳 6 bytes + # 而有繼電器版本的開關,當按下實體按鈕時,會回傳 6 bytes,讀取狀態時,會回傳 5 bytes + # 所以透過第十一位的值來判斷行為是否為手動觸發或是讀取狀態 + # 當第十一位的值等於 256 時,表示是讀取狀態,所以將 last 的第一位去掉,並改變 length + # 讓接下來的判斷依照有繼電器版本的開關來處理 + if self._has_relay is False and (_last[-2:][0] << 8) == 256: + last = _last[1:] + length = _length - 1 + else: + last = _last + length = _length + + # 如果是手動觸發,則紀錄按下的是哪個按鈕 + # 弱電版本的開關,按下按鈕時會回傳兩筆資料 + # 第一筆是按下的是哪顆按鈕 [0,0,0,0,0,6,3,3,0,2,13,1] + # 第二筆是按鈕的狀態 [0,0,0,0,0,6,3,3,0,2,1,0] + # 因為第二筆的資料判斷到最後一位是 0,則直接跳出 + if length == 6 and function_code == 3: + ls = last[-1:][0] + if ls == 0: + return + m = math.log(ls, 2) + self.hass.data[DOMAIN][self._entry_id][CONF_SWITCHES] = int(m) + 1 + + # 紀錄按下的是哪個按鈕 + switch_index = self.hass.data[DOMAIN][self._entry_id][CONF_SWITCHES] + + if slave == self._slave: + if switch_index == self._index: + _LOGGER.info( + "🚧 Subscribe callback DATA:%s / SLAVE: %s / INDEX: %s / index: %s / LAST: %s 🚧 ", + self._slave, + data, + switch_index, + self._index, + last, + ) + + if function_code == 3: + # step_3-5 + # 如果是讀取寄存器而且是讀取狀態,則將狀態更新到 DOMAIN 裡提供給其他開關使用 + if length == 5: + self.hass.data[DOMAIN][self._entry_id][ + CONF_STATE + ] = self._binary_list_to_int(last[-2:]) + + # step_3-6 + # 如果是按下實體按鈕,則讀取狀態,會進入到 step_3-5 + elif length == 6: + await self._publisher.read_register( + self._slave, REGISTER_ADDRESS, 1 + ) + # 如果是寫入寄存器,則將更新後的狀態更新到 DOMAIN 裡提供給其他開關使用 + elif function_code == 6: + self.hass.data[DOMAIN][self._entry_id][ + CONF_STATE + ] = self._binary_list_to_int(last[-2:]) + + # 這裡是為了讓其他不是在 HA 裡的操作也能更新狀態 + elif (function_code == 3 and length == 5) or function_code == 6: + self.hass.data[DOMAIN][self._entry_id][ + CONF_STATE + ] = self._binary_list_to_int(last[-2:]) + else: + return + + await self.async_update() + + async def async_added_to_hass(self): + """當實體添加到 Home Assistant 時,設置狀態更新的計劃.""" + # 當實體添加到 Home Assistant 時,起始連接 rs-485 伺服器 + await self._publisher.start() + # 訂閱數據 + await self._publisher.subscribe(self._subscribe_callback, self._unique_id) + # 設置 watchdog 任務 + if self.hass.data[DOMAIN][self._entry_id]["watchdog_task"] is None: + self.hass.data[DOMAIN][self._entry_id][ + "watchdog_task" + ] = asyncio.create_task(self._watchdogs()) + # 設置狀態更新的計劃 + _LOGGER.info("🚧 Added to hass 🚧 %s", self._index) + + async def async_will_remove_from_hass(self): + """當實體從 Home Assistant 中移除時,取消計劃.""" + await self._publisher.unsubscribe(self._unique_id) + sub_length = self._publisher.subscribers_length + # 取消狀態更新的計劃 + _LOGGER.info("🚧 Removed from hass 🚧 %s", self._index) + + # 如果沒有訂閱者,則關閉 rs-485 伺服器的連接 + if sub_length == 0: + await self._publisher.close() + _LOGGER.info("🚧 Close publisher connect 🚧") + + async def async_update(self): + """更新開關的狀態.""" + state = self.hass.data[DOMAIN][self._entry_id][CONF_STATE] + switch_index = self.hass.data[DOMAIN][self._entry_id][CONF_SWITCHES] + if switch_index == self._index: + _LOGGER.info( + "🚧 ------- SLAVE: %s / STATE:%s / index: %s ------- 🚧", + self._slave, + state, + self._index, + ) + + if state is not None: + state_str = bin(state % DEFAULT_STATE)[2:] + binary_string = PLACEHOLDER[: len(PLACEHOLDER) - len(state_str)] + state_str + self._is_on = binary_string[::-1][self._index - 1] == "1" + self.async_write_ha_state() + + async def async_turn_on(self, **kwargs): + """異步打開開關.""" + # 實現打開開關的邏輯 + await self._handle_switch(True) + + async def async_turn_off(self, **kwargs): + """異步關閉開關.""" + # 實現關閉開關的邏輯 + await self._handle_switch(False) diff --git a/custom_components/rs485_device/translations/en.json b/custom_components/rs485_device/translations/en.json new file mode 100644 index 0000000..6462a29 --- /dev/null +++ b/custom_components/rs485_device/translations/en.json @@ -0,0 +1,29 @@ +{ + "config": { + "abort": { + "already_configured": "Device is already configured" + }, + "error": { + "entity_id_error": "entity_id_error", + "missing_input": "missing_input", + "unknown": "Unexpected error" + }, + "step": { + "device_config": { + "data": { + "count": "count", + "has_relay": "has_relay", + "name": "Name", + "slave": "slave" + } + }, + "user": { + "data": { + "device": "Device", + "host": "Host", + "port": "Port" + } + } + } + } +} \ No newline at end of file diff --git a/custom_components/rs485_device/translations/zh-Hant.json b/custom_components/rs485_device/translations/zh-Hant.json new file mode 100644 index 0000000..d109efc --- /dev/null +++ b/custom_components/rs485_device/translations/zh-Hant.json @@ -0,0 +1,29 @@ +{ + "config": { + "abort": { + "already_configured": "設備已配置" + }, + "error": { + "entity_id_error": "無法建構實體 ID", + "missing_input": "缺少輸入", + "unknown": "未知錯誤" + }, + "step": { + "user": { + "data": { + "host": "RS-485 to TCP IP 位置", + "port": "RS-485 to TCP IP 連接埠", + "device": "選擇裝置類型" + } + }, + "device_config": { + "data": { + "name": "裝置名稱", + "slave": "開關從機位置", + "count": "開關鍵數", + "has_relay": "是否有繼電器" + } + } + } + } +} diff --git a/hacs.json b/hacs.json new file mode 100644 index 0000000..181b54a --- /dev/null +++ b/hacs.json @@ -0,0 +1,5 @@ +{ + "name": "RS-485 Device", + "homeassistant": "2024.4.3", + "render_readme": true +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..799ee16 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,87 @@ +acme==2.8.0 +aiohttp==3.9.4 +aiohttp-cors==0.7.0 +aiohttp-fast-url-dispatcher==0.3.0 +aiohttp-zlib-ng==0.3.1 +aiooui==0.1.5 +aiosignal==1.3.1 +anyio==4.3.0 +astral==2.2 +async-interrupt==1.1.1 +async-timeout==4.0.3 +atomicwrites-homeassistant==1.4.1 +attrs==23.2.0 +awesomeversion==24.2.0 +bcrypt==4.1.2 +bleak==0.21.1 +bleak-retry-connector==3.5.0 +bluetooth-adapters==0.18.0 +bluetooth-auto-recovery==1.4.1 +bluetooth-data-tools==1.19.0 +boto3==1.34.87 +botocore==1.34.87 +btsocket==0.2.0 +certifi==2024.2.2 +cffi==1.16.0 +charset-normalizer==3.3.2 +ciso8601==2.3.1 +cryptography==42.0.5 +dbus-fast==2.21.1 +ecdsa==0.19.0 +envs==1.4 +fnv-hash-fast==0.5.0 +fnvhash==0.1.0 +frozenlist==1.4.1 +h11==0.14.0 +habluetooth==2.8.0 +hass-nabucasa==0.78.0 +home-assistant-bluetooth==1.12.0 +homeassistant==2024.4.3 +httpcore==1.0.5 +httpx==0.27.0 +idna==3.7 +ifaddr==0.2.0 +Jinja2==3.1.3 +jmespath==1.0.1 +josepy==1.14.0 +lru-dict==1.3.0 +MarkupSafe==2.1.5 +multidict==6.0.5 +orjson==3.9.15 +packaging==24.0 +pillow==10.2.0 +psutil==5.9.8 +psutil-home-assistant==0.0.1 +pyasn1==0.6.0 +pycognito==2023.5.0 +pycparser==2.22 +PyJWT==2.8.0 +pyobjc-core==9.2 +pyobjc-framework-Cocoa==9.2 +pyobjc-framework-CoreBluetooth==9.2 +pyobjc-framework-libdispatch==9.2 +pyOpenSSL==24.1.0 +pyRFC3339==1.1 +PyRIC==0.1.6.3 +python-dateutil==2.9.0.post0 +python-jose==3.3.0 +python-slugify==8.0.4 +pytz==2024.1 +PyYAML==6.0.1 +requests==2.31.0 +rsa==4.9 +s3transfer==0.10.1 +setuptools==69.5.1 +six==1.16.0 +sniffio==1.3.1 +snitun==0.36.2 +SQLAlchemy==2.0.29 +text-unidecode==1.3 +typing_extensions==4.11.0 +ulid-transform==0.9.0 +urllib3==1.26.18 +usb-devices==0.4.5 +voluptuous==0.13.1 +voluptuous-serialize==2.6.0 +yarl==1.9.4 +zlib-ng==0.4.3