From 596708be86c985c40681c09be6b860a99177adb9 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Tue, 16 Apr 2024 15:32:03 +0000 Subject: [PATCH 1/9] update reqs --- requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 83c559a..0c13a9e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,9 +2,9 @@ pydantic pytorch-lightning==2.1.3 torch[cpu]==2.2.0 PVNet-summation==0.1.3 -pvnet==3.0.11 -ocf_datapipes==3.2.11 -nowcasting_datamodel>=1.5.30 +pvnet==3.0.25 +ocf_datapipes==3.3.18 +nowcasting_datamodel>=1.5.39 fsspec[s3] xarray zarr From b9b0ffefb706c8773dc789777d5841ced5567213 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Tue, 16 Apr 2024 15:32:59 +0000 Subject: [PATCH 2/9] better handling on satellite delays --- pvnet_app/app.py | 79 ++++++++--------- pvnet_app/data.py | 124 ++++++++++++++++++++------ tests/conftest.py | 56 ++++++------ tests/test_data.py | 210 ++++++++++++++++++++++++++++----------------- 4 files changed, 295 insertions(+), 174 deletions(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index 6d6ac39..b4599c4 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -27,13 +27,14 @@ from ocf_datapipes.load import OpenGSPFromDatabase from ocf_datapipes.training.pvnet import construct_sliced_data_pipeline from ocf_datapipes.utils.consts import ELEVATION_MEAN, ELEVATION_STD -from ocf_datapipes.batch import BatchKey, stack_np_examples_into_batch +from ocf_datapipes.batch import ( + BatchKey, stack_np_examples_into_batch, batch_to_tensor, copy_batch_to_device +) from pvnet_summation.models.base_model import BaseModel as SummationBaseModel from torch.utils.data import DataLoader from torch.utils.data.datapipes.iter import IterableWrapper import pvnet -from pvnet.data.utils import batch_to_tensor, copy_batch_to_device from pvnet.models.base_model import BaseModel as PVNetBaseModel from pvnet.utils import GSPLocationLookup @@ -61,8 +62,8 @@ batch_size = 10 # Huggingfacehub model repo and commit for PVNet (GSP-level model) -default_model_name = "openclimatefix/pvnet_v2" -default_model_version = "5ed2b179974993d8804a1e60fdc850dc547e9025" +default_model_name = "openclimatefix/pvnet_uk_region" +default_model_version = "9cc2bf5859e129b3816041b657c8875d31ced0d6" # Huggingfacehub model repo and commit for PVNet summation (GSP sum to national model) # If summation_model_name is set to None, a simple sum is computed instead @@ -164,10 +165,42 @@ def app( logger.info(f"Making forecast for GSP IDs: {gsp_ids}") # --------------------------------------------------------------------------- - # 1. Prepare data sources + # 1. set up model + logger.info(f"Loading model: {model_name} - {model_version}") - # Make pands Series of most recent GSP effective capacities + model = PVNetBaseModel.from_pretrained( + model_name, + revision=model_version, + ).to(device) + + if summation_model_name is not None: + summation_model = SummationBaseModel.from_pretrained( + summation_model_name, + revision=summation_model_version, + ).to(device) + + if ( + summation_model.pvnet_model_name != model_name + or summation_model.pvnet_model_version != model_version + ): + warnings.warn( + f"The PVNet version running in this app is {model_name}/{model_version}. " + "The summation model running in this app was trained on outputs from PVNet version " + f"{summation_model.pvnet_model_name}/{summation_model.pvnet_model_version}. " + "Combining these models may lead to an error if the shape of PVNet output doesn't " + "match the expected shape of the summation model. Combining may lead to unreliable " + "results even if the shapes match." + ) + # --------------------------------------------------------------------------- + # 2. Prepare data sources + + # Pull the data config from huggingface + data_config_filename = PVNetBaseModel.get_data_config( + model_name, + revision=model_version, + ) + # Make pands Series of most recent GSP effective capacities logger.info("Loading GSP metadata") ds_gsp = next(iter(OpenGSPFromDatabase())) @@ -190,7 +223,7 @@ def app( download_all_sat_data() # Process the 5/15 minutely satellite data - preprocess_sat_data(t0) + preprocess_sat_data(t0, data_config_filename) # Download NWP data logger.info("Downloading NWP data") @@ -202,12 +235,6 @@ def app( # --------------------------------------------------------------------------- # 2. Set up data loader logger.info("Creating DataLoader") - - # Pull the data config from huggingface - data_config_filename = PVNetBaseModel.get_data_config( - model_name, - revision=model_version, - ) # Populate the data config with production data paths temp_dir = tempfile.TemporaryDirectory() @@ -253,33 +280,7 @@ def app( dataloader = DataLoader(batch_datapipe, **dataloader_kwargs) - # --------------------------------------------------------------------------- - # 3. set up model - logger.info(f"Loading model: {model_name} - {model_version}") - - model = PVNetBaseModel.from_pretrained( - model_name, - revision=model_version, - ).to(device) - - if summation_model_name is not None: - summation_model = SummationBaseModel.from_pretrained( - summation_model_name, - revision=summation_model_version, - ).to(device) - if ( - summation_model.pvnet_model_name != model_name - or summation_model.pvnet_model_version != model_version - ): - warnings.warn( - f"The PVNet version running in this app is {model_name}/{model_version}. " - "The summation model running in this app was trained on outputs from PVNet version " - f"{summation_model.pvnet_model_name}/{summation_model.pvnet_model_version}. " - "Combining these models may lead to an error if the shape of PVNet output doesn't " - "match the expected shape of the summation model. Combining may lead to unreliable " - "results even if the shapes match." - ) # 4. Make prediction logger.info("Processing batches") diff --git a/pvnet_app/data.py b/pvnet_app/data.py index b86f06f..bbc2f53 100644 --- a/pvnet_app/data.py +++ b/pvnet_app/data.py @@ -7,6 +7,7 @@ import fsspec from datetime import timedelta import ocf_blosc2 +from ocf_datapipes.config.load import load_yaml_configuration from pvnet_app.consts import sat_path, nwp_ukv_path, nwp_ecmwf_path @@ -38,47 +39,122 @@ def download_all_sat_data(): logger.info(f"Downloading 15-minute satellite data {sat_15_dl_path}") fs.get(sat_15_dl_path, "sat_15_min.zarr.zip") os.system(f"unzip sat_15_min.zarr.zip -d {sat_15_path}") + +def _get_latest_time_and_mins_delay(sat_zarr_path, t0): + ds_sat = xr.open_zarr(sat_zarr_path) + latest_time = pd.to_datetime(ds_sat.time.max().item()) + delay = t0 - latest_time + delay_mins = int(delay.total_seconds() / 60) + return latest_time, delay_mins + -def preprocess_sat_data(t0): +def combine_5_and_15_sat_data(t0, max_sat_delay_allowed_mins): """Select and/or combine the 5 and 15-minutely satellite data""" - use_15_minute = False - if not os.path.exists(sat_5_path): - use_15_minute = True - logger.debug(f"5-minute satellite data not found at {sat_5_path}. " - f"Using 15-minute data.") + use_5_minute = os.path.exists(sat_5_path) + if not use_5_minute: + logger.info(f"5-minute satellite data not found at {sat_5_path}. Trying 15-minute data.") else: - ds_sat_5 = xr.open_zarr(sat_5_path) - latest_time_5 = pd.to_datetime(ds_sat_5.time.max().values) - sat_delay_5 = t0 - latest_time_5 #Timedelta for the delay - sat_delay_minutes = int(sat_delay_5.total_seconds() / 60) #To see the timedelta in minutes + latest_time_5, delay_mins_5 = _get_latest_time_and_mins_delay(sat_5_path, t0) logger.info(f"Latest 5-minute timestamp is {latest_time_5} for t0 time {t0}.") - - if sat_delay_minutes < 60: - logger.info(f"5-min satellite delay is only {sat_delay_minutes} minutes - Using 5-minutely data.") + + if delay_mins_5 <= max_sat_delay_allowed_mins: + logger.info( + f"5-min satellite delay is only {delay_mins_5} minutes. " + f"Maximum delay for this model is {max_sat_delay_allowed_mins} minutes - " + "Using 5-minutely data." + ) os.system(f"mv {sat_5_path} {sat_path}") else: - use_15_minute = True - logger.info(f"5-min satellite delay is {sat_delay_minutes} minutes - " - f"Switching to 15-minutely data.") + logger.info( + f"5-min satellite delay is {delay_mins_5} minutes. " + f"Maximum delay for this model is {max_sat_delay_allowed_mins} minutes - " + "Trying 15-minutely data." + ) + use_5_minute = False - if use_15_minute: - logger.info(f"Using 15-minute satellite data") + if not use_5_minute: + # Make sure the 15-minute data is actually there + if not os.path.exists(sat_15_path): + raise ValueError(f"5-minute satellite data not found at {sat_15_path}") - ds_sat_15 = xr.open_zarr(sat_15_path) - latest_time_15 = pd.to_datetime(ds_sat_15.time.max().values) + latest_time_15, delay_mins_15 = _get_latest_time_and_mins_delay(sat_15_path, t0) logger.info(f"Latest 15-minute timestamp is {latest_time_15} for t0 time {t0}.") - logger.debug("Resampling 15 minute data to 5 mins") + # If the 15-minute satellite data is too delayed the run fails + if delay_mins_15 > max_sat_delay_allowed_mins: + raise ValueError( + f"15-min satellite delay is {delay_mins_15} minutes. " + f"Maximum delay for this model is {max_sat_delay_allowed_mins} minutes" + ) + + ds_sat_15 = xr.open_zarr(sat_15_path) + + #logger.debug("Resampling 15 minute data to 5 mins") #ds_sat_15.resample(time="5T").interpolate("linear").to_zarr(sat_path) ds_sat_15.attrs["source"] = "15-minute" - logger.debug(f"Saving 15 minute data to {sat_path}") ds_sat_15.to_zarr(sat_path) + + +def extend_satellite_data_with_nans(t0, min_sat_delay_used_mins): + """Fill the satellite data with NaNs if needed by the model""" + + # Check how the expected satellite delay compares with the satellite data available and fill + # if required + latest_time, delay_mins = _get_latest_time_and_mins_delay(sat_path, t0) + + if min_sat_delay_used_mins < delay_mins: + fill_mins = delay_mins - min_sat_delay_used_mins + logger.info(f"Filling most recent {fill_mins} mins with NaNs") + + # Load into memory so we can delete it on disk + ds_sat = xr.open_zarr(sat_path).compute() + + # Pad with zeros + fill_times = pd.date_range( + latest_time+timedelta(minutes=5), + latest_time+timedelta(minutes=fill_mins), + freq="5min" + ) + + + ds_sat = ds_sat.reindex(time=np.concatenate([ds_sat.time, fill_times]), fill_value=np.nan) + + # Re-save inplace + os.system(f"rm -rf {sat_path}") + ds_sat.to_zarr(sat_path) + + +def preprocess_sat_data(t0, data_config_filename): + + # Find the max delay w.r.t t0 that this model was trained with + data_config = load_yaml_configuration(data_config_filename) + + # Take into account how recently the model tries to slice data from + max_sat_delay_allowed_mins = data_config.input_data.satellite.live_delay_minutes + + # Take into account the dropout the model was trained with, if any + if data_config.input_data.satellite.dropout_fraction>0: + max_sat_delay_allowed_mins = max( + max_sat_delay_allowed_mins, + np.abs(data_config.input_data.satellite.dropout_timedeltas_minutes).max() + ) + + # The model will not ever try to use data more recent than this + min_sat_delay_used_mins = data_config.input_data.satellite.live_delay_minutes + + # Deal with switching between the 5 and 15 minutely satellite data + combine_5_and_15_sat_data(t0, max_sat_delay_allowed_mins) + + # Extend the satellite data with NaNs if needed by the model + extend_satellite_data_with_nans(t0, min_sat_delay_used_mins) + + ds_sat = xr.open_zarr(sat_path) + ds_sat.data.isnull().mean().compute() + #assert False - return use_15_minute - def _download_nwp_data(source, destination): fs = fsspec.open(source).fs diff --git a/tests/conftest.py b/tests/conftest.py index c2041a6..75ba2bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,9 +20,9 @@ xr.set_options(keep_attrs=True) - -def time_before_present(dt: timedelta): - return pd.Timestamp.now(tz=None) - dt +@pytest.fixture() +def test_t0(): + return pd.Timestamp.now(tz=None).floor(timedelta(minutes=30)) @pytest.fixture(scope="session") @@ -81,12 +81,12 @@ def db_session(db_connection, engine_url): s.rollback() -def make_nwp_data(shell_path, varname): +def make_nwp_data(shell_path, varname, test_t0): # Load dataset which only contains coordinates, but no data ds = xr.open_zarr(shell_path) # Last init time was at least 2 hours ago and floor to 3-hour interval - t0_datetime_utc = time_before_present(timedelta(hours=2)).floor(timedelta(hours=3)) + t0_datetime_utc = (test_t0 - timedelta(hours=2)).floor(timedelta(hours=3)) ds.init_time.values[:] = pd.date_range( t0_datetime_utc - timedelta(hours=3 * (len(ds.init_time) - 1)), t0_datetime_utc, @@ -116,34 +116,35 @@ def make_nwp_data(shell_path, varname): @pytest.fixture -def nwp_ukv_data(): +def nwp_ukv_data(test_t0): return make_nwp_data( shell_path=f"{os.path.dirname(os.path.abspath(__file__))}/test_data/nwp_ukv_shell.zarr", varname="UKV", + test_t0=test_t0, ) @pytest.fixture -def nwp_ecmwf_data(): +def nwp_ecmwf_data(test_t0): return make_nwp_data( shell_path=f"{os.path.dirname(os.path.abspath(__file__))}/test_data/nwp_ecmwf_shell.zarr", - varname="UKV", + varname="ECMWF_UK", + test_t0=test_t0, ) -@pytest.fixture() -def sat_5_data(): +def _get_sat_data(test_t0, delay_mins, freq_mins): # Load dataset which only contains coordinates, but no data ds = xr.open_zarr( f"{os.path.dirname(os.path.abspath(__file__))}/test_data/non_hrv_shell.zarr" ) - # Change times so they lead up to present. Delayed by 30-60 mins - t0_datetime_utc = time_before_present(timedelta(minutes=30)).floor(timedelta(minutes=30)) + # Change times so they lead up to present + t0_datetime_utc = test_t0 - timedelta(minutes=delay_mins) ds.time.values[:] = pd.date_range( - t0_datetime_utc - timedelta(minutes=5 * (len(ds.time) - 1)), + t0_datetime_utc - timedelta(minutes=freq_mins * (len(ds.time) - 1)), t0_datetime_utc, - freq=timedelta(minutes=5), + freq=timedelta(minutes=freq_mins), ) # Add data to dataset @@ -158,34 +159,27 @@ def sat_5_data(): return ds +@pytest.fixture() +def sat_5_data(test_t0): + return _get_sat_data(test_t0, delay_mins=10, freq_mins=5) + @pytest.fixture() -def sat_5_data_delayed(sat_5_data): - # Set the most recent timestamp to 2 - 2.5 hours ago - t_most_recent = time_before_present(timedelta(hours=2)).floor(timedelta(minutes=30)) - offset = sat_5_data.time.max().values - t_most_recent - sat_5_delayed = sat_5_data.copy(deep=True) - sat_5_delayed["time"] = sat_5_data.time.values - offset - return sat_5_delayed +def sat_5_data_delayed(test_t0): + return _get_sat_data(test_t0, delay_mins=120, freq_mins=5) @pytest.fixture() -def sat_15_data(sat_5_data): - freq = timedelta(minutes=15) - times_15 = pd.date_range( - pd.to_datetime(sat_5_data.time.min().values).ceil(freq), - pd.to_datetime(sat_5_data.time.max().values).floor(freq), - freq=freq, - ) - return sat_5_data.sel(time=times_15) +def sat_15_data(test_t0): + return _get_sat_data(test_t0, delay_mins=0, freq_mins=15) @pytest.fixture() -def gsp_yields_and_systems(db_session): +def gsp_yields_and_systems(db_session, test_t0): """Create gsp yields and systems""" # GSP data is mostly up to date - t0_datetime_utc = time_before_present(timedelta(minutes=0)).floor(timedelta(minutes=30)) + t0_datetime_utc = test_t0 # this pv systems has same coordiantes as the first gsp gsp_yields = [] diff --git a/tests/test_data.py b/tests/test_data.py index ddc0938..f70990f 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -10,131 +10,181 @@ Note that I'm not sure these tests will work in parallel, due to files being saved in the same places """ - -from pvnet_app.data import download_all_sat_data, preprocess_sat_data, sat_path, sat_5_path, sat_15_path -import zarr import os -import pandas as pd import tempfile +import pytest +import zarr +import numpy as np +import pandas as pd +import xarray as xr +from datetime import timedelta +from pvnet.models.base_model import BaseModel as PVNetBaseModel +from pvnet_app.data import ( + download_all_sat_data, preprocess_sat_data, sat_path, sat_5_path, sat_15_path +) +from pvnet_app.app import default_model_name, default_model_version -def save_to_zarr_zip(sat_5_data, filename): + +@pytest.fixture() +def data_config_filename(): + # Pull the data config from huggingface + filename = PVNetBaseModel.get_data_config( + default_model_name, + revision=default_model_version, + ) + return filename + + +def save_to_zarr_zip(ds, filename): encoding = {"data": {"dtype": "int16"}} with zarr.ZipStore(filename) as store: - sat_5_data.to_zarr(store, compute=True, mode="w", encoding=encoding, consolidated=True) + ds.to_zarr(store, compute=True, mode="w", encoding=encoding, consolidated=True) -def test_download_sat_data(sat_5_data): - """1. Download just 5 minute sat""" +def check_timesteps(sat_path, expected_mins, skip_nans=False): + ds_sat = xr.open_zarr(sat_path) + + if not isinstance(expected_mins, list): + expected_mins = [expected_mins] + + dts = pd.to_datetime(ds_sat.time).diff()[1:] + assert (np.isin(dts, [np.timedelta64(m, "m") for m in expected_mins])).all(), dts - # make temporary directory - with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest.zarr.zip") - # zip sat_5_data file to 'sat_5_data.zarr.zip' - save_to_zarr_zip(sat_5_data, filename=filename) +def test_download_sat_5_data(sat_5_data): + """Download only the 5 minute satellite data""" - os.environ["SATELLITE_ZARR_PATH"] = filename + # make temporary directory + with tempfile.TemporaryDirectory() as tmpdirname: + + # Change to temporary working directory + os.chdir(tmpdirname) + + # Make 5-minutely satellite data available + save_to_zarr_zip(sat_5_data, filename= "latest.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] = "latest.zarr.zip" download_all_sat_data() - - # assert that the file 'sat_5_path' exists + + # Assert that the file 'sat_5_path' exists assert os.path.exists(sat_5_path) assert not os.path.exists(sat_15_path) + + # Check the satellite data is 5-minutely + check_timesteps(sat_5_path, expected_mins=5) -def test_download_sat_15_data(sat_5_data): - """2. Download just 15 minute sat""" +def test_download_sat_15_data(sat_15_data): + """Download only the 15 minute satellite data""" + # make temporary directory with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest_15.zarr.zip") - - # zip sat_5_data file to 'sat_5_data.zarr.zip' - save_to_zarr_zip(sat_5_data, filename=filename) - - os.environ["SATELLITE_ZARR_PATH"] = os.path.join(tmpdirname, "latest.zarr.zip") + + # Change to temporary working directory + os.chdir(tmpdirname) + + # Make 15-minutely satellite data available + save_to_zarr_zip(sat_15_data, filename="latest_15.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] = "latest.zarr.zip" + download_all_sat_data() - + + # Assert that the file 'sat_15_path' exists assert not os.path.exists(sat_5_path) assert os.path.exists(sat_15_path) + + # Check the satellite data is 15-minutely + check_timesteps(sat_15_path, expected_mins=15) -def test_download_sat_both_data(sat_5_data): - """3. Download 5 minute sat, then 15 minute sa""" +def test_download_sat_both_data(sat_5_data, sat_15_data): + """Download 5 minute sat and 15 minute satellite data""" with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest.zarr.zip") - save_to_zarr_zip(sat_5_data, filename=filename) - - filename = os.path.join(tmpdirname, "latest_15.zarr.zip") - save_to_zarr_zip(sat_5_data, filename=filename) - - os.environ["SATELLITE_ZARR_PATH"] = os.path.join(tmpdirname, "latest.zarr.zip") + + # Change to temporary working directory + os.chdir(tmpdirname) + + # Make 5- and 15-minutely satellite data available + save_to_zarr_zip(sat_5_data, filename="latest.zarr.zip") + save_to_zarr_zip(sat_15_data, filename="latest_15.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] ="latest.zarr.zip" + download_all_sat_data() assert os.path.exists(sat_5_path) assert os.path.exists(sat_15_path) + + # Check this satellite data is 5-minutely + check_timesteps(sat_5_path, expected_mins=5) + + # Check this satellite data is 15-minutely + check_timesteps(sat_15_path, expected_mins=15) -def test_preprocess_sat_data(sat_5_data): - """4. Download and process 5 minute""" +def test_preprocess_sat_data(sat_5_data, data_config_filename, test_t0): + """Download and process only the 5 minute satellite data""" # make temporary directory with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest.zarr.zip") - - # zip sat_5_data file to 'sat_5_data.zarr.zip' - save_to_zarr_zip(sat_5_data, filename=filename) - - os.environ["SATELLITE_ZARR_PATH"] = filename + + # Change to temporary working directory + os.chdir(tmpdirname) + + # Make 5-minutely satellite data available + save_to_zarr_zip(sat_5_data, filename="latest.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] = "latest.zarr.zip" download_all_sat_data() - use_15_minute = preprocess_sat_data(pd.Timestamp.now(tz=None)) - assert use_15_minute == False + + preprocess_sat_data(test_t0, data_config_filename) + + # Check the satellite data is 5-minutely + check_timesteps(sat_path, expected_mins=5) -def test_preprocess_sat_15_data(sat_5_data): - """5. Download and process 15 minute""" +def test_preprocess_sat_15_data(sat_15_data, data_config_filename, test_t0): + """Download and process only the 15 minute satellite data""" # make temporary directory with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest_15.zarr.zip") - - save_to_zarr_zip(sat_5_data, filename=filename) - - os.environ["SATELLITE_ZARR_PATH"] = os.path.join(tmpdirname, "latest.zarr.zip") + + # Change to temporary working directory + os.chdir(tmpdirname) + + # Make 15-minutely satellite data available + save_to_zarr_zip(sat_15_data, filename="latest_15.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] = "latest.zarr.zip" download_all_sat_data() - assert not os.path.exists(sat_5_path) - assert os.path.exists(sat_15_path) + + preprocess_sat_data(test_t0, data_config_filename) + + # Check the satellite data being used is 15-minutely + check_timesteps(sat_path, expected_mins=15) - use_15_minute = preprocess_sat_data(pd.Timestamp.now(tz=None)) - assert use_15_minute == True - # assert that the file 'sat_5_path' exists - assert os.path.exists(sat_path) - assert not os.path.exists(sat_5_path) - assert os.path.exists(sat_15_path) - - -def test_preprocess_old_sat_5_data_(sat_5_data): - """6. Download and process 5 and 15 minute, then use 15 minute""" +def test_preprocess_old_sat_5_data(sat_5_data_delayed, sat_15_data, data_config_filename, test_t0): + """Download and process 5 and 15 minute satellite data. Use the 15 minute data since the + 5 minute data is too delayed + """ # make temporary directory with tempfile.TemporaryDirectory() as tmpdirname: - # tempfile - filename = os.path.join(tmpdirname, "latest.zarr.zip") - save_to_zarr_zip(sat_5_data, filename=filename) - - filename = os.path.join(tmpdirname, "latest_15.zarr.zip") - save_to_zarr_zip(sat_5_data, filename=filename) - - os.environ["SATELLITE_ZARR_PATH"] = os.path.join(tmpdirname, "latest.zarr.zip") + + # Change to temporary working directory + os.chdir(tmpdirname) + + save_to_zarr_zip(sat_5_data_delayed, filename="latest.zarr.zip") + save_to_zarr_zip(sat_15_data, filename="latest_15.zarr.zip") + + os.environ["SATELLITE_ZARR_PATH"] = "latest.zarr.zip" download_all_sat_data() - assert os.path.exists(sat_5_path) - assert os.path.exists(sat_15_path) + + preprocess_sat_data(test_t0, data_config_filename) - use_15_minute = preprocess_sat_data(pd.Timestamp.now(tz=None) + pd.Timedelta(days=1)) - assert use_15_minute == True + # Check the satellite data being used is 15-minutely + check_timesteps(sat_path, expected_mins=15) From f2b4f38170033ef7095ec9077c076a29180f9a41 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Wed, 17 Apr 2024 09:14:04 +0000 Subject: [PATCH 3/9] turn off satellite checking in pipeline --- pvnet_app/app.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index b4599c4..f620d06 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -256,7 +256,6 @@ def app( location_pipe=location_pipe, t0_datapipe=t0_datapipe, production=True, - check_satellite_no_zeros=True, ) .batch(batch_size) .map(stack_np_examples_into_batch) From 60b856f65d4681d3f27f56542d7a7822965fc0a8 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Wed, 17 Apr 2024 09:15:12 +0000 Subject: [PATCH 4/9] update reqs --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 0c13a9e..2114b16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ pytorch-lightning==2.1.3 torch[cpu]==2.2.0 PVNet-summation==0.1.3 pvnet==3.0.25 -ocf_datapipes==3.3.18 +ocf_datapipes==3.3.19 nowcasting_datamodel>=1.5.39 fsspec[s3] xarray From f0e0f84298307d3c1aa36da3977198dd9f902ca4 Mon Sep 17 00:00:00 2001 From: James Fulton Date: Wed, 17 Apr 2024 13:23:40 +0000 Subject: [PATCH 5/9] update reqs --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2114b16..3b6ebfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ pydantic pytorch-lightning==2.1.3 torch[cpu]==2.2.0 -PVNet-summation==0.1.3 +PVNet-summation==0.1.4 pvnet==3.0.25 ocf_datapipes==3.3.19 nowcasting_datamodel>=1.5.39 From 09530473c5219afae86636da63ab9ef46793d0d7 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 18 Apr 2024 19:37:49 +0100 Subject: [PATCH 6/9] add print statements --- pvnet_app/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index f620d06..c4ee22b 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -429,7 +429,7 @@ def app( sql_forecasts = convert_dataarray_to_forecasts( da_abs_all, session, model_name=model_name_ocf_db, version=pvnet_app.__version__ ) - + print(f'Saving {len(sql_forecasts)} forecasts') save_sql_forecasts( forecasts=sql_forecasts, session=session, @@ -447,6 +447,7 @@ def app( version=pvnet_app.__version__ ) + print(f'Saving {len(sql_forecasts)} forecasts, for gsp sum') save_sql_forecasts( forecasts=sql_forecasts, session=session, From 477e58a83ddd310722be54382f37357b6d7e1ac8 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 18 Apr 2024 19:49:24 +0100 Subject: [PATCH 7/9] comment out saving gsp forecasts --- pvnet_app/app.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index c4ee22b..ee874fd 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -430,13 +430,13 @@ def app( da_abs_all, session, model_name=model_name_ocf_db, version=pvnet_app.__version__ ) print(f'Saving {len(sql_forecasts)} forecasts') - save_sql_forecasts( - forecasts=sql_forecasts, - session=session, - update_national=True, - update_gsp=True, - apply_adjuster=apply_adjuster, - ) + # save_sql_forecasts( + # forecasts=sql_forecasts, + # session=session, + # update_national=True, + # update_gsp=True, + # apply_adjuster=apply_adjuster, + # ) if save_gsp_sum: # Save the sum of GSPs independently - mainly for summation model monitoring From aea3a19b548a0cbb62610542e12b1f1e2f509061 Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 18 Apr 2024 19:59:33 +0100 Subject: [PATCH 8/9] move os envoirn load into app --- pvnet_app/app.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index ee874fd..7e39a33 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -76,7 +76,6 @@ # If environmental variable is true, the sum-of-GSPs will be computed and saved under a different # model name. This can be useful to compare against the summation model and therefore monitor its # performance in production -save_gsp_sum = os.getenv("SAVE_GSP_SUM", "False").lower() == "true" gsp_sum_model_name_ocf_db = "pvnet_gsp_sum" # --------------------------------------------------------------------------- @@ -138,6 +137,8 @@ def app( # Without this line the dataloader will hang if multiple workers are used dask.config.set(scheduler='single-threaded') + save_gsp_sum = os.getenv("SAVE_GSP_SUM", "False").lower() == "true" + logger.info(f"Using `pvnet` library version: {pvnet.__version__}") logger.info(f"Using {num_workers} workers") logger.info(f"Using adjduster: {use_adjuster}") @@ -437,6 +438,8 @@ def app( # update_gsp=True, # apply_adjuster=apply_adjuster, # ) + + print(f'{save_gsp_sum=}') if save_gsp_sum: # Save the sum of GSPs independently - mainly for summation model monitoring From 95f16a2ea9ff912732b4ac7e81dec2ef7fa0b2fd Mon Sep 17 00:00:00 2001 From: peterdudfield Date: Thu, 18 Apr 2024 20:09:28 +0100 Subject: [PATCH 9/9] tidy up --- pvnet_app/app.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/pvnet_app/app.py b/pvnet_app/app.py index 7e39a33..1dec588 100644 --- a/pvnet_app/app.py +++ b/pvnet_app/app.py @@ -73,11 +73,6 @@ model_name_ocf_db = "pvnet_v2" use_adjuster = os.getenv("USE_ADJUSTER", "True").lower() == "true" -# If environmental variable is true, the sum-of-GSPs will be computed and saved under a different -# model name. This can be useful to compare against the summation model and therefore monitor its -# performance in production -gsp_sum_model_name_ocf_db = "pvnet_gsp_sum" - # --------------------------------------------------------------------------- # LOGGER @@ -137,6 +132,10 @@ def app( # Without this line the dataloader will hang if multiple workers are used dask.config.set(scheduler='single-threaded') + # If environmental variable is true, the sum-of-GSPs will be computed and saved under a different + # model name. This can be useful to compare against the summation model and therefore monitor its + # performance in production + gsp_sum_model_name_ocf_db = "pvnet_gsp_sum" save_gsp_sum = os.getenv("SAVE_GSP_SUM", "False").lower() == "true" logger.info(f"Using `pvnet` library version: {pvnet.__version__}") @@ -430,16 +429,13 @@ def app( sql_forecasts = convert_dataarray_to_forecasts( da_abs_all, session, model_name=model_name_ocf_db, version=pvnet_app.__version__ ) - print(f'Saving {len(sql_forecasts)} forecasts') - # save_sql_forecasts( - # forecasts=sql_forecasts, - # session=session, - # update_national=True, - # update_gsp=True, - # apply_adjuster=apply_adjuster, - # ) - - print(f'{save_gsp_sum=}') + save_sql_forecasts( + forecasts=sql_forecasts, + session=session, + update_national=True, + update_gsp=True, + apply_adjuster=apply_adjuster, + ) if save_gsp_sum: # Save the sum of GSPs independently - mainly for summation model monitoring @@ -450,7 +446,6 @@ def app( version=pvnet_app.__version__ ) - print(f'Saving {len(sql_forecasts)} forecasts, for gsp sum') save_sql_forecasts( forecasts=sql_forecasts, session=session,