Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add test for checking physical limits and zeroes in NWP data #… #340

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3ee287c
chore: Add test for checking physical limits and zeroes in NWP data #…
glitch401 Jul 3, 2024
1e2df80
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 3, 2024
8105b91
changes to generate test data on the go. remove unnecessary zarr file…
glitch401 Jul 4, 2024
1eafe49
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 4, 2024
d5bc6cf
Fix ValueError message for NWP data containing zeros and outside phys…
glitch401 Jul 4, 2024
d8cfa9d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 4, 2024
5e68173
Fix ValueError message coding style
glitch401 Jul 4, 2024
466b710
update physical limits in according to pvnet_uk_region/data_config.yaml
glitch401 Jul 5, 2024
692500c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 5, 2024
0667bab
Update temperature physical limits in OpenNWPIterDataPipe
glitch401 Jul 5, 2024
246d898
Fix NaN check in stack_np_examples_into_batch function
glitch401 Jul 11, 2024
55627eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 11, 2024
7ba254d
changes made to adapt for lazy loading
glitch401 Jul 16, 2024
c6ee33d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2024
d0c4f6f
moved limits to a constant file
glitch401 Jul 24, 2024
19050c7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 24, 2024
3fe89fc
Refactor test_merge_numpy_examples_to_batch.py and test_load_nwp.py t…
glitch401 Aug 15, 2024
ace0259
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ocf_datapipes/batch/merge_numpy_examples_to_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def stack_np_examples_into_batch(dict_list: Sequence[NumpyBatch]) -> NumpyBatch:

nwp_batch[nwp_source] = nwp_source_batch

batch[BatchKey.nwp] = nwp_batch
batch[BatchKey.nwp] = check_for_nans(nwp_batch)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong, but I was under the impression that we allow for nans currently to be present in batches, which then get filled with zeroes during training? @peterdudfield is this a gsp thing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to when the NWP gets opened? And have an option to check it or not?
I think that would make it safer and clearer whats going on.

We could have a different issue that checks for nans in the batches, but we need to think how we turn that on and off .e.tc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@glitch401 would you mind moving this to when the nwp is opened? with an option to do this or not.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean, when data element NWP is opened?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like below, in the load stage

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, will append changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was hoping this would be removed, and it would be mvoed to below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still open

