diff --git a/nowcasting_datamodel/save/save.py b/nowcasting_datamodel/save/save.py index bc316c98..731a4991 100644 --- a/nowcasting_datamodel/save/save.py +++ b/nowcasting_datamodel/save/save.py @@ -25,6 +25,7 @@ def save( update_gsp: Optional[bool] = True, apply_adjuster: Optional[bool] = True, save_to_last_seven_days: Optional[bool] = True, + remove_non_distinct_last_seven_days: bool = False, ): """ Save forecast to database @@ -40,6 +41,12 @@ def save( :param update_gsp: Optional (default true), to update all the GSP forecasts :param apply_adjuster: Optional (default true), to apply the adjuster :param save_to_last_seven_days: Optional (default true), to save to the last seven days table + :param remove_non_distinct_last_seven_days: Optional (default False), to only keep distinct + forecast values in the forecast_value_last_seven_days table + Please note that this does remove some data, which might be needed when calculating + metrics. Another solution to this is to make a forecast <--> forecast_value a + many-to-many relationship. This means one forecast will still have the full + range of forecast values assign with it. """ use_adjuster_env_var = bool(os.getenv("USE_ADJUSTER", "True").lower() in ["true", "1"]) @@ -67,7 +74,11 @@ def save( if save_to_last_seven_days: logger.debug("Saving to last seven days table") - save_all_forecast_values_seven_days(session=session, forecasts=forecasts) + save_all_forecast_values_seven_days( + session=session, + forecasts=forecasts, + remove_non_distinct=remove_non_distinct_last_seven_days, + ) session.commit() @@ -111,23 +122,30 @@ def save_pv_system(session: Session, pv_system: PVSystem) -> PVSystemSQL: return pv_system -def save_all_forecast_values_seven_days(session: Session, forecasts: List[ForecastSQL]): +def save_all_forecast_values_seven_days( + session: Session, forecasts: List[ForecastSQL], remove_non_distinct: bool = False +): """ Save all the forecast values in the last seven days table :param session: database sessions :param forecasts: list of forecasts + :param remove_non_distinct: Optional (default False), to only keep distinct forecast values + If the last saved forecast value is the same as the current one, it will not be saved """ # get all values together forecast_values_last_7_days = [] for forecast in forecasts: for forecast_value in forecast.forecast_values: - forecast_values_last_7_days.append( - change_forecast_value_to_forecast_last_7_days(forecast_value) + forecast_value_last_7_days = change_forecast_value_to_forecast_last_7_days( + forecast_value ) + forecast_values_last_7_days.append(forecast_value_last_7_days) # add them to the database add_forecast_last_7_days_and_remove_old_data( - session=session, forecast_values=forecast_values_last_7_days + session=session, + forecast_values=forecast_values_last_7_days, + remove_non_distinct=remove_non_distinct, ) diff --git a/nowcasting_datamodel/save/update.py b/nowcasting_datamodel/save/update.py index 1083e281..d0e2cbd8 100644 --- a/nowcasting_datamodel/save/update.py +++ b/nowcasting_datamodel/save/update.py @@ -9,7 +9,7 @@ from sqlalchemy.orm.session import Session from nowcasting_datamodel import N_GSP -from nowcasting_datamodel.models import ForecastValueSevenDaysSQL +from nowcasting_datamodel.models import ForecastValueSevenDaysSQL, LocationSQL, MLModelSQL from nowcasting_datamodel.models.forecast import ( ForecastSQL, ForecastValueLatestSQL, @@ -296,14 +296,84 @@ def change_forecast_value_to_forecast_last_7_days( return forecast_new +def remove_non_distinct_forecast_values( + session: Session, start_datetime: datetime, end_datetime: datetime +): + """ + Remove any non-distinct forecast values + + We remove any non-distinct values from ForecastValueSevenDaysSQL. The values are distinct on + - target_time + - location_id + - model_name + - expected_power_generation_megawatts + + We keep the first value, and remove and values from later on. + + :param session: database session + :param start_datetime: start datetime + :param end_datetime: end_dateimte + """ + + logger.debug(f"Removing non distinct forecast values for {start_datetime} to {end_datetime}") + + # 1. sub query, these are the values we want to keep + sub_query = session.query(ForecastValueSevenDaysSQL.uuid) + + # distinct + sub_query = sub_query.distinct( + ForecastValueSevenDaysSQL.target_time, + ForecastSQL.location_id, + MLModelSQL.name, + ForecastValueSevenDaysSQL.expected_power_generation_megawatts, + ) + + # join + sub_query = sub_query.join(ForecastSQL) + sub_query = sub_query.join(LocationSQL) + sub_query = sub_query.join(MLModelSQL) + + # filter on time + sub_query = sub_query.filter(ForecastValueSevenDaysSQL.target_time >= start_datetime) + sub_query = sub_query.filter(ForecastValueSevenDaysSQL.target_time < end_datetime) + + # order by + sub_query = sub_query.order_by( + ForecastValueSevenDaysSQL.target_time, + ForecastSQL.location_id, + MLModelSQL.name, + ForecastValueSevenDaysSQL.expected_power_generation_megawatts, + ForecastValueSevenDaysSQL.created_utc, # get the first one + ) + + sub_query = sub_query.subquery() + + # 2. main query + query = session.query(ForecastValueSevenDaysSQL) + + # filter on tiem + query = query.filter(ForecastValueSevenDaysSQL.target_time >= start_datetime) + query = query.filter(ForecastValueSevenDaysSQL.target_time < end_datetime) + + # select uuid not in subquery + query = query.filter(ForecastValueSevenDaysSQL.uuid.notin_(sub_query)) + + # delete all results + query.delete() + + def add_forecast_last_7_days_and_remove_old_data( - forecast_values: List[ForecastValueSevenDaysSQL], session: Session + forecast_values: List[ForecastValueSevenDaysSQL], + session: Session, + remove_non_distinct: bool = False, ): """ Add forecast values and delete old values :param forecast_values: :param session: + :param remove_non_distinct: Optional (default False), to only keep distinct forecast values + If the last saved forecast value is the same as the current one, it will not be saved :return: """ @@ -313,7 +383,16 @@ def add_forecast_last_7_days_and_remove_old_data( # remove old data now_minus_7_days = datetime.now(tz=timezone.utc) - timedelta(days=7) + # remove any duplicate forecast values + if remove_non_distinct: + for i in range(0, 24 * 10): + start_datetime = now_minus_7_days + timedelta(hours=i) + end_datetime = start_datetime + timedelta(hours=1) + remove_non_distinct_forecast_values(session, start_datetime, end_datetime) + logger.debug(f"Removing data before {now_minus_7_days}") query = session.query(ForecastValueSevenDaysSQL) query = query.where(ForecastValueSevenDaysSQL.target_time < now_minus_7_days) query.delete() + + session.commit() diff --git a/tests/save/test_save.py b/tests/save/test_save.py index 41f33700..b993e124 100644 --- a/tests/save/test_save.py +++ b/tests/save/test_save.py @@ -62,6 +62,43 @@ def test_save(db_session, latest_me): assert forecast_latest_values[0].properties is not None +@freeze_time("2024-01-01 00:00:00") +def test_save_distinct_last_seven_days(db_session, latest_me): + # Make sure save works where no forecast already exists + forecasts = make_fake_forecasts( + gsp_ids=range(0, 10), + session=db_session, + t0_datetime_utc=datetime(2024, 1, 1, tzinfo=timezone.utc), + ) + save(session=db_session, forecasts=forecasts, remove_non_distinct_last_seven_days=True) + + forecast_values = db_session.query(ForecastValueSQL).all() + assert np.max([fv.adjust_mw for fv in forecast_values]) != 0.0 + + # 10 forecast, + 10 historic ones + assert len(db_session.query(ForecastSQL).all()) == 20 + assert len(db_session.query(ForecastValueSQL).all()) == 10 * N_FAKE_FORECASTS + assert len(db_session.query(ForecastValueLatestSQL).all()) == 10 * N_FAKE_FORECASTS + assert len(db_session.query(ForecastValueSevenDaysSQL).all()) == 10 * N_FAKE_FORECASTS + + # Make sure save works where forecast exists already + forecasts = make_fake_forecasts(gsp_ids=range(0, 10), session=db_session) + save(session=db_session, forecasts=forecasts, remove_non_distinct_last_seven_days=True) + + # 20 forecast, + 10 historic ones + assert len(db_session.query(ForecastSQL).all()) == 30 + forecast_values = db_session.query(ForecastValueSQL).all() + assert len(forecast_values) == 20 * N_FAKE_FORECASTS + forecast_values_latest = db_session.query(ForecastValueLatestSQL).all() + assert len(forecast_values_latest) == 10 * N_FAKE_FORECASTS + forecast_values_seven_days = db_session.query(ForecastValueSevenDaysSQL).all() + assert len(forecast_values_seven_days) == 10 * (N_FAKE_FORECASTS + 49) + + assert forecast_values[0].properties is not None + assert forecast_values_latest[0].properties is not None + assert forecast_values_seven_days[0].properties is not None + + @freeze_time("2024-01-01 00:00:00") def test_save_no_adjuster(db_session): # Make sure save works where no forecast already exists @@ -116,7 +153,9 @@ def test_save_all_forecast_values_seven_days(db_session): now = datetime.now(tz=timezone.utc) forecasts = make_fake_forecasts(gsp_ids=range(0, 3), session=db_session, t0_datetime_utc=now) - save_all_forecast_values_seven_days(session=db_session, forecasts=forecasts) + save_all_forecast_values_seven_days( + session=db_session, forecasts=forecasts, remove_non_distinct=False + ) assert len(db_session.query(ForecastValueSevenDaysSQL).all()) == 3 * 112 diff --git a/tests/save/test_update.py b/tests/save/test_update.py index 7537f380..cd83230a 100644 --- a/tests/save/test_update.py +++ b/tests/save/test_update.py @@ -326,7 +326,9 @@ def test_change_forecast_value_to_forecast_last_7_days(db_session): forecast=forecast_value ) add_forecast_last_7_days_and_remove_old_data( - forecast_values=[forecast_value_last_seven_days], session=db_session + forecast_values=[forecast_value_last_seven_days], + session=db_session, + remove_non_distinct=False, ) assert len(db_session.query(ForecastValueSevenDaysSQL).all()) == 1