Skip to content

Commit

Permalink
Stability fix for async issue (#1070)
Browse files Browse the repository at this point in the history
* Stability fix for async issue

* [pre-commit.ci lite] apply automatic fixes

* Fixes

* [pre-commit.ci lite] apply automatic fixes

---------

Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
  • Loading branch information
springfall2008 and pre-commit-ci-lite[bot] authored May 10, 2024
1 parent 48e8004 commit 3c90fa2
Showing 1 changed file with 39 additions and 69 deletions.
108 changes: 39 additions & 69 deletions apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
if not "PRED_GLOBAL" in globals():
PRED_GLOBAL = {}

THIS_VERSION = "v7.18.2"
THIS_VERSION = "v7.18.3"
PREDBAT_FILES = ["predbat.py"]
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
TIME_FORMAT_SECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
Expand Down Expand Up @@ -85,6 +85,7 @@
"friendly_name": "Predbat Active",
"type": "switch",
"default": False,
"restore": False,
},
{
"name": "pv_metric10_weight",
Expand Down Expand Up @@ -2522,10 +2523,10 @@ def find_charge_curve(self, discharge):
if soc_kwh_sensor and charge_rate_sensor and battery_power_sensor and predbat_status_sensor:
battery_power_sensor = battery_power_sensor.replace("number.", "sensor.") # Workaround as old template had number.
self.log("Find {} curve with sensors {} and {} and {} and {}".format(curve_type, soc_kwh_sensor, charge_rate_sensor, predbat_status_sensor, battery_power_sensor))
soc_kwh_data = self.base.get_history_async(entity_id=soc_kwh_sensor, days=self.base.max_days_previous)
charge_rate_data = self.base.get_history_async(entity_id=charge_rate_sensor, days=self.base.max_days_previous)
predbat_status_data = self.base.get_history_async(entity_id=predbat_status_sensor, days=self.base.max_days_previous)
battery_power_data = self.base.get_history_async(entity_id=battery_power_sensor, days=self.base.max_days_previous)
soc_kwh_data = self.base.get_history_wrapper(entity_id=soc_kwh_sensor, days=self.base.max_days_previous)
charge_rate_data = self.base.get_history_wrapper(entity_id=charge_rate_sensor, days=self.base.max_days_previous)
predbat_status_data = self.base.get_history_wrapper(entity_id=predbat_status_sensor, days=self.base.max_days_previous)
battery_power_data = self.base.get_history_wrapper(entity_id=battery_power_sensor, days=self.base.max_days_previous)

if soc_kwh_data and charge_rate_data and charge_rate_data and battery_power_data:
soc_kwh = self.base.minute_data(
Expand Down Expand Up @@ -4380,24 +4381,13 @@ class PredBat(hass.Hass):
The battery prediction class itself
"""

def run_async(self, func):
"""
Run async func as sync
"""
task = self.create_task(func)
count = 0
while not task.done() and count < 60 * 100:
time.sleep(0.01)
count += 1
if not task.done():
raise Exception("Timeout waiting for async function to complete")
return task.result()

def call_notify(self, message):
"""
Sync wrapper for call_notify
"""
return self.run_async(self.async_call_notify(message))
for device in self.notify_devices:
self.call_service("notify/" + device, message=message)
return True

async def async_call_notify(self, message):
"""
Expand Down Expand Up @@ -5025,27 +5015,20 @@ def load_car_energy(self, now_utc):
self.log("Car charging hold {} threshold {}".format(self.car_charging_hold, self.car_charging_threshold * 60.0))
return self.car_charging_energy

async def get_history_async_hook(self, entity_id, days):
def get_history_wrapper(self, entity_id, days=None):
"""
Async function to get history from HA
Async function to get history from HA using Async task
"""
if days:
history = await self.get_history(entity_id=entity_id, days=days)
history = self.get_history(entity_id=entity_id, days=days)
else:
history = await self.get_history(entity_id=entity_id)
return history
history = self.get_history(entity_id=entity_id)

def get_history_async(self, entity_id, days=None):
"""
Async function to get history from HA using Async task
"""
result = self.run_async(self.get_history_async_hook(entity_id, days))

if result is None:
if history is None:
self.log("Failure to fetch history for {}".format(entity_id))
raise ValueError
else:
return result
return history

def minute_data_import_export(self, now_utc, key, scale=1.0, required_unit=None, increment=True, smoothing=True):
"""
Expand All @@ -5062,7 +5045,7 @@ def minute_data_import_export(self, now_utc, key, scale=1.0, required_unit=None,
import_today = {}
for entity_id in entity_ids:
try:
history = self.get_history_async(entity_id=entity_id, days=self.max_days_previous)
history = self.get_history_wrapper(entity_id=entity_id, days=self.max_days_previous)
except (ValueError, TypeError):
history = []

Expand Down Expand Up @@ -5098,7 +5081,7 @@ def minute_data_load(self, now_utc, entity_name, max_days_previous, required_uni
load_minutes = {}
age_days = None
for entity_id in entity_ids:
history = self.get_history_async(entity_id=entity_id, days=max_days_previous)
history = self.get_history_wrapper(entity_id=entity_id, days=max_days_previous)
if history:
item = history[0][0]
try:
Expand Down Expand Up @@ -12988,13 +12971,13 @@ def fetch_inverter_data(self):
self.log("Base charge window {}".format(self.window_as_text(self.charge_window, self.charge_limit_percent)))
self.log("Base discharge window {}".format(self.window_as_text(self.discharge_window, self.discharge_limits)))

def manual_select(self, config_item, value):
async def async_manual_select(self, config_item, value):
"""
Wrapper for async manual times
Async wrapper for selection on manual times dropdown
"""
return self.run_async(self.async_manual_select(config_item, value))
return await self.run_in_executor(self.manual_select, config_item, value)

async def async_manual_select(self, config_item, value):
def manual_select(self, config_item, value):
"""
Selection on manual times dropdown
"""
Expand Down Expand Up @@ -13032,22 +13015,16 @@ async def async_manual_select(self, config_item, value):

if not item_value:
item_value = "off"
await self.async_manual_times(config_item, new_value=item_value)
self.manual_times(config_item, new_value=item_value)

# Update other drop downs that may need this time excluding
for item in CONFIG_ITEMS:
if item["name"] != config_item and item.get("manual"):
value = item.get("value", "")
if value and value != "reset" and exclude_list:
await self.async_manual_times(item["name"], exclude=exclude_list)
self.manual_times(item["name"], exclude=exclude_list)

def manual_times(self, config_item, exclude=[], new_value=None):
"""
Wrapper for async manual times
"""
return self.run_async(self.async_manual_times(config_item=config_item, exclude=exclude, new_value=new_value))

async def async_manual_times(self, config_item, exclude=[], new_value=None):
"""
Update manual times sensor
"""
Expand Down Expand Up @@ -13103,13 +13080,12 @@ async def async_manual_times(self, config_item, exclude=[], new_value=None):
item["options"] = time_values
if not values:
values = "off"
await self.async_expose_config(config_item, values, force=True)
self.expose_config(config_item, values, force=True)

if time_overrides:
time_txt = []
for minute in time_overrides:
time_txt.append(self.time_abs_str(minute))
self.log("Manual override: {} times now {}".format(config_item, time_txt))
return time_overrides

def fetch_config_options(self):
Expand Down Expand Up @@ -13518,13 +13494,13 @@ def download_predbat_file_from_github(self, tag, filename, new_filename):
self.log("WARN: Downloading Predbat file failed, URL {}".format(url))
return None

def download_predbat_version(self, version):
async def async_download_predbat_version(self, version):
"""
Sync wrapper for async download_predbat_version
"""
return self.run_async(self.async_download_predbat_version(version))
return await self.run_in_executor(self.download_predbat_version, version)

async def async_download_predbat_version(self, version):
def download_predbat_version(self, version):
"""
Download a version of Predbat from GitHub

Expand All @@ -13538,7 +13514,7 @@ async def async_download_predbat_version(self, version):
self.log("WARN: Predbat update requested for the same version as we are running ({}), no update required".format(version))
return

await self.async_expose_config("version", True, force=True, in_progress=True)
self.expose_config("version", True, force=True, in_progress=True)
tag_split = version.split(" ")
this_path = os.path.dirname(__file__)
self.log("Split returns {}".format(tag_split))
Expand Down Expand Up @@ -13576,7 +13552,7 @@ async def async_download_predbat_version(self, version):
self.pool = None

# Notify that we are about to update
await self.async_call_notify("Predbat: update to: {}".format(version))
self.call_notify("Predbat: update to: {}".format(version))

# Perform the update
self.log("Perform the update.....")
Expand Down Expand Up @@ -13743,13 +13719,10 @@ def get_ha_config(self, name, default):
return value, default
return None, default

def expose_config(self, name, value, quiet=True, event=False, force=False, in_progress=False):
"""
Wrapper for async expose config
"""
return self.run_async(self.async_expose_config(name, value, quiet, event, force, in_progress))

async def async_expose_config(self, name, value, quiet=True, event=False, force=False, in_progress=False):
return await self.run_in_executor(self.expose_config, name, value, quiet, event, force, in_progress)

def expose_config(self, name, value, quiet=True, event=False, force=False, in_progress=False):
"""
Share the config with HA
"""
Expand Down Expand Up @@ -13804,7 +13777,7 @@ async def async_expose_config(self, name, value, quiet=True, event=False, force=
options = item["options"]
if value not in options:
options.append(value)
old_state = await self.get_state(entity_id=entity)
old_state = self.get_state(entity_id=entity)
if old_state and old_state != value:
self.set_state(entity_id=entity, state=old_state, attributes={"friendly_name": item["friendly_name"], "options": options, "icon": icon})
self.set_state(entity_id=entity, state=value, attributes={"friendly_name": item["friendly_name"], "options": options, "icon": icon})
Expand Down Expand Up @@ -13858,13 +13831,10 @@ def dashboard_item(self, entity, state, attributes):
if entity not in self.dashboard_index:
self.dashboard_index.append(entity)

def update_save_restore_list(self):
"""
Sync wrapper for sync update_save_restore_list
"""
return self.run_async(self.async_update_save_restore_list())

async def async_update_save_restore_list(self):
return await self.run_in_executor(self.update_save_restore_list)

def update_save_restore_list(self):
"""
Update list of current Predbat settings
"""
Expand All @@ -13883,11 +13853,11 @@ async def async_update_save_restore_list(self):
for root, dirs, files in os.walk(self.save_restore_dir):
for name in files:
filepath = os.path.join(root, name)
if filepath.endswith(".yaml"):
if filepath.endswith(".yaml") and not name.startswith("."):
PREDBAT_SAVE_RESTORE.append(name)
item = self.config_index.get("saverestore", None)
item["options"] = PREDBAT_SAVE_RESTORE
await self.async_expose_config("saverestore", None)
self.expose_config("saverestore", None)

async def async_restore_settings_yaml(self, filename):
"""
Expand Down Expand Up @@ -14028,7 +13998,7 @@ def load_previous_value_from_ha(self, entity):
ha_value = self.get_state(entity)
if ha_value is not None:
return ha_value
history = self.get_history_async(entity_id=entity)
history = self.get_history_wrapper(entity_id=entity)
if history:
history = history[0]
ha_value = history[-1]["state"]
Expand Down

0 comments on commit 3c90fa2

Please sign in to comment.