From db68a3918ccd458f064162cea539bf4267619b61 Mon Sep 17 00:00:00 2001 From: chocomega Date: Tue, 11 Apr 2023 20:11:39 +0200 Subject: [PATCH] fix compatibility with python < 3.9 --- README.md | 2 +- statistics_importer.py | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 0df0fbb..9494d85 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This Python script retrieves statistics from [MyElectricalData](https://github.c Long Term Statistics will be created in Home Assistant and usable in the Energy Dashboard. ## Pre-requisites -- Python 3.4 or higher +- Python 3.5 or higher - A running instance of Home Assistant 2022.10.0 or higher - A running instance of MyElectricalData 0.8.13-11 or higher - MyElectricalData must be configured with the cache and hourly details enabled, cf. [wiki](https://github.com/m4dm4rtig4n/myelectricaldata/wiki/03.-Configuration) diff --git a/statistics_importer.py b/statistics_importer.py index 4edc5c2..b1ef466 100755 --- a/statistics_importer.py +++ b/statistics_importer.py @@ -1,4 +1,4 @@ -__version__ = '2.0.0' +__version__ = '2.0.1' import aiohttp import argparse @@ -13,7 +13,7 @@ import sys import time import traceback -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import yaml MED_CONFIG_DATE_FORMAT: str = "%Y-%m-%d" @@ -132,7 +132,7 @@ def to_float(s: str) -> float: return math.nan -def get_max_date_from_med_config(usage_point_config: dict[str, str], statistics_key: str) -> datetime: +def get_max_date_from_med_config(usage_point_config: Dict[str, str], statistics_key: str) -> datetime: try: max_date_str = usage_point_config[f"{statistics_key}_max_date"] max_date = datetime.strptime(max_date_str, MED_CONFIG_DATE_FORMAT) @@ -143,7 +143,7 @@ def get_max_date_from_med_config(usage_point_config: dict[str, str], statistics_ return max_date -def create_plan_from_med_config(usage_point_config: dict[str, str]): +def create_plan_from_med_config(usage_point_config: Dict[str, str]): plan_type = PlanType(usage_point_config["plan"]) if plan_type == PlanType.BASE: plan = PlanBase(to_float(usage_point_config["consumption_price_base"]), @@ -157,7 +157,7 @@ def create_plan_from_med_config(usage_point_config: dict[str, str]): return plan -def export_statistics_from_db(db_cursor: sqlite3.Cursor, stat_metadata: StatisticMetadata, start_date: datetime, sum_offset: float, plan: Plan) -> list[StatisticData]: +def export_statistics_from_db(db_cursor: sqlite3.Cursor, stat_metadata: StatisticMetadata, start_date: datetime, sum_offset: float, plan: Plan) -> List[StatisticData]: is_cost = (stat_metadata.unit_of_measurement == Unit.EURO) is_base_tariff = (stat_metadata.tariff_type == TariffType.BASE) # Select the sum of the value column aggregated by hour @@ -222,7 +222,7 @@ async def authenticate(self, access_token: str) -> None: raise Exception( f"authenticate: auth NOT ok, check Home Assistant Long-Lived Access Token") - async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, stats: list[StatisticData]) -> None: + async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, stats: List[StatisticData]) -> None: self._command_id += 1 await self._websocket.send_json({ "id": self._command_id, @@ -243,7 +243,7 @@ async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, sta if not response["success"]: raise Exception(f"recorder_import_statistics: failed") - async def recorder_list_statistic_ids(self) -> list[dict]: + async def recorder_list_statistic_ids(self) -> List[dict]: self._command_id += 1 response = await self._websocket.send_json({ "id": self._command_id, @@ -259,7 +259,7 @@ async def recorder_list_statistic_ids(self) -> list[dict]: return response["result"] - async def recorder_clear_statistics(self, statistic_ids: list[str]) -> None: + async def recorder_clear_statistics(self, statistic_ids: List[str]) -> None: self._command_id += 1 response = await self._websocket.send_json({ "id": self._command_id, @@ -272,7 +272,7 @@ async def recorder_clear_statistics(self, statistic_ids: list[str]) -> None: if not response["success"]: raise Exception(f"recorder_clear_statistics: failed") - async def recorder_statistics_during_period(self, stat_metadata: StatisticMetadata, start_time: datetime, end_time: datetime) -> dict[str, list[StatisticData]]: + async def recorder_statistics_during_period(self, stat_metadata: StatisticMetadata, start_time: datetime, end_time: datetime) -> Dict[str, List[StatisticData]]: self._command_id += 1 await self._websocket.send_json({ "id": self._command_id,