From 9e1497e1b5e66ebe17b96c2def8d93c31672b365 Mon Sep 17 00:00:00 2001 From: Marco Braun Date: Sun, 19 Nov 2023 17:50:09 -0500 Subject: [PATCH] Updated docstring, pre-commit passed --- tests/test_aggregate.py | 147 +++++++++++++++++++++++----------- xscen/aggregate.py | 169 +++++++++++++++++++++++++++------------- 2 files changed, 216 insertions(+), 100 deletions(-) diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 196ec36d..5350a312 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -12,8 +12,10 @@ class TestClimatologicalMean: def _format(self, s): import xclim - op_format = (dict.fromkeys(("mean", "std", "var", "sum"), "adj") | - dict.fromkeys(("max", "min"), "noun")) + + op_format = dict.fromkeys(("mean", "std", "var", "sum"), "adj") | dict.fromkeys( + ("max", "min"), "noun" + ) return xclim.core.formatting.default_formatter.format_field(s, op_format[s]) def test_daily(self): @@ -566,8 +568,10 @@ def test_global(self, lonstart, method, exp): class TestClimatologicalOp: def _format(self, s): import xclim - op_format = (dict.fromkeys(("mean", "std", "var", "sum"), "adj") | - dict.fromkeys(("max", "min"), "noun")) + + op_format = dict.fromkeys(("mean", "std", "var", "sum"), "adj") | dict.fromkeys( + ("max", "min"), "noun" + ) return xclim.core.formatting.default_formatter.format_field(s, op_format[s]) def test_daily(self): @@ -579,10 +583,12 @@ def test_daily(self): as_dataset=True, ) with pytest.raises(NotImplementedError): - xs.climatological_op(ds, op='mean') + xs.climatological_op(ds, op="mean") @pytest.mark.parametrize("xrfreq", ["MS", "AS-JAN"]) - @pytest.mark.parametrize("op", ["max", "mean", "median", "min", "std", "sum", "var", "linregress"]) + @pytest.mark.parametrize( + "op", ["max", "mean", "median", "min", "std", "sum", "var", "linregress"] + ) def test_all_default(self, xrfreq, op): o = 12 if xrfreq == "MS" else 1 @@ -594,17 +600,25 @@ def test_all_default(self, xrfreq, op): as_dataset=True, ) out = xs.climatological_op(ds, op=op) - expected = (dict.fromkeys(("max", "mean", "median", "min"), np.arange(1, o + 1)) | - dict.fromkeys(("std", "var"), np.zeros(o)) | - dict({"sum": np.arange(1, o + 1) * 30}) | - dict({"linregress": np.array([ - np.zeros(o), - np.arange(1, o + 1), - np.zeros(o), - np.ones(o), - np.zeros(o), - np.zeros(o)]).T}) - ) + expected = ( + dict.fromkeys(("max", "mean", "median", "min"), np.arange(1, o + 1)) + | dict.fromkeys(("std", "var"), np.zeros(o)) + | dict({"sum": np.arange(1, o + 1) * 30}) + | dict( + { + "linregress": np.array( + [ + np.zeros(o), + np.arange(1, o + 1), + np.zeros(o), + np.ones(o), + np.zeros(o), + np.zeros(o), + ] + ).T + } + ) + ) # Test output variable name, values, length, horizon assert list(out.data_vars.keys()) == [f"tas_clim_{op}"] np.testing.assert_array_equal(out[f"tas_clim_{op}"], expected[op]) @@ -612,7 +626,7 @@ def test_all_default(self, xrfreq, op): np.testing.assert_array_equal(out.time[0], ds.time[0]) assert (out.horizon == "2001-2030").all() # Test metadata - operation = self._format(op) if op not in ['median', 'linregress'] else op + operation = self._format(op) if op not in ["median", "linregress"] else op assert ( out[f"tas_clim_{op}"].attrs["description"] == f"Climatological 30-year {operation} of {ds.tas.attrs['description']}" @@ -624,7 +638,9 @@ def test_all_default(self, xrfreq, op): assert out.attrs["cat:processing_level"] == "climatology" @pytest.mark.parametrize("xrfreq", ["MS", "AS-JAN"]) - @pytest.mark.parametrize("op", ['max', 'mean', 'median', 'min', 'std', 'sum', 'var', 'linregress']) + @pytest.mark.parametrize( + "op", ["max", "mean", "median", "min", "std", "sum", "var", "linregress"] + ) def test_options(self, xrfreq, op): o = 12 if xrfreq == "MS" else 1 @@ -635,20 +651,42 @@ def test_options(self, xrfreq, op): freq=xrfreq, as_dataset=True, ) - out = xs.climatological_op(ds, op=op, window=15, stride=5, to_level="for_testing") - expected = (dict.fromkeys(("max", "mean", "median", "min"), - np.tile(np.arange(1, o + 1), len(np.unique(out.horizon.values)))) | - dict.fromkeys(("std", "var"), - np.tile(np.zeros(o), len(np.unique(out.horizon.values)))) | - dict({"sum": np.tile(np.arange(1, o + 1) * 15, len(np.unique(out.horizon.values)))}) | - dict({"linregress": np.tile(np.array([ - np.zeros(o), - np.arange(1, o + 1), - np.zeros(o), - np.ones(o), - np.zeros(o), - np.zeros(o)]), len(np.unique(out.horizon.values))).T}) + out = xs.climatological_op( + ds, op=op, window=15, stride=5, to_level="for_testing" + ) + expected = ( + dict.fromkeys( + ("max", "mean", "median", "min"), + np.tile(np.arange(1, o + 1), len(np.unique(out.horizon.values))), + ) + | dict.fromkeys( + ("std", "var"), np.tile(np.zeros(o), len(np.unique(out.horizon.values))) + ) + | dict( + { + "sum": np.tile( + np.arange(1, o + 1) * 15, len(np.unique(out.horizon.values)) ) + } + ) + | dict( + { + "linregress": np.tile( + np.array( + [ + np.zeros(o), + np.arange(1, o + 1), + np.zeros(o), + np.ones(o), + np.zeros(o), + np.zeros(o), + ] + ), + len(np.unique(out.horizon.values)), + ).T + } + ) + ) # Test output values np.testing.assert_array_equal( out[f"tas_clim_{op}"], @@ -660,7 +698,7 @@ def test_options(self, xrfreq, op): out.horizon.values ) # Test metadata - operation = self._format(op) if op not in ['median', 'linregress'] else op + operation = self._format(op) if op not in ["median", "linregress"] else op assert ( out[f"tas_clim_{op}"].attrs["description"] == f"Climatological 15-year {operation} of {ds.tas.attrs['description']}" @@ -682,7 +720,7 @@ def test_minperiods(self, op): ) ds = ds.where(ds["time"].dt.strftime("%Y-%m-%d") != "2030-12-01") - op = 'mean' + op = "mean" out = xs.climatological_op(ds, op=op, window=30) assert all(np.isreal(out[f"tas_clim_{op}"])) assert len(out.time) == 4 @@ -693,7 +731,7 @@ def test_minperiods(self, op): assert np.sum(np.isnan(out[f"tas_clim_{op}"])) == 1 # min_periods as float - out = xs.climatological_op(ds, op=op, window=30, min_periods=.5) + out = xs.climatological_op(ds, op=op, window=30, min_periods=0.5) assert "minimum of 15 years of data" in out[f"tas_clim_{op}"].attrs["history"] assert np.sum(np.isnan(out[f"tas_clim_{op}"])) == 0 @@ -721,7 +759,9 @@ def test_periods(self, op): with pytest.raises(ValueError): xs.climatological_op(ds, op=op) - out = xs.climatological_op(ds, op='mean', periods=[["2001", "2010"], ["2021", "2030"]]) + out = xs.climatological_op( + ds, op="mean", periods=[["2001", "2010"], ["2021", "2030"]] + ) assert len(out.time) == 2 assert {"2001-2010", "2021-2030"}.issubset(out.horizon.values) @@ -735,7 +775,7 @@ def test_calendars(self, cal): as_dataset=True, ) - out = xs.climatological_op(ds.convert_calendar(cal, align_on="date"), op='mean') + out = xs.climatological_op(ds.convert_calendar(cal, align_on="date"), op="mean") assert out.time.dt.calendar == cal def test_periods_as_dim(self): @@ -746,11 +786,30 @@ def test_periods_as_dim(self): freq="MS", as_dataset=True, ) - out = xs.climatological_op(ds, op="mean", window=10, stride=5, periods_as_dim=True) + out = xs.climatological_op( + ds, op="mean", window=10, stride=5, periods_as_dim=True + ) assert (out.tas_clim_mean.values == np.tile(np.arange(1, 13), (5, 1))).all() - assert out.dims == {'period': 5, 'month': 12} - assert out.time.dims == ('period', 'month') - assert (out.period.values == ['2001-2010', '2006-2015', '2011-2020', '2016-2025', '2021-2030']).all() - assert (out.month.values == - ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', - 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']).all() + assert out.dims == {"period": 5, "month": 12} + assert out.time.dims == ("period", "month") + assert ( + out.period.values + == ["2001-2010", "2006-2015", "2011-2020", "2016-2025", "2021-2030"] + ).all() + assert ( + out.month.values + == [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + ] + ).all() diff --git a/xscen/aggregate.py b/xscen/aggregate.py index bd51498b..e5e5d063 100644 --- a/xscen/aggregate.py +++ b/xscen/aggregate.py @@ -1,4 +1,5 @@ # noqa: D100 +import calendar import datetime import logging import warnings @@ -11,13 +12,12 @@ import geopandas as gpd import numpy as np import pandas as pd +import scipy import shapely import xarray as xr import xclim as xc import xclim.core.calendar import xesmf as xe -import scipy -import calendar from shapely.geometry import Polygon from xclim.core.indicator import Indicator @@ -202,14 +202,14 @@ def climatological_mean( # return ds_rolling return climatological_op( ds, - op='mean', + op="mean", window=window, min_periods=min_periods, stride=interval, periods=periods, rename_variables=False, to_level=to_level, - periods_as_dim=False + periods_as_dim=False, ) @@ -226,8 +226,7 @@ def climatological_op( to_level: str = "climatology", periods_as_dim: bool = False, ) -> xr.Dataset: - """Perform 'op' over the 'year' dimension for given time periods and windows, - respecting the temporal resolution of ds. + """Perform 'op' over the 'year' dimension for given time periods, respecting the temporal resolution of ds. Parameters ---------- @@ -241,10 +240,11 @@ def climatological_op( the following are recommended and tested: ['max', 'mean', 'median', 'min', 'std', 'sum', 'var', 'linregress']. Operations beyond methods of xarray.core.rolling.DatasetRolling include: - - 'linregress' : Computes the linear regression over the periods or it's windows, using - scipy.stats.linregress employing years as regressors. - Here the output has a new dimension 'linreg_param' with coordinates: - ['slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr']. + + - 'linregress' : Computes the linear regression over the periods or it's windows, using + scipy.stats.linregress employing years as regressors. + Here the output has a new dimension 'linreg_param' with coordinates: + ['slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr']. window : int Number of years to use for the time periods. If left at None and periods is given, window will be the size of the first period. Hence, if periods are of @@ -277,7 +277,6 @@ def climatological_op( Returns a Dataset with resultf from the climatological operation. """ - # Daily data is not supported if len(ds.time) > 3 and xr.infer_freq(ds.time) == "D": raise NotImplementedError( @@ -326,7 +325,9 @@ def climatological_op( if 0 < min_periods <= 1: min_periods = int(np.floor(min_periods * window)) else: - raise ValueError(f"When 'min_periods' is passed as 'float' it must be between 0 and 1. Got {min_periods}.") + raise ValueError( + f"When 'min_periods' is passed as 'float' it must be between 0 and 1. Got {min_periods}." + ) # set min_periods min_periods = min_periods or window @@ -336,36 +337,53 @@ def climatological_op( # if op is a dict unpack it if isinstance(op, dict): op, op_kwargs = list(op.items())[0] - op_kwargs['keep_attrs'] = True if 'keep_attrs' not in op_kwargs else op_kwargs['keep_attrs'] + op_kwargs["keep_attrs"] = ( + True if "keep_attrs" not in op_kwargs else op_kwargs["keep_attrs"] + ) else: - op_kwargs = {'keep_attrs': True} # ToDo: Is ther a global setting in xscen so we don't need this? + op_kwargs = { + "keep_attrs": True + } # ToDo: Is ther a global setting in xscen so we don't need this? # special case for averaging standard deviations: need to convert to variance before averaging - ds_has_std = ('std' in ds_unstack.data_vars or any( - ['standard deviation' in ds_unstack[vv].attrs['description'].lower() for vv in ds_unstack.data_vars])) - if op == 'mean' and ds_has_std: + ds_has_std = "std" in ds_unstack.data_vars or any( + [ + "standard deviation" in ds_unstack[vv].attrs["description"].lower() + for vv in ds_unstack.data_vars + ] + ) + if op == "mean" and ds_has_std: for vv in ds_unstack.data_vars: - if 'std' in vv or 'standard deviation' in ds_unstack[vv].attrs['description'].lower(): + if ( + "std" in vv + or "standard deviation" in ds_unstack[vv].attrs["description"].lower() + ): ds_unstack[vv] = np.square(ds_unstack[vv]) # Compute climatolgical operation concats = [] for period in periods: # Rolling average - ds_rolling = ds_unstack.sel(year=slice(period[0], period[1])).rolling(year=window, min_periods=min_periods) + ds_rolling = ds_unstack.sel(year=slice(period[0], period[1])).rolling( + year=window, min_periods=min_periods + ) # apply operation on rolling object if hasattr(ds_rolling, op) and callable(getattr(ds_rolling, op)): - if op not in ['max', 'mean', 'median', 'min', 'std', 'sum', 'var']: + if op not in ["max", "mean", "median", "min", "std", "sum", "var"]: warnings.warn( f"The requested operation '{op}' has not been tested and may produce unexpected results." ) ds_rolling = getattr(ds_rolling, op)(**op_kwargs) # revert variance to std, where applicable - if op == 'mean' and ds_has_std: + if op == "mean" and ds_has_std: for vv in ds_rolling.data_vars: - if 'std' in vv or 'standard deviation' in ds_rolling[vv].attrs['description'].lower(): + if ( + "std" in vv + or "standard deviation" + in ds_rolling[vv].attrs["description"].lower() + ): ds_rolling[vv] = np.sqrt(ds_rolling[vv]) # Select the windows at provided stride, starting from the first full window's operation result @@ -378,17 +396,29 @@ def _ulinregress(x, y, **kwargs): valid_x = ~np.isnan(x) valid_y = ~np.isnan(y) mask = valid_x & valid_y - if np.sum(mask) >= kwargs.get('min_periods', 1): - reg = scipy.stats.linregress(x, y, alternative=kwargs.get('alternative', 'two-sided')) - out = np.array([reg.slope, reg.intercept, reg.rvalue, - reg.pvalue, reg.stderr, reg.intercept_stderr]) + if np.sum(mask) >= kwargs.get("min_periods", 1): + reg = scipy.stats.linregress( + x, y, alternative=kwargs.get("alternative", "two-sided") + ) + out = np.array( + [ + reg.slope, + reg.intercept, + reg.rvalue, + reg.pvalue, + reg.stderr, + reg.intercept_stderr, + ] + ) else: out = np.full(6, np.nan) return out # prepare kwargs - linreg_kwargs = {k: v for k, v in op_kwargs.items() if 'keep_attrs' not in k} - linreg_kwargs['min_periods'] = min_periods + linreg_kwargs = { + k: v for k, v in op_kwargs.items() if "keep_attrs" not in k + } + linreg_kwargs["min_periods"] = min_periods # unwrap DatasetRolling object and select years subset dsr_construct = ds_rolling.construct(window_dim="window", keep_attrs=True) @@ -396,10 +426,17 @@ def _ulinregress(x, y, **kwargs): # construct array to use years as x values (==regressors) in xr.apply_ufunc years_as_x_values = xr.DataArray( - np.arange(dsr_construct.window.size).repeat(dsr_construct.year.size).reshape( - dsr_construct.window.size, dsr_construct.year.size) + dsr_construct.year.values - window + 1, - dims=['window', 'year'], - coords={'window': dsr_construct.window.values, 'year': dsr_construct.year.values} + np.arange(dsr_construct.window.size) + .repeat(dsr_construct.year.size) + .reshape(dsr_construct.window.size, dsr_construct.year.size) + + dsr_construct.year.values + - window + + 1, + dims=["window", "year"], + coords={ + "window": dsr_construct.window.values, + "year": dsr_construct.year.values, + }, ) # apply linregress along windows @@ -417,7 +454,14 @@ def _ulinregress(x, y, **kwargs): kwargs=linreg_kwargs, ) # label new coords - ds_rolling.coords['linreg_param'] = ['slope', 'intercept', 'rvalue', 'pvalue', 'stderr', 'intercept_stderr'] + ds_rolling.coords["linreg_param"] = [ + "slope", + "intercept", + "rvalue", + "pvalue", + "stderr", + "intercept_stderr", + ] else: raise ValueError(f"Operation '{op}' not implemented.") @@ -464,19 +508,27 @@ def _ulinregress(x, y, **kwargs): # update data_vars names, attrs, history if rename_variables: - ds_rolling = ds_rolling.rename_vars({vv: f"{vv}_clim_{op}" for vv in ds_rolling.data_vars}) + ds_rolling = ds_rolling.rename_vars( + {vv: f"{vv}_clim_{op}" for vv in ds_rolling.data_vars} + ) for vv in ds_rolling.data_vars: for a in ["description", "long_name"]: try: - op_format = (dict.fromkeys(("mean", "std", "var", "sum"), "adj") | - dict.fromkeys(("max", "min"), "noun")) - operation = xc.core.formatting.default_formatter.format_field(op, op_format[op]) + op_format = dict.fromkeys( + ("mean", "std", "var", "sum"), "adj" + ) | dict.fromkeys(("max", "min"), "noun") + operation = xc.core.formatting.default_formatter.format_field( + op, op_format[op] + ) except (ValueError, KeyError): operation = op update_attr( - ds_rolling[vv], a, _("Climatological {window}-year {operation} of {attr}."), - window=window, operation=operation + ds_rolling[vv], + a, + _("Climatological {window}-year {operation} of {attr}."), + window=window, + operation=operation, ) new_history = ( @@ -497,30 +549,35 @@ def _ulinregress(x, y, **kwargs): if periods_as_dim: # restructure output to have periods as a dimension instead of stacked horizons per year/season/month new_coords = { - 1: {'year': ['ANN']}, - 4: {'season': ['MAM', 'JJA', 'SON', 'DJF']}, - 12: {'month': calendar.month_abbr[1:]}, + 1: {"year": ["ANN"]}, + 4: {"season": ["MAM", "JJA", "SON", "DJF"]}, + 12: {"month": calendar.month_abbr[1:]}, } new_dim_size = len(np.unique(ds_rolling.time.dt.month)) mindex_coords = xr.Coordinates.from_pandas_multiindex( - pd.MultiIndex.from_arrays( - [ds_rolling.horizon.values, ds_rolling.time.dt.month.values], - names=["period", "months"] - ), dim='time') - out = (ds_rolling - .assign(tmp_time=xr.DataArray(ds_rolling.time.values.copy(), dims=['time'])) - .assign_coords(mindex_coords) - .unstack('time') - .rename({'months': next(iter(new_coords[new_dim_size]))}) - .assign_coords(new_coords[new_dim_size]) - .drop_vars('horizon') - .set_coords('tmp_time') - .rename({'tmp_time': 'time'}) - ) + pd.MultiIndex.from_arrays( + [ds_rolling.horizon.values, ds_rolling.time.dt.month.values], + names=["period", "months"], + ), + dim="time", + ) + out = ( + ds_rolling.assign( + tmp_time=xr.DataArray(ds_rolling.time.values.copy(), dims=["time"]) + ) + .assign_coords(mindex_coords) + .unstack("time") + .rename({"months": next(iter(new_coords[new_dim_size]))}) + .assign_coords(new_coords[new_dim_size]) + .drop_vars("horizon") + .set_coords("tmp_time") + .rename({"tmp_time": "time"}) + ) return out else: return ds_rolling + @parse_config def compute_deltas( ds: xr.Dataset,