diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index d986f391..73ca1a7d 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -66,25 +66,34 @@ def toL2( except Exception: logger.exception('Flagging and fixing failed:') - if ds.attrs['format'] == 'TX': - ds = persistence_qc(ds) # Flag and remove persistence outliers - # TODO: The configuration should be provided explicitly - outlier_detector = ThresholdBasedOutlierDetector.default() - ds = outlier_detector.filter_data(ds) # Flag and remove percentile outliers + ds = persistence_qc(ds) # Flag and remove persistence outliers + # if ds.attrs['format'] == 'TX': + # # TODO: The configuration should be provided explicitly + # outlier_detector = ThresholdBasedOutlierDetector.default() + # ds = outlier_detector.filter_data(ds) # Flag and remove percentile outliers + + # filtering gps_lat, gps_lon and gps_alt based on the difference to a baseline elevation + # right now baseline elevation is gapfilled monthly median elevation + baseline_elevation = (ds.gps_alt.to_series().resample('M').median() + .reindex(ds.time.to_series().index, method='nearest') + .ffill().bfill()) + mask = (np.abs(ds.gps_alt - baseline_elevation) < 100) & ds.gps_alt.notnull() + ds[['gps_alt','gps_lon', 'gps_lat']] = ds[['gps_alt','gps_lon', 'gps_lat']].where(mask) + + # removing dlr and ulr that are missing t_rad + # this is done now becasue t_rad can be filtered either manually or with persistence + ds['dlr'] = ds.dlr.where(ds.t_rad.notnull()) + ds['ulr'] = ds.ulr.where(ds.t_rad.notnull()) T_100 = _getTempK(T_0) ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], T_0, T_100, ews, ei0) # Determiune cloud cover for on-ice stations - if not ds.attrs['bedrock']: - cc = calcCloudCoverage(ds['t_u'], T_0, eps_overcast, eps_clear, # Calculate cloud coverage - ds['dlr'], ds.attrs['station_id']) - ds['cc'] = (('time'), cc.data) - else: - # Default cloud cover for bedrock station for which tilt should be 0 anyway. - cc = 0.8 - + cc = calcCloudCoverage(ds['t_u'], T_0, eps_overcast, eps_clear, # Calculate cloud coverage + ds['dlr'], ds.attrs['station_id']) + ds['cc'] = (('time'), cc.data) + # Determine surface temperature ds['t_surf'] = calcSurfaceTemperature(T_0, ds['ulr'], ds['dlr'], # Calculate surface temperature emissivity) @@ -102,6 +111,11 @@ def toL2( else: lat = ds['gps_lat'].mean() lon = ds['gps_lon'].mean() + + # smoothing tilt and rot + ds['tilt_x'] = smoothTilt(ds['tilt_x']) + ds['tilt_y'] = smoothTilt(ds['tilt_y']) + ds['rot'] = smoothRot(ds['rot']) deg2rad, rad2deg = _getRotation() # Get degree-radian conversions phi_sensor_rad, theta_sensor_rad = calcTilt(ds['tilt_x'], ds['tilt_y'], # Calculate station tilt @@ -112,13 +126,15 @@ def toL2( ZenithAngle_rad, ZenithAngle_deg = calcZenith(lat, Declination_rad, # Calculate zenith HourAngle_rad, deg2rad, rad2deg) - + + # Correct Downwelling shortwave radiation DifFrac = 0.2 + 0.8 * cc CorFac_all = calcCorrectionFactor(Declination_rad, phi_sensor_rad, # Calculate correction theta_sensor_rad, HourAngle_rad, ZenithAngle_rad, ZenithAngle_deg, lat, DifFrac, deg2rad) + CorFac_all = xr.where(ds['cc'].notnull(), CorFac_all, 1) ds['dsr_cor'] = ds['dsr'].copy(deep=True) * CorFac_all # Apply correction AngleDif_deg = calcAngleDiff(ZenithAngle_rad, HourAngle_rad, # Calculate angle between sun and sensor @@ -145,9 +161,9 @@ def toL2( TOA_crit_nopass = (ds['dsr_cor'] > (0.9 * isr_toa + 10)) # Determine filter ds['dsr_cor'][TOA_crit_nopass] = np.nan # Apply filter and interpolate ds['usr_cor'][TOA_crit_nopass] = np.nan - ds['dsr_cor'] = ds['dsr_cor'].interpolate_na(dim='time', use_coordinate=False) - ds['usr_cor'] = ds['usr_cor'].interpolate_na(dim='time', use_coordinate=False) - + + ds['dsr_cor'] = ds.dsr_cor.where(ds.dsr.notnull()) + ds['usr_cor'] = ds.usr_cor.where(ds.usr.notnull()) # # Check sun position # sundown = ZenithAngle_deg >= 90 # _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass) @@ -241,6 +257,65 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): return t_surf +def smoothTilt(da: xr.DataArray, threshold=0.2): + '''Smooth the station tilt + + Parameters + ---------- + da : xarray.DataArray + either X or Y tilt inclinometer measurements + threshold : float + threshold used in a standrad.-deviation based filter + + Returns + ------- + xarray.DataArray + either X or Y smoothed tilt inclinometer measurements + ''' + # we calculate the moving standard deviation over a 3-day sliding window + # hourly resampling is necessary to make sure the same threshold can be used + # for 10 min and hourly data + moving_std_gap_filled = da.to_series().resample('H').median().rolling( + 3*24, center=True, min_periods=2 + ).std().reindex(da.time, method='bfill').values + # we select the good timestamps and gapfill assuming that + # - when tilt goes missing the last available value is used + # - when tilt is not available for the very first time steps, the first + # good value is used for backfill + return da.where( + moving_std_gap_filled < threshold + ).ffill(dim='time').bfill(dim='time') + + +def smoothRot(da: xr.DataArray, threshold=4): + '''Smooth the station rotation + + Parameters + ---------- + da : xarray.DataArray + rotation measurements from inclinometer + threshold : float + threshold used in a standrad-deviation based filter + + Returns + ------- + xarray.DataArray + smoothed rotation measurements from inclinometer + ''' + moving_std_gap_filled = da.to_series().resample('H').median().rolling( + 3*24, center=True, min_periods=2 + ).std().reindex(da.time, method='bfill').values + # same as for tilt with, in addition: + # - a resampling to daily values + # - a two week median smoothing + # - a resampling from these daily values to the original temporal resolution + return ('time', (da.where(moving_std_gap_filled <4).ffill(dim='time') + .to_series().resample('D').median() + .rolling(7*2,center=True,min_periods=2).median() + .reindex(da.time, method='bfill').values + )) + + def calcTilt(tilt_x, tilt_y, deg2rad): '''Calculate station tilt @@ -323,7 +398,6 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f # Set to Groff & Gratch values when freezing, otherwise just rh rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) - rh_cor = rh_cor.where(T.notnull()) return rh_cor diff --git a/src/pypromice/process/L2toL3.py b/src/pypromice/process/L2toL3.py index 9b8a4cee..a71a4028 100755 --- a/src/pypromice/process/L2toL3.py +++ b/src/pypromice/process/L2toL3.py @@ -161,7 +161,7 @@ def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h, es_0 : int Saturation vapour pressure at the melting point (hPa). Default is 6.1071. eps : int - Default is 0.622. + Ratio of molar masses of vapor and dry air (0.622). gamma : int Flux profile correction (Paulson & Dyer). Default is 16.. L_sub : int @@ -313,7 +313,7 @@ def calcHumid(T_0, T_100, T_h, es_0, es_100, eps, p_h, RH_cor_h): T_h : xarray.DataArray Air temperature eps : int - DESCRIPTION + ratio of molar masses of vapor and dry air (0.622) es_0 : float Saturation vapour pressure at the melting point (hPa) es_100 : float diff --git a/src/pypromice/process/aws.py b/src/pypromice/process/aws.py index 7fa1e3de..58440595 100644 --- a/src/pypromice/process/aws.py +++ b/src/pypromice/process/aws.py @@ -224,7 +224,7 @@ def loadL0(self): except pd.errors.ParserError as e: # ParserError: Too many columns specified: expected 40 and found 38 - logger.info(f'-----> No msg_lat or msg_lon for {k}') + # logger.info(f'-----> No msg_lat or msg_lon for {k}') for item in ['msg_lat', 'msg_lon']: target['columns'].remove(item) # Also removes from self.config ds_list.append(self.readL0file(target)) @@ -723,7 +723,7 @@ def resampleL3(ds_h, t): Parameters ---------- ds_h : xarray.Dataset - L3 AWS daily dataset + L3 AWS dataset either at 10 min (for raw data) or hourly (for tx data) t : str Resample factor, same variable definition as in pandas.DataFrame.resample() @@ -731,9 +731,10 @@ def resampleL3(ds_h, t): Returns ------- ds_d : xarray.Dataset - L3 AWS hourly dataset + L3 AWS dataset resampled to the frequency defined by t ''' df_d = ds_h.to_dataframe().resample(t).mean() + # recalculating wind direction from averaged directional wind speeds for var in ['wdir_u','wdir_l','wdir_i']: if var in df_d.columns: @@ -742,12 +743,68 @@ def resampleL3(ds_h, t): df_d['wspd_y_'+var.split('_')[1]]) else: logger.info(var,'in dataframe but not','wspd_x_'+var.split('_')[1],'wspd_x_'+var.split('_')[1]) + + # recalculating relative humidity from average vapour pressure and average + # saturation vapor pressure + for var in ['rh_u','rh_l']: + lvl = var.split('_')[1] + if var in df_d.columns: + if ('t_'+lvl in ds_h.keys()): + es_wtr, es_cor = calculateSaturationVaporPressure(ds_h['t_'+lvl]) + p_vap = ds_h[var] / 100 * es_wtr + + df_d[var] = (p_vap.to_series().resample(t).mean() \ + / es_wtr.to_series().resample(t).mean())*100 + df_d[var+'_cor'] = (p_vap.to_series().resample(t).mean() \ + / es_cor.to_series().resample(t).mean())*100 + vals = [xr.DataArray(data=df_d[c], dims=['time'], coords={'time':df_d.index}, attrs=ds_h[c].attrs) for c in df_d.columns] ds_d = xr.Dataset(dict(zip(df_d.columns,vals)), attrs=ds_h.attrs) return ds_d +def calculateSaturationVaporPressure(t, T_0=273.15, T_100=373.15, es_0=6.1071, + es_100=1013.246, eps=0.622): + '''Calculate specific humidity + + Parameters + ---------- + T_0 : float + Steam point temperature. Default is 273.15. + T_100 : float + Steam point temperature in Kelvin + t : xarray.DataArray + Air temperature + es_0 : float + Saturation vapour pressure at the melting point (hPa) + es_100 : float + Saturation vapour pressure at steam point temperature (hPa) + + Returns + ------- + xarray.DataArray + Saturation vapour pressure with regard to water above 0 C (hPa) + xarray.DataArray + Saturation vapour pressure where subfreezing timestamps are with regards to ice (hPa) + ''' + # Saturation vapour pressure above 0 C (hPa) + es_wtr = 10**(-7.90298 * (T_100 / (t + T_0) - 1) + 5.02808 * np.log10(T_100 / (t + T_0)) + - 1.3816E-7 * (10**(11.344 * (1 - (t + T_0) / T_100)) - 1) + + 8.1328E-3 * (10**(-3.49149 * (T_100 / (t + T_0) -1)) - 1) + np.log10(es_100)) + + # Saturation vapour pressure below 0 C (hPa) + es_ice = 10**(-9.09718 * (T_0 / (t + T_0) - 1) - 3.56654 + * np.log10(T_0 / (t + T_0)) + 0.876793 + * (1 - (t + T_0) / T_0) + + np.log10(es_0)) + + # Saturation vapour pressure (hPa) + es_cor = xr.where(t < 0, es_ice, es_wtr) + + return es_wtr, es_cor + + def _calcWindDir(wspd_x, wspd_y): '''Calculate wind direction in degrees diff --git a/src/pypromice/process/variables.csv b/src/pypromice/process/variables.csv index 7ecc27d7..2960259b 100644 --- a/src/pypromice/process/variables.csv +++ b/src/pypromice/process/variables.csv @@ -89,4 +89,4 @@ wspd_i,wind_speed,Wind speed (instantaneous),m s-1,0,100,wdir_i wspd_x_i wspd_y_ wdir_i,wind_from_direction,Wind from direction (instantaneous),degrees,1,360,wspd_x_i wspd_y_i,all,all,4,physicalMeasurement,time lat lon alt,True,For meteorological observations wspd_x_i,wind_speed_from_x_direction,Wind speed from x direction (instantaneous),m s-1,-100,100,wdir_i wspd_i,all,all,4,modelResult,time lat lon alt,True,For meteorological observations wspd_y_i,wind_speed_from_y_direction,Wind speed from y direction (instantaneous),m s-1,-100,100,wdir_i wspd_i,all,all,4,modelResult,time lat lon alt,True,For meteorological observations -msg_i,message,Message string (instantaneous),-,,,,all,all,,qualityInformation,time lat lon alt,True,For meteorological observations +msg_i,message,Message string (instantaneous),-,,,,all,,,qualityInformation,time lat lon alt,True,L0 only diff --git a/src/pypromice/qc/percentiles/thresholds.csv b/src/pypromice/qc/percentiles/thresholds.csv index 9bf50727..6152165c 100644 --- a/src/pypromice/qc/percentiles/thresholds.csv +++ b/src/pypromice/qc/percentiles/thresholds.csv @@ -122,8 +122,8 @@ JAR,p_[ul],881.00,929.00, JAR,p_i,-119.00,-71.00, JAR,wspd_[uli],-11.62,29.82, JAR,t_[uli],-14.60,13.30,summer -FRE,p_[ul],638.31,799.82, -FRE,p_i,-361.69,-200.18, +FRE,p_[ul],800,1000, +FRE,p_i,-200,0, FRE,wspd_[uli],-12.00,22.62, FRE,t_[uli],-38.20,10.02,winter FRE,t_[uli],-36.55,18.07,spring diff --git a/src/pypromice/qc/persistence.py b/src/pypromice/qc/persistence.py index d2ea5ef3..f59bf45c 100644 --- a/src/pypromice/qc/persistence.py +++ b/src/pypromice/qc/persistence.py @@ -14,11 +14,15 @@ logger = logging.getLogger(__name__) +# period is given in hours, 2 persistent 10 min values will be flagged if period < 0.333 DEFAULT_VARIABLE_THRESHOLDS = { "t": {"max_diff": 0.0001, "period": 2}, "p": {"max_diff": 0.0001, "period": 2}, - # Relative humidity can be very stable around 100%. - #"rh": {"max_diff": 0.0001, "period": 2}, + 'gps_lat_lon':{"max_diff": 0.000001, "period": 6}, # gets special handling to remove simultaneously constant gps_lat and gps_lon + 'gps_alt':{"max_diff": 0.0001, "period": 6}, + 't_rad':{"max_diff": 0.0001, "period": 2}, + "rh": {"max_diff": 0.0001, "period": 2}, # gets special handling to allow constant 100% + "wspd": {"max_diff": 0.0001, "period": 6}, } @@ -58,27 +62,46 @@ def persistence_qc( if variable_thresholds is None: variable_thresholds = DEFAULT_VARIABLE_THRESHOLDS - logger.debug(f"Running persistence_qc using {variable_thresholds}") + logger.info(f"Running persistence_qc using {variable_thresholds}") for k in variable_thresholds.keys(): - var_all = [ - k + "_u", - k + "_l", - k + "_i", - ] # apply to upper, lower boom, and instant + if k in ['t','p','rh','wspd','wdir', 'z_boom']: + var_all = [ + k + "_u", + k + "_l", + k + "_i", + ] # apply to upper, lower boom, and instant + else: + var_all = [k] max_diff = variable_thresholds[k]["max_diff"] # loading persistent limit period = variable_thresholds[k]["period"] # loading diff period for v in var_all: if v in df: mask = find_persistent_regions(df[v], period, max_diff) + if 'rh' in v: + mask = mask & (df[v]<99) n_masked = mask.sum() n_samples = len(mask) - logger.debug( + logger.info( f"Applying persistent QC in {v}. Filtering {n_masked}/{n_samples} samples" ) # setting outliers to NaN df.loc[mask, v] = np.nan + elif v == 'gps_lat_lon': + mask = ( + find_persistent_regions(df['gps_lon'], period, max_diff) + & find_persistent_regions(df['gps_lat'], period, max_diff) + ) + + n_masked = mask.sum() + n_samples = len(mask) + logger.info( + f"Applying persistent QC in {v}. Filtering {n_masked}/{n_samples} samples" + ) + # setting outliers to NaN + df.loc[mask, 'gps_lon'] = np.nan + df.loc[mask, 'gps_lat'] = np.nan # Back to xarray, and re-assign the original attrs ds_out = df.to_xarray() @@ -110,33 +133,34 @@ def count_consecutive_persistent_values( ) -> pd.Series: diff = data.ffill().diff().abs() # forward filling all NaNs! mask: pd.Series = diff < max_diff - return count_consecutive_true(mask) + return duration_consecutive_true(mask) -def count_consecutive_true( - series: Union[pd.Series, pd.DataFrame] -) -> Union[pd.Series, pd.DataFrame]: +def duration_consecutive_true( + series: pd.Series, +) -> pd.Series: """ - Convert boolean series to integer series where the values represent the number of connective true values. + From a boolean series, calculates the duration, in hours, of the periods with connective true values. Examples -------- - >>> count_consecutive_true(pd.Series([False, True, False, False, True, True, True, False, True])) + >>> duration_consecutive_true(pd.Series([False, True, False, False, True, True, True, False, True])) pd.Series([0, 1, 0, 0, 1, 2, 3, 0, 1]) Parameters ---------- - series + pd.Series Boolean pandas Series or DataFrame Returns ------- - consecutive_true_count + pd.Series Integer pandas Series or DataFrame with values representing the number of connective true values. """ # assert series.dtype == bool - cumsum = series.cumsum() + cumsum = ((series.index - series.index[0]).total_seconds()/3600).to_series(index=series.index) is_first = series.astype("int").diff() == 1 offset = (is_first * cumsum).replace(0, np.nan).fillna(method="ffill").fillna(0) - return ((cumsum - offset + 1) * series).astype("int") + + return (cumsum - offset) * series