Skip to content

Commit

Permalink
Refresh state every 8 hours and catch some Websocket errors (#1150)
Browse files Browse the repository at this point in the history
* Refresh state every 8 hours and catch some Websocket errors

* [pre-commit.ci lite] apply automatic fixes

---------

Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
  • Loading branch information
springfall2008 and pre-commit-ci-lite[bot] authored May 28, 2024
1 parent 469a309 commit cedd9ee
Showing 1 changed file with 49 additions and 10 deletions.
59 changes: 49 additions & 10 deletions apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
if not "PRED_GLOBAL" in globals():
PRED_GLOBAL = {}

THIS_VERSION = "v7.20.4"
THIS_VERSION = "v7.20.5"
PREDBAT_FILES = ["predbat.py"]
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
TIME_FORMAT_SECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
Expand All @@ -46,6 +46,7 @@
CONFIG_ROOTS_HA = ["/homeassistant", "/conf", "/config", "./"]
TIME_FORMAT_HA = "%Y-%m-%dT%H:%M:%S%z"
TIMEOUT = 60 * 5
CONFIG_REFRESH_PERIOD = 60 * 8

# 240v x 100 amps x 3 phases / 1000 to kW / 60 minutes in an hour is the maximum kWh in a 1 minute period
MAX_INCREMENT = 240 * 100 * 3 / 1000 / 60
Expand Down Expand Up @@ -3921,7 +3922,7 @@ def alt_charge_discharge_enable(self, direction, enable):
solax_modes = SOLAX_SOLIS_MODES_NEW if self.base.get_arg("solax_modbus_new", True) else SOLAX_SOLIS_MODES

entity_id = self.base.get_arg("energy_control_switch", indirect=False, index=self.id)
switch = solax_modes.get(self.get_state_wrapper(entity_id), 0)
switch = solax_modes.get(self.base.get_state_wrapper(entity_id), 0)

if direction == "charge":
if enable:
Expand Down Expand Up @@ -5770,6 +5771,10 @@ def record_status(self, message, debug="", had_errors=False, notify=False, extra
"error": (had_errors or self.had_errors),
},
)

self.log("Info: record_status {}".format(message + extra))
print("Info: record_status {}".format(message + extra))

self.previous_status = message
if had_errors:
self.had_errors = True
Expand Down Expand Up @@ -14805,6 +14810,9 @@ def load_user_config(self, quiet=True, register=False):
else:
self.expose_config(item["name"], ha_value, quiet=quiet)

# Update the last time we refreshed the config
self.set_state_wrapper(entity_id=self.prefix + ".config_refresh", state=self.now_utc.strftime(TIME_FORMAT))

# Register HA services
if register:
self.watch_list = self.get_arg("watch_list", [], indirect=False)
Expand Down Expand Up @@ -15090,6 +15098,7 @@ async def terminate(self):
"""
self.log("Predbat terminating")
self.stop_thread = True
await asyncio.sleep(0)
if hasattr(self, "pool"):
if self.pool:
self.pool.close()
Expand All @@ -15101,6 +15110,7 @@ def update_time_loop(self, cb_args):
"""
Called every 15 seconds
"""
self.check_entity_refresh()
if self.update_pending and not self.prediction_started:
self.prediction_started = True
self.load_user_config()
Expand All @@ -15116,6 +15126,30 @@ def update_time_loop(self, cb_args):
self.prediction_started = False
self.prediction_started = False

def check_entity_refresh(self):
"""
Check if we need to refresh the config entities with HA
"""
# Check if we need to refresh the config entities with HA
config_refresh = self.get_state_wrapper(entity_id=self.prefix + ".config_refresh")
config_refresh_stamp = None
if config_refresh:
try:
config_refresh_stamp = datetime.strptime(config_refresh, TIME_FORMAT)
except (ValueError, TypeError):
config_refresh_stamp = None

age = CONFIG_REFRESH_PERIOD
if config_refresh_stamp:
tdiff = self.now_utc - config_refresh_stamp
age = tdiff.seconds / 60 + tdiff.days * 60 * 24
if age >= CONFIG_REFRESH_PERIOD:
self.log("Info: Refresh config entities due to their age of {} minutes".format(age))
self.update_pending = True
else:
self.log("Info: Refresh config entities as config_refresh state is unknown")
self.update_pending = True

def run_time_loop(self, cb_args):
"""
Called every N minutes
Expand All @@ -15124,9 +15158,12 @@ def run_time_loop(self, cb_args):
config_changed = False
self.prediction_started = True
self.ha_interface.update_states()
self.check_entity_refresh()

if self.update_pending:
self.load_user_config()
config_changed = True

self.update_pending = False
try:
self.update_pred(scheduled=True)
Expand Down Expand Up @@ -15193,8 +15230,7 @@ async def socketLoop(self):

url = "{}/api/websocket".format(self.ha_url)
self.log("Info: Start socket for url {}".format(url))
session = ClientSession()
if session:
async with ClientSession() as session:
try:
async with session.ws_connect(url) as websocket:
await websocket.send_json({"type": "auth", "access_token": self.ha_key})
Expand All @@ -15220,7 +15256,6 @@ async def socketLoop(self):
async for message in websocket:
if self.base.stop_thread:
self.log("Info: Web socket stopping")
await session.close()
break

if message.type == WSMsgType.TEXT:
Expand Down Expand Up @@ -15262,17 +15297,21 @@ async def socketLoop(self):
else:
self.log("Info: Web Socket unknown message {}".format(data))
except Exception as e:
self.log("Warn Web Socket exception {}".format(e))
pass
self.log("Warn Web Socket exception in update loop: {}".format(e))
break

elif message.type == WSMsgType.CLOSED:
break
elif message.type == WSMsgType.ERROR:
break

except Exception as e:
self.log("Warn: Web Socket exception {}".format(e))
self.log("Warn: Web Socket exception in startup: {}".format(e))
continue

self.log("Warn: Web Socket closed, will try to reconnect in 5 seconds")
await asyncio.sleep(5)
if not self.base.stop_thread:
self.log("Warn: Web Socket closed, will try to reconnect in 5 seconds")
await asyncio.sleep(5)

def get_state(self, entity_id=None, default=None, attribute=None, refresh=False):
"""
Expand Down

0 comments on commit cedd9ee

Please sign in to comment.