Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add test for checking physical limits and zeroes in NWP data #… #340

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3ee287c
chore: Add test for checking physical limits and zeroes in NWP data #…
glitch401 Jul 3, 2024
1e2df80
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 3, 2024
8105b91
changes to generate test data on the go. remove unnecessary zarr file…
glitch401 Jul 4, 2024
1eafe49
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 4, 2024
d5bc6cf
Fix ValueError message for NWP data containing zeros and outside phys…
glitch401 Jul 4, 2024
d8cfa9d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 4, 2024
5e68173
Fix ValueError message coding style
glitch401 Jul 4, 2024
466b710
update physical limits in according to pvnet_uk_region/data_config.yaml
glitch401 Jul 5, 2024
692500c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 5, 2024
0667bab
Update temperature physical limits in OpenNWPIterDataPipe
glitch401 Jul 5, 2024
246d898
Fix NaN check in stack_np_examples_into_batch function
glitch401 Jul 11, 2024
55627eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 11, 2024
7ba254d
changes made to adapt for lazy loading
glitch401 Jul 16, 2024
c6ee33d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2024
d0c4f6f
moved limits to a constant file
glitch401 Jul 24, 2024
19050c7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 24, 2024
3fe89fc
Refactor test_merge_numpy_examples_to_batch.py and test_load_nwp.py t…
glitch401 Aug 15, 2024
ace0259
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ coverage.xml
.pytest_cache/
test.nc

#test data generator
tests/load/nwp/test_data_generator.py

