diff --git a/setup.py b/setup.py index 6a7a5111..92b18e3c 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ include_package_data = True, packages=setuptools.find_packages(where="src"), python_requires=">=3.8", + package_data={"pypromice.qc.percentiles": ["thresholds.csv"]}, install_requires=['numpy>=1.23.0', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'scikit-learn>=1.1.0', 'Bottleneck', 'netcdf4', 'pyDataverse'], entry_points={ 'console_scripts': [ diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index 7c66d834..d5297caa 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -5,17 +5,21 @@ import logging import numpy as np -import urllib.request -from urllib.error import HTTPError, URLError import pandas as pd -import os import xarray as xr +from pypromice.qc.github_data_issues import flagNAN, adjustTime, adjustData +from pypromice.qc.percentiles.outlier_detector import ThresholdBasedOutlierDetector from pypromice.qc.persistence import persistence_qc from pypromice.process.value_clipping import clip_values +__all__ = [ + "toL2", +] + logger = logging.getLogger(__name__) + def toL2( L1: xr.Dataset, vars_df: pd.DataFrame, @@ -61,8 +65,12 @@ def toL2( ds = adjustData(ds) # Adjust data after a user-defined csv files except Exception: logger.exception('Flagging and fixing failed:') + if ds.attrs['format'] == 'TX': - ds = persistence_qc(ds) # Detect and filter data points that seems to be static + ds = persistence_qc(ds) # Flag and remove persistence outliers + # TODO: The configuration should be provided explicitly + outlier_detector = ThresholdBasedOutlierDetector.default() + ds = outlier_detector.filter_data(ds) # Flag and remove percentile outliers T_100 = _getTempK(T_0) ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], @@ -167,291 +175,6 @@ def toL2( ds = clip_values(ds, vars_df) return ds -def flagNAN(ds_in, - flag_url='https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/flags/', - flag_dir='local/flags/'): - '''Read flagged data from .csv file. For each variable, and downstream - dependents, flag as invalid (or other) if set in the flag .csv - - Parameters - ---------- - ds_in : xr.Dataset - Level 0 dataset - flag_url : str - URL to directory where .csv flag files can be found - flag_dir : str - File directory where .csv flag files can be found - - Returns - ------- - ds : xr.Dataset - Level 0 data with flagged data - ''' - ds = ds_in.copy() - df = None - - df = _getDF(flag_url + ds.attrs["station_id"] + ".csv", - os.path.join(flag_dir, ds.attrs["station_id"] + ".csv"), - # download = False, # only for working on draft local flag'n'fix files - ) - - if isinstance(df, pd.DataFrame): - df.t0 = pd.to_datetime(df.t0).dt.tz_localize(None) - df.t1 = pd.to_datetime(df.t1).dt.tz_localize(None) - - if df.shape[0] > 0: - for i in df.index: - t0, t1, avar = df.loc[i,['t0','t1','variable']] - - if avar == '*': - # Set to all vars if var is "*" - varlist = list(ds.keys()) - elif '*' in avar: - # Reads as regex if contains "*" and other characters (e.g. 't_i_.*($)') - varlist = pd.DataFrame(columns = list(ds.keys())).filter(regex=(avar)).columns - else: - varlist = avar.split() - - if 'time' in varlist: varlist.remove("time") - - # Set to all times if times are "n/a" - if pd.isnull(t0): - t0 = ds['time'].values[0] - if pd.isnull(t1): - t1 = ds['time'].values[-1] - - for v in varlist: - if v in list(ds.keys()): - logger.info(f'---> flagging {t0} {t1} {v}') - ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) - else: - logger.info(f'---> could not flag {v} not in dataset') - - return ds - - -def adjustTime(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', - var_list=[], skip_var=[]): - '''Read adjustment data from .csv file. Only applies the "time_shift" adjustment - - Parameters - ---------- - ds : xr.Dataset - Level 0 dataset - adj_url : str - URL to directory where .csv adjustment files can be found - adj_dir : str - File directory where .csv adjustment files can be found - - Returns - ------- - ds : xr.Dataset - Level 0 data with flagged data - ''' - ds_out = ds.copy() - adj_info=None - - adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", - os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"),) - - if isinstance(adj_info, pd.DataFrame): - - - if "time_shift" in adj_info.adjust_function.values: - time_shifts = adj_info.loc[adj_info.adjust_function == "time_shift", :] - # if t1 is left empty, then adjustment is applied until the end of the file - time_shifts.loc[time_shifts.t1.isnull(), "t1"] = pd.to_datetime(ds_out.time.values[-1]).isoformat() - time_shifts.t0 = pd.to_datetime(time_shifts.t0).dt.tz_localize(None) - time_shifts.t1 = pd.to_datetime(time_shifts.t1).dt.tz_localize(None) - - for t0, t1, val in zip( - time_shifts.t0, - time_shifts.t1, - time_shifts.adjust_value, - ): - ds_shifted = ds_out.sel(time=slice(t0,t1)) - ds_shifted['time'] = ds_shifted.time.values + pd.Timedelta(days = val) - - # here we concatenate what was before the shifted part, the shifted - # part and what was after the shifted part - # note that if any data was already present in the target period - # (where the data lands after the shift), it is overwritten - - ds_out = xr.concat( - ( - ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), - t0 + pd.Timedelta(days = val))), - ds_shifted, - ds_out.sel(time=slice(t1 + pd.Timedelta(days = val), - pd.to_datetime(ds_out.time.values[-1]))) - ), - dim = 'time', - ) - if t0 > pd.Timestamp.now(): - ds_out = ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), - t0)) - return ds_out - - -def adjustData(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', - var_list=[], skip_var=[]): - '''Read adjustment data from .csv file. For each variable, and downstream - dependents, adjust data accordingly if set in the adjustment .csv - - Parameters - ---------- - ds : xr.Dataset - Level 0 dataset - adj_url : str - URL to directory where .csv adjustment files can be found - adj_dir : str - File directory where .csv adjustment files can be found - - Returns - ------- - ds : xr.Dataset - Level 0 data with flagged data - ''' - ds_out = ds.copy() - adj_info=None - adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", - os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), - # download = False, # only for working on draft local flag'n'fix files - ) - - if isinstance(adj_info, pd.DataFrame): - # removing potential time shifts from the adjustment list - adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :] - - # if t1 is left empty, then adjustment is applied until the end of the file - adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0] - adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1] - # making all timestamps timezone naive (compatibility with xarray) - adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None) - adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None) - - # if "*" is in the variable name then we interpret it as regex - selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*") - for ind in adj_info.loc[selec, :].index: - line_template = adj_info.loc[ind, :].copy() - regex = adj_info.loc[ind, 'variable'] - for var in pd.DataFrame(columns = list(ds.keys())).filter(regex=regex).columns: - line_template.variable = var - line_template.name = adj_info.index.max() + 1 - adj_info = pd.concat((adj_info, line_template.to_frame().transpose()),axis=0) - adj_info = adj_info.drop(labels=ind, axis=0) - - adj_info = adj_info.sort_values(by=["variable", "t0"]) - adj_info.set_index(["variable", "t0"], drop=False, inplace=True) - - if len(var_list) == 0: - var_list = np.unique(adj_info.variable) - else: - adj_info = adj_info.loc[np.isin(adj_info.variable, var_list), :] - var_list = np.unique(adj_info.variable) - - if len(skip_var) > 0: - adj_info = adj_info.loc[~np.isin(adj_info.variable, skip_var), :] - var_list = np.unique(adj_info.variable) - - for var in var_list: - if var not in list(ds_out.keys()): - logger.info(f'could not adjust {var } not in dataset') - continue - for t0, t1, func, val in zip( - adj_info.loc[var].t0, - adj_info.loc[var].t1, - adj_info.loc[var].adjust_function, - adj_info.loc[var].adjust_value, - ): - if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])): - continue - logger.info(f'---> {t0} {t1} {var} {func} {val}') - if func == "add": - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val - # flagging adjusted values - # if var + "_adj_flag" not in ds_out.columns: - # ds_out[var + "_adj_flag"] = 0 - # msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull() - # ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time - # ds_out.loc[ind, var + "_adj_flag"] = 1 - - if func == "multiply": - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val - if "DW" in var: - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))] % 360 - # flagging adjusted values - # if var + "_adj_flag" not in ds_out.columns: - # ds_out[var + "_adj_flag"] = 0 - # msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull() - # ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time - # ds_out.loc[ind, var + "_adj_flag"] = 1 - - if func == "min_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values - tmp[tmp < val] = np.nan - - if func == "max_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values - tmp[tmp > val] = np.nan - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - - if func == "upper_perc_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100) - df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").var() - for m_start, m_end in zip(df_w.time[:-2], df_w.time[1:]): - msk = (tmp.time >= m_start) & (tmp.time < m_end) - values_month = tmp.loc[msk].values - values_month[values_month < df_w.loc[m_start]] = np.nan - tmp.loc[msk] = values_month - - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - - if func == "biweekly_upper_range_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max() - for m_start, m_end in zip(df_max.time[:-2], df_max.time[1:]): - msk = (tmp.time >= m_start) & (tmp.time < m_end) - lim = df_max.loc[m_start] - val - values_month = tmp.loc[msk].values - values_month[values_month < lim] = np.nan - tmp.loc[msk] = values_month - # remaining samples following outside of the last 2 weeks window - msk = tmp.time >= m_end - lim = df_max.loc[m_start] - val - values_month = tmp.loc[msk].values - values_month[values_month < lim] = np.nan - tmp.loc[msk] = values_month - # updating original pandas - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - - if func == "hampel_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))] - tmp = _hampel(tmp, k=7 * 24, t0=val) - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - - if func == "grad_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff() - tmp[np.roll(msk.abs() > val, -1)] = np.nan - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - - if "swap_with_" in func: - var2 = func[10:] - val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy() - val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy() - ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var - ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2 - - if func == "rotate": - ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360 - - return ds_out def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): '''Calculate cloud cover from T and T_0 @@ -493,6 +216,7 @@ def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): cc[cc < 0] = 0 return cc + def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): '''Calculate surface temperature from air temperature, upwelling and downwelling radiation and emissivity @@ -516,6 +240,7 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): t_surf = ((ulr - (1 - emissivity) * dlr) / emissivity / 5.67e-8)**0.25 - T_0 return t_surf + def calcTilt(tilt_x, tilt_y, deg2rad): '''Calculate station tilt @@ -557,6 +282,7 @@ def calcTilt(tilt_x, tilt_y, deg2rad): # theta_sensor_deg = theta_sensor_rad * rad2deg return phi_sensor_rad, theta_sensor_rad + def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO figure out if T replicate is needed '''Correct relative humidity using Groff & Gratch method, where values are set when freezing and remain the original values when not freezing @@ -599,6 +325,7 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) return rh_cor + def correctPrecip(precip, wspd): '''Correct precipitation with the undercatch correction method used in Yang et al. (1999) and Box et al. (2022), based on Goodison et al. (1998) @@ -654,6 +381,7 @@ def correctPrecip(precip, wspd): return precip_cor, precip_rate + def calcDeclination(doy, hour, minute): '''Calculate sun declination based on time @@ -702,6 +430,7 @@ def calcHourAngle(hour, minute, lon): return 2 * np.pi * (((hour + minute / 60) / 24 - 0.5) - lon/360) # ; - 15.*timezone/360.) + def calcDirectionDeg(HourAngle_rad): #TODO remove if not plan to use this '''Calculate sun direction as degrees. This is an alternative to _calcHourAngle that is currently not implemented into the offical L0>>L3 @@ -754,6 +483,7 @@ def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): ZenithAngle_deg = ZenithAngle_rad * rad2deg return ZenithAngle_rad, ZenithAngle_deg + def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, theta_sensor_rad): '''Calculate angle between sun and upper sensor (to determine when sun is @@ -822,6 +552,7 @@ def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): albedo = albedo.ffill(dim='time').bfill(dim='time') #TODO remove this line and one above? return albedo, OKalbedos + def calcTOA(ZenithAngle_deg, ZenithAngle_rad): '''Calculate incoming shortwave radiation at the top of the atmosphere, accounting for sunset periods @@ -912,75 +643,6 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, return CorFac_all -def _getDF(flag_url, flag_file, download=True): - '''Get dataframe from flag or adjust file. First attempt to retrieve from - URL. If this fails then attempt to retrieve from local file - - Parameters - ---------- - flag_url : str - URL address to file - flag_file : str - Local path to file - download : bool - Flag to download file from URL - - Returns - ------- - df : pd.DataFrame - Flag or adjustment dataframe - ''' - - # Download local copy as csv - if download==True: - os.makedirs(os.path.dirname(flag_file), exist_ok = True) - - try: - urllib.request.urlretrieve(flag_url, flag_file) - logger.info(f"Downloaded a {flag_file.split('/')[-2][:-1],} file to {flag_file}") - except (HTTPError, URLError) as e: - logger.info(f"Unable to download {flag_file.split('/')[-2][:-1],} file, using local file: {flag_file}") - else: - logger.info(f"Using local {flag_file.split('/')[-2][:-1],} file: {flag_file}") - - if os.path.isfile(flag_file): - df = pd.read_csv( - flag_file, - comment="#", - skipinitialspace=True, - ).dropna(how='all', axis='rows') - else: - df=None - logger.info(f"No {flag_file.split('/')[-2][:-1]} file to read.") - return df - - -def _hampel(vals_orig, k=7*24, t0=3): - '''Hampel filter - - Parameters - ---------- - vals : pd.DataSeries - Series of values from which to remove outliers - k : int - Size of window, including the sample. For example, 7 is equal to 3 on - either side of value. The default is 7*24. - t0 : int - Threshold value. The default is 3. - ''' - #Make copy so original not edited - vals=vals_orig.copy() - - #Hampel Filter - L= 1.4826 - rolling_median=vals.rolling(k).median() - difference=np.abs(rolling_median-vals) - median_abs_deviation=difference.rolling(k).median() - threshold= t0 *L * median_abs_deviation - outlier_idx=difference>threshold - outlier_idx[0:round(k/2)]=False - vals.loc[outlier_idx]=np.nan - return(vals) def _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass): '''Check sun position @@ -1025,12 +687,14 @@ def _getTempK(T_0): # Steam point temperature in K''' return T_0+100 + def _getRotation(): #TODO same as L2toL3._getRotation() '''Return degrees-to-radians and radians-to-degrees''' deg2rad = np.pi / 180 rad2deg = 1 / deg2rad return deg2rad, rad2deg + if __name__ == "__main__": # unittest.main() pass diff --git a/src/pypromice/qc/github_data_issues.py b/src/pypromice/qc/github_data_issues.py new file mode 100644 index 00000000..8d963727 --- /dev/null +++ b/src/pypromice/qc/github_data_issues.py @@ -0,0 +1,374 @@ +import logging +import os +import urllib.request +from urllib.error import HTTPError, URLError + +import numpy as np +import pandas as pd +import xarray as xr + +__all__ = [ + 'flagNAN', + 'adjustTime', + 'adjustData', +] + +logger = logging.getLogger(__name__) + + +def flagNAN(ds_in, + flag_url='https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/flags/', + flag_dir='local/flags/'): + '''Read flagged data from .csv file. For each variable, and downstream + dependents, flag as invalid (or other) if set in the flag .csv + + Parameters + ---------- + ds_in : xr.Dataset + Level 0 dataset + flag_url : str + URL to directory where .csv flag files can be found + flag_dir : str + File directory where .csv flag files can be found + + Returns + ------- + ds : xr.Dataset + Level 0 data with flagged data + ''' + ds = ds_in.copy() + df = None + + df = _getDF(flag_url + ds.attrs["station_id"] + ".csv", + os.path.join(flag_dir, ds.attrs["station_id"] + ".csv"), + # download = False, # only for working on draft local flag'n'fix files + ) + + if isinstance(df, pd.DataFrame): + df.t0 = pd.to_datetime(df.t0).dt.tz_localize(None) + df.t1 = pd.to_datetime(df.t1).dt.tz_localize(None) + + if df.shape[0] > 0: + for i in df.index: + t0, t1, avar = df.loc[i,['t0','t1','variable']] + + if avar == '*': + # Set to all vars if var is "*" + varlist = list(ds.keys()) + elif '*' in avar: + # Reads as regex if contains "*" and other characters (e.g. 't_i_.*($)') + varlist = pd.DataFrame(columns = list(ds.keys())).filter(regex=(avar)).columns + else: + varlist = avar.split() + + if 'time' in varlist: varlist.remove("time") + + # Set to all times if times are "n/a" + if pd.isnull(t0): + t0 = ds['time'].values[0] + if pd.isnull(t1): + t1 = ds['time'].values[-1] + + for v in varlist: + if v in list(ds.keys()): + logger.info(f'---> flagging {t0} {t1} {v}') + ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) + else: + logger.info(f'---> could not flag {v} not in dataset') + + return ds + + +def adjustTime(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', + var_list=[], skip_var=[]): + '''Read adjustment data from .csv file. Only applies the "time_shift" adjustment + + Parameters + ---------- + ds : xr.Dataset + Level 0 dataset + adj_url : str + URL to directory where .csv adjustment files can be found + adj_dir : str + File directory where .csv adjustment files can be found + + Returns + ------- + ds : xr.Dataset + Level 0 data with flagged data + ''' + ds_out = ds.copy() + adj_info=None + + adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", + os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"),) + + if isinstance(adj_info, pd.DataFrame): + + + if "time_shift" in adj_info.adjust_function.values: + time_shifts = adj_info.loc[adj_info.adjust_function == "time_shift", :] + # if t1 is left empty, then adjustment is applied until the end of the file + time_shifts.loc[time_shifts.t1.isnull(), "t1"] = pd.to_datetime(ds_out.time.values[-1]).isoformat() + time_shifts.t0 = pd.to_datetime(time_shifts.t0).dt.tz_localize(None) + time_shifts.t1 = pd.to_datetime(time_shifts.t1).dt.tz_localize(None) + + for t0, t1, val in zip( + time_shifts.t0, + time_shifts.t1, + time_shifts.adjust_value, + ): + ds_shifted = ds_out.sel(time=slice(t0,t1)) + ds_shifted['time'] = ds_shifted.time.values + pd.Timedelta(days = val) + + # here we concatenate what was before the shifted part, the shifted + # part and what was after the shifted part + # note that if any data was already present in the target period + # (where the data lands after the shift), it is overwritten + + ds_out = xr.concat( + ( + ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), + t0 + pd.Timedelta(days = val))), + ds_shifted, + ds_out.sel(time=slice(t1 + pd.Timedelta(days = val), + pd.to_datetime(ds_out.time.values[-1]))) + ), + dim = 'time', + ) + if t0 > pd.Timestamp.now(): + ds_out = ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), + t0)) + return ds_out + + +def adjustData(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', + var_list=[], skip_var=[]): + '''Read adjustment data from .csv file. For each variable, and downstream + dependents, adjust data accordingly if set in the adjustment .csv + + Parameters + ---------- + ds : xr.Dataset + Level 0 dataset + adj_url : str + URL to directory where .csv adjustment files can be found + adj_dir : str + File directory where .csv adjustment files can be found + + Returns + ------- + ds : xr.Dataset + Level 0 data with flagged data + ''' + ds_out = ds.copy() + adj_info=None + adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", + os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), + # download = False, # only for working on draft local flag'n'fix files + ) + + if isinstance(adj_info, pd.DataFrame): + # removing potential time shifts from the adjustment list + adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :] + + # if t1 is left empty, then adjustment is applied until the end of the file + adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0] + adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1] + # making all timestamps timezone naive (compatibility with xarray) + adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None) + adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None) + + # if "*" is in the variable name then we interpret it as regex + selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*") + for ind in adj_info.loc[selec, :].index: + line_template = adj_info.loc[ind, :].copy() + regex = adj_info.loc[ind, 'variable'] + for var in pd.DataFrame(columns = list(ds.keys())).filter(regex=regex).columns: + line_template.variable = var + line_template.name = adj_info.index.max() + 1 + adj_info = pd.concat((adj_info, line_template.to_frame().transpose()),axis=0) + adj_info = adj_info.drop(labels=ind, axis=0) + + adj_info = adj_info.sort_values(by=["variable", "t0"]) + adj_info.set_index(["variable", "t0"], drop=False, inplace=True) + + if len(var_list) == 0: + var_list = np.unique(adj_info.variable) + else: + adj_info = adj_info.loc[np.isin(adj_info.variable, var_list), :] + var_list = np.unique(adj_info.variable) + + if len(skip_var) > 0: + adj_info = adj_info.loc[~np.isin(adj_info.variable, skip_var), :] + var_list = np.unique(adj_info.variable) + + for var in var_list: + if var not in list(ds_out.keys()): + logger.info(f'could not adjust {var } not in dataset') + continue + for t0, t1, func, val in zip( + adj_info.loc[var].t0, + adj_info.loc[var].t1, + adj_info.loc[var].adjust_function, + adj_info.loc[var].adjust_value, + ): + if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])): + continue + logger.info(f'---> {t0} {t1} {var} {func} {val}') + if func == "add": + ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val + # flagging adjusted values + # if var + "_adj_flag" not in ds_out.columns: + # ds_out[var + "_adj_flag"] = 0 + # msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull() + # ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time + # ds_out.loc[ind, var + "_adj_flag"] = 1 + + if func == "multiply": + ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val + if "DW" in var: + ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))] % 360 + # flagging adjusted values + # if var + "_adj_flag" not in ds_out.columns: + # ds_out[var + "_adj_flag"] = 0 + # msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull() + # ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time + # ds_out.loc[ind, var + "_adj_flag"] = 1 + + if func == "min_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values + tmp[tmp < val] = np.nan + + if func == "max_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values + tmp[tmp > val] = np.nan + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp + + if func == "upper_perc_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() + df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100) + df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").var() + for m_start, m_end in zip(df_w.time[:-2], df_w.time[1:]): + msk = (tmp.time >= m_start) & (tmp.time < m_end) + values_month = tmp.loc[msk].values + values_month[values_month < df_w.loc[m_start]] = np.nan + tmp.loc[msk] = values_month + + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + + if func == "biweekly_upper_range_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() + df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max() + for m_start, m_end in zip(df_max.time[:-2], df_max.time[1:]): + msk = (tmp.time >= m_start) & (tmp.time < m_end) + lim = df_max.loc[m_start] - val + values_month = tmp.loc[msk].values + values_month[values_month < lim] = np.nan + tmp.loc[msk] = values_month + # remaining samples following outside of the last 2 weeks window + msk = tmp.time >= m_end + lim = df_max.loc[m_start] - val + values_month = tmp.loc[msk].values + values_month[values_month < lim] = np.nan + tmp.loc[msk] = values_month + # updating original pandas + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + + if func == "hampel_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))] + tmp = _hampel(tmp, k=7 * 24, t0=val) + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + + if func == "grad_filter": + tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() + msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff() + tmp[np.roll(msk.abs() > val, -1)] = np.nan + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp + + if "swap_with_" in func: + var2 = func[10:] + val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy() + val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy() + ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var + ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2 + + if func == "rotate": + ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360 + + return ds_out + + +def _getDF(flag_url, flag_file, download=True): + '''Get dataframe from flag or adjust file. First attempt to retrieve from + URL. If this fails then attempt to retrieve from local file + + Parameters + ---------- + flag_url : str + URL address to file + flag_file : str + Local path to file + download : bool + Flag to download file from URL + + Returns + ------- + df : pd.DataFrame + Flag or adjustment dataframe + ''' + + # Download local copy as csv + if download==True: + os.makedirs(os.path.dirname(flag_file), exist_ok = True) + + try: + urllib.request.urlretrieve(flag_url, flag_file) + logger.info(f"Downloaded a {flag_file.split('/')[-2][:-1],} file to {flag_file}") + except (HTTPError, URLError) as e: + logger.info(f"Unable to download {flag_file.split('/')[-2][:-1],} file, using local file: {flag_file}") + else: + logger.info(f"Using local {flag_file.split('/')[-2][:-1],} file: {flag_file}") + + if os.path.isfile(flag_file): + df = pd.read_csv( + flag_file, + comment="#", + skipinitialspace=True, + ).dropna(how='all', axis='rows') + else: + df=None + logger.info(f"No {flag_file.split('/')[-2][:-1]} file to read.") + return df + + +def _hampel(vals_orig, k=7*24, t0=3): + '''Hampel filter + + Parameters + ---------- + vals : pd.DataSeries + Series of values from which to remove outliers + k : int + Size of window, including the sample. For example, 7 is equal to 3 on + either side of value. The default is 7*24. + t0 : int + Threshold value. The default is 3. + ''' + #Make copy so original not edited + vals=vals_orig.copy() + + #Hampel Filter + L= 1.4826 + rolling_median=vals.rolling(k).median() + difference=np.abs(rolling_median-vals) + median_abs_deviation=difference.rolling(k).median() + threshold= t0 *L * median_abs_deviation + outlier_idx=difference>threshold + outlier_idx[0:round(k/2)]=False + vals.loc[outlier_idx]=np.nan + return(vals) diff --git a/src/pypromice/qc/percentiles/__init__.py b/src/pypromice/qc/percentiles/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/pypromice/qc/percentiles/compute_thresholds.py b/src/pypromice/qc/percentiles/compute_thresholds.py new file mode 100644 index 00000000..f7899472 --- /dev/null +++ b/src/pypromice/qc/percentiles/compute_thresholds.py @@ -0,0 +1,221 @@ +import sys +from datetime import datetime + +import pandas as pd + +from pypromice.process import AWS +from pathlib import Path +import logging +from pypromice.qc.github_data_issues import adjustTime, flagNAN, adjustData + + +# %% +logger = logging.getLogger("ComputeThreshold") + + +# %% +def compute_all_thresholds( + station_thresholds_root: Path, + thresholds_output_path: Path, + aws_l0_repo_path: Path, + start_time: datetime, + end_time: datetime, +): + logger.info("Computing all thresholds for stations available in the L0 repository") + logger.info(f"station_thresholds_root: {station_thresholds_root}") + logger.info(f"thresholds_output_path: {thresholds_output_path}") + logger.info(f"aws_l0_repo_path: {aws_l0_repo_path}") + logger.info(f"start_time: {start_time}") + logger.info(f"end_time: {end_time}") + + station_thresholds_root.mkdir(parents=True, exist_ok=True) + + # %% + output_paths = [] + for config_path in aws_l0_repo_path.glob("raw/config/*.toml"): + stid = config_path.stem + + logger.info(f"Processing {stid}") + data_path = aws_l0_repo_path.joinpath("raw", stid) + output_path = station_thresholds_root.joinpath(f"{stid}.csv") + try: + if not output_path.exists(): + threshold = find_thresholds( + stid, + config_path, + data_path, + start_time, + end_time, + ) + threshold.to_csv( + path_or_buf=output_path, index=False, float_format="{:.2f}".format + ) + output_paths.append(output_path) + except Exception: + logger.exception(f"Failed processing {stid}") + continue + + logger.info("Merge threshold files") + pd.concat(pd.read_csv(p) for p in output_paths).to_csv( + thresholds_output_path, index=False, float_format="{:.2f}".format + ) + + +def find_thresholds( + stid: str, + config_path: Path, + data_path: Path, + start_time: datetime, + end_time: datetime, +) -> pd.DataFrame: + """ + Compute variable threshold for a station using historical distribution quantiles. + + Parameters + ---------- + stid + config_path + data_path + start_time + end_time + + Returns + ------- + Upper and lower thresholds for a set of variables and seasons + + + """ + stid_logger = logger.getChild(stid) + # %% + + stid_logger.info("Read AWS data and get L1") + aws = AWS(config_file=config_path.as_posix(), inpath=data_path.as_posix()) + aws.getL1() + + # %% + stid_logger.info("Apply QC filters on data") + ds = aws.L1A.copy(deep=True) # Reassign dataset + ds = adjustTime(ds) # Adjust time after a user-defined csv files + ds = flagNAN(ds) # Flag NaNs after a user-defined csv files + ds = adjustData(ds) + + # %% + stid_logger.info("Determine thresholds") + df = ( + ds[["rh_u", "wspd_u", "p_u", "t_u"]] + .to_pandas() + .loc[start_time:end_time] + .assign(season=lambda df: (df.index.month // 3) % 4) + ) + + threshold_rows = [] + + # Pressure + p_lo, p_hi = df["p_u"].quantile([0.005, 0.995]) + [-12, 12] + threshold_rows.append( + dict( + stid=stid, + variable_pattern="p_[ul]", + lo=p_lo, + hi=p_hi, + ) + ) + threshold_rows.append( + dict( + stid=stid, + variable_pattern="p_i", + lo=p_lo - 1000, + hi=p_hi - 1000, + ) + ) + + # Wind speed + lo, hi = df["wspd_u"].quantile([0.005, 0.995]) + [-12, 12] + threshold_rows.append( + dict( + stid=stid, + variable_pattern="wspd_[uli]", + lo=lo, + hi=hi, + ) + ) + + # Temperature + season_map = ["winter", "spring", "summer", "fall"] + for season_index, season_df in df[["t_u", "season"]].groupby( + (df.index.month // 3) % 4 + ): + lo, hi = season_df.quantile([0.005, 0.995])["t_u"] + [-9, 9] + + threshold_rows.append( + dict( + stid=stid, + variable_pattern="t_[uli]", + season=season_map[season_index], + lo=lo, + hi=hi, + ) + ) + + threshold = pd.DataFrame(threshold_rows) + stid_logger.info(threshold) + return threshold + # %% + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--l0", + required=True, + type=Path, + help="L0 repository root path", + ) + parser.add_argument( + "--thresholds_output_path", + "-o", + default=Path(__file__).parent.joinpath("thresholds.csv"), + type=Path, + help="Output csv file with thresholds for all stations", + ) + parser.add_argument( + "--station_thresholds_root", + "--str", + default=Path(__file__).parent.joinpath("station_thresholds"), + type=Path, + help="Directory containing threshold files for the individual stations", + ) + parser.add_argument( + "--start_time", + default="2000-01-01", + help="Start time for data series. Format: %Y-%m-%d", + ) + parser.add_argument( + "--end_time", + default="2023-10-01", + help="End time for data series. Format: %Y-%m-%d", + ) + args = parser.parse_args() + + logging.basicConfig( + format="%(asctime)s; %(levelname)s; %(name)s; %(message)s", + level=logging.INFO, + stream=sys.stdout, + ) + + thresholds_output_path = args.thresholds_output_path + station_thresholds_root = args.station_thresholds_root + + start_time = datetime.strptime(args.start_time, "%Y-%m-%d") + end_time = datetime.strptime(args.end_time, "%Y-%m-%d") + aws_l0_repo_path = args.l0 + + compute_all_thresholds( + station_thresholds_root=station_thresholds_root, + thresholds_output_path=thresholds_output_path, + aws_l0_repo_path=aws_l0_repo_path, + start_time=start_time, + end_time=end_time, + ) diff --git a/src/pypromice/qc/percentiles/outlier_detector.py b/src/pypromice/qc/percentiles/outlier_detector.py new file mode 100644 index 00000000..2eba53dd --- /dev/null +++ b/src/pypromice/qc/percentiles/outlier_detector.py @@ -0,0 +1,112 @@ +from pathlib import Path + +import attrs +import numpy as np +import pandas as pd +import xarray as xr + + +__all__ = ["ThresholdBasedOutlierDetector"] + +season_month_map = { + "winter": {12, 1, 2}, + "spring": {3, 4, 5}, + "summer": {6, 7, 8}, + "fall": {9, 10, 11}, +} + + +def get_season_index_mask(data_set: pd.DataFrame, season: str) -> np.ndarray: + season_months = season_month_map.get( + season, {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} + ) + return data_set.index.month.isin(season_months)[:, None] + + +def detect_outliers(data_set: pd.DataFrame, thresholds: pd.DataFrame) -> pd.DataFrame: + masks = [] + + season_index_mask = { + season: get_season_index_mask(data_set, season) + for season in thresholds["season"].unique() + } + + for variable_pattern, pattern_configs in thresholds.groupby("variable_pattern"): + df = data_set.filter(regex=f"^{variable_pattern}$") + mask = None + for _, season_config in pattern_configs.iterrows(): + threshold_mask = (df < season_config.lo) | (df > season_config.hi) + season_mask = threshold_mask & season_index_mask[season_config.season] + + if mask is None: + mask = season_mask + else: + mask |= season_mask + masks.append(mask) + + return pd.concat(masks, axis=1) + + +def filter_data(data_set: pd.DataFrame, thresholds: pd.DataFrame) -> pd.DataFrame: + mask = detect_outliers(data_set, thresholds) + output_data = data_set.copy() + output_data[mask] = np.nan + return output_data + + +@attrs.define +class ThresholdBasedOutlierDetector: + thresholds: pd.DataFrame = attrs.field() + + def filter_data(self, ds: xr.Dataset) -> xr.Dataset: + """ + Filter samples across all variables by assigning to nan + """ + stid = ds.station_id + + stid_thresholds = self.thresholds.query(f"stid == '{stid}'") + if stid_thresholds.empty: + return ds + + data_df = ds.to_dataframe() # Switch to pandas + data_df = filter_data( + data_set=data_df, + thresholds=stid_thresholds, + ) + + ds_out: xr.Dataset = data_df.to_xarray() + ds_out = ds_out.assign_attrs(ds.attrs) # Dataset attrs + for x in ds_out.data_vars: # variable-specific attrs + ds_out[x].attrs = ds[x].attrs + + return ds_out + + @classmethod + def from_csv_config(cls, config_file: Path) -> "ThresholdBasedOutlierDetector": + """ + Instantiate using explicit csv file with explicit thresholds + + The CSV file shall have the format: + + * Comma separated + * First row is header + * Columns + * stid: Station id + * variabel_pattern: regular expression filtering the variable name + * lo: Low threshold + * hi: High threshold + * season: The season of the filter: [, winter, spring, summer, fall]. The empty string means all seasons + + """ + return cls(thresholds=pd.read_csv(config_file)) + + @classmethod + def default(cls) -> "ThresholdBasedOutlierDetector": + """ + Instantiate using aws thresholds stored in the python package. + Returns + ------- + + """ + default_thresholds_path = Path(__file__).parent.joinpath("thresholds.csv") + return cls.from_csv_config(default_thresholds_path) diff --git a/src/pypromice/qc/percentiles/thresholds.csv b/src/pypromice/qc/percentiles/thresholds.csv new file mode 100644 index 00000000..9bf50727 --- /dev/null +++ b/src/pypromice/qc/percentiles/thresholds.csv @@ -0,0 +1,312 @@ +stid,variable_pattern,lo,hi,season +NUK_K,p_[ul],877.96,958.79, +NUK_K,p_i,-122.04,-41.21, +NUK_K,wspd_[uli],-12.00,24.74, +NUK_K,t_[uli],-34.22,11.27,winter +NUK_K,t_[uli],-33.10,16.28,spring +NUK_K,t_[uli],-12.57,21.40,summer +NUK_K,t_[uli],-27.04,15.72,fall +SWC_O,p_[ul],831.20,909.80, +SWC_O,p_i,-168.80,-90.20, +SWC_O,wspd_[uli],-11.68,30.80, +SWC_O,t_[uli],-49.84,6.70,winter +SWC_O,t_[uli],-36.90,9.20,spring +SWC_O,t_[uli],-20.90,13.40,summer +SWC_O,t_[uli],-36.82,12.00,fall +KPC_L,p_[ul],924.86,1006.19, +KPC_L,p_i,-75.14,6.19, +KPC_L,wspd_[uli],-11.89,29.23, +KPC_L,t_[uli],-45.45,2.16,winter +KPC_L,t_[uli],-44.70,9.46,spring +KPC_L,t_[uli],-16.81,16.08,summer +KPC_L,t_[uli],-39.66,9.46,fall +EGP,p_[ul],675.50,752.80, +EGP,p_i,-324.50,-247.20, +EGP,wspd_[uli],-12.00,24.10, +EGP,t_[uli],-69.45,-10.18,winter +EGP,t_[uli],-66.58,1.41,spring +EGP,t_[uli],-37.66,8.45,summer +EGP,t_[uli],-62.60,4.00,fall +CEN2,p_[ul],762.74,826.00, +CEN2,p_i,-237.26,-174.00, +CEN2,wspd_[uli],-12.00,26.81, +CEN2,t_[uli],-36.00,-6.10,winter +CEN2,t_[uli],-32.30,2.20,spring +CEN2,t_[uli],-30.50,7.60,summer +CEN2,t_[uli],-53.70,-1.60,fall +MIT,p_[ul],900.19,993.97, +MIT,p_i,-99.81,-6.03, +MIT,wspd_[uli],-12.00,28.26, +MIT,t_[uli],-29.36,10.81,winter +MIT,t_[uli],-31.05,14.54,spring +MIT,t_[uli],-11.51,19.61,summer +MIT,t_[uli],-23.66,14.59,fall +TUN,p_[ul],744.83,807.00, +TUN,p_i,-255.17,-193.00, +TUN,wspd_[uli],-11.85,24.81, +TUN,t_[uli],-51.90,-3.65,winter +TUN,t_[uli],-55.90,1.40,spring +TUN,t_[uli],-31.70,8.90,summer +TUN,t_[uli],-56.40,8.10,fall +KPC_U,p_[ul],864.79,941.47, +KPC_U,p_i,-135.21,-58.53, +KPC_U,wspd_[uli],-12.00,27.73, +KPC_U,t_[uli],-51.46,-0.09,winter +KPC_U,t_[uli],-50.44,6.79,spring +KPC_U,t_[uli],-21.11,12.77,summer +KPC_U,t_[uli],-45.74,6.19,fall +UPE_L,p_[ul],940.00,1023.48, +UPE_L,p_i,-60.00,23.48, +UPE_L,wspd_[uli],-12.00,29.37, +UPE_L,t_[uli],-40.29,11.55,winter +UPE_L,t_[uli],-40.90,15.97,spring +UPE_L,t_[uli],-13.15,19.28,summer +UPE_L,t_[uli],-29.23,15.26,fall +KAN_M,p_[ul],816.70,896.46, +KAN_M,p_i,-183.30,-103.54, +KAN_M,wspd_[uli],-12.00,31.99, +KAN_M,t_[uli],-50.27,6.21,winter +KAN_M,t_[uli],-48.60,11.85,spring +KAN_M,t_[uli],-21.57,12.59,summer +KAN_M,t_[uli],-42.77,11.79,fall +QAS_A,p_[ul],838.61,919.19, +QAS_A,p_i,-161.39,-80.81, +QAS_A,wspd_[uli],-12.00,28.55, +QAS_A,t_[uli],-36.80,3.98,winter +QAS_A,t_[uli],-36.71,17.11,spring +QAS_A,t_[uli],-15.24,18.32,summer +QAS_A,t_[uli],-31.06,13.03,fall +NSE,p_[ul],694.70,784.80, +NSE,p_i,-305.30,-215.20, +NSE,wspd_[uli],-12.00,29.18, +NSE,t_[uli],-56.60,-2.40,winter +NSE,t_[uli],-53.54,-20.79,spring +NSE,t_[uli],-29.20,11.00,summer +NSE,t_[uli],-48.70,6.40,fall +UPE_U,p_[ul],852.55,932.91, +UPE_U,p_i,-147.45,-67.09, +UPE_U,wspd_[uli],-11.93,29.59, +UPE_U,t_[uli],-45.20,6.30,winter +UPE_U,t_[uli],-45.68,13.19,spring +UPE_U,t_[uli],-18.84,14.93,summer +UPE_U,t_[uli],-36.32,11.26,fall +QAS_M,p_[ul],884.52,970.17, +QAS_M,p_i,-115.48,-29.83, +QAS_M,wspd_[uli],-12.00,29.35, +QAS_M,t_[uli],-34.30,11.12,winter +QAS_M,t_[uli],-29.77,16.09,spring +QAS_M,t_[uli],-11.57,16.11,summer +QAS_M,t_[uli],-25.79,14.85,fall +QAS_L,p_[ul],924.21,1013.41, +QAS_L,p_i,-75.79,13.41, +QAS_L,wspd_[uli],-12.00,30.08, +QAS_L,t_[uli],-30.44,15.21,winter +QAS_L,t_[uli],-29.98,17.38,spring +QAS_L,t_[uli],-9.46,18.98,summer +QAS_L,t_[uli],-23.72,17.12,fall +NUK_L,p_[ul],897.14,980.03, +NUK_L,p_i,-102.86,-19.97, +NUK_L,wspd_[uli],-12.00,27.23, +NUK_L,t_[uli],-35.82,14.42,winter +NUK_L,t_[uli],-35.35,18.04,spring +NUK_L,t_[uli],-10.63,20.76,summer +NUK_L,t_[uli],-29.31,17.66,fall +NAU,p_[ul],715.60,783.20, +NAU,p_i,-284.40,-216.80, +NAU,wspd_[uli],-11.53,28.77, +NAU,t_[uli],-58.30,-2.90,winter +NAU,t_[uli],-49.70,1.50,spring +NAU,t_[uli],-34.79,7.90,summer +NAU,t_[uli],-55.70,9.50,fall +JAR,p_[ul],881.00,929.00, +JAR,p_i,-119.00,-71.00, +JAR,wspd_[uli],-11.62,29.82, +JAR,t_[uli],-14.60,13.30,summer +FRE,p_[ul],638.31,799.82, +FRE,p_i,-361.69,-200.18, +FRE,wspd_[uli],-12.00,22.62, +FRE,t_[uli],-38.20,10.02,winter +FRE,t_[uli],-36.55,18.07,spring +FRE,t_[uli],-12.61,22.82,summer +FRE,t_[uli],-33.93,17.69,fall +CP1,p_[ul],747.90,826.00, +CP1,p_i,-252.10,-174.00, +CP1,wspd_[uli],-12.00,31.70, +CP1,t_[uli],-58.30,1.10,winter +CP1,t_[uli],-46.40,6.70,spring +CP1,t_[uli],-26.70,10.80,summer +CP1,t_[uli],-49.30,10.40,fall +KAN_U,p_[ul],755.36,834.53, +KAN_U,p_i,-244.64,-165.47, +KAN_U,wspd_[uli],-12.00,33.94, +KAN_U,t_[uli],-53.53,2.20,winter +KAN_U,t_[uli],-52.57,10.38,spring +KAN_U,t_[uli],-26.67,11.61,summer +KAN_U,t_[uli],-47.97,9.90,fall +KAN_L,p_[ul],884.10,967.42, +KAN_L,p_i,-115.90,-32.58, +KAN_L,wspd_[uli],-12.00,28.45, +KAN_L,t_[uli],-41.57,11.24,winter +KAN_L,t_[uli],-40.69,15.09,spring +KAN_L,t_[uli],-13.58,16.95,summer +KAN_L,t_[uli],-33.55,15.56,fall +QAS_Uv3,p_[ul],859.94,946.06, +QAS_Uv3,p_i,-140.06,-53.94, +QAS_Uv3,wspd_[uli],-12.00,29.09, +QAS_Uv3,t_[uli],-36.70,7.70,winter +QAS_Uv3,t_[uli],-28.80,13.10,spring +QAS_Uv3,t_[uli],-12.29,15.20,summer +QAS_Uv3,t_[uli],-25.20,14.90,fall +ZAK_L,p_[ul],887.39,966.32, +ZAK_L,p_i,-112.61,-33.68, +ZAK_L,wspd_[uli],-11.97,25.32, +ZAK_L,t_[uli],-37.76,11.67,winter +ZAK_L,t_[uli],-35.50,16.92,spring +ZAK_L,t_[uli],-13.01,21.22,summer +ZAK_L,t_[uli],-33.80,14.72,fall +NEM,p_[ul],657.30,1013.98, +NEM,p_i,-342.70,13.98, +NEM,wspd_[uli],-12.00,25.85, +NEM,t_[uli],-30.58,7.20,summer +NEM,t_[uli],-58.60,-6.10,fall +NUK_U,p_[ul],832.02,911.63, +NUK_U,p_i,-167.98,-88.37, +NUK_U,wspd_[uli],-12.00,32.44, +NUK_U,t_[uli],-41.31,9.59,winter +NUK_U,t_[uli],-41.67,14.44,spring +NUK_U,t_[uli],-15.56,16.13,summer +NUK_U,t_[uli],-34.98,13.45,fall +TAS_A,p_[ul],847.23,942.82, +TAS_A,p_i,-152.77,-57.18, +TAS_A,wspd_[uli],-12.00,37.72, +TAS_A,t_[uli],-36.02,7.45,winter +TAS_A,t_[uli],-32.13,15.79,spring +TAS_A,t_[uli],-15.13,19.41,summer +TAS_A,t_[uli],-28.88,13.28,fall +HUM,p_[ul],669.63,963.00, +HUM,p_i,-330.37,-37.00, +HUM,wspd_[uli],-11.12,32.84, +HUM,t_[uli],-58.80,-2.63,winter +HUM,t_[uli],-27.90,9.70,summer +HUM,t_[uli],-56.20,9.10,fall +NUK_N,p_[ul],854.24,934.01, +NUK_N,p_i,-145.76,-65.99, +NUK_N,wspd_[uli],-12.00,32.87, +NUK_N,t_[uli],-37.90,7.89,winter +NUK_N,t_[uli],-35.67,15.68,spring +NUK_N,t_[uli],-16.90,14.72,summer +NUK_N,t_[uli],-32.96,12.92,fall +TAS_U,p_[ul],885.32,979.02, +TAS_U,p_i,-114.68,-20.98, +TAS_U,wspd_[uli],-12.00,38.30, +TAS_U,t_[uli],-31.91,10.66,winter +TAS_U,t_[uli],-33.16,13.20,spring +TAS_U,t_[uli],-12.88,16.75,summer +TAS_U,t_[uli],-25.74,14.06,fall +KAN_B,p_[ul],920.85,1004.05, +KAN_B,p_i,-79.15,4.05, +KAN_B,wspd_[uli],-12.00,24.36, +KAN_B,t_[uli],-40.72,12.98,winter +KAN_B,t_[uli],-40.23,19.02,spring +KAN_B,t_[uli],-10.71,22.39,summer +KAN_B,t_[uli],-32.22,18.16,fall +TAS_L,p_[ul],721.70,1021.25, +TAS_L,p_i,-278.30,21.25, +TAS_L,wspd_[uli],-12.00,38.73, +TAS_L,t_[uli],-29.33,12.68,winter +TAS_L,t_[uli],-30.18,18.26,spring +TAS_L,t_[uli],-11.96,18.32,summer +TAS_L,t_[uli],-22.81,15.56,fall +NAE,p_[ul],708.75,752.65, +NAE,p_i,-291.25,-247.35, +NAE,wspd_[uli],-11.33,23.10, +NAE,t_[uli],-18.36,5.64,spring +NAE,t_[uli],-21.17,6.71,summer +SCO_U,p_[ul],848.32,931.61, +SCO_U,p_i,-151.68,-68.39, +SCO_U,wspd_[uli],-11.81,22.90, +SCO_U,t_[uli],-41.38,3.73,winter +SCO_U,t_[uli],-41.01,13.32,spring +SCO_U,t_[uli],-15.98,15.61,summer +SCO_U,t_[uli],-36.27,12.11,fall +SWC,p_[ul],831.00,1037.70, +SWC,p_i,-169.00,37.70, +SWC,wspd_[uli],-12.00,30.26, +SWC,t_[uli],-43.60,5.80,winter +SWC,t_[uli],-50.30,11.70,spring +SWC,t_[uli],-19.20,12.90,summer +SWC,t_[uli],-40.10,9.60,fall +NUK_Uv3,p_[ul],830.00,910.00, +NUK_Uv3,p_i,-170.00,-90.00, +NUK_Uv3,wspd_[uli],-12.00,29.36, +NUK_Uv3,t_[uli],-43.00,7.40,winter +NUK_Uv3,t_[uli],-38.00,12.10,spring +NUK_Uv3,t_[uli],-16.30,15.10,summer +NUK_Uv3,t_[uli],-15.80,13.60,fall +DY2,p_[ul],730.70,810.50, +DY2,p_i,-269.30,-189.50, +DY2,wspd_[uli],-11.67,35.46, +DY2,t_[uli],-54.50,-0.90,winter +DY2,t_[uli],-47.30,8.60,spring +DY2,t_[uli],-27.90,10.70,summer +DY2,t_[uli],-45.80,11.70,fall +SDM,p_[ul],650.30,734.00, +SDM,p_i,-349.70,-266.00, +SDM,wspd_[uli],-12.00,32.30, +SDM,t_[uli],-53.80,-3.70,winter +SDM,t_[uli],-47.43,-9.40,spring +SDM,t_[uli],-30.20,10.50,summer +SDM,t_[uli],-46.80,9.90,fall +SCO_L,p_[ul],907.50,993.49, +SCO_L,p_i,-92.50,-6.51, +SCO_L,wspd_[uli],-12.00,21.02, +SCO_L,t_[uli],-39.09,6.94,winter +SCO_L,t_[uli],-37.55,14.65,spring +SCO_L,t_[uli],-12.10,17.57,summer +SCO_L,t_[uli],-32.92,12.85,fall +CEN1,p_[ul],750.48,827.52, +CEN1,p_i,-249.52,-172.48, +CEN1,wspd_[uli],-12.00,27.66, +CEN1,t_[uli],-58.65,-5.02,winter +CEN1,t_[uli],-59.82,4.05,spring +CEN1,t_[uli],-29.83,9.84,summer +CEN1,t_[uli],-51.78,1.83,fall +JAR_O,p_[ul],857.10,936.40, +JAR_O,p_i,-142.90,-63.60, +JAR_O,wspd_[uli],-12.00,29.85, +JAR_O,t_[uli],-46.70,7.90,winter +JAR_O,t_[uli],-35.10,10.00,spring +JAR_O,t_[uli],-18.90,14.50,summer +JAR_O,t_[uli],-33.50,12.70,fall +THU_U,p_[ul],874.94,953.23, +THU_U,p_i,-125.06,-46.77, +THU_U,wspd_[uli],-12.00,33.52, +THU_U,t_[uli],-45.91,1.58,winter +THU_U,t_[uli],-45.70,10.07,spring +THU_U,t_[uli],-17.64,17.01,summer +THU_U,t_[uli],-35.83,8.64,fall +THU_U2,p_[ul],875.63,956.76, +THU_U2,p_i,-124.37,-43.24, +THU_U2,wspd_[uli],-12.00,30.97, +THU_U2,t_[uli],-50.90,4.42,winter +THU_U2,t_[uli],-45.73,11.36,spring +THU_U2,t_[uli],-17.57,16.11,summer +THU_U2,t_[uli],-38.17,10.34,fall +Roof_GEUS,p_[ul],990.00,1038.00, +Roof_GEUS,p_i,-10.00,38.00, +Roof_GEUS,wspd_[uli],-12.00,16.81, +Roof_GEUS,t_[uli],,,spring +Roof_GEUS,t_[uli],2.58,38.21,summer +SDL,p_[ul],714.80,820.00, +SDL,p_i,-285.20,-180.00, +SDL,wspd_[uli],-12.00,30.05, +SDL,t_[uli],-54.80,-0.10,winter +SDL,t_[uli],-26.50,10.80,summer +SDL,t_[uli],-49.90,9.80,fall +THU_L,p_[ul],898.11,980.80, +THU_L,p_i,-101.89,-19.20, +THU_L,wspd_[uli],-12.00,34.21, +THU_L,t_[uli],-45.49,4.93,winter +THU_L,t_[uli],-42.79,13.77,spring +THU_L,t_[uli],-15.51,18.35,summer +THU_L,t_[uli],-34.05,11.15,fall diff --git a/src/pypromice/test/test_percentile.py b/src/pypromice/test/test_percentile.py new file mode 100644 index 00000000..dfb84560 --- /dev/null +++ b/src/pypromice/test/test_percentile.py @@ -0,0 +1,229 @@ +import unittest +from datetime import datetime +from typing import List + +import numpy as np +import pandas as pd +import xarray as xr + +from pypromice.qc.percentiles.outlier_detector import ( + detect_outliers, + filter_data, + ThresholdBasedOutlierDetector, +) + + +class PercentileQCTestCase(unittest.TestCase): + def test_column_pattern_matches(self): + self._test_column_pattern("p_i", True) + + def test_column_pattern_no_match(self): + self._test_column_pattern("p_l", False) + + def test_column_pattern_with_prefix(self): + self._test_column_pattern("prefix_p_i", False) + + def test_column_pattern_with_suffix(self): + self._test_column_pattern("p_i_suffix", False) + + def _test_column_pattern(self, column_name: str, expected_output: bool): + season_indices = pd.DatetimeIndex( + [ + datetime(2022, 3, 1), + ] + ) + thresholds = pd.DataFrame( + [ + dict( + stid="stid", variable_pattern="p_[iu]", lo=-100, hi=100, season=None + ), + ] + ) + value_outside_range = -325 + input_data = pd.DataFrame( + index=season_indices, columns=[column_name], data=[value_outside_range] + ) + if expected_output: + expected_mask = pd.DataFrame( + index=season_indices, columns=[column_name], data=[expected_output] + ) + else: + expected_mask = pd.DataFrame(index=season_indices, columns=[], data=[]) + + mask = detect_outliers(input_data, thresholds) + + pd.testing.assert_frame_equal(expected_mask, mask) + + def test_column_pattern_multicolumns(self): + thresholds = pd.DataFrame( + [ + dict( + stid="stid", variable_pattern="p_[iu]", lo=-100, hi=100, season=None + ), + ] + ) + date_index = pd.DatetimeIndex([datetime(2022, 3, 1)]) + input_data = pd.DataFrame( + index=date_index, + data=[ + dict( + p_i=-10, + p_u=1000, + p_j=1000, + ) + ], + ) + # p_j is not in the mask because it doesn't match the pattern + expected_mask = pd.DataFrame( + index=date_index, + data=[ + dict( + p_i=False, + p_u=True, + ) + ], + ) + + mask = detect_outliers(input_data, thresholds) + + pd.testing.assert_frame_equal(expected_mask, mask) + + def test_no_season(self): + season_indices = pd.DatetimeIndex( + [ + datetime(2022, 3, 1), + datetime(2022, 8, 1), + ] + ) + thresholds = pd.DataFrame( + [ + dict(stid="stid", variable_pattern="p_i", lo=-100, hi=100, season=None), + ] + ) + input_data = pd.DataFrame(index=season_indices, columns=["p_i"], data=[0, -243]) + expected_mask = pd.DataFrame( + index=season_indices, columns=["p_i"], data=[False, True] + ) + + mask = detect_outliers(input_data, thresholds) + + pd.testing.assert_frame_equal(expected_mask, mask) + + def test_season_filter_invalid_winter_and_spring(self): + self._test_season_filter( + input_values=[0, 0, 0, 0], expected_mask=[True, True, False, False] + ) + + def test_season_filter_invalid_summer(self): + self._test_season_filter( + input_values=[-10, -10, -10, -10], expected_mask=[False, False, True, False] + ) + + def test_season_filter_valid_season_values(self): + self._test_season_filter( + input_values=[-12, -8, -1, -3], expected_mask=[False, False, False, False] + ) + + def _test_season_filter(self, input_values: List[float], expected_mask: List[bool]): + stid = "A_STID" + thresholds = pd.DataFrame( + [ + dict( + stid=stid, variable_pattern="t_i", lo=-20, hi=-10, season="winter" + ), + dict(stid=stid, variable_pattern="t_i", lo=-10, hi=-1, season="spring"), + dict(stid=stid, variable_pattern="t_i", lo=-5, hi=5, season="summer"), + dict(stid=stid, variable_pattern="t_i", lo=-10, hi=0, season="fall"), + ] + ) + season_indices = pd.DatetimeIndex( + [ + datetime(2021, 12, 1), # winter + datetime(2022, 3, 1), # spring + datetime(2022, 6, 1), # summer + datetime(2022, 9, 1), # fall + ] + ) + input_data = pd.DataFrame( + index=season_indices, columns=["t_i"], data=input_values + ) + expected_mask = pd.DataFrame( + index=season_indices, columns=["t_i"], data=expected_mask + ) + + mask = detect_outliers(input_data, thresholds) + + pd.testing.assert_frame_equal(expected_mask, mask) + + def test_remove_outliers(self): + thresholds = pd.DataFrame( + columns=[ + "stid", + "variable_pattern", + "lo", + "hi", + "season", + ], + data=[ + ["stid", "t_[iu]", -40, 0, "winter"], + ["stid", "t_[iu]", -4, 10, "summer"], + ], + ) + date_index = pd.DatetimeIndex( + [ + datetime(2022, 1, 1), + datetime(2022, 8, 1), + ] + ) + input_data = pd.DataFrame( + index=date_index, + data=[ + dict(t_i=-10, p_u=994), + dict(t_i=37, p_u=1024), + ], + ) + mask = detect_outliers(input_data, thresholds) + expected_output_data = input_data.copy() + expected_output_data[mask] = np.nan + + output_data = filter_data(input_data, thresholds) + self.assertIsNot(output_data, input_data) + pd.testing.assert_frame_equal(output_data, expected_output_data) + + +class ThresholdBasedOutlierDetectorTestCase(unittest.TestCase): + def test_default_init(self): + outlier_detector = ThresholdBasedOutlierDetector.default() + self.assertIsInstance(outlier_detector, ThresholdBasedOutlierDetector) + + def test_filter_data_aws_with_threshold(self): + stid = "NUK_K" + index = pd.period_range("2023-10-01", "2023-11-01", freq="1h") + columns = ["p_i", "t_i", "p_l", "wpsd_u", "foo"] + dataset: xr.Dataset = pd.DataFrame( + index=index, + columns=columns, + data=np.random.random((len(index), len(columns))), + ).to_xarray() + dataset = dataset.assign_attrs(dict(station_id=stid)) + outlier_detector = ThresholdBasedOutlierDetector.default() + + dataset_output = outlier_detector.filter_data(dataset) + + self.assertIsInstance(dataset_output, xr.Dataset) + self.assertSetEqual( + set(dict(dataset.items())), + set(dict(dataset_output.items())), + ) + + pass + + def test_filter_data_aws_without_threshold(self): + stid = "non_exsiting" + dataset = xr.Dataset(attrs=dict(station_id=stid)) + outlier_detector = ThresholdBasedOutlierDetector.default() + self.assertNotIn(stid, outlier_detector.thresholds.stid) + + output_dataset = outlier_detector.filter_data(dataset) + + xr.testing.assert_equal(output_dataset, dataset)