Skip to content

Commit

Permalink
Merge commit 'e2f65873d8f50af0f345ed0a953d14f56b5f9d7d' into issue/py…
Browse files Browse the repository at this point in the history
…thon.3-12
  • Loading branch information
peterdudfield committed Dec 21, 2023
2 parents ce63741 + e2f6587 commit c6c4bd2
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 0.0.20
current_version = 0.0.21
message = Bump version: {current_version} → {new_version} [skip ci]

[bumpversion:file:pvnet_app/__init__.py]
Expand Down
2 changes: 1 addition & 1 deletion pvnet_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""PVNet App"""
__version__ = "0.0.20"
__version__ = "0.0.21"
7 changes: 6 additions & 1 deletion pvnet_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
from pvnet_app.utils import (
worker_init_fn, populate_data_config_sources, convert_dataarray_to_forecasts, preds_to_dataarray
)
from pvnet_app.data import regrid_nwp_data, download_sat_data, download_nwp_data
from pvnet_app.data import (
download_sat_data, download_nwp_data, preprocess_sat_data, regrid_nwp_data,
)

# ---------------------------------------------------------------------------
# GLOBAL SETTINGS
Expand Down Expand Up @@ -177,6 +179,9 @@ def app(
logger.info("Downloading satellite data")
download_sat_data()

# Process the 5/15 minutely satellite data
preprocess_sat_data(t0)

# Download NWP data
logger.info("Downloading NWP data")
download_nwp_data()
Expand Down
1 change: 0 additions & 1 deletion pvnet_app/consts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
sat_path = "sat.zarr"
sat_15_path = "sat_15.zarr"
nwp_path = "nwp.zarr"
51 changes: 43 additions & 8 deletions pvnet_app/data.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,65 @@
import pandas as pd
import xarray as xr
import xesmf as xe
import logging
import os
import fsspec
from datetime import timedelta
import ocf_blosc2

from pvnet_app.consts import sat_path, sat_15_path, nwp_path
from pvnet_app.consts import sat_path, nwp_path

logger = logging.getLogger(__name__)

this_dir = os.path.dirname(os.path.abspath(__file__))

sat_5_path = "sat_5_min.zarr"
sat_15_path = "sat_15_min.zarr"


def download_sat_data():
"""Download the sat data"""

# Clean out old files
os.system(f"rm -r {sat_path} {sat_5_path} {sat_15_path}")

fs = fsspec.open(os.environ["SATELLITE_ZARR_PATH"]).fs
fs.get(os.environ["SATELLITE_ZARR_PATH"], "sat.zarr.zip")
os.system(f"rm -r {sat_path}")
os.system(f"unzip sat.zarr.zip -d {sat_path}")
fs.get(os.environ["SATELLITE_ZARR_PATH"], "sat_5_min.zarr.zip")

os.system(f"unzip sat_5_min.zarr.zip -d {sat_5_path}")

# Also download 15-minute satellite if it exists
sat_latest_15 = os.environ["SATELLITE_ZARR_PATH"].replace("sat.zarr", "sat_15.zarr")
if fs.exists(sat_latest_15):
sat_15_dl_path = os.environ["SATELLITE_ZARR_PATH"].replace("sat.zarr", "sat_15.zarr")
if fs.exists(sat_15_dl_path):
logger.info("Downloading 15-minute satellite data")
fs.get(sat_latest_15, "sat_15.zarr")
os.system(f"unzip sat_15.zarr.zip -d {sat_15_path}")
fs.get(sat_15_dl_path, "sat_15_min.zarr.zip")
os.system(f"unzip sat_15_min.zarr.zip -d {sat_15_path}")


def preprocess_sat_data(t0):
"""Select and/or combine the 5 and 15-minutely satellite data"""

ds_sat_5 = xr.open_zarr(sat_5_path)
latest_time_5 = pd.to_datetime(ds_sat_5.time.max().values)
sat_delay_5 = t0 - latest_time_5
logger.info(f"Latest 5-minute timestamp is {latest_time_5} for t0 time {t0}.")

if sat_delay_5 < timedelta(minutes=60):
logger.info(f"5-min satellite delay is only {sat_delay_5} - Using 5-minutely data.")
os.system(f"mv {sat_5_path} {sat_path}")
else:
logger.info(f"5-min satellite delay is {sat_delay_5} - Switching to 15-minutely data.")

ds_sat_15 = xr.open_zarr(sat_15_path)
latest_time_15 = pd.to_datetime(ds_sat_15.time.max().values)
logger.info(f"Latest 15-minute timestamp is {latest_time_15} for t0 time {t0}.")

logger.debug("Resampling 15 minute data to 5 mins")
#ds_sat_15.resample(time="5T").interpolate("linear").to_zarr(sat_path)
ds_sat_15.attrs["source"] = "15-minute"
ds_sat_15.to_zarr(sat_path)


def download_nwp_data():
"""Download the NWP data"""
fs = fsspec.open(os.environ["NWP_ZARR_PATH"]).fs
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
torch[cpu]>=2.1.1
PVNet-summation>=0.0.9
pvnet>=2.4.0
ocf_datapipes>=2.2.2
pvnet==2.4.0
ocf_datapipes==2.2.14
nowcasting_datamodel>=1.5.22
fsspec[s3]
xarray
Expand Down
32 changes: 26 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def engine_url():
url = postgres.get_connection_url()
os.environ["DB_URL"] = url

database_connection = DatabaseConnection(url, echo=True)
database_connection = DatabaseConnection(url, echo=False)

engine = database_connection.engine

Expand All @@ -51,7 +51,7 @@ def engine_url():

@pytest.fixture()
def db_connection(engine_url):
database_connection = DatabaseConnection(engine_url, echo=True)
database_connection = DatabaseConnection(engine_url, echo=False)

engine = database_connection.engine
# connection = engine.connect()
Expand Down Expand Up @@ -118,15 +118,14 @@ def nwp_data():


@pytest.fixture()
def sat_data():
def sat_5_data():
# Load dataset which only contains coordinates, but no data
ds = xr.open_zarr(
f"{os.path.dirname(os.path.abspath(__file__))}/test_data/non_hrv_shell.zarr"
)

# Change times so they lead up to present. Delayed by at most 1 hour
t0_datetime_utc = time_before_present(timedelta(minutes=0)).floor(timedelta(minutes=30))
t0_datetime_utc = t0_datetime_utc - timedelta(minutes=30)
# Change times so they lead up to present. Delayed by 30-60 mins
t0_datetime_utc = time_before_present(timedelta(minutes=30)).floor(timedelta(minutes=30))
ds.time.values[:] = pd.date_range(
t0_datetime_utc - timedelta(minutes=5 * (len(ds.time) - 1)),
t0_datetime_utc,
Expand All @@ -146,6 +145,27 @@ def sat_data():
return ds


@pytest.fixture()
def sat_5_data_delayed(sat_5_data):
# Set the most recent timestamp to 2 - 2.5 hours ago
t_most_recent = time_before_present(timedelta(hours=2)).floor(timedelta(minutes=30))
offset = sat_5_data.time.max().values - t_most_recent
sat_5_delayed = sat_5_data.copy(deep=True)
sat_5_delayed["time"] = sat_5_data.time.values - offset
return sat_5_delayed


@pytest.fixture()
def sat_15_data(sat_5_data):
freq = timedelta(minutes=15)
times_15 = pd.date_range(
pd.to_datetime(sat_5_data.time.min().values).ceil(freq),
pd.to_datetime(sat_5_data.time.max().values).floor(freq),
freq=freq,
)
return sat_5_data.sel(time=times_15)


@pytest.fixture()
def gsp_yields_and_systems(db_session):
"""Create gsp yields and systems"""
Expand Down
12 changes: 6 additions & 6 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)


def test_app(db_session, nwp_data, sat_data, gsp_yields_and_systems, me_latest):
def test_app(db_session, nwp_data, sat_5_data, gsp_yields_and_systems, me_latest):
# Environment variable DB_URL is set in engine_url, which is called by db_session
# set NWP_ZARR_PATH
# save nwp_data to temporary file, and set NWP_ZARR_PATH
Expand All @@ -21,7 +21,7 @@ def test_app(db_session, nwp_data, sat_data, gsp_yields_and_systems, me_latest):

with tempfile.TemporaryDirectory() as tmpdirname:
# The app loads sat and NWP data from environment variable
# Save out data and set paths
# Save out data, and set paths as environmental variables
temp_nwp_path = f"{tmpdirname}/nwp.zarr"
os.environ["NWP_ZARR_PATH"] = temp_nwp_path
nwp_data.to_zarr(temp_nwp_path)
Expand All @@ -30,17 +30,17 @@ def test_app(db_session, nwp_data, sat_data, gsp_yields_and_systems, me_latest):
temp_sat_path = f"{tmpdirname}/sat.zarr.zip"
os.environ["SATELLITE_ZARR_PATH"] = temp_sat_path
store = zarr.storage.ZipStore(temp_sat_path, mode="x")
sat_data.to_zarr(store)
sat_5_data.to_zarr(store)
store.close()

# Set model version
os.environ["SAVE_GSP_SUM"] = "True"

# Run prediction
# This import needs to come after the environ vars have been set
from pvnet_app.app import app
app(gsp_ids=list(range(1, 318)), num_workers=2)

# Check forecasts have been made
# (317 GSPs + 1 National + GSP-sum) = 319 forecasts
# Doubled for historic and forecast
Expand All @@ -54,4 +54,4 @@ def test_app(db_session, nwp_data, sat_data, gsp_yields_and_systems, me_latest):
# 318 GSPs * 16 time steps in forecast
assert len(db_session.query(ForecastValueSQL).all()) == 319 * 16
assert len(db_session.query(ForecastValueLatestSQL).all()) == 319 * 16
assert len(db_session.query(ForecastValueSevenDaysSQL).all()) == 319 * 16
assert len(db_session.query(ForecastValueSevenDaysSQL).all()) == 319 * 16

0 comments on commit c6c4bd2

Please sign in to comment.