From 92d4eb274eab14d5b3063f01f2c3af8227ac1610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20Sp=C3=B6rk?= Date: Sat, 16 Nov 2024 13:37:42 +0100 Subject: [PATCH] refactor(statistics): Merge import statistics into one method Fixes #273 by removing 24h batch imports in favor of one call to /bewegungsdaten Adds convenient script for validating missed data for x days scenario --- custom_components/wnsm/statistics_sensor.py | 188 +++++--------------- utils/purge_last_x_days.py | 30 ++++ 2 files changed, 79 insertions(+), 139 deletions(-) create mode 100644 utils/purge_last_x_days.py diff --git a/custom_components/wnsm/statistics_sensor.py b/custom_components/wnsm/statistics_sensor.py index b6cadf8..f0e73ba 100644 --- a/custom_components/wnsm/statistics_sensor.py +++ b/custom_components/wnsm/statistics_sensor.py @@ -119,13 +119,13 @@ async def async_update(self): if not self.is_last_inserted_stat_valid(last_inserted_stat): # No previous data - start from scratch _LOGGER.warning("Starting import of historical data. This might take some time.") - await self._import_bewegungsdaten(smartmeter) + await self._initial_import_statistics(smartmeter) else: start_off_point = self.prepare_start_off_point(last_inserted_stat) if start_off_point is None: return start, _sum = start_off_point - await self._import_statistics(smartmeter, start, _sum) + await self._incremental_import_statistics(smartmeter, start, _sum) # XXX: Note that the state of this sensor must never be an integer value, such as 0! # If it is set to any number, home assistant will assume that a negative consumption @@ -144,34 +144,8 @@ async def async_update(self): self._available = False _LOGGER.exception("Error retrieving data from smart meter api - Error: %s" % e) - async def _import_historical_data(self, smartmeter: Smartmeter): - """Initialize the statistics by fetching three years of data""" - recording = await self.get_historic_data(smartmeter) - _LOGGER.debug(f"Mapped historical data: {recording}") - factor = 1.0 - if recording['unitOfMeasurement'] == 'WH': - factor = 1e-3 - else: - raise NotImplementedError(f'Unit {recording["unitOfMeasurement"]}" is not yet implemented. Please report!') - - dates = defaultdict(Decimal) - if 'values' not in recording: - raise ValueError("WienerNetze does not report historical data (yet)") - for value in recording['values']: - reading = Decimal(value['messwert'] * factor) - ts = dt_util.parse_datetime(value['zeitVon']) - ts_to = dt_util.parse_datetime(value['zeitBis']) - qual = value['qualitaet'] - if qual != 'VAL': - _LOGGER.warning(f"Historic data with different quality than 'VAL' detected: {value}") - if ts.minute % 15 != 0 or ts.second != 0 or ts.microsecond != 0: - _LOGGER.warning(f"Unexpected time detected in historic data: {value}") - if (ts_to - ts) != timedelta(minutes=15): - _LOGGER.warning(f"Unexpected time step detected in historic data: {value}") - dates[ts.replace(minute=0)] += reading - - statistics = [] - metadata = StatisticMetaData( + def get_statistics_metadata(self): + return StatisticMetaData( source="recorder", statistic_id=self._id, name=self.name, @@ -179,133 +153,69 @@ async def _import_historical_data(self, smartmeter: Smartmeter): has_mean=False, has_sum=True, ) - _LOGGER.debug(metadata) - total_usage = Decimal(0) - for ts, usage in sorted(dates.items(), key=itemgetter(0)): - total_usage += usage - statistics.append(StatisticData(start=ts, sum=total_usage, state=usage)) - if len(statistics) > 0: - _LOGGER.debug(f"Importing statistics from {statistics[0]} to {statistics[-1]}") - async_import_statistics(self.hass, metadata, statistics) + async def _initial_import_statistics(self, smartmeter: Smartmeter): + return await self._import_statistics(smartmeter) + + async def _incremental_import_statistics(self, smartmeter: Smartmeter, start: datetime, total_usage: Decimal): + return await self._import_statistics(smartmeter, start=start, total_usage=total_usage) + + async def _import_statistics(self, smartmeter: Smartmeter, start: datetime = None, end: datetime = None, total_usage: Decimal = Decimal(0)): + """Import statistics""" + + start = start if start is not None else datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=365 * 3) + end = end if end is not None else datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) + + if start.tzinfo is None: + raise ValueError("start datetime must be timezone-aware!") - async def _import_bewegungsdaten(self, smartmeter: Smartmeter): - """Initialize the statistics by fetching three years of data""" - recording = await self.get_bewegungsdaten(smartmeter) - _LOGGER.debug(f"Mapped historical data: {recording}") - if recording['unitOfMeasurement'] == 'WH': + _LOGGER.debug("Selecting data up to %s" % end) + if start > end: + _LOGGER.warning(f"Ignoring async update since last import happened in the future (should not happen) {start} > {end}") + return + + bewegungsdaten = await self.get_bewegungsdaten(smartmeter, start, end) + _LOGGER.debug(f"Mapped historical data: {bewegungsdaten}") + if bewegungsdaten['unitOfMeasurement'] == 'WH': factor = 1e-3 - elif recording['unitOfMeasurement'] == 'KWH': + elif bewegungsdaten['unitOfMeasurement'] == 'KWH': factor = 1.0 else: - raise NotImplementedError(f'Unit {recording["unitOfMeasurement"]}" is not yet implemented. Please report!') + raise NotImplementedError(f'Unit {bewegungsdaten["unitOfMeasurement"]}" is not yet implemented. Please report!') dates = defaultdict(Decimal) - if 'values' not in recording: + if 'values' not in bewegungsdaten: raise ValueError("WienerNetze does not report historical data (yet)") - for value in recording['values']: - reading = Decimal(value['wert'] * factor) + total_consumption = sum([v.get("wert", 0) for v in bewegungsdaten['values']]) + # Can actually check, if the whole batch can be skipped. + if total_consumption == 0: + _LOGGER.debug(f"Batch of data starting at {start} does not contain any bewegungsdaten. Seems there is nothing to import, yet.") + return + + last_ts = start + for value in bewegungsdaten['values']: ts = dt_util.parse_datetime(value['zeitpunktVon']) - ts_to = dt_util.parse_datetime(value['zeitpunktBis']) + if ts < last_ts: + # This should prevent any issues with ambiguous values though... + _LOGGER.warning(f"Timestamp from API ({ts}) is less than previously collected timestamp ({last_ts}), ignoring value!") + continue + last_ts = ts + if value['wert'] is None: + # Usually this means that the measurement is not yet in the WSTW database. + continue + reading = Decimal(value['wert'] * factor) if ts.minute % 15 != 0 or ts.second != 0 or ts.microsecond != 0: _LOGGER.warning(f"Unexpected time detected in historic data: {value}") - if (ts_to - ts) != timedelta(minutes=15): - _LOGGER.warning(f"Unexpected time step detected in historic data: {value}") dates[ts.replace(minute=0)] += reading + if value['geschaetzt']: + _LOGGER.debug(f"Not seen that before: Estimated Value found for {ts}: {reading}") statistics = [] - metadata = StatisticMetaData( - source="recorder", - statistic_id=self._id, - name=self.name, - unit_of_measurement=self._attr_unit_of_measurement, - has_mean=False, - has_sum=True, - ) - _LOGGER.debug(metadata) + metadata = self.get_statistics_metadata() - total_usage = Decimal(0) for ts, usage in sorted(dates.items(), key=itemgetter(0)): total_usage += usage statistics.append(StatisticData(start=ts, sum=total_usage, state=float(usage))) if len(statistics) > 0: _LOGGER.debug(f"Importing statistics from {statistics[0]} to {statistics[-1]}") async_import_statistics(self.hass, metadata, statistics) - - async def _import_statistics(self, smartmeter: Smartmeter, start: datetime, total_usage: Decimal): - """Import hourly consumption data into the statistics module, using start date and sum""" - # Have to be sure that the start datetime is aware of timezone, because we need to compare - # it to other timezone aware datetimes in this function - if start.tzinfo is None: - raise ValueError("start datetime must be timezone-aware!") - # Have to be sure that full minutes are used. otherwise, the API returns a different - # interval - start = start.replace(minute=0, second=0, microsecond=0) - - statistics = [] - metadata = StatisticMetaData( - source="recorder", - statistic_id=self._id, - name=self.name, - unit_of_measurement=self._attr_unit_of_measurement, - has_mean=False, - has_sum=True, - ) - _LOGGER.debug(metadata) - - now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) - _LOGGER.debug("Selecting data up to %s" % now) - while start < now: - _LOGGER.debug(f"Select 24h of Data, using sum={total_usage:.3f}, start={start}") - bewegungsdaten = await self.get_bewegungsdaten(smartmeter, start) - _LOGGER.debug(bewegungsdaten) - last_ts = start - start += timedelta(hours=24) # Next batch. Setting this here should avoid endless loops - - if 'values' not in bewegungsdaten: - # before, this indicated an error. Since 2023-12-15 no more values means that - # data is not available (yet) - but could still indicate an error... - _LOGGER.debug( - f"No more values in API response. Possibly the end of the available data is reached. Original response: {bewegungsdaten}") - # We break the loop here and hope that in the next iteration, data will be collected - # again... - # TODO: has to be carefully checked if the API can return a response without values - # between responses WITH values! Otherwise, we produce here an endless wait for that - # values to turn up. - break - - total_consumption = sum([ v.get("wert", 0) for v in bewegungsdaten['values']]) - # Can actually check, if the whole batch can be skipped. - if total_consumption == 0: - _LOGGER.debug(f"Batch of data starting at {start} does not contain any consumption, skipping") - continue - - for v in bewegungsdaten['values']: - # Timestamp has to be aware of timezone, parse_datetime does that. - ts = dt_util.parse_datetime(v['zeitpunktVon']) - if ts < last_ts: - # This should prevent any issues with ambiguous values though... - _LOGGER.warning( - f"Timestamp from API ({ts}) is less than previously collected timestamp ({last_ts}), ignoring value!") - continue - last_ts = ts - if v['wert'] is None: - # Usually this means that the measurement is not yet in the WSTW database. - # But could also be an error? Dunno... - # For now, we ignore these values, possibly that means we loose hours if these - # values come back later. - # However, it is not trivial (or even impossible?) to insert statistic values - # in between existing values, thus we can not do much. - continue - usage = Decimal(v['wert']) # Convert to kWh ... - total_usage += usage - if v['geschaetzt']: - # Can we do anything special here? - _LOGGER.debug(f"Estimated Value found for {ts}: {usage}") - - statistics.append(StatisticData(start=ts, sum=total_usage, state=float(usage))) - - _LOGGER.debug(statistics) - - # Import the statistics data - async_import_statistics(self.hass, metadata, statistics) diff --git a/utils/purge_last_x_days.py b/utils/purge_last_x_days.py new file mode 100644 index 0000000..c27b5f1 --- /dev/null +++ b/utils/purge_last_x_days.py @@ -0,0 +1,30 @@ +import sqlite3 + + +def purge(database:str, days: int, sensor_id: str): + conn = sqlite3.connect(database) + cursor = conn.cursor() + + query = f""" + DELETE FROM statistics + WHERE metadata_id IN ( + SELECT id FROM statistics_meta WHERE statistic_id = '{sensor_id}' + ) + AND start_ts >= strftime('%s', 'now', '-{days} days'); + """ + + # Execute the query + cursor.execute(query) + + # Commit the changes and close the connection + conn.commit() + conn.close() + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description='Purge the last x days of data from the statistics table') + parser.add_argument('-db', '--database', type=str, help='Path to the SQLite database', default="home-assistant_v2.db") + parser.add_argument('-d', '--days', type=int, help='Number of days to keep', default=2) + parser.add_argument('-s', '--sensor', type=str, help='Name in the statistics_meta table', default="sensor.at00100000000000000010000XXXXXXX_statistics") + args = parser.parse_args() + purge(args.database, args.days, args.sensor)