diff --git a/src/pypromice/process/value_clipping.py b/src/pypromice/process/value_clipping.py index 23ed14e0..6ff60c54 100644 --- a/src/pypromice/process/value_clipping.py +++ b/src/pypromice/process/value_clipping.py @@ -1,7 +1,12 @@ +from typing import Dict, Set, Mapping + import numpy as np import pandas +import pandas as pd import xarray +from pypromice.utilities.dependency_graph import DependencyGraph + def clip_values( ds: xarray.Dataset, @@ -26,27 +31,43 @@ def clip_values( """ cols = ["lo", "hi", "OOL"] assert set(cols) <= set(var_configurations.columns) + # TODO: Check if this is necessary + # variable_limits = var_configurations[cols].dropna(how="all") - variable_limits = var_configurations[cols].dropna(how="all") - for var, row in variable_limits.iterrows(): + variable_limits = var_configurations[cols].assign( + dependents=lambda df: df.OOL.fillna("").str.split(), + # Find the closure of dependents using the DependencyGraph class + dependents_closure=lambda df: DependencyGraph.from_child_mapping( + df.dependents + ).child_closure_mapping(), + ) + for var, row in variable_limits.iterrows(): if var not in list(ds.variables): continue + # TODO: Check if this is necessary + # I guess the nan flagging is already handled below + # What if rh_u_cor is nan? + # What if row.lo/hi is nan? + + if var in ["rh_u_cor", "rh_l_cor"]: + ds[var] = ds[var].where(ds[var] >= row.lo, other=0) + ds[var] = ds[var].where(ds[var] <= row.hi, other=100) + + # Mask out invalid corrections based on uncorrected var + var_uncor = var.rstrip("_cor") + ds[var] = ds[var].where(~np.isnan(ds[var_uncor]), other=np.nan) + + else: + if ~np.isnan(row.lo): + ds[var] = ds[var].where(ds[var] >= row.lo) + if ~np.isnan(row.hi): + ds[var] = ds[var].where(ds[var] <= row.hi) - if ~np.isnan(row.lo): - ds[var] = ds[var].where(ds[var] >= row.lo) - if ~np.isnan(row.hi): - ds[var] = ds[var].where(ds[var] <= row.hi) - - other_vars = row.OOL - if isinstance(other_vars, str) and ~ds[var].isnull().all(): - for o in other_vars.split(): - if o not in list(ds.variables): - continue - else: - if ~np.isnan(row.lo): - ds[var] = ds[var].where(ds[var] >= row.lo) - if ~np.isnan(row.hi): - ds[var] = ds[var].where(ds[var] <= row.hi) + # Flag dependents as NaN if parent is NaN + for o in row.dependents_closure: + if o not in list(ds.variables): + continue + ds[o] = ds[o].where(ds[var].notnull()) return ds diff --git a/src/pypromice/resources/variables.csv b/src/pypromice/resources/variables.csv index 7630e813..cc31d35a 100644 --- a/src/pypromice/resources/variables.csv +++ b/src/pypromice/resources/variables.csv @@ -11,15 +11,15 @@ qh_u,specific_humidity,Specific humidity (upper boom),kg/kg,modelResult,time,FAL rh_l,relative_humidity,Relative humidity (lower boom),%,physicalMeasurement,time,FALSE,,0,100,rh_l_cor,two-boom,1,1,1,4 rh_l_cor,relative_humidity_corrected,Relative humidity (lower boom) - corrected,%,modelResult,time,FALSE,L2 or later,0,150,dshf_l dlhf_l qh_l,two-boom,0,1,1,4 qh_l,specific_humidity,Specific humidity (lower boom),kg/kg,modelResult,time,FALSE,L2 or later,0,100,,two-boom,0,1,1,4 -wspd_u,wind_speed,Wind speed (upper boom),m s-1,physicalMeasurement,time,FALSE,,0,100,"wdir_u wspd_x_u wspd_y_u dshf_u dlhf_u qh_u, precip_u",all,1,1,1,4 -wspd_l,wind_speed,Wind speed (lower boom),m s-1,physicalMeasurement,time,FALSE,,0,100,"wdir_l wspd_x_l wspd_y_l dshf_l dlhf_l qh_l , precip_l",two-boom,1,1,1,4 +wspd_u,wind_speed,Wind speed (upper boom),m s-1,physicalMeasurement,time,FALSE,,0,100,wdir_u wspd_x_u wspd_y_u dshf_u dlhf_u qh_u precip_u,all,1,1,1,4 +wspd_l,wind_speed,Wind speed (lower boom),m s-1,physicalMeasurement,time,FALSE,,0,100,wdir_l wspd_x_l wspd_y_l dshf_l dlhf_l qh_l precip_l,two-boom,1,1,1,4 wdir_u,wind_from_direction,Wind from direction (upper boom),degrees,physicalMeasurement,time,FALSE,,1,360,wspd_x_u wspd_y_u,all,1,1,1,4 wdir_std_u,wind_from_direction_standard_deviation,Wind from direction (standard deviation),degrees,qualityInformation,time,FALSE,L0 or L2,,,,one-boom,1,1,0,4 wdir_l,wind_from_direction,Wind from direction (lower boom),degrees,physicalMeasurement,time,FALSE,,1,360,wspd_x_l wspd_y_l,two-boom,1,1,1,4 -wspd_x_u,wind_speed_from_x_direction,Wind speed from x direction (upper boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,wdir_u wspd_u,all,0,1,1,4 -wspd_y_u,wind_speed_from_y_direction,Wind speed from y direction (upper boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,wdir_u wspd_u,all,0,1,1,4 -wspd_x_l,wind_speed_from_x_direction,Wind speed from x direction (lower boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,wdir_l wspd_l,two-boom,0,1,1,4 -wspd_y_l,wind_speed_from_y_direction,Wind speed from y direction (lower boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,wdir_l wspd_l,two-boom,0,1,1,4 +wspd_x_u,wind_speed_from_x_direction,Wind speed from x direction (upper boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,"",all,0,1,1,4 +wspd_y_u,wind_speed_from_y_direction,Wind speed from y direction (upper boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,"",all,0,1,1,4 +wspd_x_l,wind_speed_from_x_direction,Wind speed from x direction (lower boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,"",two-boom,0,1,1,4 +wspd_y_l,wind_speed_from_y_direction,Wind speed from y direction (lower boom),m s-1,modelResult,time,FALSE,L0 or L2,-100,100,"",two-boom,0,1,1,4 dsr,surface_downwelling_shortwave_flux,Downwelling shortwave radiation,W m-2,physicalMeasurement,time,FALSE,,-10,1500,albedo dsr_cor usr_cor,all,1,1,1,4 dsr_cor,surface_downwelling_shortwave_flux_corrected,Downwelling shortwave radiation - corrected,W m-2,modelResult,time,FALSE,L2 or later,,,,all,0,1,1,4 usr,surface_upwelling_shortwave_flux,Upwelling shortwave radiation,W m-2,physicalMeasurement,time,FALSE,,-10,1000,albedo dsr_cor usr_cor,all,1,1,1,4 @@ -102,5 +102,5 @@ rh_i,relative_humidity,Relative humidity (instantaneous),%,physicalMeasurement,t rh_i_cor,relative_humidity_corrected,Relative humidity (instantaneous) – corrected,%,modelResult,time,TRUE,L2 or later,0,100,,all,0,1,1,4 wspd_i,wind_speed,Wind speed (instantaneous),m s-1,physicalMeasurement,time,TRUE,,0,100,wdir_i wspd_x_i wspd_y_i,all,1,1,1,4 wdir_i,wind_from_direction,Wind from direction (instantaneous),degrees,physicalMeasurement,time,TRUE,,1,360,wspd_x_i wspd_y_i,all,1,1,1,4 -wspd_x_i,wind_speed_from_x_direction,Wind speed from x direction (instantaneous),m s-1,modelResult,time,TRUE,L2 or later,-100,100,wdir_i wspd_i,all,0,1,1,4 -wspd_y_i,wind_speed_from_y_direction,Wind speed from y direction (instantaneous),m s-1,modelResult,time,TRUE,L2 or later,-100,100,wdir_i wspd_i,all,0,1,1,4 +wspd_x_i,wind_speed_from_x_direction,Wind speed from x direction (instantaneous),m s-1,modelResult,time,TRUE,L2 or later,-100,100,"",all,0,1,1,4 +wspd_y_i,wind_speed_from_y_direction,Wind speed from y direction (instantaneous),m s-1,modelResult,time,TRUE,L2 or later,-100,100,"",all,0,1,1,4 diff --git a/src/pypromice/utilities/dependency_graph.py b/src/pypromice/utilities/dependency_graph.py new file mode 100644 index 00000000..fbd27630 --- /dev/null +++ b/src/pypromice/utilities/dependency_graph.py @@ -0,0 +1,101 @@ +from typing import Mapping, Set, MutableMapping, Optional + +import attr + +__all__ = [ + "DependencyNode", + "DependencyGraph", +] + + +@attr.define +class DependencyNode: + name: str = attr.field() + parents: Set["DependencyNode"] = attr.field(factory=set) + children: Set["DependencyNode"] = attr.field(factory=set) + + def add_child(self, child: "DependencyNode"): + self.children.add(child) + child.parents.add(self) + + def get_children_closure(self, closure: Optional[Set[str]] = None) -> Set[str]: + is_root = closure is None + if closure is None: + closure = set() + if self.name in closure: + return closure + closure.add(self.name) + for child in self.children: + closure |= child.get_children_closure(closure) + + if is_root: + closure.remove(self.name) + return closure + + def get_parents_closure(self, closure: Optional[Set[str]] = None) -> Set[str]: + is_root = closure is None + if closure is None: + closure = set() + if self.name in closure: + return closure + closure.add(self.name) + for parent in self.parents: + closure |= parent.get_parents_closure(closure) + + if is_root: + closure.remove(self.name) + return closure + + def __hash__(self): + return hash(self.name) + + +@attr.define +class DependencyGraph: + nodes: MutableMapping[str, DependencyNode] = attr.field(factory=dict) + + def add_node(self, name: str) -> DependencyNode: + if name not in self.nodes: + self.nodes[name] = DependencyNode(name) + return self.nodes[name] + + def add_edge(self, parent: str, child: str): + parent_node = self.add_node(parent) + child_node = self.add_node(child) + parent_node.add_child(child_node) + + @classmethod + def from_child_mapping(cls, mapping: Mapping[str, Set[str]]) -> "DependencyGraph": + graph = cls() + for var, children in mapping.items(): + graph.add_node(var) + for child in children: + graph.add_edge(var, child) + return graph + + @classmethod + def from_parent_mapping(cls, mapping: Mapping[str, Set[str]]) -> "DependencyGraph": + graph = cls() + for var, parents in mapping.items(): + graph.add_node(var) + for parent in parents: + graph.add_edge(parent, var) + return graph + + def child_mapping(self) -> Mapping[str, Set[str]]: + return { + node.name: {child.name for child in node.children} + for node in self.nodes.values() + } + + def child_closure_mapping(self) -> Mapping[str, Set[str]]: + return {node.name: node.get_children_closure() for node in self.nodes.values()} + + def parent_mapping(self) -> Mapping[str, Set[str]]: + return { + node.name: {parent.name for parent in node.parents} + for node in self.nodes.values() + } + + def parent_closure_mapping(self) -> Mapping[str, Set[str]]: + return {node.name: node.get_parents_closure() for node in self.nodes.values()} diff --git a/tests/unit/test_value_clippping.py b/tests/unit/test_value_clippping.py new file mode 100644 index 00000000..ceb8e2ef --- /dev/null +++ b/tests/unit/test_value_clippping.py @@ -0,0 +1,200 @@ +import unittest + +import numpy as np +import pandas as pd +import xarray as xr + +import pypromice.resources +from pypromice.process.L1toL2 import get_directional_wind_speed +from pypromice.process.value_clipping import clip_values + + +class ClipValuesTestCase(unittest.TestCase): + def test_flag_wdir_on_nan_wspd(self): + n_entries = 4 + df_in = pd.DataFrame( + data={ + "wspd_u": n_entries * [np.nan], + "wspd_i": n_entries * [np.nan], + "wspd_l": n_entries * [np.nan], + "wdir_u": np.random.rand(n_entries) * 360, + "wdir_i": np.random.rand(n_entries) * 360, + "wdir_l": np.random.rand(n_entries) * 360, + }, + index=pd.date_range("2021-01-01", periods=n_entries, freq="H"), + ) + ds = xr.Dataset(df_in) + ds.attrs["number_of_booms"] = 2 + ds_out = get_directional_wind_speed(ds.copy()) + vars = pypromice.resources.load_variables(None) + + ds_out = clip_values(ds_out, vars) + # Convert to dataframe for easier comparison + df_out = ds_out.to_dataframe() + + # Assert all dir values are set nan + self.assertTrue(df_out["wdir_u"].isna().all()) + self.assertTrue(df_out["wdir_l"].isna().all()) + self.assertTrue(df_out["wdir_i"].isna().all()) + + def test_flag_wdir_zero_wspd(self): + n_entries = 4 + df_in = pd.DataFrame( + data={ + "wspd_u": n_entries * [0], + "wspd_i": n_entries * [0], + "wspd_l": n_entries * [0], + "wdir_u": np.random.rand(n_entries) * 360, + "wdir_i": np.random.rand(n_entries) * 360, + "wdir_l": np.random.rand(n_entries) * 360, + }, + index=pd.date_range("2021-01-01", periods=n_entries, freq="H"), + ) + ds = xr.Dataset(df_in) + ds.attrs["number_of_booms"] = 2 + ds_out = get_directional_wind_speed(ds.copy()) + vars = pypromice.resources.load_variables(None) + + ds_out = clip_values(ds_out, vars) + + # Convert to dataframe for easier comparison + df_out = ds_out.to_dataframe() + # Assert all dir values are set nan + self.assertTrue(df_out["wdir_u"].isna().all()) + self.assertTrue(df_out["wdir_l"].isna().all()) + self.assertTrue(df_out["wdir_i"].isna().all()) + + def test_flagging_depended_on_wspd(self): + # This unit test tests variable conditions for flagging wdir values + # Nan, 0, and negative values should be flagged while positive values should not + n_entries = 4 + df_in = pd.DataFrame( + data={ + "wspd_u": [np.nan, 0, 10, -3], + "wdir_u": [0, 180, 90, 270], + }, + index=pd.date_range("2021-01-01", periods=n_entries, freq="H"), + ) + ds = xr.Dataset(df_in) + ds.attrs["number_of_booms"] = 1 + ds_out = get_directional_wind_speed(ds.copy()) + vars = pypromice.resources.load_variables(None) + + ds_out = clip_values(ds_out, vars) + + # Convert to dataframe for easier comparison + df_out = ds_out.to_dataframe() + + expected_dataframe = pd.DataFrame( + data={ + "wspd_u": [np.nan, 0, 10, np.nan], + "wdir_u": [np.nan, np.nan, 90, np.nan], + "wspd_x_u": [np.nan, np.nan, 10, np.nan], + "wspd_y_u": [np.nan, np.nan, 0, np.nan], + }, + index=df_out.index, + ) + self.assertEqual( + df_out.columns.tolist(), + expected_dataframe.columns.tolist(), + ) + pd.testing.assert_frame_equal( + df_out, + expected_dataframe, + check_dtype=True, + check_like=True, + ) + + def test_recursive_flagging(self): + fields = ["a", "b", "c"] + variable_config = pd.DataFrame( + columns=["field", "lo", "hi", "OOL"], + data=[ + ["a", 0, 10, "b"], + ["b", 100, 110, ""], + ["c", 200, 210, "a"], + ], + ).set_index("field") + data_index = pd.RangeIndex(4) + data = pd.DataFrame( + columns=fields, + data=[ + [0, 100, 215], # c is out of range + [5, 115, 200], # b is out of range + [10, 100, 200], # All a withing range + [15, 100, 200], # a is out of range + ], + dtype=float, + index=data_index, + ) + expected_output = pd.DataFrame( + columns=fields, + data=[ + [np.nan, np.nan, np.nan], # c is nan -> a -> b + [5, np.nan, 200], # b is nan + [10, 100, 200], + [np.nan, np.nan, 200], # a is nan -> b + ], + dtype=float, + index=data.index, + ) + + data_set = xr.Dataset(data) + + data_set_out = clip_values(data_set, variable_config) + data_frame_out = data_set_out.to_dataframe() + + pd.testing.assert_frame_equal( + data_frame_out, + expected_output, + check_names=False, + check_dtype=True, + ) + + def test_circular_dependencies(self): + fields = ["a", "b", "c"] + variable_config = pd.DataFrame( + columns=["field", "lo", "hi", "OOL"], + data=[ + ["a", 0, 10, "b"], + ["b", 100, 110, "c"], + ["c", 200, 210, "a"], + ], + ).set_index("field") + data_index = pd.RangeIndex(4) + data = pd.DataFrame( + columns=fields, + data=[ + [0, 100, 215], # c is out of range + [5, 115, 200], # b is out of range + [10, 100, 200], # All a withing range + [15, 100, 200], # a is out of range + ], + dtype=float, + index=data_index, + ) + # All variables a dependent due to circular dependency + expected_output = pd.DataFrame( + columns=fields, + data=[ + [np.nan, np.nan, np.nan], + [np.nan, np.nan, np.nan], + [10, 100, 200], + [np.nan, np.nan, np.nan], + ], + dtype=float, + index=data.index, + ) + + data_set = xr.Dataset(data) + + data_set_out = clip_values(data_set, variable_config) + data_frame_out = data_set_out.to_dataframe() + + pd.testing.assert_frame_equal( + data_frame_out, + expected_output, + check_names=False, + check_dtype=True, + ) +