Skip to content

Commit

Permalink
Merge pull request #73 from openclimatefix/issue/rm-old-db
Browse files Browse the repository at this point in the history
remove old database: major change
  • Loading branch information
peterdudfield authored Jan 4, 2024
2 parents 356247b + f96add7 commit 894dbc6
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 469 deletions.
82 changes: 5 additions & 77 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 34 additions & 63 deletions pvconsumer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
from typing import List, Optional, Tuple

import click
from nowcasting_datamodel.connection import DatabaseConnection
from nowcasting_datamodel.models.base import Base_Forecast, Base_PV
from nowcasting_datamodel.models.pv import PVSystemSQL
from nowcasting_datamodel.read.read import update_latest_input_data_last_updated
import pandas as pd
from pvoutput import PVOutput
from pvsite_datamodel.connection import DatabaseConnection as PVSiteDatabaseConnection
from pvsite_datamodel.connection import DatabaseConnection
from pvsite_datamodel.sqlmodels import SiteSQL
from sqlalchemy.orm import Session

import pvconsumer
from pvconsumer.pv_systems import filter_pv_systems_which_have_new_data, get_pv_systems
from pvconsumer.save import save_to_database, save_to_pv_site_database
from pvconsumer.save import save_to_pv_site_database
from pvconsumer.solar_sheffield_passiv import get_all_latest_pv_yield_from_solar_sheffield
from pvconsumer.utils import FakeDatabaseConnection, format_pv_data
from pvconsumer.utils import format_pv_data

