Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove old database: major change #73

Merged
merged 15 commits into from
Jan 4, 2024
95 changes: 32 additions & 63 deletions pvconsumer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
from typing import List, Optional, Tuple

import click
from nowcasting_datamodel.connection import DatabaseConnection
from nowcasting_datamodel.models.base import Base_Forecast, Base_PV
from nowcasting_datamodel.models.pv import PVSystemSQL
from nowcasting_datamodel.read.read import update_latest_input_data_last_updated
import pandas as pd
from pvoutput import PVOutput
from pvsite_datamodel.connection import DatabaseConnection as PVSiteDatabaseConnection
from pvsite_datamodel.connection import DatabaseConnection
from pvsite_datamodel.sqlmodels import SiteSQL
from sqlalchemy.orm import Session

import pvconsumer
from pvconsumer.pv_systems import filter_pv_systems_which_have_new_data, get_pv_systems
from pvconsumer.save import save_to_database, save_to_pv_site_database
from pvconsumer.save import save_to_pv_site_database
from pvconsumer.solar_sheffield_passiv import get_all_latest_pv_yield_from_solar_sheffield
from pvconsumer.utils import FakeDatabaseConnection, format_pv_data
from pvconsumer.utils import format_pv_data

logging.basicConfig(
level=getattr(logging, os.getenv("LOGLEVEL", "INFO")),
Expand All @@ -37,20 +35,6 @@
@click.option(
"--db-url",
default=None,
envvar="DB_URL",
help="The Database URL where pv data will be saved",
type=click.STRING,
)
@click.option(
"--db-url-forecast",
default=None,
envvar="DB_URL_FORECAST",
help="The Database URL where update latest data will be saved",
type=click.STRING,
)
@click.option(
"--db-url-pv-site",
default=None,
envvar="DB_URL_PV_SITE",
help="The PV site Database URL where update latest data will be saved",
type=click.STRING,
Expand All @@ -70,9 +54,7 @@
type=click.STRING,
)
def app(
db_url: str,
db_url_forecast: str,
db_url_pv_site: Optional[str] = None,
db_url: Optional[str] = None,
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
filename: Optional[str] = None,
provider: str = "pvoutput.org",
):
Expand All @@ -87,15 +69,9 @@ def app(

logger.info(f"Running PV Consumer app ({pvconsumer.__version__})")

connection = DatabaseConnection(url=db_url, base=Base_PV, echo=False)
connection_forecast = DatabaseConnection(url=db_url_forecast, base=Base_Forecast, echo=False)

if db_url_pv_site is not None:
connection_pv_site = PVSiteDatabaseConnection(url=db_url_pv_site, echo=False)
else:
connection_pv_site = FakeDatabaseConnection()
connection = DatabaseConnection(url=db_url, echo=False)

with connection.get_session() as session, connection_pv_site.get_session() as session_pv_site:
with connection.get_session() as session:
# 1. Read list of PV systems (from local file)
# and get their refresh times (refresh times can also be stored locally)
logger.debug("Read list of PV systems (from local file)")
Expand All @@ -107,27 +83,21 @@ def app(
"Find most recent entered data (for each PV system) in OCF database,"
"and filter pv systems depending on refresh rate"
)
pv_systems = filter_pv_systems_which_have_new_data(pv_systems=pv_systems)
pv_systems = filter_pv_systems_which_have_new_data(pv_systems=pv_systems, session=session)

# 3. Pull data
pull_data_and_save(
pv_systems=pv_systems,
session=session,
provider=provider,
session_pv_site=session_pv_site,
)

# update latest data
with connection_forecast.get_session() as session_forecast:
update_latest_input_data_last_updated(session=session_forecast, component="pv")


def pull_data_and_save(
pv_systems: List[PVSystemSQL],
pv_systems: List[SiteSQL],
session: Session,
provider: str,
datetime_utc: Optional[None] = None,
session_pv_site: Optional[Session] = None,
):
"""
Pull the pv ield data and save to database
Expand Down Expand Up @@ -157,15 +127,15 @@ def pull_data_and_save(
n_pv_systems_per_batch = 50
pv_system_chunks = chunks(original_list=pv_systems, n=n_pv_systems_per_batch)

all_pv_yields_sql = []
all_pv_yields_df = []
for pv_system_chunk in pv_system_chunks:
# get all the pv system ids from a a group of pv systems
pv_system_ids = [pv_system_id.pv_system_id for pv_system_id in pv_system_chunk]

if provider == "pvoutput.org":
# set up pv output.org
pv_output = PVOutput()

# get all the pv system ids from a a group of pv systems
pv_system_ids = [pv_system.client_site_id for pv_system in pv_system_chunk]

logger.debug(f"Getting data from {provider}")

# lets take the date of the datetime now.
Expand All @@ -187,43 +157,42 @@ def pull_data_and_save(

for i, pv_system in enumerate(pv_system_chunk):
logger.debug(
f"Processing {i}th pv system ({pv_system.pv_system_id=}), "
f"Processing {i}th pv system ({pv_system.client_site_id=}), "
f"out of {len(pv_systems)}"
)

# take only the data we need for system id
pv_yield_df = all_pv_yield_df[
all_pv_yield_df["system_id"].astype(int) == pv_system.pv_system_id
all_pv_yield_df["system_id"].astype(int) == pv_system.client_site_id
]
pv_yield_df["site_uuid"] = pv_system.site_uuid

logger.debug(
f"Got {len(pv_yield_df)} pv yield for "
f"pv systems {pv_system.pv_system_id} before filtering"
f"pv systems {pv_system.client_site_id} before filtering"
)

if len(pv_yield_df) == 0:
logger.warning(f"Did not find any data for {pv_system.pv_system_id} for {date}")
logger.warning(f"Did not find any data for {pv_system.client_site_id} for {date}")
else:
pv_yields_sql = format_pv_data(pv_system=pv_system, pv_yield_df=pv_yield_df)
# filter out which is in our database and a funny 0 bug
pv_yield_df = format_pv_data(
pv_system=pv_system, pv_yield_df=pv_yield_df, session=session
)

all_pv_yields_sql = all_pv_yields_sql + pv_yields_sql
if len(all_pv_yields_df) == 0:
all_pv_yields_df = pv_yield_df
else:
all_pv_yields_df = pd.concat([all_pv_yields_df, pv_yield_df])

if len(all_pv_yields_sql) > 100:
if len(all_pv_yields_df) > 100:
# 4. Save to database - perhaps check no duplicate data. (for each PV system)
save_to_database(session=session, pv_yields=all_pv_yields_sql)
all_pv_yields_sql = []

# 5. save to pv sites database
if session_pv_site is not None:
# TODO we are current doing this every round,
# we might find we have to batch it like the other save method
save_to_pv_site_database(
session=session_pv_site, pv_system=pv_system, pv_yield_df=pv_yield_df
)
session_pv_site.commit()
save_to_pv_site_database(session=session, pv_yield_df=all_pv_yields_df)
all_pv_yields_df = []

# 4. Save to database - perhaps check no duplicate data. (for each PV system)
save_to_database(session=session, pv_yields=all_pv_yields_sql)
logger.debug(all_pv_yields_df)
save_to_pv_site_database(session=session, pv_yield_df=all_pv_yields_df)


def chunks(original_list: List, n: int) -> Tuple[List]:
Expand Down
Loading
Loading