From c71fc5bda59ddd302124daf25eaa61be2fd1a3a8 Mon Sep 17 00:00:00 2001 From: Tuen Lee Date: Thu, 18 Apr 2024 23:08:22 +0000 Subject: [PATCH] add tag for socket 1 and 2 from transaction #94 --- custom_components/alfen_wallbox/alfen.py | 84 ++++++++++++++++++++++- custom_components/alfen_wallbox/sensor.py | 66 ++++++++++++++++++ 2 files changed, 148 insertions(+), 2 deletions(-) diff --git a/custom_components/alfen_wallbox/alfen.py b/custom_components/alfen_wallbox/alfen.py index 9d9c948..b9c3cfd 100644 --- a/custom_components/alfen_wallbox/alfen.py +++ b/custom_components/alfen_wallbox/alfen.py @@ -84,6 +84,8 @@ def __init__(self, self.number_socket = 1 self._hass = hass self.max_allowed_phases = 1 + self.latest_tag = None + self.transaction_offset = 0 disable_warnings() # Default ciphers needed as of python 3.10 @@ -159,6 +161,9 @@ async def async_update(self): try: self.updating = True await self._get_all_properties_value() + await self._get_transaction() + + finally: self.updating = False @@ -194,7 +199,7 @@ async def _post(self, cmd, payload=None, allowed_login=True) -> ClientResponse | self.wait = False return None - async def _get(self, url, allowed_login=True) -> ClientResponse | None: + async def _get(self, url, allowed_login=True, json_decode=True) -> ClientResponse | None: """Send a GET request to the API.""" try: async with self._session.get(url, timeout=TIMEOUT, ssl=self.ssl) as response: @@ -204,7 +209,10 @@ async def _get(self, url, allowed_login=True) -> ClientResponse | None: return await self._get(url, False) response.raise_for_status() - _resp = await response.json(content_type=None) + if json_decode: + _resp = await response.json(content_type=None) + else: + _resp = await response.text() return _resp except TimeoutError as e: _LOGGER.warning("Timeout on GET") @@ -304,6 +312,78 @@ async def reboot_wallbox(self): response = await self._post(cmd=CMD, payload={PARAM_COMMAND: "reboot"}) _LOGGER.debug(f"Reboot response {response}") + async def _get_transaction(self): + _LOGGER.debug("Get Transaction") + offset = self.transaction_offset + transactionLoop = True + counter = 0 + while transactionLoop: + response = await self._get(url=self.__get_url("transactions?offset="+ str(offset)), json_decode=False) + #_LOGGER.debug(response) + # split this text into lines with \n + lines = str(response).splitlines() + for line in lines: + if line is None: + transactionLoop = False + break + + try: + if "version" in line: + line = line.split(":2,", 2)[1] + + if "txstart" in line: + tid = line.split(":", 2)[0].split("_", 2)[0] + socket = line.split(", ", 2)[1] + tag = line.split("kWh ", 2)[1].split(" ", 2)[0] + + + if self.latest_tag is None: + self.latest_tag = {} + self.latest_tag[socket,"start"] = tag + + elif "txstop" in line: + tid = line.split(":", 2)[0].split("_", 2)[0] + socket = line.split(", ", 2)[1] + tag = line.split("kWh ", 2)[1].split(" ", 2)[0] + + if self.latest_tag is None: + self.latest_tag = {} + self.latest_tag[socket,"stop"] = tag + elif "mv" in line: + tid = line.split("_", 2)[0] + elif 'dto' in line: + continue + else: + _LOGGER.debug("Unknown line" + line) + continue + + + except IndexError: + break + + + # check if tid is integer + try: + offset = int(tid) + if self.transaction_offset == offset: + counter += 1 + else: + self.transaction_offset = offset + counter = 0 + + if counter == 2: + _LOGGER.debug(self.latest_tag) + transactionLoop = False + break + except ValueError: + continue + + # check if last line is reached + if line == lines[-1]: + break + + + async def async_request(self, method: str, cmd: str, json_data=None) -> ClientResponse | None: """Send a request to the API.""" try: diff --git a/custom_components/alfen_wallbox/sensor.py b/custom_components/alfen_wallbox/sensor.py index 398d2c1..d6b38f7 100644 --- a/custom_components/alfen_wallbox/sensor.py +++ b/custom_components/alfen_wallbox/sensor.py @@ -1124,6 +1124,22 @@ class AlfenSensorDescription( state_class=SensorStateClass.MEASUREMENT, device_class=SensorDeviceClass.CURRENT, ), + AlfenSensorDescription( + key="custom_tag_socket_1_start", + name="Tag Socket 1 Start", + icon="mdi:badge-account-outline", + api_param=None, + unit=None, + round_digits=None, + ), + AlfenSensorDescription( + key="custom_tag_socket_1_stop", + name="Tag Socket 1 Stop", + icon="mdi:badge-account-outline", + api_param=None, + unit=None, + round_digits=None, + ), ) ALFEN_SENSOR_DUAL_SOCKET_TYPES: Final[tuple[AlfenSensorDescription, ...]] = ( @@ -1410,6 +1426,22 @@ class AlfenSensorDescription( state_class=SensorStateClass.MEASUREMENT, device_class=SensorDeviceClass.CURRENT, ), + AlfenSensorDescription( + key="custom_tag_socket_2_start", + name="Tag Socket 2 Start", + icon="mdi:badge-account-outline", + api_param=None, + unit=None, + round_digits=None, + ), + AlfenSensorDescription( + key="custom_tag_socket_2_stop", + name="Tag Socket 2 Stop", + icon="mdi:badge-account-outline", + api_param=None, + unit=None, + round_digits=None, + ), ) @@ -1605,6 +1637,40 @@ def state(self) -> StateType: if voltage_l1 is not None and current_l1 is not None and voltage_l2 is not None and current_l2 is not None and voltage_l3 is not None and current_l3 is not None: return round((float(voltage_l1) * float(current_l1) + float(voltage_l2) * float(current_l2) + float(voltage_l3) * float(current_l3)), 2) + if self.entity_description.key == "custom_tag_socket_1_start": + if self._device.latest_tag is None: + return "No Tag" + for (key,value) in self._device.latest_tag.items(): + if key[0] == "socket 1" and key[1] == "start": + return value + return "No Tag" + + if self.entity_description.key == "custom_tag_socket_1_stop": + if self._device.latest_tag is None: + return "No Tag" + for (key,value) in self._device.latest_tag.items(): + if key[0] == "socket 1" and key[1] == "stop": + return value + return "No Tag" + + if self.entity_description.key == "custom_tag_socket_2_start": + if self._device.latest_tag is None: + return "No Tag" + for (key,value) in self._device.latest_tag.items(): + if key[0] == "socket 2" and key[1] == "start": + return value + return "No Tag" + + if self.entity_description.key == "custom_tag_socket_2_stop": + if self._device.latest_tag is None: + return "No Tag" + for (key,value) in self._device.latest_tag.items(): + if key[0] == "socket 2" and key[1] == "stop": + return value + return "No Tag" + + + for prop in self._device.properties: if prop[ID] == self.entity_description.api_param: # some exception of return value