Skip to content

Commit

Permalink
refactor: Refactored retrieving active tariffs for meters
Browse files Browse the repository at this point in the history
  • Loading branch information
BottlecapDave committed Oct 1, 2023
1 parent 588e7ed commit ac2b6a7
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 134 deletions.
56 changes: 23 additions & 33 deletions custom_components/octopus_energy/coordinators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,6 @@ async def async_check_valid_tariff(hass, client: OctopusEnergyApiClient, tariff_
except:
_LOGGER.debug(f"Failed to retrieve product info for '{tariff_parts.product_code}'")

def get_current_electricity_agreement_tariff_codes(current: datetime, account_info):
tariff_codes = {}
if account_info is not None and len(account_info["electricity_meter_points"]) > 0:
for point in account_info["electricity_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
is_smart_meter = meter["is_smart_meter"]
if active_tariff_code is not None:
key = (point["mpan"], is_smart_meter)
if key not in tariff_codes:
tariff_codes[(point["mpan"], is_smart_meter)] = active_tariff_code

return tariff_codes

def get_current_gas_agreement_tariff_codes(current: datetime, account_info):
tariff_codes = {}
if account_info is not None and len(account_info["gas_meter_points"]) > 0:
for point in account_info["gas_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
is_smart_meter = meter["is_smart_meter"]
if active_tariff_code is not None:
key = (point["mprn"], is_smart_meter)
if key not in tariff_codes:
tariff_codes[(point["mprn"], is_smart_meter)] = active_tariff_code

return tariff_codes

def raise_rate_events(now: datetime,
rates: list,
additional_attributes: "dict[str, Any]",
Expand Down Expand Up @@ -123,4 +91,26 @@ def raise_rate_events(now: datetime,

event_data = { "rates": next_rates }
event_data.update(additional_attributes)
fire_event(next_event_key, event_data)
fire_event(next_event_key, event_data)

def get_electricity_meter_tariff_code_and_is_smart_meter(current: datetime, account_info, target_mpan: str, target_serial_number: str):
if len(account_info["electricity_meter_points"]) > 0:
for point in account_info["electricity_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
if active_tariff_code is not None and point["mpan"] == target_mpan and meter["serial_number"] == target_serial_number:
return (active_tariff_code, meter["is_smart_meter"])

return None

def get_gas_meter_tariff_code(current: datetime, account_info, target_mprn: str, target_serial_number: str):
if len(account_info["gas_meter_points"]) > 0:
for point in account_info["gas_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
for meter in point["meters"]:
if active_tariff_code is not None and point["mprn"] == target_mprn and meter["serial_number"] == target_serial_number:
return active_tariff_code

return None
28 changes: 9 additions & 19 deletions custom_components/octopus_energy/coordinators/electricity_rates.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from datetime import datetime, timedelta
from typing import Callable, Any
from custom_components.octopus_energy.utils import get_active_tariff_code

from homeassistant.util.dt import (now, as_utc)
from homeassistant.helpers.update_coordinator import (
Expand All @@ -23,7 +22,7 @@

from ..api_client import OctopusEnergyApiClient

from . import raise_rate_events
from . import get_electricity_meter_tariff_code_and_is_smart_meter, raise_rate_events
from ..intelligent import adjust_intelligent_rates

_LOGGER = logging.getLogger(__name__)
Expand All @@ -36,18 +35,6 @@ def __init__(self, last_retrieved: datetime, rates: list):
self.last_retrieved = last_retrieved
self.rates = rates

def get_tariff_code_and_is_smart_meter(current: datetime, account_info, target_mpan: str, target_serial_number: str):
if len(account_info["electricity_meter_points"]) > 0:
for point in account_info["electricity_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
if active_tariff_code is not None and point["mpan"] == target_mpan and meter["serial_number"] == target_serial_number:
return (active_tariff_code, meter["is_smart_meter"])

return None

async def async_refresh_electricity_rates_data(
current: datetime,
client: OctopusEnergyApiClient,
Expand All @@ -62,21 +49,24 @@ async def async_refresh_electricity_rates_data(
period_from = as_utc((current - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0))
period_to = as_utc((current + timedelta(days=2)).replace(hour=0, minute=0, second=0, microsecond=0))

result = get_tariff_code_and_is_smart_meter(current, account_info, target_mpan, target_serial_number)
result = get_electricity_meter_tariff_code_and_is_smart_meter(current, account_info, target_mpan, target_serial_number)
if result is None:
return None

tariff_code: str = result[0]
is_smart_meter: bool = result[1]

new_rates: list = None
if ((current.minute % 30) == 0 or
existing_rates_result is None or
existing_rates_result.rates[-1]["valid_from"] < period_from):
try:
new_rates = await client.async_get_electricity_rates(result[0], result[1], period_from, period_to)
new_rates = await client.async_get_electricity_rates(tariff_code, is_smart_meter, period_from, period_to)
except:
_LOGGER.debug(f'Failed to retrieve electricity rates for {target_mpan}/{target_serial_number} ({result[0]})')
_LOGGER.debug(f'Failed to retrieve electricity rates for {target_mpan}/{target_serial_number} ({tariff_code})')

if new_rates is not None:
_LOGGER.debug(f'Electricity rates retrieved for {target_mpan}/{target_serial_number} ({result[0]});')
_LOGGER.debug(f'Electricity rates retrieved for {target_mpan}/{target_serial_number} ({tariff_code});')

if dispatches is not None:
new_rates = adjust_intelligent_rates(new_rates,
Expand All @@ -87,7 +77,7 @@ async def async_refresh_electricity_rates_data(

raise_rate_events(current,
new_rates,
{ "mpan": target_mpan, "serial_number": target_serial_number, "tariff_code": result[0] },
{ "mpan": target_mpan, "serial_number": target_serial_number, "tariff_code": tariff_code },
fire_event,
EVENT_ELECTRICITY_PREVIOUS_DAY_RATES,
EVENT_ELECTRICITY_CURRENT_DAY_RATES,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timedelta
from custom_components.octopus_energy.utils import get_active_tariff_code
from custom_components.octopus_energy.coordinators import get_electricity_meter_tariff_code_and_is_smart_meter

from homeassistant.util.dt import (now, as_utc)
from homeassistant.helpers.update_coordinator import (
Expand All @@ -27,16 +27,6 @@ def __init__(self, last_retrieved: datetime, standing_charge: {}):
self.last_retrieved = last_retrieved
self.standing_charge = standing_charge

def get_tariff_code(current: datetime, account_info, target_mpan: str, target_serial_number: str):
if len(account_info["electricity_meter_points"]) > 0:
for point in account_info["electricity_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
if active_tariff_code is not None and point["mpan"] == target_mpan and meter["serial_number"] == target_serial_number:
return active_tariff_code

async def async_refresh_electricity_standing_charges_data(
current: datetime,
client: OctopusEnergyApiClient,
Expand All @@ -49,10 +39,12 @@ async def async_refresh_electricity_standing_charges_data(
period_to = period_from + timedelta(days=1)

if (account_info is not None):
tariff_code = get_tariff_code(current, account_info, target_mpan, target_serial_number)
if tariff_code is None:
result = get_electricity_meter_tariff_code_and_is_smart_meter(current, account_info, target_mpan, target_serial_number)
if result is None:
return None

tariff_code: str = result[0]

new_standing_charge = None
if ((current.minute % 30) == 0 or
existing_standing_charges_result is None or
Expand Down
18 changes: 2 additions & 16 deletions custom_components/octopus_energy/coordinators/gas_rates.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from datetime import datetime, timedelta
from typing import Callable, Any
from custom_components.octopus_energy.utils import get_active_tariff_code

from homeassistant.util.dt import (now, as_utc)
from homeassistant.helpers.update_coordinator import (
Expand All @@ -22,8 +21,7 @@

from ..api_client import OctopusEnergyApiClient

from . import raise_rate_events
from ..intelligent import adjust_intelligent_rates
from . import get_gas_meter_tariff_code, raise_rate_events

_LOGGER = logging.getLogger(__name__)

Expand All @@ -35,18 +33,6 @@ def __init__(self, last_retrieved: datetime, rates: list):
self.last_retrieved = last_retrieved
self.rates = rates

def get_tariff_code(current: datetime, account_info, target_mprn: str, target_serial_number: str):
if len(account_info["gas_meter_points"]) > 0:
for point in account_info["gas_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
if active_tariff_code is not None and point["mprn"] == target_mprn and meter["serial_number"] == target_serial_number:
return active_tariff_code

return None

async def async_refresh_gas_rates_data(
current: datetime,
client: OctopusEnergyApiClient,
Expand All @@ -60,7 +46,7 @@ async def async_refresh_gas_rates_data(
period_from = as_utc((current - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0))
period_to = as_utc((current + timedelta(days=2)).replace(hour=0, minute=0, second=0, microsecond=0))

tariff_code = get_tariff_code(current, account_info, target_mprn, target_serial_number)
tariff_code = get_gas_meter_tariff_code(current, account_info, target_mprn, target_serial_number)
if tariff_code is None:
return None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime, timedelta
from custom_components.octopus_energy.utils import get_active_tariff_code
from custom_components.octopus_energy.coordinators import get_gas_meter_tariff_code

from homeassistant.util.dt import (now, as_utc)
from homeassistant.helpers.update_coordinator import (
Expand All @@ -27,16 +27,6 @@ def __init__(self, last_retrieved: datetime, standing_charge: {}):
self.last_retrieved = last_retrieved
self.standing_charge = standing_charge

def get_tariff_code(current: datetime, account_info, target_mprn: str, target_serial_number: str):
if len(account_info["gas_meter_points"]) > 0:
for point in account_info["gas_meter_points"]:
active_tariff_code = get_active_tariff_code(current, point["agreements"])
# The type of meter (ie smart vs dumb) can change the tariff behaviour, so we
# have to enumerate the different meters being used for each tariff as well.
for meter in point["meters"]:
if active_tariff_code is not None and point["mprn"] == target_mprn and meter["serial_number"] == target_serial_number:
return active_tariff_code

async def async_refresh_gas_standing_charges_data(
current: datetime,
client: OctopusEnergyApiClient,
Expand All @@ -49,7 +39,7 @@ async def async_refresh_gas_standing_charges_data(
period_to = period_from + timedelta(days=1)

if (account_info is not None):
tariff_code = get_tariff_code(current, account_info, target_mprn, target_serial_number)
tariff_code = get_gas_meter_tariff_code(current, account_info, target_mprn, target_serial_number)
if tariff_code is None:
return None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from datetime import timedelta

from ..coordinators import get_current_electricity_agreement_tariff_codes
from ..intelligent import async_mock_intelligent_data, clean_previous_dispatches, is_intelligent_tariff, mock_intelligent_dispatches
from ..intelligent import async_mock_intelligent_data, clean_previous_dispatches, has_intelligent_tariff, mock_intelligent_dispatches

from homeassistant.util.dt import (utcnow)
from homeassistant.helpers.update_coordinator import (
Expand Down Expand Up @@ -53,17 +52,13 @@ async def async_update_intelligent_dispatches_data():
client: OctopusEnergyApiClient = hass.data[DOMAIN][DATA_CLIENT]
if (DATA_ACCOUNT in hass.data[DOMAIN]):

tariff_codes = get_current_electricity_agreement_tariff_codes(current, hass.data[DOMAIN][DATA_ACCOUNT])

dispatches = None
for ((meter_point), tariff_code) in tariff_codes.items():
if is_intelligent_tariff(tariff_code):
try:
dispatches = await client.async_get_intelligent_dispatches(account_id)
_LOGGER.debug(f'Intelligent dispatches retrieved for {tariff_code}')
except:
_LOGGER.debug('Failed to retrieve intelligent dispatches')
break
if has_intelligent_tariff(current, hass.data[DOMAIN][DATA_ACCOUNT]):
try:
dispatches = await client.async_get_intelligent_dispatches(account_id)
_LOGGER.debug(f'Intelligent dispatches retrieved for account {account_id}')
except:
_LOGGER.debug('Failed to retrieve intelligent dispatches for account {account_id}')

if await async_mock_intelligent_data(hass):
dispatches = mock_intelligent_dispatches()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from datetime import timedelta

from . import get_current_electricity_agreement_tariff_codes
from ..intelligent import async_mock_intelligent_data, clean_previous_dispatches, is_intelligent_tariff, mock_intelligent_settings
from ..intelligent import async_mock_intelligent_data, clean_previous_dispatches, has_intelligent_tariff, mock_intelligent_settings

from homeassistant.util.dt import (utcnow)
from homeassistant.helpers.update_coordinator import (
Expand Down Expand Up @@ -39,18 +38,12 @@ async def async_update_intelligent_settings_data():
client: OctopusEnergyApiClient = hass.data[DOMAIN][DATA_CLIENT]
if (DATA_ACCOUNT in hass.data[DOMAIN]):

tariff_codes = get_current_electricity_agreement_tariff_codes(current, hass.data[DOMAIN][DATA_ACCOUNT])
_LOGGER.debug(f'tariff_codes: {tariff_codes}')

settings = None
for ((meter_point), tariff_code) in tariff_codes.items():
if is_intelligent_tariff(tariff_code):
try:
settings = await client.async_get_intelligent_settings(account_id)
_LOGGER.debug(f'Intelligent settings retrieved for {tariff_code}')
except:
_LOGGER.debug('Failed to retrieve intelligent dispatches')
break
if has_intelligent_tariff(current, hass.data[DOMAIN][DATA_ACCOUNT]):
try:
settings = await client.async_get_intelligent_settings(account_id)
_LOGGER.debug(f'Intelligent settings retrieved for account {account_id}')
except:
_LOGGER.debug('Failed to retrieve intelligent dispatches for account {account_id}')

if await async_mock_intelligent_data(hass):
settings = mock_intelligent_settings()
Expand Down
11 changes: 10 additions & 1 deletion custom_components/octopus_energy/intelligent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from homeassistant.helpers import storage

from ..utils import get_tariff_parts
from ..utils import get_active_tariff_code, get_tariff_parts

from ..const import DOMAIN

Expand Down Expand Up @@ -84,6 +84,15 @@ def is_intelligent_tariff(tariff_code: str):

return parts is not None and "INTELLI" in parts.product_code

def has_intelligent_tariff(current: datetime, account_info):
if account_info is not None and len(account_info["electricity_meter_points"]) > 0:
for point in account_info["electricity_meter_points"]:
tariff_code = get_active_tariff_code(current, point["agreements"])
if tariff_code is not None and is_intelligent_tariff(tariff_code):
return True

return False

def __get_dispatch(rate, dispatches, expected_source: str):
for dispatch in dispatches:
if (expected_source is None or dispatch["source"] == expected_source) and dispatch["start"] <= rate["valid_from"] and dispatch["end"] >= rate["valid_to"]:
Expand Down
15 changes: 1 addition & 14 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,4 @@ def create_rate_data(period_from, period_to, expected_rates: list):
if (rate_index > (len(expected_rates) - 1)):
rate_index = 0

return rates

async def async_get_tracker_tariff(api_key, tariff_code, target_date):
async with aiohttp.ClientSession() as client:
auth = aiohttp.BasicAuth(api_key, '')
url = f'https://octopus.energy/api/v1/tracker/{tariff_code}/daily/past/1/0'
async with client.get(url, auth=auth) as response:
text = await response.text()
data = json.loads(text)
for period in data["periods"]:
valid_from = parse_datetime(f'{period["date"]}T00:00:00Z')
valid_to = parse_datetime(f'{period["date"]}T00:00:00Z') + timedelta(days=1)
if (valid_from <= target_date and valid_to >= target_date):
return period
return rates
Loading

0 comments on commit ac2b6a7

Please sign in to comment.