Skip to content

Commit

Permalink
refactor(statistics): Merge import statistics into one method
Browse files Browse the repository at this point in the history
Fixes #273 by removing 24h batch imports in favor of one call to /bewegungsdaten

Adds convenient script for validating missed data for x days scenario
  • Loading branch information
DarwinsBuddy committed Nov 17, 2024
1 parent 618e163 commit 92d4eb2
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 139 deletions.
188 changes: 49 additions & 139 deletions custom_components/wnsm/statistics_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ async def async_update(self):
if not self.is_last_inserted_stat_valid(last_inserted_stat):
# No previous data - start from scratch
_LOGGER.warning("Starting import of historical data. This might take some time.")
await self._import_bewegungsdaten(smartmeter)
await self._initial_import_statistics(smartmeter)
else:
start_off_point = self.prepare_start_off_point(last_inserted_stat)
if start_off_point is None:
return
start, _sum = start_off_point
await self._import_statistics(smartmeter, start, _sum)
await self._incremental_import_statistics(smartmeter, start, _sum)

# XXX: Note that the state of this sensor must never be an integer value, such as 0!
# If it is set to any number, home assistant will assume that a negative consumption
Expand All @@ -144,168 +144,78 @@ async def async_update(self):
self._available = False
_LOGGER.exception("Error retrieving data from smart meter api - Error: %s" % e)

async def _import_historical_data(self, smartmeter: Smartmeter):
"""Initialize the statistics by fetching three years of data"""
recording = await self.get_historic_data(smartmeter)
_LOGGER.debug(f"Mapped historical data: {recording}")
factor = 1.0
if recording['unitOfMeasurement'] == 'WH':
factor = 1e-3
else:
raise NotImplementedError(f'Unit {recording["unitOfMeasurement"]}" is not yet implemented. Please report!')

dates = defaultdict(Decimal)
if 'values' not in recording:
raise ValueError("WienerNetze does not report historical data (yet)")
for value in recording['values']:
reading = Decimal(value['messwert'] * factor)
ts = dt_util.parse_datetime(value['zeitVon'])
ts_to = dt_util.parse_datetime(value['zeitBis'])
qual = value['qualitaet']
if qual != 'VAL':
_LOGGER.warning(f"Historic data with different quality than 'VAL' detected: {value}")
if ts.minute % 15 != 0 or ts.second != 0 or ts.microsecond != 0:
_LOGGER.warning(f"Unexpected time detected in historic data: {value}")
if (ts_to - ts) != timedelta(minutes=15):
_LOGGER.warning(f"Unexpected time step detected in historic data: {value}")
dates[ts.replace(minute=0)] += reading

statistics = []
metadata = StatisticMetaData(
def get_statistics_metadata(self):
return StatisticMetaData(
source="recorder",
statistic_id=self._id,
name=self.name,
unit_of_measurement=self._attr_unit_of_measurement,
has_mean=False,
has_sum=True,
)
_LOGGER.debug(metadata)

total_usage = Decimal(0)
for ts, usage in sorted(dates.items(), key=itemgetter(0)):
total_usage += usage
statistics.append(StatisticData(start=ts, sum=total_usage, state=usage))
if len(statistics) > 0:
_LOGGER.debug(f"Importing statistics from {statistics[0]} to {statistics[-1]}")
async_import_statistics(self.hass, metadata, statistics)
async def _initial_import_statistics(self, smartmeter: Smartmeter):
return await self._import_statistics(smartmeter)

async def _incremental_import_statistics(self, smartmeter: Smartmeter, start: datetime, total_usage: Decimal):
return await self._import_statistics(smartmeter, start=start, total_usage=total_usage)

async def _import_statistics(self, smartmeter: Smartmeter, start: datetime = None, end: datetime = None, total_usage: Decimal = Decimal(0)):
"""Import statistics"""

start = start if start is not None else datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=365 * 3)
end = end if end is not None else datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)