logging.basicConfig(
level=getattr(logging, os.getenv("LOGLEVEL", "INFO")),
Expand All @@ -37,20 +35,6 @@
@click.option(
"--db-url",
default=None,
envvar="DB_URL",
help="The Database URL where pv data will be saved",
type=click.STRING,
)
@click.option(
"--db-url-forecast",
default=None,
envvar="DB_URL_FORECAST",
help="The Database URL where update latest data will be saved",
type=click.STRING,
)
@click.option(
"--db-url-pv-site",
default=None,
envvar="DB_URL_PV_SITE",
help="The PV site Database URL where update latest data will be saved",
type=click.STRING,
Expand All @@ -71,8 +55,6 @@
)
def app(
db_url: str,
db_url_forecast: str,
db_url_pv_site: Optional[str] = None,
filename: Optional[str] = None,
provider: str = "pvoutput.org",
):
Expand All @@ -87,15 +69,9 @@ def app(

logger.info(f"Running PV Consumer app ({pvconsumer.__version__})")

connection = DatabaseConnection(url=db_url, base=Base_PV, echo=False)
connection_forecast = DatabaseConnection(url=db_url_forecast, base=Base_Forecast, echo=False)

if db_url_pv_site is not None:
connection_pv_site = PVSiteDatabaseConnection(url=db_url_pv_site, echo=False)
else:
connection_pv_site = FakeDatabaseConnection()
connection = DatabaseConnection(url=db_url, echo=False)

with connection.get_session() as session, connection_pv_site.get_session() as session_pv_site:
with connection.get_session() as session:
# 1. Read list of PV systems (from local file)
# and get their refresh times (refresh times can also be stored locally)
logger.debug("Read list of PV systems (from local file)")
Expand All @@ -107,27 +83,21 @@ def app(
"Find most recent entered data (for each PV system) in OCF database,"
"and filter pv systems depending on refresh rate"
)
pv_systems = filter_pv_systems_which_have_new_data(pv_systems=pv_systems)
pv_systems = filter_pv_systems_which_have_new_data(pv_systems=pv_systems, session=session)

# 3. Pull data
pull_data_and_save(
pv_systems=pv_systems,
session=session,
provider=provider,
session_pv_site=session_pv_site,
)

# update latest data
with connection_forecast.get_session() as session_forecast:
update_latest_input_data_last_updated(session=session_forecast, component="pv")


def pull_data_and_save(
pv_systems: List[PVSystemSQL],
pv_systems: List[SiteSQL],
session: Session,
provider: str,
datetime_utc: Optional[None] = None,
session_pv_site: Optional[Session] = None,
):
"""
Pull the pv ield data and save to database
Expand Down Expand Up @@ -157,15 +127,16 @@ def pull_data_and_save(
n_pv_systems_per_batch = 50
pv_system_chunks = chunks(original_list=pv_systems, n=n_pv_systems_per_batch)

all_pv_yields_sql = []
all_pv_yields_df = []
i = 0
for pv_system_chunk in pv_system_chunks:
# get all the pv system ids from a a group of pv systems
pv_system_ids = [pv_system_id.pv_system_id for pv_system_id in pv_system_chunk]

if provider == "pvoutput.org":
# set up pv output.org
pv_output = PVOutput()

# get all the pv system ids from a a group of pv systems
pv_system_ids = [pv_system.client_site_id for pv_system in pv_system_chunk]

logger.debug(f"Getting data from {provider}")

# lets take the date of the datetime now.
Expand All @@ -185,45 +156,45 @@ def pull_data_and_save(
else:
raise Exception(f"Can not use provider {provider}")

for i, pv_system in enumerate(pv_system_chunk):
for _, pv_system in enumerate(pv_system_chunk):
i = i + 1
logger.debug(
f"Processing {i}th pv system ({pv_system.pv_system_id=}), "
f"Processing {i}th pv system ({pv_system.client_site_id=}), "
f"out of {len(pv_systems)}"
)

# take only the data we need for system id
pv_yield_df = all_pv_yield_df[
all_pv_yield_df["system_id"].astype(int) == pv_system.pv_system_id
all_pv_yield_df["system_id"].astype(int) == pv_system.client_site_id
]
pv_yield_df["site_uuid"] = pv_system.site_uuid

logger.debug(
f"Got {len(pv_yield_df)} pv yield for "
f"pv systems {pv_system.pv_system_id} before filtering"
f"pv systems {pv_system.client_site_id} before filtering"
)

if len(pv_yield_df) == 0:
logger.warning(f"Did not find any data for {pv_system.pv_system_id} for {date}")
logger.warning(f"Did not find any data for {pv_system.client_site_id} for {date}")
else:
pv_yields_sql = format_pv_data(pv_system=pv_system, pv_yield_df=pv_yield_df)
# filter out which is in our database and a funny 0 bug
pv_yield_df = format_pv_data(
pv_system=pv_system, pv_yield_df=pv_yield_df, session=session
)

all_pv_yields_sql = all_pv_yields_sql + pv_yields_sql
if len(all_pv_yields_df) == 0:
all_pv_yields_df = pv_yield_df
else:
all_pv_yields_df = pd.concat([all_pv_yields_df, pv_yield_df])

if len(all_pv_yields_sql) > 100:
if len(all_pv_yields_df) > 100:
# 4. Save to database - perhaps check no duplicate data. (for each PV system)
save_to_database(session=session, pv_yields=all_pv_yields_sql)
all_pv_yields_sql = []

# 5. save to pv sites database
if session_pv_site is not None:
# TODO we are current doing this every round,
# we might find we have to batch it like the other save method
save_to_pv_site_database(
session=session_pv_site, pv_system=pv_system, pv_yield_df=pv_yield_df
)
session_pv_site.commit()
save_to_pv_site_database(session=session, pv_yield_df=all_pv_yields_df)
all_pv_yields_df = []

# 4. Save to database - perhaps check no duplicate data. (for each PV system)
save_to_database(session=session, pv_yields=all_pv_yields_sql)
logger.debug(all_pv_yields_df)
save_to_pv_site_database(session=session, pv_yield_df=all_pv_yields_df)


def chunks(original_list: List, n: int) -> Tuple[List]:
Expand Down
Loading

0 comments on commit 894dbc6

Please sign in to comment.