Skip to content

Commit

Permalink
Get history directly from HA when possible (#1077)
Browse files Browse the repository at this point in the history
* Get history directly from HA when possible

---------

Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
  • Loading branch information
springfall2008 and pre-commit-ci-lite[bot] authored May 11, 2024
1 parent 84a7bf6 commit 8ea3157
Showing 1 changed file with 85 additions and 11 deletions.
96 changes: 85 additions & 11 deletions apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
PREDICT_STEP = 5
RUN_EVERY = 5
CONFIG_ROOTS = ["/config", "/conf", "/homeassistant"]
TIME_FORMAT_HA = "%Y-%m-%dT%H:%M:%S%z"
TIMEOUT = 60 * 2

# 240v x 100 amps x 3 phases / 1000 to kW / 60 minutes in an hour is the maximum kWh in a 1 minute period
MAX_INCREMENT = 240 * 100 * 3 / 1000 / 60
Expand Down Expand Up @@ -5039,17 +5041,7 @@ def get_history_wrapper(self, entity_id, days=None):
"""
Async function to get history from HA using Async task
"""
for retry in range(5):
if retry:
time.sleep(retry * 5)

if days:
history = self.get_history(entity_id=entity_id, days=days)
else:
history = self.get_history(entity_id=entity_id)
if history is not None:
break
self.log("WARN: Unable to fetch history for {} retry {}".format(entity_id, retry))
history = self.ha_interface.get_history(entity_id, days=days, now=self.now)

if history is None:
self.log("Error: Failure to fetch history for {}".format(entity_id))
Expand Down Expand Up @@ -13366,6 +13358,7 @@ def update_time(self, print=True):
now_utc += timedelta(minutes=self.simulate_offset)

self.now_utc = now_utc
self.now = now
self.midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
self.midnight_utc = now_utc.replace(hour=0, minute=0, second=0, microsecond=0)

Expand Down Expand Up @@ -14355,6 +14348,8 @@ def initialize(self):
self.midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
self.minutes_now = int((now - self.midnight).seconds / 60)

self.ha_interface = HAInterface(self)

try:
self.reset()
self.sanity()
Expand Down Expand Up @@ -14478,3 +14473,82 @@ def run_time_loop_balance(self, cb_args):
self.log("ERROR: Exception raised {}".format(e))
self.record_status("ERROR: Exception raised {}".format(e))
raise


class HAInterface:
"""
Direct interface to Home Assistant
"""

def __init__(self, base):
self.ha_key = os.environ.get("SUPERVISOR_TOKEN", None)
self.ha_url = "http://supervisor/core"
self.base = base
self.log = base.log
if not self.ha_key:
self.log("WARN: Supervisor token not found, will use direct HA API")

def get_history(self, sensor, now, days=30):
"""
Get the history for a sensor from Home Assistant.

:param sensor: The sensor to get the history for.
:return: The history for the sensor.
"""
if not self.ha_key:
return self.base.get_history(sensor, days=days)

start = now - timedelta(days=days)
end = now
res = self.api_call("/api/history/period/{}".format(start.strftime(TIME_FORMAT_HA)), {"filter_entity_id": sensor, "end_time": end.strftime(TIME_FORMAT_HA)})
return res

def set_state(self, entity_id, state, attributes=None):
"""
Set the state of an entity in Home Assistant.
"""
if not self.ha_key:
if attributes:
return self.base.set_state(entity_id, state=state)
else:
return self.base.set_state(entity_id, state=state, attributes=attributes)

data = {"state": state}
if attributes:
data["attributes"] = attributes
self.api_call("/api/states/{}".format(entity_id), data, post=True)

def api_call(self, endpoint, data_in=None, post=False):
"""
Make an API call to Home Assistant.

:param endpoint: The API endpoint to call.
:param data_in: The data to send in the body of the request.
:param post: True if this is a POST request, False for GET.
:return: The response from the API.
"""
url = self.ha_url + endpoint
headers = {
"Authorization": "Bearer " + self.ha_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
if post:
if data_in:
response = requests.post(url, headers=headers, json=data_in, timeout=TIMEOUT)
else:
response = requests.post(url, headers=headers, timeout=TIMEOUT)
else:
if data_in:
response = requests.get(url, headers=headers, params=data_in, timeout=TIMEOUT)
else:
response = requests.get(url, headers=headers, timeout=TIMEOUT)
try:
data = response.json()
except requests.exceptions.JSONDecodeError:
self.log("Warn: Failed to decode response from {}".format(url))
data = None
except (requests.Timeout, requests.exceptions.ReadTimeout):
self.log("Warn: Timeout from {}".format(url))
data = None
return data

0 comments on commit 8ea3157

Please sign in to comment.