Skip to content

Commit

Permalink
Improve connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
hooverd-uits committed Feb 14, 2022
1 parent 47ba972 commit 720d5fd
Showing 1 changed file with 53 additions and 29 deletions.
82 changes: 53 additions & 29 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import asyncio
from functools import partial
import json
import logging
import os
from typing import Optional
import websockets

from dataclasses import dataclass
from functools import partial
from typing import Optional

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from websockets.client import connect
from websockets.exceptions import ConnectionClosed

logging.basicConfig()
logger = logging.getLogger(__name__)
Expand All @@ -33,42 +33,66 @@ def __post_init__(self):
self.value = float(self.value)


hs_ws_url = os.environ["HUBITAT_ELEVATION_WEBSOCKET_URL"]
influxdb_url = os.environ["INFLUXDB_URL"]
token = os.environ["INFLUXDB_TOKEN"]
org = os.environ["INFLUXDB_ORG"]
bucket = os.environ["INFLUXDB_BUCKET"]
@dataclass
class HubitatConfig:
url: str


@dataclass
class InfluxDBConfig:
url: str
token: str
org: str
bucket: str


hubitat_config = HubitatConfig(url=os.environ["HUBITAT_ELEVATION_WEBSOCKET_URL"])

influxdb_config = InfluxDBConfig(
url=os.environ["INFLUXDB_URL"],
token=os.environ["INFLUXDB_TOKEN"],
org=os.environ["INFLUXDB_ORG"],
bucket=os.environ["INFLUXDB_BUCKET"],
)


client = InfluxDBClient(url=influxdb_url, token=token, org=org)
client = InfluxDBClient(
url=influxdb_config.url, token=influxdb_config.token, org=influxdb_config.org
)
write_api = client.write_api(write_options=SYNCHRONOUS)


async def write_event(record):
return asyncio.get_running_loop().run_in_executor(
None, partial(write_api.write, bucket, record=record)
None, partial(write_api.write, influxdb_config.bucket, record=record)
)


async def he_event():
async with websockets.connect(hs_ws_url) as websocket:
loop = asyncio.get_running_loop()

async for message in websocket:
try:
event = HubitatElevationEvent(**json.loads(message))
logger.info(
"%s (%s): %s"
% (event.displayName, event.deviceId, event.descriptionText)
async for websocket in connect(hubitat_config.url):
# loop = asyncio.get_running_loop()
try:
async for message in websocket:
try:
event = HubitatElevationEvent(**json.loads(message))
logger.info(
"%s (%s): %s"
% (event.displayName, event.deviceId, event.descriptionText)
)
except Exception as ex:
logger.error(
"Failed parsing message from HE websocket: %s" % message
)

p = (
Point("home")
.tag("device", event.displayName)
.field(event.name, event.value)
)
except Exception as ex:
logger.error("Failed parsing message from HE websocket: %s" % message)

p = (
Point("home")
.tag("device", event.displayName)
.field(event.name, event.value)
)
await write_event(p)
await write_event(p)
except ConnectionClosed:
logger.error("Connection closed... reconnecting.")
continue


asyncio.run(he_event())

0 comments on commit 720d5fd

Please sign in to comment.