From d1e9936bcc60029df9a5026c317db268de847c7a Mon Sep 17 00:00:00 2001 From: Peter Dudfield <34686298+peterdudfield@users.noreply.github.com> Date: Mon, 2 Dec 2024 12:40:33 +0000 Subject: [PATCH 1/2] add regridding (#150) * add regridding * lint * lint * update to run more effeciently in app * fix * remove hard coding * lint * lint * lint * fix bug * fix --- .../data/mo_global/india_coords.nc | Bin 0 -> 3224 bytes india_forecast_app/data/nwp.py | 62 ++++++++++++++++++ india_forecast_app/models/pvnet/model.py | 43 ++++++++---- india_forecast_app/models/pvnet/utils.py | 39 +++++++++-- tests/data/test_nwp.py | 30 +++++++++ 5 files changed, 154 insertions(+), 20 deletions(-) create mode 100644 india_forecast_app/data/mo_global/india_coords.nc create mode 100644 india_forecast_app/data/nwp.py create mode 100644 tests/data/test_nwp.py diff --git a/india_forecast_app/data/mo_global/india_coords.nc b/india_forecast_app/data/mo_global/india_coords.nc new file mode 100644 index 0000000000000000000000000000000000000000..b97874c4aa754768fff0db989cfd7f161ebad112 GIT binary patch literal 3224 zcmb7GX>b%}5DkYR1QZBYIAZRDBaqEzvpIINZ$+Yjz|K8|$Yngh0Lr0Uf=XG62%?rC z2#9hk2wI2~DvIEVfPyz7pi)2y3Zk4|Uw_^H@t;#QH8Z<2{dK>7@AcQ~+S-Ome59%i z-KW$KpD=R5#JXG5{SkFGtsgt)wts(6&-2y!@6k-%{P&v%@@I2(rTf&OwIl26-Qo2U zDPmGmvN{{gxS;Rlxk-vV9ah)>u17KG{2%ceZ&dHAqsj66^ff6dw&VPsuiHJ;Ew*2- z2hVQq{oF^}V?W6A@!#KhY;e)J*eUt_%%so$JiECzc2@pBclQUID*cwR3-aFO32%&= ze(^v|y+m=4r2ZVye?J=VcWAi9`02r`x1bS!k1N9c-dp|4QY3R6G@;Mlo{1EWhtyDP z+NsBHM>CF#=Cs;~0k~4)w+QM3EoGgSR87O?XeH~Vr9Y{@oPji1w>71+?^m>z_1i3% zGw@J#9okABZNE&7-nslsw3B?=>u`P1Uh+zJ8&gE2OMV@mx<5JZU{7?EJUbFRy`M)X z$+vTxDMzk*sSP?y-Wir&7#WiP)s)`!OURTuGWVTXd->RCWJx_)vna9C$dbCUi)i64 z$d>xDHT)xxBX#E7K5k>y&c(=)dUI0(1t%tbj$Elbcl9qzUMao`xl(_gN^Z)L>yalo z5QC=?6s#mlLX6(|<`d#GYnis&JI=%LeVkDk(xo-@2K*_1y{fK5tKd$OR^{+g3|9GD;+`UdvLE}`=AU2 zrT=A_+o+Q$6CRX}C3?6h6F!u^Me-YuGT}w2#bBxgA>l`8s0!D49YVsB&<53=yFWxo z_!9c-;(?q`j-p(6Q$9cggL2_d`BEw*6Xn9A@}Kp>2n(OWHTp$_g;(LZM5Kub3%|nO zX`-M)cvjIv69E;%w~BkI(YsI~ysOx)8;J_hZ)FCR+ZdI?!^+W*E2OhgDSWJaQ@KHE z6qUlusw>GZ2BAv$S#=Z9@g7tOPpj6eXz5R)O88p!hk|e1MpO%Lt8e&N9lKF2{H!yCL=0*k4~k= z%|}#tAKj&aMpXD;(}{rDiyF~E%^g(8Nz{lQYF^h1qegTQYep~TBPRNY-Ke@d>>9*G zC$SgFE`}f`dWl^k`Qpq9-h2d?C8RyX*%Lec=@29T%O& z+pzz}MQ`y4;~y8@#p~EV;-bIAb?k=;mwhd9i!Kz23H)ASCJ}iB5>Kcy6A7W ziH)q=#9OSN#2)5x;vn-WagzI*xXkz(sY(_fY=Y5Yp*j}9C?bGozzFmFF=|ObePK8} zPmDUA8^(B^55@z^h6WyjL2Od~V=QAIFgCCrjcu$8V>jbze9JsBe&T)^muRe|!feX( z#7tK-QFFm8WPdlyiJ;41_GWxdTN@C}o7w-&an#ljOr2gQnDg`w0_IZIx4E8mX1>9A zo4c5I<~Q7b^90$zFqjue7`3pP=zRyQcG~!0O&r-vKw9 zgt!K73C|%{%X2*30qo1}P3(_uJ^Pe9mHonfL@hh*XSfS#5bcM%hWCB$YrNlc-_wQ* z_jA^}`y&~`0JvwhvA}Ccw%h?;E1sub7SA=0U{>?Rt6`t_`m_GM!R&k99qd2eWTMNp z@E+#)ap2I`>Di&izTObN?aMxj&zE?yqE>` xr.Dataset: + """This function loads the NWP data, then regrids and saves it back out if the data is not + on the same grid as expected. The data is resaved in-place. + """ + + logger.info(f"Regridding NWP data to expected grid to {target_coords_path}") + + ds_raw = nwp_ds + + # These are the coords we are aiming for + ds_target_coords = xr.load_dataset(target_coords_path) + + # Check if regridding step needs to be done + needs_regridding = not ( + ds_raw.latitude.equals(ds_target_coords.latitude) + and ds_raw.longitude.equals(ds_target_coords.longitude) + ) + + if not needs_regridding: + logger.info(f"No NWP regridding required - skipping this step") + return ds_raw + + # flip latitude, so its in ascending order + if ds_raw.latitude[0] > ds_raw.latitude[-1]: + ds_raw = ds_raw.reindex(latitude=ds_raw.latitude[::-1]) + + # clip to india coordindates + ds_raw = ds_raw.sel( + latitude=slice(0, 40), + longitude=slice(65, 100), + ) + + # regrid + logger.info(f"Regridding NWP to expected grid") + ds_regridded = ds_raw.interp( + latitude=ds_target_coords.latitude, longitude=ds_target_coords.longitude + ) + + # rechunking + ds_regridded["variable"] = ds_regridded["variable"].astype(str) + + # Rechunk to these dimensions when saving + save_chunk_dict = { + "step": 5, + "latitude": 100, + "longitude": 100, + "x": 100, + "y": 100, + } + + ds_regridded = ds_regridded.chunk( + {k: save_chunk_dict[k] for k in list(ds_raw.xindexes) if k in save_chunk_dict} + ) + + return ds_regridded diff --git a/india_forecast_app/models/pvnet/model.py b/india_forecast_app/models/pvnet/model.py index d6604cc..829eb55 100644 --- a/india_forecast_app/models/pvnet/model.py +++ b/india_forecast_app/models/pvnet/model.py @@ -35,6 +35,7 @@ wind_path, ) from .utils import ( + NWPProcessAndCacheConfig, download_satellite_data, populate_data_config_sources, process_and_cache_nwp, @@ -214,26 +215,42 @@ def _prepare_data_sources(self): satellite_source_file_path = os.getenv("SATELLITE_ZARR_PATH", None) # only load nwp that we need - nwp_paths = [] - nwp_source_file_paths = [] + nwp_configs = [] nwp_keys = self.config["input_data"]["nwp"].keys() if "ecmwf" in nwp_keys: - nwp_ecmwf_source_file_path = os.environ["NWP_ECMWF_ZARR_PATH"] - nwp_source_file_paths.append(nwp_ecmwf_source_file_path) - nwp_paths.append(nwp_ecmwf_path) + + nwp_configs.append( + NWPProcessAndCacheConfig( + source_nwp_path=os.environ["NWP_ECMWF_ZARR_PATH"], + dest_nwp_path=nwp_ecmwf_path, + source="ecmwf", + ) + ) + if "gfs" in nwp_keys: - nwp_gfs_source_file_path = os.environ["NWP_GFS_ZARR_PATH"] - nwp_source_file_paths.append(nwp_gfs_source_file_path) - nwp_paths.append(nwp_gfs_path) + + nwp_configs.append( + NWPProcessAndCacheConfig( + source_nwp_path=os.environ["NWP_GFS_ZARR_PATH"], + dest_nwp_path=nwp_gfs_path, + source="gfs", + ) + ) + if "mo_global" in nwp_keys: - nwp_mo_global_source_file_path = os.environ["NWP_MO_GLOBAL_ZARR_PATH"] - nwp_source_file_paths.append(nwp_mo_global_source_file_path) - nwp_paths.append(nwp_mo_global_path) + nwp_configs.append( + NWPProcessAndCacheConfig( + source_nwp_path=os.environ["NWP_MO_GLOBAL_ZARR_PATH"], + dest_nwp_path=nwp_mo_global_path, + source="mo_global", + config=self.config["input_data"]["nwp"]["mo_global"] + ) + ) # Remove local cached zarr if already exists - for nwp_source_file_path, nwp_path in zip(nwp_source_file_paths, nwp_paths, strict=False): + for nwp_config in nwp_configs: # Process/cache remote zarr locally - process_and_cache_nwp(nwp_source_file_path, nwp_path) + process_and_cache_nwp(nwp_config) if use_satellite and "satellite" in self.config["input_data"].keys(): shutil.rmtree(satellite_path, ignore_errors=True) download_satellite_data(satellite_source_file_path) diff --git a/india_forecast_app/models/pvnet/utils.py b/india_forecast_app/models/pvnet/utils.py index 32b02b2..59e4fb8 100644 --- a/india_forecast_app/models/pvnet/utils.py +++ b/india_forecast_app/models/pvnet/utils.py @@ -9,7 +9,11 @@ import xarray as xr import yaml from ocf_datapipes.batch import BatchKey +from ocf_datapipes.config.model import NWP from ocf_datapipes.utils.consts import ELEVATION_MEAN, ELEVATION_STD +from pydantic import BaseModel + +from india_forecast_app.data.nwp import regrid_nwp_data from .consts import ( nwp_ecmwf_path, @@ -25,6 +29,15 @@ log = logging.getLogger(__name__) +class NWPProcessAndCacheConfig(BaseModel): + """Configuration for processing and caching NWP data""" + + source_nwp_path: str + dest_nwp_path: str + source: str + config: Optional[NWP] = None + + def worker_init_fn(worker_id): """ Clear reference to the loop and thread. @@ -92,11 +105,15 @@ def populate_data_config_sources(input_path, output_path): return config -def process_and_cache_nwp(source_nwp_path: str, dest_nwp_path: str): +def process_and_cache_nwp(nwp_config: NWPProcessAndCacheConfig): """Reads zarr file, renames t variable to t2m and saves zarr to new destination""" + source_nwp_path = nwp_config.source_nwp_path + dest_nwp_path = nwp_config.dest_nwp_path + log.info( - f"Processing and caching NWP data for {source_nwp_path}, " f"and saving to {dest_nwp_path}" + f"Processing and caching NWP data for {source_nwp_path} " + f"and saving to {dest_nwp_path} for {nwp_config.source}" ) if os.path.exists(dest_nwp_path): @@ -115,10 +132,7 @@ def process_and_cache_nwp(source_nwp_path: str, dest_nwp_path: str): if ds[v].dtype == object: ds[v].encoding.clear() - is_gfs = "gfs" in source_nwp_path.lower() - is_ecmwf = "ecmwf" in source_nwp_path.lower() - - if is_ecmwf: + if nwp_config.source == "ecmwf": # Rename t variable to t2m variables = list(ds.variable.values) new_variables = [] @@ -134,12 +148,23 @@ def process_and_cache_nwp(source_nwp_path: str, dest_nwp_path: str): ds.__setitem__("variable", new_variables) # Hack to resolve some NWP data format differences between providers - elif is_gfs: + elif nwp_config.source == "gfs": data_var = ds[list(ds.data_vars.keys())[0]] # # Use .to_dataset() to split the data variable based on 'variable' dim ds = data_var.to_dataset(dim="variable") ds = ds.rename({"t2m": "t"}) + + if nwp_config.source == "mo_global": + + # only select the variables we need + nwp_channels = list(nwp_config.config.nwp_channels) + ds = ds.sel(variable=nwp_channels) + + # regrid data + ds = regrid_nwp_data(ds, "india_forecast_app/data/mo_global/india_coords.nc") + # Save destination path + log.info(f"Saving NWP data to {dest_nwp_path}") ds.to_zarr(dest_nwp_path, mode="a") diff --git a/tests/data/test_nwp.py b/tests/data/test_nwp.py new file mode 100644 index 0000000..1e7b794 --- /dev/null +++ b/tests/data/test_nwp.py @@ -0,0 +1,30 @@ +""" Tests for the nwp regridding module """ +import os +import tempfile + +import xarray as xr + +from india_forecast_app.data.nwp import regrid_nwp_data + + +def test_regrid_nwp_data(nwp_mo_global_data): + """Test the regridding of the nwp data""" + + # create a temporary dir + with tempfile.TemporaryDirectory() as temp_dir: + + # save mo data to zarr + nwp_zarr = os.environ["NWP_MO_GLOBAL_ZARR_PATH"] + + # regrid the data + nwp_xr = xr.open_zarr(nwp_zarr) + nwp_xr_regridded = regrid_nwp_data( + nwp_xr, "india_forecast_app/data/mo_global/india_coords.nc" + ) + + # check the data is different in latitude and longitude + assert not nwp_xr_regridded.latitude.equals(nwp_xr.latitude) + assert not nwp_xr_regridded.longitude.equals(nwp_xr.longitude) + + assert len(nwp_xr_regridded.latitude) == 225 + assert len(nwp_xr_regridded.longitude) == 150 From b2a7af8ef22ce80fa7dd4aace0719d38ce522d5e Mon Sep 17 00:00:00 2001 From: BumpVersion Action Date: Mon, 2 Dec 2024 12:45:00 +0000 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=201.1.21=20=E2=86=92=201.1.?= =?UTF-8?q?22=20[ci=20skip]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- india_forecast_app/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 97e9305..3c1d506 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.1.21 +current_version = 1.1.22 commit = True tag = True message = Bump version: {current_version} → {new_version} [ci skip] diff --git a/india_forecast_app/__init__.py b/india_forecast_app/__init__.py index d466ad4..3429e00 100644 --- a/india_forecast_app/__init__.py +++ b/india_forecast_app/__init__.py @@ -1,2 +1,2 @@ """India Forecast App""" -__version__ = "1.1.21" +__version__ = "1.1.22" diff --git a/pyproject.toml b/pyproject.toml index 8014a5b..d8d42e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "india_forecast_app" -version = "1.1.21" +version = "1.1.22" description = "Runs wind and PV forecasts for India and saves to database" authors = ["Chris Briggs "] readme = "README.md"