else:
batch[batch_key] = stack_data_list(
Expand All @@ -101,6 +101,16 @@ def stack_np_examples_into_batch(dict_list: Sequence[NumpyBatch]) -> NumpyBatch:
return batch


def check_for_nans(batch: dict[str, NWPNumpyBatch]):
"""Check for NaNs in a batch"""
for keys in batch.keys():
for keys2 in batch[keys].keys():
if keys2 == NWPBatchKey.nwp:
if np.isnan(batch[keys][keys2]).any():
raise ValueError(f"NaNs found in {keys2}")
return batch


def unstack_np_batch_into_examples(batch: NumpyBatch):
"""Splits a single batch into samples.

Expand Down
36 changes: 36 additions & 0 deletions ocf_datapipes/load/nwp/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
Module for defining limits for NWP data.
"""

# limits for NWP data in accordance with https://huggingface.co/openclimatefix/pvnet_uk_region/blob/main/data_config.yaml
NWP_LIMITS = {
"t2m": (200, 350), # Temperature in Kelvin (-100°C to 60°C)
"dswrf": (0, 1500), # Downward short-wave radiation flux, W/m^2
"dlwrf": (0, 750), # Downward long-wave radiation flux, W/m^2
"hcc": (0, 100), # High cloud cover, %
"mcc": (0, 100), # Medium cloud cover, %
"lcc": (0, 100), # Low cloud cover, %
"tcc": (0, 100), # Total cloud cover, %
"sde": (0, 1000), # Snowfall depth, meters
"duvrs": (0, 500), # Direct UV radiation at surface, W/m^2 (positive values only)
"u10": (-200, 200), # U component of 10m wind, m/s
"v10": (-200, 200), # V component of 10m wind, m/s
# UKV NWP channels (additional to ECMWF)
"prate": (0, 2000), # Precipitation rate, , kg/m^2/s (equivalent to 0-2000 mm/day)
"r": (0, 100), # Relative humidity, %
"si10": (0, 250), # Wind speed at 10m, m/s
"t": (200, 350), # Temperature in Kelvin (-100°C to 60°C)
"vis": (0, 100000), # Visibility, meters
# Satellite channels (no direct mapping to physical limits, using placeholder values)
"IR_016": (0, 1000), # Infrared channel
"IR_039": (0, 1000), # Infrared channel
"IR_087": (0, 1000), # Infrared channel
"IR_097": (0, 1000), # Infrared channel
"IR_108": (0, 1000), # Infrared channel
"IR_120": (0, 1000), # Infrared channel
"IR_134": (0, 1000), # Infrared channel
"VIS006": (0, 1000), # Visible channel
"VIS008": (0, 1000), # Visible channel
"WV_062": (0, 1000), # Water vapor channel
"WV_073": (0, 1000), # Water vapor channel
}
93 changes: 92 additions & 1 deletion ocf_datapipes/load/nwp/nwp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from pathlib import Path
from typing import Union

import dask
import dask.array
import numpy as np
import xarray as xr
from ocf_blosc2 import Blosc2 # noqa: F401
from torch.utils.data import IterDataPipe, functional_datapipe
Expand All @@ -15,6 +18,8 @@
from ocf_datapipes.load.nwp.providers.merra2 import open_merra2
from ocf_datapipes.load.nwp.providers.ukv import open_ukv

from .constants import NWP_LIMITS

logger = logging.getLogger(__name__)


Expand All @@ -26,15 +31,26 @@ def __init__(
self,
zarr_path: Union[Path, str, list[Path], list[str]],
provider: str = "ukv",
check_for_zeros: bool = False,
check_physical_limits: bool = False,
check_for_nans: bool = False,
):
"""
Opens NWP Zarr and yields it

Args:
zarr_path: Path to the Zarr file
provider: NWP provider
check_for_zeros: Check for zeros in the NWP data
check_physical_limits: Check the physical limits of nwp data (e.g. -100<temperature<100)
check_for_nans: Check for NaNs in the NWP data
"""
self.zarr_path = zarr_path
self.check_for_zeros = check_for_zeros
self.check_physical_limits = check_physical_limits
self.check_for_nans = check_for_nans
self.limits = NWP_LIMITS
glitch401 marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Using {provider.lower()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very much just a suggestion, but it would be nice to have some control over which variables receive the checks. Intuitively, that should probably be possible by just passing a list of keys to be checked instead of True to check_for_zeroes/check_physical_limits

if provider.lower() == "ukv":
self.open_nwp = open_ukv
Expand All @@ -53,9 +69,84 @@ def __init__(
else:
raise ValueError(f"Unknown provider: {provider}")

def __iter__(self) -> Union[xr.DataArray, xr.Dataset]:
def __iter__(self) -> Union[xr.DataArray, xr.Dataset]: # type: ignore
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
"""Opens the NWP data"""
logger.debug("Opening NWP data: %s", self.zarr_path)
nwp = self.open_nwp(self.zarr_path)
if self.check_for_zeros:
self.check_if_zeros(nwp)
if self.check_physical_limits:
self.check_if_physical_limits(nwp)
if self.check_for_nans:
self.check_if_nans(nwp)
while True:
yield nwp

def check_if_zeros(self, nwp: Union[xr.DataArray, xr.Dataset]):
"""Checks if the NWP data contains zeros"""

def count_zeros(block):
return (block == 0).sum()

def check_zeros(result):
if result > 0:
raise ValueError(f"NWP data contains {result*100/nwp.size}% zeros")

if isinstance(nwp, xr.DataArray):
if dask.is_dask_collection(nwp.data):
zero_count = nwp.data.map_blocks(count_zeros, dtype=int).compute()
check_zeros(zero_count)
else:
if (nwp.values == 0).any():
raise ValueError(
f"NWP DataArray contains{(nwp.values == 0).sum()*100/nwp.values.size}% "
"zeros"
)
elif isinstance(nwp, xr.Dataset):
for var in nwp:
if dask.is_dask_collection(nwp[var].data):
zero_count = nwp[var].data.map_blocks(count_zeros, dtype=int).compute()
check_zeros(zero_count)
else:
if (nwp[var].values == 0).any():
raise ValueError(
f"NWP Dataset variable{var} "
f"contains {(nwp[var].values == 0).sum()*100/nwp[var].values.size}% "
"zeros"
)

def check_if_physical_limits(self, nwp: Union[xr.DataArray, xr.Dataset]):
"""Checks if the NWP data is within physical limits"""
if isinstance(nwp, xr.DataArray):
var_name = nwp.channel.values[0]
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
if var_name in self.limits:
lower, upper = self.limits[var_name]
if (nwp < lower).any() or (nwp > upper).any():
raise ValueError(
f"NWP data {var_name} is outside physical limits: ({lower},{upper})"
)
elif isinstance(nwp, xr.Dataset):
for var_name, (lower, upper) in self.limits.items():
if var_name in nwp.channel:
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
if not ((nwp[var_name] >= lower).all() and (nwp[var_name] <= upper).all()):
raise ValueError(
f"NWP data {var_name} is outside physical limits: ({lower},{upper})"
)

def check_if_nans(self, nwp: Union[xr.DataArray, xr.Dataset]):
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
"""Checks if the NWP data contains NaNs"""
if isinstance(nwp, xr.DataArray):
if dask.is_dask_collection(nwp.data):
if dask.array.isnan(nwp.data).any().compute():
raise ValueError("NWP data contains NaNs")
else:
if np.isnan(nwp.data).any():
raise ValueError("NWP DataArray contains NaNs")
elif isinstance(nwp, xr.Dataset):
for var in nwp.data_vars:
if dask.is_dask_collection(nwp[var].data):
if dask.array.isnan(nwp[var].data).any().compute():
raise ValueError(f"NWP Dataset variable{var} contains NaNs")
else:
if np.isnan(nwp[var].data).any():
raise ValueError(f"NWP Dataset variable{var} contains NaNs")
74 changes: 73 additions & 1 deletion tests/load/nwp/test_load_nwp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import zarr
import shutil
import numpy as np
import pandas as pd
from xarray import DataArray

import pytest
from ocf_datapipes.load import OpenNWP


Expand Down Expand Up @@ -102,3 +105,72 @@ def test_load_excarta_local():
raise ValueError(
"The following dimensions are missing: %s" % (str(dim_keys - set(metadata.dims)))
)


def test_check_for_zeros():
# to generate data with zeros and limits:
original_store_path = "tests/data/nwp_data/test.zarr"
original_store = zarr.open(original_store_path, mode="r")
new_store_path = "tests/data/nwp_data/test_with_zeros_n_limits_n_nans.zarr"
# Optionally, clear the destination store if it already exists
shutil.rmtree(new_store_path, ignore_errors=True)
with zarr.open(new_store_path, mode="w") as new_store:
for item in original_store:
zarr.copy(original_store[item], new_store, name=item)

new_store["UKV"][0, 0, 0, 0] = 0
new_store["UKV"][0, 0, 0, 1] = np.random.uniform(190, 360, size=(548,))
new_store["UKV"][0, 0, 0, 2] = np.nan

shutil.copy(
"tests/data/nwp_data/test.zarr/.zmetadata",
"tests/data/nwp_data/test_with_zeros_n_limits_n_nans.zarr/.zmetadata",
)

# positive test case
nwp_datapipe1 = OpenNWP(
zarr_path=new_store_path,
check_for_zeros=True,
)
with pytest.raises(ValueError): # checks for Error raised if NWP DataArray contains zeros
metadata = next(iter(nwp_datapipe1))

# negative test case
nwp_datapipe2 = OpenNWP(zarr_path=original_store_path, check_for_zeros=True)
metadata = next(iter(nwp_datapipe2))
assert metadata is not None


def test_check_physical_limits():
# positive test case
nwp_datapipe1 = OpenNWP(
zarr_path="tests/data/nwp_data/test_with_zeros_n_limits_n_nans.zarr",
check_physical_limits=True,
)
with pytest.raises(
ValueError
): # checks for Error raised if NWP data UKV is outside physical limits
metadata = next(iter(nwp_datapipe1))

# negative test case
nwp_datapipe2 = OpenNWP(zarr_path="tests/data/nwp_data/test.zarr", check_physical_limits=True)
metadata = next(iter(nwp_datapipe2))
assert metadata is not None


def test_check_if_nans():
# positive test case
nwp_datapipe1 = OpenNWP(
zarr_path="tests/data/nwp_data/test_with_zeros_n_limits_n_nans.zarr", check_for_nans=True
)
with pytest.raises(ValueError): # checks for Error raised if NWP DataArray contains nans
metadata = next(iter(nwp_datapipe1))

# negative test case
nwp_datapipe2 = OpenNWP(zarr_path="tests/data/nwp_data/test.zarr", check_for_nans=True)
metadata = next(iter(nwp_datapipe2))
assert metadata is not None

shutil.rmtree(
"tests/data/nwp_data/test_with_zeros_n_limits_n_nans.zarr"
) # removes the zarr file created for testing