if start.tzinfo is None:
raise ValueError("start datetime must be timezone-aware!")

async def _import_bewegungsdaten(self, smartmeter: Smartmeter):
"""Initialize the statistics by fetching three years of data"""
recording = await self.get_bewegungsdaten(smartmeter)
_LOGGER.debug(f"Mapped historical data: {recording}")
if recording['unitOfMeasurement'] == 'WH':
_LOGGER.debug("Selecting data up to %s" % end)
if start > end:
_LOGGER.warning(f"Ignoring async update since last import happened in the future (should not happen) {start} > {end}")
return

bewegungsdaten = await self.get_bewegungsdaten(smartmeter, start, end)
_LOGGER.debug(f"Mapped historical data: {bewegungsdaten}")
if bewegungsdaten['unitOfMeasurement'] == 'WH':
factor = 1e-3
elif recording['unitOfMeasurement'] == 'KWH':
elif bewegungsdaten['unitOfMeasurement'] == 'KWH':
factor = 1.0
else:
raise NotImplementedError(f'Unit {recording["unitOfMeasurement"]}" is not yet implemented. Please report!')
raise NotImplementedError(f'Unit {bewegungsdaten["unitOfMeasurement"]}" is not yet implemented. Please report!')

dates = defaultdict(Decimal)
if 'values' not in recording:
if 'values' not in bewegungsdaten:
raise ValueError("WienerNetze does not report historical data (yet)")
for value in recording['values']:
reading = Decimal(value['wert'] * factor)
total_consumption = sum([v.get("wert", 0) for v in bewegungsdaten['values']])
# Can actually check, if the whole batch can be skipped.
if total_consumption == 0:
_LOGGER.debug(f"Batch of data starting at {start} does not contain any bewegungsdaten. Seems there is nothing to import, yet.")
return

last_ts = start
for value in bewegungsdaten['values']:
ts = dt_util.parse_datetime(value['zeitpunktVon'])
ts_to = dt_util.parse_datetime(value['zeitpunktBis'])
if ts < last_ts:
# This should prevent any issues with ambiguous values though...
_LOGGER.warning(f"Timestamp from API ({ts}) is less than previously collected timestamp ({last_ts}), ignoring value!")
continue
last_ts = ts
if value['wert'] is None:
# Usually this means that the measurement is not yet in the WSTW database.
continue
reading = Decimal(value['wert'] * factor)
if ts.minute % 15 != 0 or ts.second != 0 or ts.microsecond != 0:
_LOGGER.warning(f"Unexpected time detected in historic data: {value}")
if (ts_to - ts) != timedelta(minutes=15):
_LOGGER.warning(f"Unexpected time step detected in historic data: {value}")
dates[ts.replace(minute=0)] += reading
if value['geschaetzt']:
_LOGGER.debug(f"Not seen that before: Estimated Value found for {ts}: {reading}")

statistics = []
metadata = StatisticMetaData(
source="recorder",
statistic_id=self._id,
name=self.name,
unit_of_measurement=self._attr_unit_of_measurement,
has_mean=False,
has_sum=True,
)
_LOGGER.debug(metadata)
metadata = self.get_statistics_metadata()

total_usage = Decimal(0)
for ts, usage in sorted(dates.items(), key=itemgetter(0)):
total_usage += usage
statistics.append(StatisticData(start=ts, sum=total_usage, state=float(usage)))
if len(statistics) > 0:
_LOGGER.debug(f"Importing statistics from {statistics[0]} to {statistics[-1]}")
async_import_statistics(self.hass, metadata, statistics)

async def _import_statistics(self, smartmeter: Smartmeter, start: datetime, total_usage: Decimal):
"""Import hourly consumption data into the statistics module, using start date and sum"""
# Have to be sure that the start datetime is aware of timezone, because we need to compare
# it to other timezone aware datetimes in this function
if start.tzinfo is None:
raise ValueError("start datetime must be timezone-aware!")
# Have to be sure that full minutes are used. otherwise, the API returns a different
# interval
start = start.replace(minute=0, second=0, microsecond=0)