glitch401 marked this conversation as resolved.
Show resolved Hide resolved
# Translations
*.mo
*.pot
Expand Down
100 changes: 99 additions & 1 deletion ocf_datapipes/load/nwp/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,85 @@ def __init__(
self,
zarr_path: Union[Path, str, list[Path], list[str]],
provider: str = "ukv",
check_for_zeros: bool = False,
check_physical_limits: bool = False,
):
"""
Opens NWP Zarr and yields it

Args:
zarr_path: Path to the Zarr file
provider: NWP provider
check_for_zeros: Check for zeros in the NWP data
check_physical_limits: Check the physical limits of nwp data (e.g. -100<temperature<100)
"""
self.zarr_path = zarr_path
self.check_for_zeros = check_for_zeros
self.check_physical_limits = check_physical_limits
self.limits = {
"temperature": (-100, 60), # Celsius
"specific_humidity": (0, 0.03), # kg/kg
"relative_humidity": (0, 100), # Percentage
"pressure": (0, 1100), # hPa (sea level pressure)
"u_wind": (-200, 200), # m/s
"v_wind": (-200, 200), # m/s
"geopotential": (0, 100000), # m^2/s^2
"total_precipitation": (0, 2000), # mm/day
"convective_precipitation": (0, 1000), # mm/day
"snowfall": (0, 1000), # mm water equivalent/day
"graupel": (0, 500), # mm water equivalent/day
"cloud_cover": (0, 100), # Percentage
"surface_temperature": (-90, 60), # Celsius
"sea_surface_temperature": (-2, 35), # Celsius
"soil_temperature": (-50, 60), # Celsius
"soil_moisture": (0, 1), # m^3/m^3
"visibility": (0, 100000), # meters
"wind_gust": (0, 250), # m/s
"solar_radiation": (0, 1500), # W/m^2
"longwave_radiation": (0, 750), # W/m^2
"evaporation": (0, 50), # mm/day
"potential_evaporation": (0, 100), # mm/day
"boundary_layer_height": (0, 5000), # meters
"cape": (0, 10000), # J/kg
"cin": (0, 1000), # J/kg
"lifted_index": (-15, 15), # Kelvin
"total_column_water": (0, 100), # kg/m^2
"ozone_concentration": (0, 1000), # Dobson units
"dew_point_temperature": (-100, 35), # Celsius
"wet_bulb_temperature": (-100, 35), # Celsius
"potential_temperature": (0, 1000), # Kelvin
"equivalent_potential_temperature": (0, 1000), # Kelvin
"vorticity": (-1e-3, 1e-3), # 1/s
"divergence": (-1e-3, 1e-3), # 1/s
"vertical_velocity": (-50, 50), # m/s
"cloud_base_height": (0, 20000), # meters
"cloud_top_height": (0, 20000), # meters
"cloud_water_content": (0, 5), # g/kg
"ice_water_content": (0, 5), # g/kg
"surface_roughness": (0, 10), # meters
"albedo": (0, 1), # dimensionless
"friction_velocity": (0, 5), # m/s
"sensible_heat_flux": (-500, 500), # W/m^2
"latent_heat_flux": (-500, 500), # W/m^2
"momentum_flux": (-10, 10), # N/m^2
"surface_pressure": (300, 1100), # hPa
"mean_sea_level_pressure": (870, 1090), # hPa
"tropopause_pressure": (50, 500), # hPa
"tropopause_temperature": (-100, 0), # Celsius
"precipitable_water": (0, 100), # mm
"total_cloud_cover": (0, 100), # Percentage
"low_cloud_cover": (0, 100), # Percentage
"medium_cloud_cover": (0, 100), # Percentage
"high_cloud_cover": (0, 100), # Percentage
"convective_available_potential_energy": (0, 10000), # J/kg
"convective_inhibition": (0, 1000), # J/kg
"storm_relative_helicity": (-1000, 1000), # m^2/s^2
"bulk_richardson_number": (-10, 10), # dimensionless
"lifted_condensation_level": (0, 5000), # meters
"level_of_free_convection": (0, 20000), # meters
"equilibrium_level": (0, 20000), # meters
"UKV": (250, 330), # UKV specific
glitch401 marked this conversation as resolved.
Show resolved Hide resolved
}
logger.info(f"Using {provider.lower()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very much just a suggestion, but it would be nice to have some control over which variables receive the checks. Intuitively, that should probably be possible by just passing a list of keys to be checked instead of True to check_for_zeroes/check_physical_limits

if provider.lower() == "ukv":
self.open_nwp = open_ukv
Expand All @@ -53,9 +123,37 @@ def __init__(
else:
raise ValueError(f"Unknown provider: {provider}")

def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
def __iter__(self) -> Union[xr.DataArray, xr.Dataset]: # type: ignore
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
"""Opens the NWP data"""
logger.debug("Opening NWP data: %s", self.zarr_path)
nwp = self.open_nwp(self.zarr_path)
if self.check_for_zeros:
self.check_if_zeros(nwp)
if self.check_physical_limits:
self.check_if_physical_limits(nwp)
while True:
yield nwp

def check_if_zeros(self, nwp: Union[xr.DataArray, xr.Dataset]):
"""Checks if the NWP data contains zeros"""
if isinstance(nwp, xr.DataArray):
if (nwp.values == 0).any():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it will not be performed lazily (as in, this will load the whole dataArray into memory to check the values), which we really want to avoid in this place because at this point the arrays we operate on are often massive

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some changes to accommodate lazy loading. Have leveraged Dask arrays, as its often used with xarray for the same

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good way to go about it! There is a danger that some of our data might not fit anyway, but since it can be turned on and off that's fine.

I was wondering if it's worth exploring implementing this check downstream, somewhere after spacial and temporal crop and before normalisation, so that it operates on samples instead? And then maybe skip ones with too many zeroes/nans/out of physical bounds values and give a userWarning/log info of how many were skipped as a proxy for understanding how much of the data is corrupted. Thoughts @Sukh-P @peterdudfield?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea, less chance of a chance of running into memory issues by loading a chunk, I guess the only draw back would be doing processing on some data you are going to chuck anyway but in this case that processing gets it down to a more manageable size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AUdaltsova I'm trying to understand if this is acceptable w.r.t the scope of this PR? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glitch401 might be! @peterdudfield happy to merge this then? :)

raise ValueError("NWP DataArray contains zeros")
if isinstance(nwp, xr.Dataset):
for var in nwp:
if (nwp[var].values == 0).any():
raise ValueError(f"NWP Dataset variable{var} contains zeros")
glitch401 marked this conversation as resolved.
Show resolved Hide resolved

def check_if_physical_limits(self, nwp: Union[xr.DataArray, xr.Dataset]):
"""Checks if the NWP data is within physical limits"""
if isinstance(nwp, xr.DataArray):
var_name = nwp.name
if var_name in self.limits:
lower, upper = self.limits[var_name]
if (nwp < lower).any() or (nwp > upper).any():
raise ValueError(f"NWP data {var_name} is outside physical limits")
glitch401 marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(nwp, xr.Dataset):
for var_name, (lower, upper) in self.limits.items():
if var_name in nwp.variables:
if not ((nwp[var_name] >= lower).all() and (nwp[var_name] <= upper).all()):
raise ValueError(f"NWP data {var_name} is outside physical limits")
61 changes: 60 additions & 1 deletion tests/load/nwp/test_load_nwp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import zarr
import shutil
import numpy as np
import pandas as pd
from xarray import DataArray

import pytest
from ocf_datapipes.load import OpenNWP


Expand Down Expand Up @@ -102,3 +105,59 @@ def test_load_excarta_local():
raise ValueError(
"The following dimensions are missing: %s" % (str(dim_keys - set(metadata.dims)))
)


def test_check_for_zeros():
# to generate data with zeros and limits:
original_store_path = "tests/data/nwp_data/test.zarr"
original_store = zarr.open(original_store_path, mode="r")
new_store_path = "tests/data/nwp_data/test_with_zeros_n_limits.zarr"
# Optionally, clear the destination store if it already exists
shutil.rmtree(new_store_path, ignore_errors=True)
with zarr.open(new_store_path, mode="w") as new_store:
for item in original_store:
zarr.copy(original_store[item], new_store, name=item)

new_store["UKV"][0, 0, 0, 0] = 0
new_store["UKV"][0, 0, 0, 1] = np.random.uniform(240, 350, size=(548,))
shutil.copy(
"tests/data/nwp_data/test.zarr/.zmetadata",
"tests/data/nwp_data/test_with_zeros_n_limits.zarr/.zmetadata",
)
shutil.copy(
"tests/data/nwp_data/test.zarr/.zattrs",
"tests/data/nwp_data/test_with_zeros_n_limits.zarr/.zattrs",
)

# positive test case
nwp_datapipe1 = OpenNWP(
zarr_path=new_store_path,
check_for_zeros=True,
)
with pytest.raises(ValueError): # checks for Error raised if NWP DataArray contains zeros
metadata = next(iter(nwp_datapipe1))

# negative test case
nwp_datapipe2 = OpenNWP(zarr_path=original_store_path, check_for_zeros=True)
metadata = next(iter(nwp_datapipe2))
assert metadata is not None


def test_check_physical_limits():
# positive test case
nwp_datapipe1 = OpenNWP(
zarr_path="tests/data/nwp_data/test_with_zeros_n_limits.zarr", check_physical_limits=True
)
with pytest.raises(
ValueError
): # checks for Error raised if NWP data UKV is outside physical limits
metadata = next(iter(nwp_datapipe1))

# negative test case
nwp_datapipe2 = OpenNWP(zarr_path="tests/data/nwp_data/test.zarr", check_physical_limits=True)
metadata = next(iter(nwp_datapipe2))
assert metadata is not None

shutil.rmtree(
"tests/data/nwp_data/test_with_zeros_n_limits.zarr"
) # removes the zarr file created for testing