Skip to content

Commit

Permalink
Catch some websocket errors (#1148)
Browse files Browse the repository at this point in the history
  • Loading branch information
springfall2008 authored May 27, 2024
1 parent 3f7eea8 commit 469a309
Showing 1 changed file with 72 additions and 70 deletions.
142 changes: 72 additions & 70 deletions apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
if not "PRED_GLOBAL" in globals():
PRED_GLOBAL = {}

THIS_VERSION = "v7.20.3"
THIS_VERSION = "v7.20.4"
PREDBAT_FILES = ["predbat.py"]
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
TIME_FORMAT_SECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
Expand Down Expand Up @@ -15103,7 +15103,6 @@ def update_time_loop(self, cb_args):
"""
if self.update_pending and not self.prediction_started:
self.prediction_started = True
self.ha_interface.update_states()
self.load_user_config()
self.update_pending = False
try:
Expand Down Expand Up @@ -15196,78 +15195,81 @@ async def socketLoop(self):
self.log("Info: Start socket for url {}".format(url))
session = ClientSession()
if session:
async with session.ws_connect(url) as websocket:
await websocket.send_json({"type": "auth", "access_token": self.ha_key})
sid = 1

# Subscribe to all state changes
await websocket.send_json({"id": sid, "type": "subscribe_events", "event_type": "state_changed"})
sid += 1

# Listen for services
await websocket.send_json({"id": sid, "type": "subscribe_events", "event_type": "call_service"})
sid += 1

# Fire events to say we have registered services
for item in self.base.SERVICE_REGISTER_LIST:
await websocket.send_json(
{"id": sid, "type": "fire_event", "event_type": "service_registered", "event_data": {"service": item["service"], "domain": item["domain"]}}
)
sid += 1
try:
async with session.ws_connect(url) as websocket:
await websocket.send_json({"type": "auth", "access_token": self.ha_key})
sid = 1

self.log("Info: Web Socket active")
# Subscribe to all state changes
await websocket.send_json({"id": sid, "type": "subscribe_events", "event_type": "state_changed"})
sid += 1

async for message in websocket:
if self.base.stop_thread:
self.log("Info: Web socket stopping")
await session.close()
break
# Listen for services
await websocket.send_json({"id": sid, "type": "subscribe_events", "event_type": "call_service"})
sid += 1

if message.type == WSMsgType.TEXT:
try:
data = json.loads(message.data)
message_type = data.get("type", "")
if message_type == "event":
event_info = data.get("event", {})
event_type = event_info.get("event_type", "")
if event_type == "state_changed":
event_data = event_info.get("data", {})
old_state = event_data.get("old_state")
new_state = event_data.get("new_state")
if new_state:
self.update_state_item(new_state)
# Only trigger on value change or you get too many updates
if new_state.get("state", None) != old_state.get("state", None):
await self.base.trigger_watch_list(
new_state["entity_id"], event_data.get("attribute", None), event_data.get("old_state", None), new_state
)
elif event_type == "call_service":
service_data = event_info.get("data", {})
await self.base.trigger_callback(service_data)
else:
self.log("Info: Web Socket unknown message {}".format(data))
elif message_type == "result":
success = data.get("success", False)
if not success:
self.log("Warn: Web Socket result failed {}".format(data))
elif message_type == "auth_required":
pass
elif message_type == "auth_ok":
# Fire events to say we have registered services
for item in self.base.SERVICE_REGISTER_LIST:
await websocket.send_json(
{"id": sid, "type": "fire_event", "event_type": "service_registered", "event_data": {"service": item["service"], "domain": item["domain"]}}
)
sid += 1

self.log("Info: Web Socket active")

async for message in websocket:
if self.base.stop_thread:
self.log("Info: Web socket stopping")
await session.close()
break

if message.type == WSMsgType.TEXT:
try:
data = json.loads(message.data)
if data:
message_type = data.get("type", "")
if message_type == "event":
event_info = data.get("event", {})
event_type = event_info.get("event_type", "")
if event_type == "state_changed":
event_data = event_info.get("data", {})
old_state = event_data.get("old_state", {})
new_state = event_data.get("new_state", {})
if new_state:
self.update_state_item(new_state)
# Only trigger on value change or you get too many updates
if new_state.get("state", None) != old_state.get("state", None):
await self.base.trigger_watch_list(
new_state["entity_id"], event_data.get("attribute", None), event_data.get("old_state", None), new_state
)
elif event_type == "call_service":
service_data = event_info.get("data", {})
await self.base.trigger_callback(service_data)
else:
self.log("Info: Web Socket unknown message {}".format(data))
elif message_type == "result":
success = data.get("success", False)
if not success:
self.log("Warn: Web Socket result failed {}".format(data))
elif message_type == "auth_required":
pass
elif message_type == "auth_ok":
pass
elif message_type == "auth_invalid":
self.log("Warn: Web Socket auth failed, check your ha_key setting")
self.websocket_active = False
raise Exception("Web Socket auth failed")
else:
self.log("Info: Web Socket unknown message {}".format(data))
except Exception as e:
self.log("Warn Web Socket exception {}".format(e))
pass
elif message_type == "auth_invalid":
self.log("Warn: Web Socket auth failed, check your ha_key setting")
self.websocket_active = False
raise Exception("Web Socket auth failed")
else:
self.log("Info: Web Socket unknown message {}".format(data))

except Exception as e:
self.log("Warn Web Socket exception {}".format(e))
pass
elif message.type == WSMsgType.CLOSED:
break
elif message.type == WSMsgType.ERROR:
break
elif message.type == WSMsgType.CLOSED:
break
elif message.type == WSMsgType.ERROR:
break
except Exception as e:
self.log("Warn: Web Socket exception {}".format(e))

self.log("Warn: Web Socket closed, will try to reconnect in 5 seconds")
await asyncio.sleep(5)
Expand Down

0 comments on commit 469a309

Please sign in to comment.