diff --git a/.github/workflows/unit_test.yml b/.github/workflows/unit_test.yml index 8f7fa440..41619708 100644 --- a/.github/workflows/unit_test.yml +++ b/.github/workflows/unit_test.yml @@ -30,4 +30,4 @@ jobs: shell: bash run: | cd $GITHUB_WORKSPACE/src/pypromice - python3 -m unittest -v process/aws.py get.py tx/tx.py + python3 -m unittest -v process/aws.py get.py tx/tx.py qc/static_qc_test.py diff --git a/.gitignore b/.gitignore index a64f5e91..065beda1 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ src/pypromice/postprocess/latest_timestamps.pkl # Output positions from transmitted GPS src/pypromice/postprocess/positions.csv + +# sqlite db files +*.db diff --git a/src/pypromice/process/L0toL1.py b/src/pypromice/process/L0toL1.py index d0b09a7d..5a209344 100644 --- a/src/pypromice/process/L0toL1.py +++ b/src/pypromice/process/L0toL1.py @@ -116,7 +116,7 @@ def toL1(L0, vars_df, T_0=273.15, tilt_threshold=-100): ds['z_boom_l'] = ds['z_boom_l'] * ((ds['t_l'] + T_0)/T_0)**0.5 # Adjust sonic ranger readings for sensitivity to air temperature ds = clip_values(ds, vars_df) - for key in ['format', 'hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef', + for key in ['hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef', 'dlr_eng_coef', 'ulr_eng_coef', 'pt_z_coef', 'pt_z_p_coef', 'pt_z_factor', 'pt_antifreeze', 'boom_azimuth', 'nodata', 'conf', 'file']: diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index 1510a757..079a7804 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -2,6 +2,8 @@ """ AWS Level 1 (L1) to Level 2 (L2) data processing """ +import logging + import numpy as np import urllib.request from urllib.error import HTTPError, URLError @@ -9,11 +11,21 @@ import os import xarray as xr +from pypromice.qc.static_qc import apply_static_qc from pypromice.process.value_clipping import clip_values +logger = logging.getLogger(__name__) -def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., - eps_clear=9.36508e-6, emissivity=0.97): +def toL2( + L1: xr.Dataset, + vars_df: pd.DataFrame, + T_0=273.15, + ews=1013.246, + ei0=6.1071, + eps_overcast=1.0, + eps_clear=9.36508e-6, + emissivity=0.97, +) -> xr.Dataset: '''Process one Level 1 (L1) product to Level 2 Parameters @@ -22,19 +34,19 @@ def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_ov Level 1 dataset vars_df : pd.DataFrame Metadata dataframe - T_0 : float, optional + T_0 : float Ice point temperature in K. The default is 273.15. - ews : float, optional - Saturation pressure (normal atmosphere) at steam point temperature. + ews : float + Saturation pressure (normal atmosphere) at steam point temperature. The default is 1013.246. - ei0 : float, optional - Saturation pressure (normal atmosphere) at ice-point temperature. The + ei0 : float + Saturation pressure (normal atmosphere) at ice-point temperature. The default is 6.1071. - eps_overcast : int, optional + eps_overcast : int Cloud overcast. The default is 1.. - eps_clear : float, optional + eps_clear : float Cloud clear. The default is 9.36508e-6. - emissivity : float, optional + emissivity : float Emissivity. The default is 0.97. Returns @@ -47,80 +59,81 @@ def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_ov ds = adjustTime(ds) # Adjust time after a user-defined csv files ds = flagNAN(ds) # Flag NaNs after a user-defined csv files ds = adjustData(ds) # Adjust data after a user-defined csv files - except Exception as e: - print('Flagging and fixing failed:') - print(e) - - T_100 = _getTempK(T_0) - ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], - T_0, T_100, ews, ei0) - + except Exception: + logger.exception('Flagging and fixing failed:') + if ds.attrs['format'] == 'TX': + ds = apply_static_qc(ds) # Detect and filter data points that seems to be static + + T_100 = _getTempK(T_0) + ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], + T_0, T_100, ews, ei0) + # Determiune cloud cover for on-ice stations if not ds.attrs['bedrock']: cc = calcCloudCoverage(ds['t_u'], T_0, eps_overcast, eps_clear, # Calculate cloud coverage - ds['dlr'], ds.attrs['station_id']) + ds['dlr'], ds.attrs['station_id']) ds['cc'] = (('time'), cc.data) else: # Default cloud cover for bedrock station for which tilt should be 0 anyway. cc = 0.8 - + # Determine surface temperature ds['t_surf'] = calcSurfaceTemperature(T_0, ds['ulr'], ds['dlr'], # Calculate surface temperature emissivity) if not ds.attrs['bedrock']: ds['t_surf'] = ds['t_surf'].where(ds['t_surf'] <= 0, other = 0) - - # Determine station position relative to sun + + # Determine station position relative to sun doy = ds['time'].to_dataframe().index.dayofyear.values # Gather variables to calculate sun pos hour = ds['time'].to_dataframe().index.hour.values minute = ds['time'].to_dataframe().index.minute.values - + if hasattr(ds, 'latitude') and hasattr(ds, 'longitude'): lat = ds.attrs['latitude'] # TODO Why is mean GPS lat lon not preferred for calcs? lon = ds.attrs['longitude'] else: lat = ds['gps_lat'].mean() lon = ds['gps_lon'].mean() - - deg2rad, rad2deg = _getRotation() # Get degree-radian conversions - phi_sensor_rad, theta_sensor_rad = calcTilt(ds['tilt_x'], ds['tilt_y'], # Calculate station tilt - deg2rad) - + + deg2rad, rad2deg = _getRotation() # Get degree-radian conversions + phi_sensor_rad, theta_sensor_rad = calcTilt(ds['tilt_x'], ds['tilt_y'], # Calculate station tilt + deg2rad) + Declination_rad = calcDeclination(doy, hour, minute) # Calculate declination - HourAngle_rad = calcHourAngle(hour, minute, lon) # Calculate hour angle + HourAngle_rad = calcHourAngle(hour, minute, lon) # Calculate hour angle ZenithAngle_rad, ZenithAngle_deg = calcZenith(lat, Declination_rad, # Calculate zenith - HourAngle_rad, deg2rad, + HourAngle_rad, deg2rad, rad2deg) - + # Correct Downwelling shortwave radiation - DifFrac = 0.2 + 0.8 * cc + DifFrac = 0.2 + 0.8 * cc CorFac_all = calcCorrectionFactor(Declination_rad, phi_sensor_rad, # Calculate correction - theta_sensor_rad, HourAngle_rad, - ZenithAngle_rad, ZenithAngle_deg, - lat, DifFrac, deg2rad) + theta_sensor_rad, HourAngle_rad, + ZenithAngle_rad, ZenithAngle_deg, + lat, DifFrac, deg2rad) ds['dsr_cor'] = ds['dsr'].copy(deep=True) * CorFac_all # Apply correction AngleDif_deg = calcAngleDiff(ZenithAngle_rad, HourAngle_rad, # Calculate angle between sun and sensor - phi_sensor_rad, theta_sensor_rad) - + phi_sensor_rad, theta_sensor_rad) + ds['albedo'], OKalbedos = calcAlbedo(ds['usr'], ds['dsr_cor'], # Determine albedo AngleDif_deg, ZenithAngle_deg) # Correct upwelling and downwelling shortwave radiation sunonlowerdome =(AngleDif_deg >= 90) & (ZenithAngle_deg <= 90) # Determine when sun is in FOV of lower sensor, assuming sensor measures only diffuse radiation - ds['dsr_cor'] = ds['dsr_cor'].where(~sunonlowerdome, + ds['dsr_cor'] = ds['dsr_cor'].where(~sunonlowerdome, other=ds['dsr'] / DifFrac) # Apply to downwelling ds['usr_cor'] = ds['usr'].copy(deep=True) - ds['usr_cor'] = ds['usr_cor'].where(~sunonlowerdome, + ds['usr_cor'] = ds['usr_cor'].where(~sunonlowerdome, other=ds['albedo'] * ds['dsr'] / DifFrac) # Apply to upwelling bad = (ZenithAngle_deg > 95) | (ds['dsr_cor'] <= 0) | (ds['usr_cor'] <= 0) # Set to zero for solar zenith angles larger than 95 deg or either values are (less than) zero ds['dsr_cor'][bad] = 0 ds['usr_cor'][bad] = 0 - ds['dsr_cor'] = ds['usr_cor'].copy(deep=True) / ds['albedo'] # Correct DWR using more reliable USWR when sun not in sight of upper sensor - ds['albedo'] = ds['albedo'].where(OKalbedos) #TODO remove? - + ds['dsr_cor'] = ds['usr_cor'].copy(deep=True) / ds['albedo'] # Correct DWR using more reliable USWR when sun not in sight of upper sensor + ds['albedo'] = ds['albedo'].where(OKalbedos) #TODO remove? + # Remove data where TOA shortwave radiation invalid - isr_toa = calcTOA(ZenithAngle_deg, ZenithAngle_rad) # Calculate TOA shortwave radiation + isr_toa = calcTOA(ZenithAngle_deg, ZenithAngle_rad) # Calculate TOA shortwave radiation TOA_crit_nopass = (ds['dsr_cor'] > (0.9 * isr_toa + 10)) # Determine filter ds['dsr_cor'][TOA_crit_nopass] = np.nan # Apply filter and interpolate ds['usr_cor'][TOA_crit_nopass] = np.nan @@ -129,24 +142,24 @@ def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_ov # # Check sun position # sundown = ZenithAngle_deg >= 90 - # _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass) + # _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass) if hasattr(ds, 'correct_precip'): # Correct precipitation precip_flag=ds.attrs['correct_precip'] else: precip_flag=True - if ~ds['precip_u'].isnull().all() and precip_flag: - ds['precip_u_cor'], ds['precip_u_rate'] = correctPrecip(ds['precip_u'], + if ~ds['precip_u'].isnull().all() and precip_flag: + ds['precip_u_cor'], ds['precip_u_rate'] = correctPrecip(ds['precip_u'], ds['wspd_u']) - if ds.attrs['number_of_booms']==2: + if ds.attrs['number_of_booms']==2: ds['rh_l_cor'] = correctHumidity(ds['rh_l'], ds['t_l'], # Correct relative humidity - T_0, T_100, ews, ei0) - + T_0, T_100, ews, ei0) + if ~ds['precip_l'].isnull().all() and precip_flag: # Correct precipitation - ds['precip_l_cor'], ds['precip_l_rate']= correctPrecip(ds['precip_l'], + ds['precip_l_cor'], ds['precip_l_rate']= correctPrecip(ds['precip_l'], ds['wspd_l']) - - if hasattr(ds,'t_i'): + + if hasattr(ds,'t_i'): if ~ds['t_i'].isnull().all(): # Instantaneous msg processing ds['rh_i_cor'] = correctHumidity(ds['rh_i'], ds['t_i'], # Correct relative humidity T_0, T_100, ews, ei0) @@ -154,12 +167,12 @@ def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_ov ds = clip_values(ds, vars_df) return ds -def flagNAN(ds_in, +def flagNAN(ds_in, flag_url='https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/flags/', flag_dir='local/flags/'): - '''Read flagged data from .csv file. For each variable, and downstream + '''Read flagged data from .csv file. For each variable, and downstream dependents, flag as invalid (or other) if set in the flag .csv - + Parameters ---------- ds_in : xr.Dataset @@ -168,7 +181,7 @@ def flagNAN(ds_in, URL to directory where .csv flag files can be found flag_dir : str File directory where .csv flag files can be found - + Returns ------- ds : xr.Dataset @@ -176,20 +189,20 @@ def flagNAN(ds_in, ''' ds = ds_in.copy() df = None - + df = _getDF(flag_url + ds.attrs["station_id"] + ".csv", os.path.join(flag_dir, ds.attrs["station_id"] + ".csv"), # download = False, # only for working on draft local flag'n'fix files ) - + if isinstance(df, pd.DataFrame): df.t0 = pd.to_datetime(df.t0).dt.tz_localize(None) df.t1 = pd.to_datetime(df.t1).dt.tz_localize(None) - - if df.shape[0] > 0: + + if df.shape[0] > 0: for i in df.index: t0, t1, avar = df.loc[i,['t0','t1','variable']] - + if avar == '*': # Set to all vars if var is "*" varlist = list(ds.keys()) @@ -197,32 +210,32 @@ def flagNAN(ds_in, # Reads as regex if contains "*" and other characters (e.g. 't_i_.*($)') varlist = pd.DataFrame(columns = list(ds.keys())).filter(regex=(avar)).columns else: - varlist = avar.split() - + varlist = avar.split() + if 'time' in varlist: varlist.remove("time") - + # Set to all times if times are "n/a" - if pd.isnull(t0): + if pd.isnull(t0): t0 = ds['time'].values[0] - if pd.isnull(t1): + if pd.isnull(t1): t1 = ds['time'].values[-1] - + for v in varlist: if v in list(ds.keys()): - print('---> flagging',t0, t1, v) + logger.info(f'---> flagging {t0} {t1} {v}') ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) else: - print('---> could not flag', v,', not in dataset') + logger.info(f'---> could not flag {v} not in dataset') return ds -def adjustTime(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', +def adjustTime(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', var_list=[], skip_var=[]): '''Read adjustment data from .csv file. Only applies the "time_shift" adjustment - + Parameters ---------- ds : xr.Dataset @@ -231,7 +244,7 @@ def adjustTime(ds, URL to directory where .csv adjustment files can be found adj_dir : str File directory where .csv adjustment files can be found - + Returns ------- ds : xr.Dataset @@ -239,22 +252,20 @@ def adjustTime(ds, ''' ds_out = ds.copy() adj_info=None - + adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", - os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), - # download = False, - verbose = False) - + os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"),) + if isinstance(adj_info, pd.DataFrame): - + if "time_shift" in adj_info.adjust_function.values: time_shifts = adj_info.loc[adj_info.adjust_function == "time_shift", :] # if t1 is left empty, then adjustment is applied until the end of the file time_shifts.loc[time_shifts.t1.isnull(), "t1"] = pd.to_datetime(ds_out.time.values[-1]).isoformat() time_shifts.t0 = pd.to_datetime(time_shifts.t0).dt.tz_localize(None) time_shifts.t1 = pd.to_datetime(time_shifts.t1).dt.tz_localize(None) - + for t0, t1, val in zip( time_shifts.t0, time_shifts.t1, @@ -262,12 +273,12 @@ def adjustTime(ds, ): ds_shifted = ds_out.sel(time=slice(t0,t1)) ds_shifted['time'] = ds_shifted.time.values + pd.Timedelta(days = val) - + # here we concatenate what was before the shifted part, the shifted # part and what was after the shifted part - # note that if any data was already present in the target period + # note that if any data was already present in the target period # (where the data lands after the shift), it is overwritten - + ds_out = xr.concat( ( ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), @@ -284,13 +295,13 @@ def adjustTime(ds, return ds_out -def adjustData(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', +def adjustData(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', var_list=[], skip_var=[]): - '''Read adjustment data from .csv file. For each variable, and downstream + '''Read adjustment data from .csv file. For each variable, and downstream dependents, adjust data accordingly if set in the adjustment .csv - + Parameters ---------- ds : xr.Dataset @@ -299,30 +310,30 @@ def adjustData(ds, URL to directory where .csv adjustment files can be found adj_dir : str File directory where .csv adjustment files can be found - + Returns ------- ds : xr.Dataset Level 0 data with flagged data ''' ds_out = ds.copy() - adj_info=None + adj_info=None adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), # download = False, # only for working on draft local flag'n'fix files ) - + if isinstance(adj_info, pd.DataFrame): # removing potential time shifts from the adjustment list adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :] - + # if t1 is left empty, then adjustment is applied until the end of the file adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0] adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1] # making all timestamps timezone naive (compatibility with xarray) adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None) adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None) - + # if "*" is in the variable name then we interpret it as regex selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*") for ind in adj_info.loc[selec, :].index: @@ -333,23 +344,23 @@ def adjustData(ds, line_template.name = adj_info.index.max() + 1 adj_info = pd.concat((adj_info, line_template.to_frame().transpose()),axis=0) adj_info = adj_info.drop(labels=ind, axis=0) - + adj_info = adj_info.sort_values(by=["variable", "t0"]) adj_info.set_index(["variable", "t0"], drop=False, inplace=True) - + if len(var_list) == 0: var_list = np.unique(adj_info.variable) else: adj_info = adj_info.loc[np.isin(adj_info.variable, var_list), :] var_list = np.unique(adj_info.variable) - + if len(skip_var) > 0: adj_info = adj_info.loc[~np.isin(adj_info.variable, skip_var), :] var_list = np.unique(adj_info.variable) - + for var in var_list: if var not in list(ds_out.keys()): - print('could not adjust',var,', not in dataset') + logger.info(f'could not adjust {var } not in dataset') continue for t0, t1, func, val in zip( adj_info.loc[var].t0, @@ -359,7 +370,7 @@ def adjustData(ds, ): if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])): continue - print('--->',t0, t1, var, func, val) + logger.info(f'---> {t0} {t1} {var} {func} {val}') if func == "add": ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val # flagging adjusted values @@ -368,7 +379,7 @@ def adjustData(ds, # msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull() # ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 - + if func == "multiply": ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val if "DW" in var: @@ -379,16 +390,16 @@ def adjustData(ds, # msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull() # ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 - + if func == "min_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values tmp[tmp < val] = np.nan - + if func == "max_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values tmp[tmp > val] = np.nan ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - + if func == "upper_perc_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100) @@ -398,9 +409,9 @@ def adjustData(ds, values_month = tmp.loc[msk].values values_month[values_month < df_w.loc[m_start]] = np.nan tmp.loc[msk] = values_month - + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "biweekly_upper_range_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max() @@ -418,33 +429,33 @@ def adjustData(ds, tmp.loc[msk] = values_month # updating original pandas ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "hampel_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))] tmp = _hampel(tmp, k=7 * 24, t0=val) ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "grad_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff() tmp[np.roll(msk.abs() > val, -1)] = np.nan ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - + if "swap_with_" in func: var2 = func[10:] val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy() val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy() ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2 - + if func == "rotate": ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360 - + return ds_out def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): '''Calculate cloud cover from T and T_0 - + Parameters ---------- T : xarray.DataArray @@ -462,7 +473,7 @@ def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): overcast and cloud clear assumptions are pre-defined. Currently KAN_M and KAN_U are special cases, but this will need to be done for all stations eventually - + Returns ------- cc : xarray.DataArray @@ -475,17 +486,17 @@ def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): LR_overcast = 305 + 4*T LR_clear = 220 + 3.5*T else: - LR_overcast = eps_overcast * 5.67e-8 *(T + T_0)**4 - LR_clear = eps_clear * 5.67e-8 * (T + T_0)**6 + LR_overcast = eps_overcast * 5.67e-8 *(T + T_0)**4 + LR_clear = eps_clear * 5.67e-8 * (T + T_0)**6 cc = (dlr - LR_clear) / (LR_overcast - LR_clear) cc[cc > 1] = 1 cc[cc < 0] = 0 return cc def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): - '''Calculate surface temperature from air temperature, upwelling and + '''Calculate surface temperature from air temperature, upwelling and downwelling radiation and emissivity - + Parameters ---------- T_0 : xarray.DataArray @@ -496,7 +507,7 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): Downwelling longwave radiation emissivity : int Assumed emissivity - + Returns ------- xarray.DataArray @@ -504,10 +515,10 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): ''' t_surf = ((ulr - (1 - emissivity) * dlr) / emissivity / 5.67e-8)**0.25 - T_0 return t_surf - + def calcTilt(tilt_x, tilt_y, deg2rad): '''Calculate station tilt - + Parameters ---------- tilt_x : xarray.DataArray @@ -516,7 +527,7 @@ def calcTilt(tilt_x, tilt_y, deg2rad): Y tilt inclinometer measurements deg2rad : float Degrees to radians conversion - + Returns ------- phi_sensor_rad : xarray.DataArray @@ -527,12 +538,12 @@ def calcTilt(tilt_x, tilt_y, deg2rad): # Tilt as radians tx = tilt_x * deg2rad ty = tilt_y * deg2rad - + # Calculate cartesian coordinates X = np.sin(tx) * np.cos(tx) * np.sin(ty)**2 + np.sin(tx) * np.cos(ty)**2 Y = np.sin(ty) * np.cos(ty) * np.sin(tx)**2 + np.sin(ty) * np.cos(tx)**2 Z = np.cos(tx) * np.cos(ty) + np.sin(tx)**2 * np.sin(ty)**2 - + # Calculate spherical coordinates phi_sensor_rad = -np.pi /2 - np.arctan(Y/X) phi_sensor_rad[X > 0] += np.pi @@ -541,10 +552,10 @@ def calcTilt(tilt_x, tilt_y, deg2rad): phi_sensor_rad[phi_sensor_rad < 0] += 2*np.pi # Total tilt of the sensor, i.e. 0 when horizontal - theta_sensor_rad = np.arccos(Z / (X**2 + Y**2 + Z**2)**0.5) + theta_sensor_rad = np.arccos(Z / (X**2 + Y**2 + Z**2)**0.5) # phi_sensor_deg = phi_sensor_rad * rad2deg #TODO take these out if not needed # theta_sensor_deg = theta_sensor_rad * rad2deg - return phi_sensor_rad, theta_sensor_rad + return phi_sensor_rad, theta_sensor_rad def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO figure out if T replicate is needed '''Correct relative humidity using Groff & Gratch method, where values are @@ -564,15 +575,15 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f Saturation pressure (normal atmosphere) at steam point temperature ei0 : float Saturation pressure (normal atmosphere) at ice-point temperature - + Returns ------- rh_cor : xarray.DataArray Corrected relative humidity - ''' - # Convert to hPa (Groff & Gratch) + ''' + # Convert to hPa (Groff & Gratch) e_s_wtr = 10**(-7.90298 * (T_100 / (T + T_0) - 1) - + 5.02808 * np.log10(T_100 / (T + T_0)) + + 5.02808 * np.log10(T_100 / (T + T_0)) - 1.3816E-7 * (10**(11.344 * (1 - (T + T_0) / T_100)) - 1) + 8.1328E-3 * (10**(-3.49149 * (T_100/(T + T_0) - 1)) -1) + np.log10(ews)) @@ -580,38 +591,38 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f - 3.56654 * np.log10(T_0 / (T + T_0)) + 0.876793 * (1 - (T + T_0) / T_0) + np.log10(ei0)) - - # Define freezing point. Why > -100? - freezing = (T < 0) & (T > -100).values - # Set to Groff & Gratch values when freezing, otherwise just rh - rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) - return rh_cor + # Define freezing point. Why > -100? + freezing = (T < 0) & (T > -100).values + + # Set to Groff & Gratch values when freezing, otherwise just rh + rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) + return rh_cor def correctPrecip(precip, wspd): - '''Correct precipitation with the undercatch correction method used in + '''Correct precipitation with the undercatch correction method used in Yang et al. (1999) and Box et al. (2022), based on Goodison et al. (1998) - - Yang, D., Ishida, S., Goodison, B. E., and Gunther, T.: Bias correction of - daily precipitation measurements for Greenland, + + Yang, D., Ishida, S., Goodison, B. E., and Gunther, T.: Bias correction of + daily precipitation measurements for Greenland, https://doi.org/10.1029/1998jd200110, 1999. - + Box, J., Wehrle, A., van As, D., Fausto, R., Kjeldsen, K., Dachauer, A., - Ahlstrom, A. P., and Picard, G.: Greenland Ice Sheet rainfall, heat and - albedo feedback imapacts from the Mid-August 2021 atmospheric river, - Geophys. Res. Lett. 49 (11), e2021GL097356, + Ahlstrom, A. P., and Picard, G.: Greenland Ice Sheet rainfall, heat and + albedo feedback imapacts from the Mid-August 2021 atmospheric river, + Geophys. Res. Lett. 49 (11), e2021GL097356, https://doi.org/10.1029/2021GL097356, 2022. - - Goodison, B. E., Louie, P. Y. T., and Yang, D.: Solid Precipitation + + Goodison, B. E., Louie, P. Y. T., and Yang, D.: Solid Precipitation Measurement Intercomparison, WMO, 1998 - + Parameters ---------- precip : xarray.DataArray Cumulative precipitation measurements wspd : xarray.DataArray Wind speed measurements - + Returns ------- precip_cor : xarray.DataArray @@ -625,27 +636,27 @@ def correctPrecip(precip, wspd): # Fix all values below 1.02 to 1.02 corr = corr.where(corr>1.02, other=1.02) - # Fill nan values in precip with preceding value - precip = precip.ffill(dim='time') - + # Fill nan values in precip with preceding value + precip = precip.ffill(dim='time') + # Calculate precipitation rate precip_rate = precip.diff(dim='time', n=1) - + # Apply correction to rate precip_rate = precip_rate*corr - + # Flag rain bucket reset - precip_rate = precip_rate.where(precip_rate>-0.01, other=np.nan) + precip_rate = precip_rate.where(precip_rate>-0.01, other=np.nan) b = precip_rate.to_dataframe('precip_flag').notna().to_xarray() - + # Get corrected cumulative precipitation, reset if rain bucket flag precip_cor = precip_rate.cumsum()-precip_rate.cumsum().where(~b['precip_flag']).ffill(dim='time').fillna(0).astype(float) - + return precip_cor, precip_rate - + def calcDeclination(doy, hour, minute): '''Calculate sun declination based on time - + Parameters ---------- doy : int @@ -654,7 +665,7 @@ def calcDeclination(doy, hour, minute): Hour of day minute : int Minute of hour - + Returns ------- float @@ -673,7 +684,7 @@ def calcHourAngle(hour, minute, lon): '''Calculate hour angle of sun based on time and longitude. Make sure that time is set to UTC and longitude is positive when west. Hour angle should be 0 at noon - + Parameters ---------- hour : int @@ -682,7 +693,7 @@ def calcHourAngle(hour, minute, lon): Minute of hour lon : float Longitude - + Returns ------- float @@ -692,15 +703,15 @@ def calcHourAngle(hour, minute, lon): # ; - 15.*timezone/360.) def calcDirectionDeg(HourAngle_rad): #TODO remove if not plan to use this - '''Calculate sun direction as degrees. This is an alternative to + '''Calculate sun direction as degrees. This is an alternative to _calcHourAngle that is currently not implemented into the offical L0>>L3 workflow. Here, 180 degrees is at noon (NH), as opposed to HourAngle - + Parameters ---------- HourAngle_rad : float Sun hour angle in radians - + Returns ------- DirectionSun_deg @@ -713,7 +724,7 @@ def calcDirectionDeg(HourAngle_rad): #T def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): '''Calculate sun zenith in radians and degrees - + Parameters ---------- lat : float @@ -726,7 +737,7 @@ def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): Degrees to radians conversion rad2deg : float Radians to degrees conversion - + Returns ------- ZenithAngle_rad : float @@ -739,32 +750,32 @@ def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): * np.cos(HourAngle_rad) + np.sin(lat * deg2rad) * np.sin(Declination_rad)) - - ZenithAngle_deg = ZenithAngle_rad * rad2deg + + ZenithAngle_deg = ZenithAngle_rad * rad2deg return ZenithAngle_rad, ZenithAngle_deg -def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, +def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, theta_sensor_rad): '''Calculate angle between sun and upper sensor (to determine when sun is in sight of upper sensor - + Parameters ---------- ZenithAngle_rad : float Zenith angle in radians - HourAngle_rad : float + HourAngle_rad : float Sun hour angle in radians phi_sensor_rad : xarray.DataArray Spherical tilt coordinates theta_sensor_rad : xarray.DataArray Total tilt of sensor, where 0 is horizontal - + Returns ------- float Angle between sun and sensor ''' - return 180 / np.pi * np.arccos(np.sin(ZenithAngle_rad) + return 180 / np.pi * np.arccos(np.sin(ZenithAngle_rad) * np.cos(HourAngle_rad + np.pi) * np.sin(theta_sensor_rad) * np.cos(phi_sensor_rad) @@ -773,12 +784,12 @@ def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, * np.sin(theta_sensor_rad) * np.sin(phi_sensor_rad) + np.cos(ZenithAngle_rad) - * np.cos(theta_sensor_rad)) + * np.cos(theta_sensor_rad)) def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): - '''Calculate surface albedo based on upwelling and downwelling shorwave + '''Calculate surface albedo based on upwelling and downwelling shorwave flux, the angle between the sun and sensor, and the sun zenith - + Parameters ---------- usr : xarray.DataArray @@ -789,7 +800,7 @@ def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): Angle between sun and sensor in degrees ZenithAngle_deg: float Zenith angle in degrees - + Returns ------- albedo : xarray.DataArray @@ -797,55 +808,55 @@ def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): OKalbedos : xarray.DataArray Valid albedo measurements ''' - albedo = usr / dsr_cor - + albedo = usr / dsr_cor + # NaN bad data - OKalbedos = (AngleDif_deg < 70) & (ZenithAngle_deg < 70) & (albedo < 1) & (albedo > 0) - albedo[~OKalbedos] = np.nan - - # Interpolate all. Note "use_coordinate=False" is used here to force - # comparison against the GDL code when that is run with *only* a TX file. - # Should eventually set to default (True) and interpolate based on time, - # not index. - albedo = albedo.interpolate_na(dim='time', use_coordinate=False) + OKalbedos = (AngleDif_deg < 70) & (ZenithAngle_deg < 70) & (albedo < 1) & (albedo > 0) + albedo[~OKalbedos] = np.nan + + # Interpolate all. Note "use_coordinate=False" is used here to force + # comparison against the GDL code when that is run with *only* a TX file. + # Should eventually set to default (True) and interpolate based on time, + # not index. + albedo = albedo.interpolate_na(dim='time', use_coordinate=False) albedo = albedo.ffill(dim='time').bfill(dim='time') #TODO remove this line and one above? return albedo, OKalbedos - + def calcTOA(ZenithAngle_deg, ZenithAngle_rad): '''Calculate incoming shortwave radiation at the top of the atmosphere, accounting for sunset periods - + Parameters ---------- ZenithAngle_deg : float Zenith angle in degrees ZenithAngle_rad : float Zenith angle in radians - + Returns ------- isr_toa : float Incoming shortwave radiation at the top of the atmosphere ''' sundown = ZenithAngle_deg >= 90 - + # Incoming shortware radiation at the top of the atmosphere - isr_toa = 1372 * np.cos(ZenithAngle_rad) + isr_toa = 1372 * np.cos(ZenithAngle_rad) isr_toa[sundown] = 0 - return isr_toa - -def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, - HourAngle_rad, ZenithAngle_rad, ZenithAngle_deg, - lat, DifFrac, deg2rad): - '''Calculate correction factor for direct beam radiation, as described + return isr_toa + +def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, + HourAngle_rad, ZenithAngle_rad, ZenithAngle_deg, + lat, DifFrac, deg2rad): + '''Calculate correction factor for direct beam radiation, as described here: http://solardat.uoregon.edu/SolarRadiationBasics.html - + Offset correction (where solar zenith angles larger than 110 degrees) not implemented as it should not improve the accuracy of well-calibrated instruments. It would go something like this: ds['dsr'] = ds['dsr'] - ds['dwr_offset'] SRout = SRout - SRout_offset - + Parameters ---------- Declination_rad : float @@ -854,11 +865,11 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, Spherical tilt coordinates theta_sensor_rad : xarray.DataArray Total tilt of sensor, where 0 is horizontal - HourAngle_rad : float + HourAngle_rad : float Sun hour angle in radians - ZenithAngle_rad : float + ZenithAngle_rad : float Zenith angle in radians - ZenithAngle_deg : float + ZenithAngle_deg : float Zenith Angle in degrees lat : float Latitude @@ -866,7 +877,7 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, Fractional cloud cover deg2rad : float Degrees to radians conversion - + Returns ------- CorFac_all : xarray.DataArray @@ -891,20 +902,20 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, * np.sin(theta_sensor_rad) \ * np.sin(phi_sensor_rad + np.pi) \ * np.sin(HourAngle_rad) \ - + CorFac = np.cos(ZenithAngle_rad) / CorFac # sun out of field of view upper sensor CorFac[(CorFac < 0) | (ZenithAngle_deg > 90)] = 1 - + # Calculating ds['dsr'] over a horizontal surface corrected for station/sensor tilt CorFac_all = CorFac / (1 - DifFrac + CorFac * DifFrac) - + return CorFac_all -def _getDF(flag_url, flag_file, download=True, verbose=True): - '''Get dataframe from flag or adjust file. First attempt to retrieve from - URL. If this fails then attempt to retrieve from local file - +def _getDF(flag_url, flag_file, download=True): + '''Get dataframe from flag or adjust file. First attempt to retrieve from + URL. If this fails then attempt to retrieve from local file + Parameters ---------- flag_url : str @@ -913,59 +924,53 @@ def _getDF(flag_url, flag_file, download=True, verbose=True): Local path to file download : bool Flag to download file from URL - + Returns ------- df : pd.DataFrame Flag or adjustment dataframe ''' - + # Download local copy as csv if download==True: - os.makedirs(os.path.dirname(flag_file), exist_ok = True) - + os.makedirs(os.path.dirname(flag_file), exist_ok = True) + try: urllib.request.urlretrieve(flag_url, flag_file) - if verbose: print('Downloaded a', - flag_file.split('/')[-2][:-1], - f'file to {flag_file}') + logger.info(f"Downloaded a {flag_file.split('/')[-2][:-1],} file to {flag_file}") except (HTTPError, URLError) as e: - if verbose: print('Unable to download', - flag_file.split('/')[-2][:-1], - f'file, using local file: {flag_file}') + logger.info(f"Unable to download {flag_file.split('/')[-2][:-1],} file, using local file: {flag_file}") else: - if verbose: print('Using local', - flag_file.split('/')[-2][:-1], - f'file: {flag_file}') + logger.info(f"Using local {flag_file.split('/')[-2][:-1],} file: {flag_file}") if os.path.isfile(flag_file): df = pd.read_csv( - flag_file, - comment="#", + flag_file, + comment="#", skipinitialspace=True, - ).dropna(how='all', axis='rows') + ).dropna(how='all', axis='rows') else: df=None - if verbose: print('No', flag_file.split('/')[-2][:-1], 'file to read.') + logger.info(f"No {flag_file.split('/')[-2][:-1]} file to read.") return df def _hampel(vals_orig, k=7*24, t0=3): '''Hampel filter - + Parameters ---------- vals : pd.DataSeries Series of values from which to remove outliers k : int - Size of window, including the sample. For example, 7 is equal to 3 on + Size of window, including the sample. For example, 7 is equal to 3 on either side of value. The default is 7*24. t0 : int Threshold value. The default is 3. ''' #Make copy so original not edited vals=vals_orig.copy() - + #Hampel Filter L= 1.4826 rolling_median=vals.rolling(k).median() @@ -975,11 +980,11 @@ def _hampel(vals_orig, k=7*24, t0=3): outlier_idx=difference>threshold outlier_idx[0:round(k/2)]=False vals.loc[outlier_idx]=np.nan - return(vals) + return(vals) def _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass): '''Check sun position - + Parameters ---------- ds : xarray.Dataset @@ -1005,15 +1010,15 @@ def _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass): print('Mean net SR change by corrections:', (ds['dsr_cor']-ds['usr_cor']-ds['dsr']+ds['usr']).sum().values/valid.values, "W/m2") - + def _getTempK(T_0): #TODO same as L2toL3._getTempK() '''Return steam point temperature in Kelvins - + Parameters ---------- T_0 : float Ice point temperature in K - + Returns ------- float @@ -1021,11 +1026,11 @@ def _getTempK(T_0): # return T_0+100 def _getRotation(): #TODO same as L2toL3._getRotation() - '''Return degrees-to-radians and radians-to-degrees''' + '''Return degrees-to-radians and radians-to-degrees''' deg2rad = np.pi / 180 rad2deg = 1 / deg2rad return deg2rad, rad2deg -if __name__ == "__main__": - # unittest.main() - pass +if __name__ == "__main__": + # unittest.main() + pass diff --git a/src/pypromice/process/aws.py b/src/pypromice/process/aws.py index 4fc895ed..61ba6a5d 100644 --- a/src/pypromice/process/aws.py +++ b/src/pypromice/process/aws.py @@ -160,15 +160,12 @@ def writeArr(self, outpath): if not os.path.isdir(outdir): os.mkdir(outdir) - f = [l.attrs['format'] for l in self.L0] - if all(f): - col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], - self.L0[0].attrs['format'], - self.L3.attrs['bedrock']) - else: - col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], - None, - self.L3.attrs['bedrock']) + col_names = getColNames( + self.vars, + self.L3.attrs['number_of_booms'], + self.L3.attrs['format'], + self.L3.attrs['bedrock'], + ) t = int(pd.Timedelta((self.L3['time'][1] - self.L3['time'][0]).values).total_seconds()) logger.info('Writing to files...') diff --git a/src/pypromice/qc/__init__.py b/src/pypromice/qc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/pypromice/qc/static_qc.py b/src/pypromice/qc/static_qc.py new file mode 100644 index 00000000..6f9eb5ef --- /dev/null +++ b/src/pypromice/qc/static_qc.py @@ -0,0 +1,142 @@ +import logging + +import numpy as np +import pandas as pd +import xarray as xr +from typing import Mapping, Optional, Union + +__all__ = [ + "apply_static_qc", + "find_static_regions", + "count_consecutive_static_values", + "count_consecutive_true", +] + +logger = logging.getLogger(__name__) + +DEFAULT_VARIABLE_THRESHOLDS = { + "t": {"max_diff": 0.0001, "period": 2}, + "p": {"max_diff": 0.0001, "period": 2}, + # Relative humidity can be very stable around 100%. + #"rh": {"max_diff": 0.0001, "period": 2}, +} + + +def apply_static_qc( + ds: xr.Dataset, + variable_thresholds: Optional[Mapping] = None, +) -> xr.Dataset: + """ + Detect and filter data points that seems to be static within a certain period. + + TODO: It could be nice to have a reference to the logger or description of the behaviour here. + The AWS logger program is know to return the last successfully read value if it fails reading from the sensor. + + Parameters + ---------- + ds : xr.Dataset + Level 1 datset + variable_thresholds : Mapping + Define threshold dict to hold limit values, and the difference values. + Limit values indicate how much a variable has to change to the previous value + period is how many hours a value can stay the same without being set to NaN + * are used to calculate and define all limits, which are then applied to *_u, *_l and *_i + + Returns + ------- + ds_out : xr.Dataset + Level 1 dataset with difference outliers set to NaN + """ + + # the differenceQC is not done on the Windspeed + # Optionally examine flagged data by setting make_plots to True + # This is best done by running aws.py directly and setting 'test_station' + # Plots will be shown before and after flag removal for each var + + df = ds.to_dataframe() # Switch to pandas + + if variable_thresholds is None: + variable_thresholds = DEFAULT_VARIABLE_THRESHOLDS + + logger.debug(f"Running apply_static_qc using {variable_thresholds}") + + for k in variable_thresholds.keys(): + var_all = [ + k + "_u", + k + "_l", + k + "_i", + ] # apply to upper, lower boom, and instant + max_diff = variable_thresholds[k]["max_diff"] # loading static limit + period = variable_thresholds[k]["period"] # loading diff period + + for v in var_all: + if v in df: + mask = find_static_regions(df[v], period, max_diff) + n_masked = mask.sum() + n_samples = len(mask) + logger.debug( + f"Applying static QC in {v}. Filtering {n_masked}/{n_samples} samples" + ) + # setting outliers to NaN + df.loc[mask, v] = np.nan + + # Back to xarray, and re-assign the original attrs + ds_out = df.to_xarray() + ds_out = ds_out.assign_attrs(ds.attrs) # Dataset attrs + for x in ds_out.data_vars: # variable-specific attrs + ds_out[x].attrs = ds[x].attrs + + return ds_out + + +def find_static_regions( + data: pd.Series, + min_repeats: int, + max_diff: float, +) -> pd.Series: + """ + Algorithm that ensures values can stay the same within the outliers_mask + """ + consecutive_true_df = count_consecutive_static_values(data, max_diff) + static_regions = consecutive_true_df >= min_repeats + # Ignore entries which already nan in the input data + static_regions[data.isna()] = False + return static_regions + + +def count_consecutive_static_values( + data: pd.Series, + max_diff: float, +) -> pd.Series: + diff = data.ffill().diff().abs() # forward filling all NaNs! + mask: pd.Series = diff < max_diff + return count_consecutive_true(mask) + + +def count_consecutive_true( + series: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + """ + Convert boolean series to integer series where the values represent the number of connective true values. + + Examples + -------- + >>> count_consecutive_true(pd.Series([False, True, False, False, True, True, True, False, True])) + pd.Series([0, 1, 0, 0, 1, 2, 3, 0, 1]) + + Parameters + ---------- + series + Boolean pandas Series or DataFrame + + Returns + ------- + consecutive_true_count + Integer pandas Series or DataFrame with values representing the number of connective true values. + + """ + # assert series.dtype == bool + cumsum = series.cumsum() + is_first = series.astype("int").diff() == 1 + offset = (is_first * cumsum).replace(0, np.nan).fillna(method="ffill").fillna(0) + return ((cumsum - offset + 1) * series).astype("int") diff --git a/src/pypromice/qc/static_qc_test.py b/src/pypromice/qc/static_qc_test.py new file mode 100644 index 00000000..e9aac828 --- /dev/null +++ b/src/pypromice/qc/static_qc_test.py @@ -0,0 +1,146 @@ +import unittest + +import numpy as np +import numpy.testing +import pandas as pd + +from pypromice.qc.static_qc import find_static_regions + + +class StaticQATestCase(unittest.TestCase): + def test_1_hour_static(self): + self._test_1_hour_repeat(10) + + def test_1_hour_second_index(self): + self._test_1_hour_repeat(0) + + def test_1_hour_last_index(self): + self._test_1_hour_repeat(-2) + + def _test_1_hour_repeat(self, index: int): + self.assertTrue(index < -1 or index >= 0) + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index + 1] = input_series[index] + min_repeats = 1 + expected_output = input_series.map(lambda _: False) + expected_output[index + 1] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_no_static_period(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + min_repeats = 1 + expected_output = input_series.map(lambda _: False) + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_static_period_longer_than_period_threshold(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" + ) + index_start = 23 + index_end = 33 + min_repeats = 4 + expected_filter_start = 27 + expected_filter_end = 33 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:index_end] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + expected_output[expected_filter_start:expected_filter_end] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_period_threshold_longer_than_static_period(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" + ) + index_start = 23 + index_end = 27 + min_repeats = 10 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:index_end] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_static_period_at_the_end(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" + ) + index_start = 23 + min_repeats = 4 + expected_filter_start = 27 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + expected_output[expected_filter_start:] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_dont_filter_nan_values(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + input_series = pd.Series( + index=time_range, data=np.zeros_like(time_range, dtype="float") + ) + min_repeats = 4 + input_series[:] = np.nan + input_series[9] = -11 + input_series[10:12] = -10 + input_series[15] = -9 + # There are >=4 repeats if the nan values are forward filled. [10:15] == -10 + # The output mask shouldn't filter nan values. + expected_output = input_series.map(lambda _: False) + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) + + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + + def test_series_with_nan_values_between_static_values(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + values = np.zeros_like(time_range, dtype="float") + values[:] = np.nan + values[9] = -11 + values[16] = -11 + values[17] = -9 + series = pd.Series(index=time_range, data=values) + expected_mask = np.zeros_like(values, dtype="bool") + period = 4 + # The value and index 16 is the same as 9 which is longer than period + # Note: The station region mask shall not filter nan values + expected_mask[16] = True + + output_mask = find_static_regions(series, min_repeats=period, max_diff=0.01) + + np.testing.assert_equal(expected_mask, output_mask)