From cea6064356d313646656a7ddae4ab4207cda8cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=2E=20L=C3=B3pez?= Date: Thu, 8 Feb 2024 12:55:31 +0100 Subject: [PATCH] 3.x branch opened. Everything can change, API changes everywhere --- .gitignore | 2 +- Pipfile | 2 +- custom_components/delorian/manifest.json | 4 +- custom_components/delorian/sensor.py | 45 +-- ha-historical-sensor.sublime-project | 40 +++ .../recorderutil.py | 161 --------- homeassistant_historical_sensor/sensor.py | 205 ++++-------- homeassistant_historical_sensor/state.py | 36 +- .../timemachine.py | 310 ++++++++++++++++++ pyproject.toml | 2 +- 10 files changed, 473 insertions(+), 334 deletions(-) create mode 100644 ha-historical-sensor.sublime-project delete mode 100644 homeassistant_historical_sensor/recorderutil.py create mode 100644 homeassistant_historical_sensor/timemachine.py diff --git a/.gitignore b/.gitignore index 5068f44..c18ce16 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ *.egg-info/ *.pyc -*.sublime-project +*.sublime-workspace __pycache__ dist/ home-assistant-historical-sensor.sublime-workspace diff --git a/Pipfile b/Pipfile index c51e50d..20ab419 100644 --- a/Pipfile +++ b/Pipfile @@ -6,10 +6,10 @@ name = "pypi" [packages] [dev-packages] +# homeassistant-historical-sensor = {editable = true, path = "."} black = "*" build = "*" homeassistant = ">=2024.1.0" -homeassistant-historical-sensor = {editable = true, path = "."} ipdb = "*" ipython = "*" isort = "*" diff --git a/custom_components/delorian/manifest.json b/custom_components/delorian/manifest.json index 79a3aea..b7c3bc5 100644 --- a/custom_components/delorian/manifest.json +++ b/custom_components/delorian/manifest.json @@ -12,7 +12,7 @@ "iot_class": "cloud_polling", "issue_tracker": "https://github.com/ldotlopez/ha-historical-sensor/issues", "requirements": [ - "homeassistant-historical-sensor==2.0.0rc4" + "homeassistant-historical-sensor==3.0.0a1" ], - "version": "2.0.0rc4" + "version": "3.0.0a1" } diff --git a/custom_components/delorian/sensor.py b/custom_components/delorian/sensor.py index 7077c6d..32bffe5 100644 --- a/custom_components/delorian/sensor.py +++ b/custom_components/delorian/sensor.py @@ -23,9 +23,10 @@ # Important methods include comments about code itself and reasons behind them # -import itertools +import math import statistics from datetime import datetime, timedelta +from zoneinfo import ZoneInfo from homeassistant.components.recorder.models import StatisticData, StatisticMetaData from homeassistant.components.sensor import SensorDeviceClass, SensorEntity @@ -41,6 +42,7 @@ HistoricalState, PollUpdateMixin, ) +from homeassistant_historical_sensor.state import group_by_interval from .api import API from .const import DOMAIN, NAME @@ -92,18 +94,29 @@ async def async_update_historical(self): # This functions is equivaled to the `Sensor.async_update` from # HomeAssistant core # - # Important: You must provide datetime with tzinfo + # Important: ts is in UTC - hist_states = [ + upstream_data = self.api.fetch( + start=datetime.now() - timedelta(days=3), step=timedelta(minutes=15) + ) + + upstream_data_with_timestamps = [ + ( + dt.timestamp() if dt.tzinfo else dtutil.as_local(dt).timestamp(), + state, + ) + for (dt, state) in upstream_data + ] + + historical_states = [ HistoricalState( state=state, - dt=dtutil.as_local(dt), # Add tzinfo, required by HistoricalSensor - ) - for (dt, state) in self.api.fetch( - start=datetime.now() - timedelta(days=3), step=timedelta(minutes=15) + ts=ts, ) + for (ts, state) in upstream_data_with_timestamps ] - self._attr_historical_states = hist_states + + self._attr_historical_states = historical_states @property def statistic_id(self) -> str: @@ -131,24 +144,18 @@ async def async_calculate_statistic_data( accumulated = latest["sum"] if latest else 0 - def hour_block_for_hist_state(hist_state: HistoricalState) -> datetime: - # XX:00:00 states belongs to previous hour block - if hist_state.dt.minute == 0 and hist_state.dt.second == 0: - dt = hist_state.dt - timedelta(hours=1) - return dt.replace(minute=0, second=0, microsecond=0) - - else: - return hist_state.dt.replace(minute=0, second=0, microsecond=0) - ret = [] - for dt, collection_it in itertools.groupby( - hist_states, key=hour_block_for_hist_state + for block_ts, collection_it in group_by_interval( + hist_states, granurality=60 * 60 ): collection = list(collection_it) + mean = statistics.mean([x.state for x in collection]) partial_sum = sum([x.state for x in collection]) accumulated = accumulated + partial_sum + dt = datetime.fromtimestamp(block_ts).replace(tzinfo=ZoneInfo("UTC")) + ret.append( StatisticData( start=dt, diff --git a/ha-historical-sensor.sublime-project b/ha-historical-sensor.sublime-project new file mode 100644 index 0000000..a3de4d7 --- /dev/null +++ b/ha-historical-sensor.sublime-project @@ -0,0 +1,40 @@ +{ + "folders": + [ + { + "file_exclude_patterns": [ + "*.pyc", + "*.swp", + "Pipfile.lock" + ], + "folder_exclude_patterns": [ + "*.egg-info", + ".mypy_cache", + ".venv", + "__pycache__", + "dist", + ], + "follow_symlinks": true, + "path": ".", + } + ], + "settings": { + "python_interpreter": "${project_path}/.venv/bin/python", + + "sublack.black_command": "${project_path}/.venv/bin/black", + "sublack.black_on_save": true, + + "isort.sort_on_save": false, + + "SublimeLinter.linters.flake8.executable": "${project_path}/.venv/bin/flake8", + "SublimeLinter.linters.flake8.disable": false, + + "SublimeLinter.linters.mypy.executable": "${project_path}/.venv/bin/mypy", + "SublimeLinter.linters.mypy.disable": false, + // "SublimeLinter.linters.mypy.args": ["--ignore-missing-imports"], + + "SublimeLinter.linters.pycodestyle.executable": "${project_path}/.venv/bin/pycodestyle", + "SublimeLinter.linters.pycodestyle.disable": true, + } + +} diff --git a/homeassistant_historical_sensor/recorderutil.py b/homeassistant_historical_sensor/recorderutil.py deleted file mode 100644 index b13302f..0000000 --- a/homeassistant_historical_sensor/recorderutil.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (C) 2021-2023 Luis López -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - - -import logging -from contextlib import contextmanager -from typing import Literal - -from homeassistant.components import recorder -from homeassistant.components.recorder import Recorder, db_schema -from homeassistant.components.recorder.statistics import ( - StatisticsRow, - get_last_statistics, -) -from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN -from homeassistant.core import HomeAssistant -from homeassistant.helpers.entity import Entity -from sqlalchemy import Select, not_, or_, select -from sqlalchemy.orm import Session - -_LOGGER = logging.getLogger(__name__) - - -@contextmanager -def hass_recorder_session(hass: HomeAssistant): - r = recorder.get_instance(hass) - with recorder.util.session_scope(session=r.get_session()) as session: - yield session - - -async def hass_get_entity_states_metadata_id( - hass: HomeAssistant, entity: Entity -) -> int | None: - rec = recorder.get_instance(hass) - return await rec.async_add_executor_job( - recorder_get_entity_states_metadata_id, rec, entity - ) - - -def recorder_get_entity_states_metadata_id(rec: Recorder, entity: Entity) -> int | None: - with rec.get_session() as sess: - return rec.states_meta_manager.get(entity.entity_id, sess, True) - - -async def get_last_statistics_wrapper( - hass: HomeAssistant, - statistic_id: str, - *, - convert_units: bool = True, - types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] = { - "last_reset", - "max", - "mean", - "min", - "state", - "sum", - }, -) -> StatisticsRow | None: - res = await recorder.get_instance(hass).async_add_executor_job( - get_last_statistics, - hass, - 1, - statistic_id, - convert_units, - types, - ) - if not res: - return None - - return res[statistic_id][0] - - -def _entity_id_states_stmt(session: Session, entity: Entity) -> Select: - return ( - select(db_schema.States) - .join(db_schema.StatesMeta) - .where(db_schema.StatesMeta.entity_id == entity.entity_id) - ) - - -def get_entity_states_meta(session: Session, entity: Entity) -> db_schema.StatesMeta: - # Don't re-use _entity_id_states_stmt. - # It's posible to have a StatesMeta for the current entity but zero States in the - # database. - # In that case the _entity_id_states_stmt will return zero rows but it doesn't mean - # that we need to create a new StatesMeta - - res = session.execute( - select(db_schema.StatesMeta).where( - db_schema.StatesMeta.entity_id == entity.entity_id - ) - ).scalar() - - if res: - return res - - else: - ret = db_schema.StatesMeta(entity_id=entity.entity_id) - session.add(ret) - session.commit() - - return ret - - -def delete_entity_invalid_states(session: Session, entity: Entity) -> int: - stmt = _entity_id_states_stmt(session, entity).order_by( - db_schema.States.last_updated_ts.asc() - ) - - prev = None - to_delete = [] - - for state in session.execute(stmt).scalars(): - if state.state in [STATE_UNKNOWN, STATE_UNAVAILABLE]: - to_delete.append(state) - else: - state.old_state_id = prev.state_id if prev else None # type: ignore[attr-defined] - session.add(state) - prev = state - - for state in to_delete: - session.delete(state) - - session.commit() - - return len(to_delete) - - -def get_entity_latest_state(session: Session, entity: Entity): - stmt = ( - _entity_id_states_stmt(session, entity) - .where( - not_( - or_( - db_schema.States.state == STATE_UNAVAILABLE, - db_schema.States.state == STATE_UNKNOWN, - ) - ) - ) - .order_by(db_schema.States.last_updated_ts.desc()) - ) - return session.execute(stmt).scalar() - - -def save_states(session: Session, states: list[db_schema.States]): - session.add_all(states) - session.commit() diff --git a/homeassistant_historical_sensor/sensor.py b/homeassistant_historical_sensor/sensor.py index f22d809..b37b0fa 100644 --- a/homeassistant_historical_sensor/sensor.py +++ b/homeassistant_historical_sensor/sensor.py @@ -14,7 +14,6 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. - import logging from abc import abstractmethod from datetime import datetime, timedelta @@ -33,19 +32,10 @@ ) from homeassistant.components.sensor import SensorEntity from homeassistant.helpers.event import async_call_later, async_track_time_interval -from homeassistant.util import dt as dtutil +from . import timemachine as tm from .consts import DELAY_ON_MISSING_STATES_METADATA from .patches import _build_attributes, _stringify_state -from .recorderutil import ( - delete_entity_invalid_states, - get_entity_latest_state, - get_entity_states_meta, - get_last_statistics_wrapper, - hass_get_entity_states_metadata_id, - hass_recorder_session, - save_states, -) from .state import HistoricalState _LOGGER = logging.getLogger(__name__) @@ -131,14 +121,17 @@ async def async_update_historical(self): """ raise NotImplementedError() - async def _schedule_on_missing_states_metadata(self, fn) -> bool: - metadata_id = await hass_get_entity_states_metadata_id(self.hass, self) - if metadata_id is not None: + async def _schedule_on_missing_states_meta(self, fn) -> bool: + states_metadata_id = await tm.hass_get_entity_states_metadata_id( + self.hass, self + ) + + if states_metadata_id is not None: return False - _LOGGER.debug( - f"{self.entity_id} not yet fully ready, StatesMeta object is not ready." - + f"Retry in {DELAY_ON_MISSING_STATES_METADATA} seconds" + _LOGGER.warning( + f"{self.entity_id}: not yet fully ready, states meta information is " + + f"unavailablele, retring in {DELAY_ON_MISSING_STATES_METADATA} seconds." ) async_call_later(self.hass, DELAY_ON_MISSING_STATES_METADATA, fn) @@ -150,52 +143,41 @@ async def async_write_ha_historical_states(self, _=None): This method writes `self.historical_states` into database """ - if await self._schedule_on_missing_states_metadata( + if await self._schedule_on_missing_states_meta( self.async_write_ha_historical_states ): return - _LOGGER.debug(f"{self.entity_id} states medata ready") - - hist_states = self.historical_states - if any([True for x in hist_states if x.dt.tzinfo is None]): - _LOGGER.error("historical_states MUST include tzinfo") - return - - hist_states = list(sorted(hist_states, key=lambda x: x.dt)) - _LOGGER.debug( - f"{self.entity_id}: " - + f"{len(hist_states)} historical states present in sensor" - ) - - if not hist_states: - return + _LOGGER.debug(f"{self.entity_id} states meta ready") # Write states - n = len(await self._async_write_states(hist_states)) + n = len(await self._async_write_states(self.historical_states)) _LOGGER.debug(f"{self.entity_id}: {n} states written into the database") - # Write statistics - n = len(await self._async_write_statistics(hist_states)) + # # Write statistics + n = len(await self._async_write_statistics(self.historical_states)) _LOGGER.debug(f"{self.entity_id}: {n} statistics points written into database") async def _async_write_states( self, hist_states: list[HistoricalState] - ) -> list[HistoricalState]: + ) -> list[db_schema.States]: return await recorder.get_instance(self.hass).async_add_executor_job( self._recorder_write_states, hist_states ) def _recorder_write_states( self, hist_states: list[HistoricalState] - ) -> list[HistoricalState]: - with hass_recorder_session(self.hass) as session: + ) -> list[db_schema.States]: + if not hist_states: + return [] + + with tm.hass_recorder_session(self.hass) as session: # # Delete invalid states # try: - n_states = delete_entity_invalid_states(session, self) + n_states = len(tm.delete_invalid_states(session, self)) _LOGGER.debug(f"{self.entity_id}: cleaned {n_states} invalid states") except sqlalchemy.exc.IntegrityError: @@ -206,61 +188,14 @@ def _recorder_write_states( + "This is not critical just unsightly for some graphs " ) - # - # Check latest state in the database - # - - try: - latest = get_entity_latest_state(session, self) - - except sqlalchemy.exc.DatabaseError: - _LOGGER.debug( - "Error: Current recorder schema is not supported. " - + "This error is fatal, please file a bug" - ) - return [] - - # - # Drop historical states older than lastest db state - # - - # About deleting intersecting states instead of drop incomming - # overlapping states: This approach has been tested several times and - # always ends up causing unexpected failures. Sometimes the database - # schema changes and sometimes, depending on the engine, integrity - # failures appear. It is better to discard the new overlapping states - # than to delete them from the database. - - if latest: - cutoff = dtutil.utc_from_timestamp(latest.last_updated_ts or 0) - _LOGGER.debug( - f"{self.entity_id}: " - + f"lastest state found at {cutoff} ({latest.state})" - ) - hist_states = [x for x in hist_states if x.dt > cutoff] - - else: - _LOGGER.debug(f"{self.entity_id}: no previous state found") - - # - # Check if there are any states left - # - if not hist_states: - _LOGGER.debug(f"{self.entity_id}: no new states") - return [] - - n_hist_states = len(hist_states) - _LOGGER.debug(f"{self.entity_id}: found {n_hist_states} new states") - # # Build recorder States # - state_meta = get_entity_states_meta(session, self) db_states: list[db_schema.States] = [] - for idx, hist_state in enumerate(hist_states): - attrs_as_dict = _build_attributes(self) - attrs_as_dict.update(hist_state.attributes) + base_attrs_dict = _build_attributes(self) + for hist_state in hist_states: + attrs_as_dict = base_attrs_dict | hist_state.attributes attrs_as_str = db_schema.JSON_DUMP(attrs_as_dict) attrs_as_bytes = ( @@ -275,26 +210,18 @@ def _recorder_write_states( hash=attrs_hash, shared_attrs=attrs_as_str ) - ts = dtutil.as_timestamp(hist_state.dt) state = db_schema.States( - # entity_id=self.entity_id, - states_meta_rel=state_meta, - last_changed_ts=ts, - last_updated_ts=ts, - old_state=db_states[idx - 1] if idx else latest, + last_changed_ts=hist_state.ts, + last_updated_ts=hist_state.ts, state=_stringify_state(self, hist_state.state), state_attributes=state_attributes, ) - # _LOGGER.debug( - # f"new state: " - # f"dt={dtutil.as_local(hist_state.dt)} value={hist_state.state}" - # ) db_states.append(state) - save_states(session, db_states) + ret = tm.save_states(session, self, db_states, overwrite_overlaping=True) - return hist_states + return ret async def _async_write_statistics( self, hist_states: list[HistoricalState] @@ -303,58 +230,58 @@ async def _async_write_statistics( _LOGGER.debug(f"{self.entity_id}: statistics are not enabled") return [] - statistics_meta = self.get_statistic_metadata() + if not hist_states: + return [] + + statistics_metadata = self.get_statistic_metadata() - latest = await get_last_statistics_wrapper( - self.hass, statistics_meta["statistic_id"] + hist_states = list(sorted(hist_states, key=lambda x: x.ts)) + + latest_stats_data = await tm.hass_get_last_statistic( + self.hass, statistics_metadata ) - # Don't do this, see notes above "About deleting intersecting states" - # - # def delete_statistics(): - # with recorder.session_scope( - # session=self.recorder.get_session() - # ) as session: - # start_cutoff = hist_states[0].when - timedelta(hours=1) - # end_cutoff = hist_states[-1].when - # qs = ( - # session.query(db_schema.Statistics) - # .join( - # db_schema.StatisticsMeta, - # db_schema.Statistics.metadata_id == db_schema.StatisticsMeta.id, - # isouter=True, - # ) - # .filter(db_schema.Statistics.start >= start_cutoff) - # .filter(db_schema.Statistics.start < end_cutoff) - # ) - # stats = [x.id for x in qs] # - # clear_statistics(self.recorder, stats) - # _LOGGER.debug(f"Cleared {len(stats)} statistics") + # Handle overlaping stats. # - # await self.recorder.async_add_executor_job(delete_statistics) - hist_states = self.historical_states - if latest is not None: - cutoff = dtutil.utc_from_timestamp(latest["start"]) + timedelta(hours=1) - hist_states = [x for x in hist_states if x.dt > cutoff] + overwrite = True + + if overwrite: + + def _delete_stats_since(ts: int): + with tm.hass_recorder_session(self.hass) as session: + return tm.delete_statistics_since( + session, statistics_metadata["statistic_id"], since=ts + ) + + deleted_statistics = await recorder.get_instance( + self.hass + ).async_add_executor_job(_delete_stats_since, hist_states[0].ts) + + _LOGGER.debug( + f"{statistics_metadata['statistic_id']}: " + + f"deleted {len(deleted_statistics)} statistics" + ) + + else: + if latest_stats_data is not None: + cutoff = latest_stats_data["start"] + 60 * 60 + hist_states = [x for x in hist_states if x.ts > cutoff] # # Calculate stats # statistics_data = await self.async_calculate_statistic_data( - hist_states, latest=latest + hist_states, latest=latest_stats_data ) - # for stat in statistics_data: - # tmp = dict(stat) - # start_dt = dtutil.as_local(tmp.pop("start")) - # _LOGGER.debug(f"new statistic: start={start_dt}, value={tmp!r}") - if valid_statistic_id(self.statistic_id): - async_add_external_statistics(self.hass, statistics_meta, statistics_data) + async_add_external_statistics( + self.hass, statistics_metadata, statistics_data + ) else: - async_import_statistics(self.hass, statistics_meta, statistics_data) + async_import_statistics(self.hass, statistics_metadata, statistics_data) return hist_states diff --git a/homeassistant_historical_sensor/state.py b/homeassistant_historical_sensor/state.py index b7fe601..d3d908c 100644 --- a/homeassistant_historical_sensor/state.py +++ b/homeassistant_historical_sensor/state.py @@ -16,26 +16,42 @@ # USA. +import functools +import itertools +from collections.abc import Iterator from dataclasses import asdict, dataclass, field -from datetime import datetime +from math import ceil from typing import Any -from homeassistant.util import dt as dtutil - @dataclass class HistoricalState: state: Any - dt: datetime + ts: float attributes: dict[str, Any] = field(default_factory=dict) def asdict(self): return asdict(self) - def as_value_and_timestamp(self): - if not self.dt.tzinfo: - raise ValueError(f"{self}.dt is missing tzinfo") - utc = dtutil.as_utc(self.dt) - ts = dtutil.utc_to_timestamp(utc) - return self.state, ts +def group_by_interval( + historical_states: list[HistoricalState], **blockize_kwargs +) -> Iterator[Any]: + fn = functools.partial(blockize, **blockize_kwargs) + yield from itertools.groupby(historical_states, key=lambda x: fn) + + +def blockize( + hist_state: HistoricalState, + *, + granurality: int = 60 * 60, + border_in_previous_block: int = True, +) -> int: + ts = ceil(hist_state.ts) + block = ts // granurality + leftover = ts % granurality + + if border_in_previous_block and leftover == 0: + block = block - 1 + + return block * granurality diff --git a/homeassistant_historical_sensor/timemachine.py b/homeassistant_historical_sensor/timemachine.py new file mode 100644 index 0000000..b70b1c7 --- /dev/null +++ b/homeassistant_historical_sensor/timemachine.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 + +import logging +from contextlib import contextmanager +from typing import Literal, cast + +from homeassistant.components import recorder +from homeassistant.components.recorder import Recorder, db_schema +from homeassistant.components.recorder.models import StatisticData, StatisticMetaData +from homeassistant.components.recorder.statistics import ( + StatisticsRow, + get_last_statistics, +) +from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import Entity +from sqlalchemy import Select, and_, not_, or_, select +from sqlalchemy.orm import Session + +_LOGGER = logging.getLogger(__name__) + + +@contextmanager +def hass_recorder_session(hass: HomeAssistant): + r = recorder.get_instance(hass) + with recorder_session(r) as session: + yield session + + +@contextmanager +def recorder_session(rec: Recorder): + with recorder.util.session_scope(session=rec.get_session()) as session: + yield session + + +async def hass_get_entity_states_metadata_id( + hass: HomeAssistant, entity: Entity +) -> int | None: + r = recorder.get_instance(hass) + return await r.async_add_executor_job( + recorder_get_entity_states_metadata_id, r, entity.entity_id + ) + + +def recorder_get_entity_states_metadata_id(rec: Recorder, entity_id: str) -> int | None: + with recorder_session(rec) as sess: + return rec.states_meta_manager.get(entity_id, sess, True) + + +def get_states_meta(session: Session, entity_id: str) -> db_schema.StatesMeta: + stmt = select(db_schema.StatesMeta).where( + db_schema.StatesMeta.entity_id == entity_id + ) + + return session.execute(stmt).scalar_one() + + +async def hass_get_last_statistic( + hass: HomeAssistant, + statistics_metadata: StatisticMetaData, + *, + convert_units: bool = True, + types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]] = { + "last_reset", + "max", + "mean", + "min", + "state", + "sum", + }, +) -> StatisticsRow | None: + res = await recorder.get_instance(hass).async_add_executor_job( + get_last_statistics, + hass, + 1, + statistics_metadata["statistic_id"], + convert_units, + types, + ) + if not res: + return None + + return res[statistics_metadata["statistic_id"]][0] + + +def _build_entity_states_stmt(entity: Entity) -> Select: + return ( + select(db_schema.States) + .join(db_schema.StatesMeta) + .where(db_schema.StatesMeta.entity_id == entity.entity_id) + ) + + +def _rebuild_states_chain( + session: Session, entity: Entity, *, since: float = 0 +) -> None: + stmt = _build_entity_states_stmt(entity).order_by( + db_schema.States.last_updated_ts.asc() + ) + + if since: + prev = get_last_state(session, entity, before=since) + + else: + prev = None + + for state in session.execute(stmt).scalars(): + state.old_state_id = prev.state_id if prev else None + prev = state + + +def delete_invalid_states(session: Session, entity: Entity) -> list[db_schema.States]: + stmt = _build_entity_states_stmt(entity) + stmt = stmt.where(db_schema.States.state.in_([STATE_UNKNOWN, STATE_UNAVAILABLE])) + + deleted_states = list(session.execute(stmt).scalars()) + for state in deleted_states: + session.delete(state) + + _rebuild_states_chain(session, entity) + + return deleted_states + + +def delete_states_in_period( + session: Session, entity: Entity, *, start: float, end: float +) -> list[db_schema.States]: + """ + Delete all states between two points in time + """ + + # Link states just outside the period + + first_state_after_end = session.execute( + _build_entity_states_stmt(entity).where(db_schema.States.last_updated_ts > end) + ).scalar() + + last_state_before_start = session.execute( + _build_entity_states_stmt(entity).where( + db_schema.States.last_updated_ts < start + ) + ).scalar() + + if first_state_after_end: + first_state_after_end.old_state_id = ( + last_state_before_start.state_id if last_state_before_start else None + ) + + # Execute deletion backwards in time to delink safely the chain + + delete_stmt = _build_entity_states_stmt(entity) + delete_stmt = delete_stmt.where( + and_( + db_schema.States.last_updated_ts >= start, + db_schema.States.last_updated_ts <= end, + ) + ).order_by(db_schema.States.last_updated_ts.desc()) + + deleted_states = cast( + list[db_schema.States], list(session.execute(delete_stmt).scalars()) + ) + + for state in deleted_states: + session.delete(state) + + return deleted_states + + +def get_last_state( + session: Session, entity: Entity, *, before: float | None = None +) -> db_schema.States: + """ + Get last state from database + If `before` is passed the lastest state will be the last previous to the time + specified in `before` + """ + stmt = _build_entity_states_stmt(entity) + if before: + stmt = stmt.where(db_schema.States.last_updated_ts < before) + + stmt = stmt.where( + not_( + or_( + db_schema.States.state == STATE_UNAVAILABLE, + db_schema.States.state == STATE_UNKNOWN, + ) + ) + ).order_by(db_schema.States.last_updated_ts.desc()) + + state = cast(db_schema.States, session.execute(stmt).scalar()) + + return state + + +def save_states( + session: Session, + entity: Entity, + states: list[db_schema.States], + *, + overwrite_overlaping: bool = False, +) -> list[db_schema.States]: + # Initial checks: + # - at least one state + # - states meta information available + + if not states: + return [] + + states_meta = get_states_meta(session, entity.entity_id) + if not states_meta: + _LOGGER.error( + f"{entity.entity_id}: " + + "states meta information is NOT available (it should be!). This is a bug" + ) + return [] + + # Ensure ordered data + + states = list(sorted(states, key=lambda x: x.last_updated_ts)) + + # Add some data to states + + for x in states: + x.states_meta_rel = states_meta + x.metadata_id = states_meta.metadata_id + x.entity_id = states_meta.entity_id + + assert x.last_updated_ts is not None + + # Handle overlaping states + + if overwrite_overlaping: + deleted = delete_states_in_period( + session, + entity, + start=states[0].last_updated_ts, + end=states[-1].last_updated_ts, + ) + _LOGGER.debug( + f"{entity.entity_id}: deleted {len(deleted)} overlaping exisisting states" + ) + + else: + last_existing_state = get_last_state(session, entity) + assert last_existing_state.last_updated_ts is not None + + if last_existing_state: + n_prev = len(states) + states = [ + x + for x in states + if x.last_updated_ts > last_existing_state.last_updated_ts + ] + n_post = len(states) + + _LOGGER.debug( + f"{entity.entity_id}: discarded {n_prev-n_post} overlaping new states" + ) + + if not states: + return [] + + # Insert states and rebuild chain + + for state in states: + session.add(state) + + _rebuild_states_chain(session, entity, since=states[0].last_updated_ts) + + return states + + +def delete_statistics_since( + session: Session, + statistic_id: str, + *, + since: float, +) -> list[db_schema.Statistics]: + # SELECT * + # FROM statistics LEFT OUTER JOIN statistics_meta + # ON statistics.metadata_id = statistics_meta.id + # WHERE statistics_meta.statistic_id = 'sensor.delorian'; + + stmt = ( + select(db_schema.Statistics) + .join( + db_schema.StatisticsMeta, + db_schema.Statistics.metadata_id == db_schema.StatisticsMeta.id, + isouter=True, + ) + .filter(db_schema.StatisticsMeta.statistic_id == statistic_id) + .filter(db_schema.Statistics.start_ts >= since) + ) + + deleted_statistics = list(x for x in session.execute(stmt).scalars()) + for x in deleted_statistics: + session.delete(x) + + return deleted_statistics + + +def save_statistics_data( + session: Session, + statistics_metadata: StatisticMetaData, + statistics_data: list[StatisticData], + *, + overwrite_overlaping: bool = False, +): + raise NotImplementedError() + return [] diff --git a/pyproject.toml b/pyproject.toml index a3005c4..225374f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ [project] name = "homeassistant-historical-sensor" -version = "2.0.0rc6" +version = "3.0.0a1" dependencies = [ "importlib-metadata; python_version >= '3.11'", ]