diff --git a/pvnet_app/app.py b/pvnet_app/app.py index c1bf285..99eedf1 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -40,7 +40,11 @@ import pvnet_app from pvnet_app.utils import ( - worker_init_fn, populate_data_config_sources, convert_dataarray_to_forecasts + worker_init_fn, + populate_data_config_sources, + convert_dataarray_to_forecasts, + find_min_satellite_delay_config, + save_yaml_config, ) from pvnet_app.data import ( download_all_sat_data, download_all_nwp_data, preprocess_sat_data, preprocess_nwp_data, @@ -105,6 +109,24 @@ # Remove extra models if not configured to run them if os.getenv("RUN_EXTRA_MODELS", "false").lower() == "false": models_dict = {"pvnet_v2": models_dict["pvnet_v2"]} + + +# Pull the data configs from huggingface +data_config_filenames = [] +for model in models_dict.values(): + data_config_filenames.append( + PVNetBaseModel.get_data_config( + model["pvnet"]["name"], + revision=model["pvnet"]["version"], + ) + ) + +# Find the config with satellite delay suitable for all models running +common_config = find_min_satellite_delay_config(data_config_filenames) + +common_config_path = "common_config_path.yaml" + +save_yaml_config(common_config, common_config_path) # --------------------------------------------------------------------------- # LOGGER @@ -184,12 +206,6 @@ def app( # --------------------------------------------------------------------------- # Prepare data sources - - # Pull the data config from huggingface - data_config_filename = PVNetBaseModel.get_data_config( - models_dict[ "pvnet_v2"]["pvnet"]["name"], - revision=models_dict[ "pvnet_v2"]["pvnet"]["version"], - ) # Make pands Series of most recent GSP effective capacities logger.info("Loading GSP metadata") @@ -213,7 +229,7 @@ def app( download_all_sat_data() # Process the 5/15 minutely satellite data - preprocess_sat_data(t0, data_config_filename) + preprocess_sat_data(t0, common_config_path) # Download NWP data logger.info("Downloading NWP data") @@ -230,7 +246,7 @@ def app( temp_dir = tempfile.TemporaryDirectory() populated_data_config_filename = f"{temp_dir.name}/data_config.yaml" - populate_data_config_sources(data_config_filename, populated_data_config_filename) + populate_data_config_sources(common_config_path, populated_data_config_filename) # Location and time datapipes location_pipe = IterableWrapper([gsp_id_to_loc(gsp_id) for gsp_id in gsp_ids]) @@ -316,8 +332,7 @@ def app( # Write predictions to database logger.info("Writing to database") - connection = db_connection# DatabaseConnection(url=os.environ["DB_URL"]) - with connection.get_session() as session: + with db_connection.get_session() as session: for model_name, forecast_compiler in forecast_compilers.items(): diff --git a/pvnet_app/utils.py b/pvnet_app/utils.py index 1ebc7d5..318f754 100644 --- a/pvnet_app/utils.py +++ b/pvnet_app/utils.py @@ -1,11 +1,14 @@ +from datetime import timezone, datetime import fsspec.asyn import yaml import os +import copy import xarray as xr import numpy as np import pandas as pd from sqlalchemy.orm import Session import logging + from nowcasting_datamodel.models import ( ForecastSQL, ForecastValue, @@ -16,7 +19,7 @@ ) from nowcasting_datamodel.read.read_models import get_model -from datetime import timezone, datetime + from pvnet_app.consts import sat_path, nwp_ukv_path, nwp_ecmwf_path @@ -39,6 +42,19 @@ def worker_init_fn(worker_id): fsspec.asyn.iothread[0] = None fsspec.asyn.loop[0] = None + +def load_yaml_config(path): + """Load config file from path""" + with open(path) as file: + config = yaml.load(file, Loader=yaml.FullLoader) + return config + + +def save_yaml_config(config, path): + """Save config file to path""" + with open(path, 'w') as file: + yaml.dump(config, file, default_flow_style=False) + def populate_data_config_sources(input_path, output_path): """Resave the data config and replace the source filepaths @@ -47,8 +63,7 @@ def populate_data_config_sources(input_path, output_path): input_path: Path to input datapipes configuration file output_path: Location to save the output configuration file """ - with open(input_path) as infile: - config = yaml.load(infile, Loader=yaml.FullLoader) + config = load_yaml_config(input_path) production_paths = { "gsp": os.environ["DB_URL"], @@ -76,9 +91,27 @@ def populate_data_config_sources(input_path, output_path): # We do not need to set PV path right now. This currently done through datapipes # TODO - Move the PV path to here - with open(output_path, 'w') as outfile: - yaml.dump(config, outfile, default_flow_style=False) + save_yaml_config(config, output_path) + + +def find_min_satellite_delay_config(config_paths): + """Find the config with the minimum satallite delay across from list of config paths""" + # Load all the configs + configs = [load_yaml_config(config_path) for config_path in config_paths] + + min_sat_delay = np.inf + for config in configs: + + min_sat_delay = min( + min_sat_delay, + config["input_data"]["satellite"]["live_delay_minutes"] + ) + + config = configs[0] + config["input_data"]["satellite"]["live_delay_minutes"] = min_sat_delay + return config + def preds_to_dataarray(preds, model, valid_times, gsp_ids): """Put numpy array of predictions into a dataarray""" @@ -191,3 +224,7 @@ def convert_dataarray_to_forecasts( forecasts.append(forecast) return forecasts + + + + \ No newline at end of file