Skip to content

Commit

Permalink
fix compatibility with python < 3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
chocomega committed Apr 11, 2023
1 parent c6f9c94 commit db68a39
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This Python script retrieves statistics from [MyElectricalData](https://github.c
Long Term Statistics will be created in Home Assistant and usable in the Energy Dashboard.

## Pre-requisites
- Python 3.4 or higher
- Python 3.5 or higher
- A running instance of Home Assistant 2022.10.0 or higher
- A running instance of MyElectricalData 0.8.13-11 or higher
- MyElectricalData must be configured with the cache and hourly details enabled, cf. [wiki](https://github.com/m4dm4rtig4n/myelectricaldata/wiki/03.-Configuration)
Expand Down
18 changes: 9 additions & 9 deletions statistics_importer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '2.0.0'
__version__ = '2.0.1'

import aiohttp
import argparse
Expand All @@ -13,7 +13,7 @@
import sys
import time
import traceback
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
import yaml

MED_CONFIG_DATE_FORMAT: str = "%Y-%m-%d"
Expand Down Expand Up @@ -132,7 +132,7 @@ def to_float(s: str) -> float:
return math.nan


def get_max_date_from_med_config(usage_point_config: dict[str, str], statistics_key: str) -> datetime:
def get_max_date_from_med_config(usage_point_config: Dict[str, str], statistics_key: str) -> datetime:
try:
max_date_str = usage_point_config[f"{statistics_key}_max_date"]
max_date = datetime.strptime(max_date_str, MED_CONFIG_DATE_FORMAT)
Expand All @@ -143,7 +143,7 @@ def get_max_date_from_med_config(usage_point_config: dict[str, str], statistics_
return max_date


def create_plan_from_med_config(usage_point_config: dict[str, str]):
def create_plan_from_med_config(usage_point_config: Dict[str, str]):
plan_type = PlanType(usage_point_config["plan"])
if plan_type == PlanType.BASE:
plan = PlanBase(to_float(usage_point_config["consumption_price_base"]),
Expand All @@ -157,7 +157,7 @@ def create_plan_from_med_config(usage_point_config: dict[str, str]):
return plan


def export_statistics_from_db(db_cursor: sqlite3.Cursor, stat_metadata: StatisticMetadata, start_date: datetime, sum_offset: float, plan: Plan) -> list[StatisticData]:
def export_statistics_from_db(db_cursor: sqlite3.Cursor, stat_metadata: StatisticMetadata, start_date: datetime, sum_offset: float, plan: Plan) -> List[StatisticData]:
is_cost = (stat_metadata.unit_of_measurement == Unit.EURO)
is_base_tariff = (stat_metadata.tariff_type == TariffType.BASE)
# Select the sum of the value column aggregated by hour
Expand Down Expand Up @@ -222,7 +222,7 @@ async def authenticate(self, access_token: str) -> None:
raise Exception(
f"authenticate: auth NOT ok, check Home Assistant Long-Lived Access Token")

async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, stats: list[StatisticData]) -> None:
async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, stats: List[StatisticData]) -> None:
self._command_id += 1
await self._websocket.send_json({
"id": self._command_id,
Expand All @@ -243,7 +243,7 @@ async def recorder_import_statistics(self, stat_metadata: StatisticMetadata, sta
if not response["success"]:
raise Exception(f"recorder_import_statistics: failed")

async def recorder_list_statistic_ids(self) -> list[dict]:
async def recorder_list_statistic_ids(self) -> List[dict]:
self._command_id += 1
response = await self._websocket.send_json({
"id": self._command_id,
Expand All @@ -259,7 +259,7 @@ async def recorder_list_statistic_ids(self) -> list[dict]:

return response["result"]

async def recorder_clear_statistics(self, statistic_ids: list[str]) -> None:
async def recorder_clear_statistics(self, statistic_ids: List[str]) -> None:
self._command_id += 1
response = await self._websocket.send_json({
"id": self._command_id,
Expand All @@ -272,7 +272,7 @@ async def recorder_clear_statistics(self, statistic_ids: list[str]) -> None:
if not response["success"]:
raise Exception(f"recorder_clear_statistics: failed")

async def recorder_statistics_during_period(self, stat_metadata: StatisticMetadata, start_time: datetime, end_time: datetime) -> dict[str, list[StatisticData]]:
async def recorder_statistics_during_period(self, stat_metadata: StatisticMetadata, start_time: datetime, end_time: datetime) -> Dict[str, List[StatisticData]]:
self._command_id += 1
await self._websocket.send_json({
"id": self._command_id,
Expand Down

0 comments on commit db68a39

Please sign in to comment.