statistics = []
metadata = StatisticMetaData(
source="recorder",
statistic_id=self._id,
name=self.name,
unit_of_measurement=self._attr_unit_of_measurement,
has_mean=False,
has_sum=True,
)
_LOGGER.debug(metadata)

now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
_LOGGER.debug("Selecting data up to %s" % now)
while start < now:
_LOGGER.debug(f"Select 24h of Data, using sum={total_usage:.3f}, start={start}")
bewegungsdaten = await self.get_bewegungsdaten(smartmeter, start)
_LOGGER.debug(bewegungsdaten)
last_ts = start
start += timedelta(hours=24) # Next batch. Setting this here should avoid endless loops

if 'values' not in bewegungsdaten:
# before, this indicated an error. Since 2023-12-15 no more values means that
# data is not available (yet) - but could still indicate an error...
_LOGGER.debug(
f"No more values in API response. Possibly the end of the available data is reached. Original response: {bewegungsdaten}")
# We break the loop here and hope that in the next iteration, data will be collected
# again...
# TODO: has to be carefully checked if the API can return a response without values
# between responses WITH values! Otherwise, we produce here an endless wait for that
# values to turn up.
break

total_consumption = sum([ v.get("wert", 0) for v in bewegungsdaten['values']])
# Can actually check, if the whole batch can be skipped.
if total_consumption == 0:
_LOGGER.debug(f"Batch of data starting at {start} does not contain any consumption, skipping")
continue

for v in bewegungsdaten['values']:
# Timestamp has to be aware of timezone, parse_datetime does that.
ts = dt_util.parse_datetime(v['zeitpunktVon'])
if ts < last_ts:
# This should prevent any issues with ambiguous values though...
_LOGGER.warning(
f"Timestamp from API ({ts}) is less than previously collected timestamp ({last_ts}), ignoring value!")
continue
last_ts = ts
if v['wert'] is None:
# Usually this means that the measurement is not yet in the WSTW database.
# But could also be an error? Dunno...
# For now, we ignore these values, possibly that means we loose hours if these
# values come back later.
# However, it is not trivial (or even impossible?) to insert statistic values
# in between existing values, thus we can not do much.
continue
usage = Decimal(v['wert']) # Convert to kWh ...
total_usage += usage
if v['geschaetzt']:
# Can we do anything special here?
_LOGGER.debug(f"Estimated Value found for {ts}: {usage}")

statistics.append(StatisticData(start=ts, sum=total_usage, state=float(usage)))

_LOGGER.debug(statistics)

# Import the statistics data
async_import_statistics(self.hass, metadata, statistics)
30 changes: 30 additions & 0 deletions utils/purge_last_x_days.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sqlite3


def purge(database:str, days: int, sensor_id: str):
conn = sqlite3.connect(database)
cursor = conn.cursor()

query = f"""
DELETE FROM statistics
WHERE metadata_id IN (
SELECT id FROM statistics_meta WHERE statistic_id = '{sensor_id}'
)
AND start_ts >= strftime('%s', 'now', '-{days} days');
"""

# Execute the query
cursor.execute(query)

# Commit the changes and close the connection
conn.commit()
conn.close()

if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Purge the last x days of data from the statistics table')
parser.add_argument('-db', '--database', type=str, help='Path to the SQLite database', default="home-assistant_v2.db")
parser.add_argument('-d', '--days', type=int, help='Number of days to keep', default=2)
parser.add_argument('-s', '--sensor', type=str, help='Name in the statistics_meta table', default="sensor.at00100000000000000010000XXXXXXX_statistics")
args = parser.parse_args()
purge(args.database, args.days, args.sensor)

0 comments on commit 92d4eb2

Please sign in to comment.