Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/smoothing and extrapolating gps coordinates #268

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ dependencies:
- setuptools=68.2.2=py38h06a4308_0
- six=1.16.0=pyh6c4a22f_0
- sqlite=3.41.2=h5eee18b_0
- statsmodel=0.13.2=py39h2bbff1b_0
- tbb=2021.8.0=hdb19cb5_0
- threadpoolctl=3.4.0=pyhc1e730c_0
- tk=8.6.12=h1ccaba5_0
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"pypromice.qc.percentiles": ["thresholds.csv"],
"pypromice.postprocess": ["station_configurations.toml", "positions_seed.csv"],
},
install_requires=['numpy~=1.23', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse==0.3.1', 'eccodes', 'scikit-learn>=1.1.0'],
install_requires=['numpy~=1.23', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse==0.3.1', 'eccodes', 'scikit-learn>=1.1.0', 'statsmodel==0.13.2'],
# extras_require={'postprocess': ['eccodes','scikit-learn>=1.1.0']},
entry_points={
'console_scripts': [
Expand Down
250 changes: 151 additions & 99 deletions src/pypromice/process/L2toL3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
"""
AWS Level 2 (L2) to Level 3 (L3) data processing
"""
import pandas as pd
import numpy as np
import xarray as xr
from statsmodels.nonparametric.smoothers_lowess import lowess
import logging
logger = logging.getLogger(__name__)

def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071,
es_100=1013.246):
def toL3(L2, T_0=273.15):
'''Process one Level 2 (L2) product to Level 3 (L3) meaning calculating all
derived variables:
- Sensible fluxes
- Turbulent fluxes
- smoothed and inter/extrapolated GPS coordinates


Parameters
Expand All @@ -18,24 +22,13 @@ def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071,
L2 AWS data
T_0 : int
Steam point temperature. Default is 273.15.
ladsmund marked this conversation as resolved.
Show resolved Hide resolved
z_0 : int
Aerodynamic surface roughness length for momention, assumed constant
for all ice/snow surfaces. Default is 0.001.
R_d : int
Gas constant of dry air. Default is 287.05.
eps : int
Default is 0.622.
es_0 : int
Saturation vapour pressure at the melting point (hPa). Default is 6.1071.
es_100 : int
Saturation vapour pressure at steam point temperature (hPa). Default is
1013.246.
'''
ds = L2
ds.attrs['level'] = 'L3'

T_100 = _getTempK(T_0) # Get steam point temperature as K

# Turbulent heat flux calculation
# Upper boom bulk calculation
T_h_u = ds['t_u'].copy() # Copy for processing
p_h_u = ds['p_u'].copy()
Expand All @@ -45,54 +38,91 @@ def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071,
z_WS_u = ds['z_boom_u'].copy() + 0.4 # Get height of Anemometer
z_T_u = ds['z_boom_u'].copy() - 0.1 # Get height of thermometer

rho_atm_u = 100 * p_h_u / R_d / (T_h_u + T_0) # Calculate atmospheric density
nu_u = calcVisc(T_h_u, T_0, rho_atm_u) # Calculate kinematic viscosity
q_h_u = calcHumid(T_0, T_100, T_h_u, es_0, es_100, eps, # Calculate specific humidity
p_h_u, RH_cor_h_u)
q_h_u = calcSpHumid(T_0, T_100, T_h_u, p_h_u, RH_cor_h_u) # Calculate specific humidity

if not ds.attrs['bedrock']:
SHF_h_u, LHF_h_u= calcHeatFlux(T_0, T_h_u, Tsurf_h, rho_atm_u, WS_h_u, # Calculate latent and sensible heat fluxes
z_WS_u, z_T_u, nu_u, q_h_u, p_h_u)
SHF_h_u, LHF_h_u = cleanHeatFlux(SHF_h_u, LHF_h_u, T_h_u, Tsurf_h, p_h_u, # Clean heat flux values
WS_h_u, RH_cor_h_u, ds['z_boom_u'])
SHF_h_u, LHF_h_u= calcHeatFlux(T_0, T_h_u, Tsurf_h, WS_h_u, # Calculate latent and sensible heat fluxes
z_WS_u, z_T_u, q_h_u, p_h_u)

ds['dshf_u'] = (('time'), SHF_h_u.data)
ds['dlhf_u'] = (('time'), LHF_h_u.data)

q_h_u = 1000 * q_h_u # Convert sp.humid from kg/kg to g/kg
q_h_u = cleanSpHumid(q_h_u, T_h_u, Tsurf_h, p_h_u, RH_cor_h_u) # Clean sp.humid values
ds['qh_u'] = (('time'), q_h_u.data)

# Lower boom bulk calculation
if ds.attrs['number_of_booms']==2:
# ds['wdir_l'] = _calcWindDir(ds['wspd_x_l'], ds['wspd_y_l']) # Calculatate wind direction

if ds.attrs['number_of_booms']==2:
T_h_l = ds['t_l'].copy() # Copy for processing
p_h_l = ds['p_l'].copy()
WS_h_l = ds['wspd_l'].copy()
RH_cor_h_l = ds['rh_l_cor'].copy()
z_WS_l = ds['z_boom_l'].copy() + 0.4 # Get height of W
z_T_l = ds['z_boom_l'].copy() - 0.1 # Get height of thermometer

rho_atm_l = 100 * p_h_l / R_d / (T_h_l + T_0) # Calculate atmospheric density
nu_l = calcVisc(T_h_l, T_0, rho_atm_l) # Calculate kinematic viscosity
q_h_l = calcHumid(T_0, T_100, T_h_l, es_0, es_100, eps, # Calculate sp.humidity
p_h_l, RH_cor_h_l)

q_h_l = calcSpHumid(T_0, T_100, T_h_l, p_h_l, RH_cor_h_l) # Calculate sp.humidity

if not ds.attrs['bedrock']:
SHF_h_l, LHF_h_l= calcHeatFlux(T_0, T_h_l, Tsurf_h, rho_atm_l, WS_h_l, # Calculate latent and sensible heat fluxes
z_WS_l, z_T_l, nu_l, q_h_l, p_h_l)
SHF_h_l, LHF_h_l = cleanHeatFlux(SHF_h_l, LHF_h_l, T_h_l, Tsurf_h, p_h_l, # Clean heat flux values
WS_h_l, RH_cor_h_l, ds['z_boom_l'])
SHF_h_l, LHF_h_l= calcHeatFlux(T_0, T_h_l, Tsurf_h, WS_h_l, # Calculate latent and sensible heat fluxes
ladsmund marked this conversation as resolved.
Show resolved Hide resolved
z_WS_l, z_T_l, q_h_l, p_h_l)

ds['dshf_l'] = (('time'), SHF_h_l.data)
ds['dlhf_l'] = (('time'), LHF_h_l.data)
q_h_l = 1000 * q_h_l # Convert sp.humid from kg/kg to g/kg
q_h_l = cleanSpHumid(q_h_l, T_h_l, Tsurf_h, p_h_l, RH_cor_h_l) # Clean sp.humid values

ds['qh_l'] = (('time'), q_h_l.data)

# Smoothing and inter/extrapolation of GPS coordinates
for var in ['gps_lat', 'gps_lon', 'gps_alt']:
ds[var.replace('gps_','')] = gpsCoordinatePostprocessing(ds, var)

# adding average coordinate as attribute
for v in ['lat','lon','alt']:
ds.attrs[v+'_avg'] = ds[v].mean(dim='time').item()
return ds

def gpsCoordinatePostprocessing(ds, var):
# saving the static value of 'lat','lon' or 'alt' stored in attribute
# as it might be the only coordinate available for certain stations (e.g. bedrock)
var_out = var.replace('gps_','')

if var_out == 'alt':
if 'altitude' in list(ds.attrs.keys()):
static_value = float(ds.attrs['altitude'])
else:
print('no standard altitude for', ds.attrs['station_id'])
static_value = np.nan
elif var_out == 'lat':
static_value = float(ds.attrs['latitude'])
elif var_out == 'lon':
static_value = float(ds.attrs['longitude'])

# if there is no gps observations, then we use the static value repeated
# for each time stamp
if var not in ds.data_vars:
print('no',var,'at', ds.attrs['station_id'])
return ('time', np.ones_like(ds['t_u'].data)*static_value)

if ds[var].isnull().all():
print('no',var,'at',ds.attrs['station_id'])
return ('time', np.ones_like(ds['t_u'].data)*static_value)

# here we detect potential relocation of the station in the form of a
# break in the general trend of the latitude, longitude and altitude
# in the future, this could/should be listed in an external file to
# avoid missed relocations or sensor issues interpreted as a relocation
if var == 'gps_alt':
_, breaks = find_breaks(ds[var].to_series(), alpha=8)
else:
_, breaks = find_breaks(ds[var].to_series(), alpha=6)

# smoothing and inter/extrapolation of the coordinate
return ('time', piecewise_smoothing_and_interpolation(ds[var].to_series(), breaks))
BaptisteVandecrux marked this conversation as resolved.
Show resolved Hide resolved


def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h,
def calcHeatFlux(T_0, T_h, Tsurf_h, WS_h, z_WS, z_T, q_h, p_h,
kappa=0.4, WS_lim=1., z_0=0.001, g=9.82, es_0=6.1071, eps=0.622,
gamma=16., L_sub=2.83e6, L_dif_max=0.01, c_pd=1005., aa=0.7,
bb=0.75, cc=5., dd=0.35):
bb=0.75, cc=5., dd=0.35, R_d=287.05):
'''Calculate latent and sensible heat flux using the bulk calculation
method

BaptisteVandecrux marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -128,9 +158,14 @@ def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h,
g : int
Gravitational acceleration (m/s2). Default is 9.82.
es_0 : int
Saturation vapour pressure at the melting point (hPa). Default is 6.1071.
Saturation vapour pressure at the melting point (hPa). Default is 6.1071.
es_100 : int
Saturation vapour pressure at steam point temperature (hPa). Default is
1013.246.
eps : int
Ratio of molar masses of vapor and dry air (0.622).
R_d : int
Gas constant of dry air. Default is 287.05.
gamma : int
Flux profile correction (Paulson & Dyer). Default is 16..
L_sub : int
Expand All @@ -151,6 +186,9 @@ def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h,
dd : int
Flux profile correction constants (Holtslag & De Bruin '88). Default is
0.35.
z_0 : int
Aerodynamic surface roughness length for momention, assumed constant
for all ice/snow surfaces. Default is 0.001.

Returns
-------
Expand All @@ -159,6 +197,9 @@ def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h,
LHF_h : xarray.DataArray
Latent heat flux
'''
rho_atm = 100 * p_h / R_d / (T_h + T_0) # Calculate atmospheric density
nu = calcVisc(T_h, T_0, rho_atm) # Calculate kinematic viscosity

SHF_h = xr.zeros_like(T_h) # Create empty xarrays
LHF_h = xr.zeros_like(T_h)
L = xr.full_like(T_h, 1E5)
Expand Down Expand Up @@ -244,7 +285,11 @@ def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h,
# If n_elements(where(L_dif > L_dif_max)) eq 1 then break
if np.all(L_dif <= L_dif_max):
break


HF_nan = np.isnan(p_h) | np.isnan(T_h) | np.isnan(Tsurf_h) \
| np.isnan(q_h) | np.isnan(WS_h) | np.isnan(z_T)
SHF_h[HF_nan] = np.nan
LHF_h[HF_nan] = np.nan
return SHF_h, LHF_h

def calcVisc(T_h, T_0, rho_atm):
Expand All @@ -270,9 +315,8 @@ def calcVisc(T_h, T_0, rho_atm):
# Kinematic viscosity of air in m^2/s
return mu / rho_atm

def calcHumid(T_0, T_100, T_h, es_0, es_100, eps, p_h, RH_cor_h):
def calcSpHumid(T_0, T_100, T_h, p_h, RH_cor_h, es_0=6.1071, es_100=1013.246, eps=0.622):
'''Calculate specific humidity

Parameters
----------
T_0 : float
ladsmund marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -315,72 +359,80 @@ def calcHumid(T_0, T_100, T_h, es_0, es_100, eps, p_h, RH_cor_h):
freezing = T_h < 0
q_sat[freezing] = eps * es_ice[freezing] / (p_h[freezing] - (1 - eps) * es_ice[freezing])

q_nan = np.isnan(T_h) | np.isnan(p_h)
q_sat[q_nan] = np.nan

# Convert to kg/kg
return RH_cor_h * q_sat / 100

def cleanHeatFlux(SHF, LHF, T, Tsurf, p, WS, RH_cor, z_boom):
'''Find invalid heat flux data values and replace with NaNs, based on
air temperature, surface temperature, air pressure, wind speed,
corrected relative humidity, and boom height

def find_breaks(df,alpha):
'''Detects potential relocation of the station from the GPS measurements.
The code first makes a forward linear interpolation of the coordinates and
then looks for important jumps in latitude, longitude and altitude. The jumps
that are higher than a given threshold (expressed as a multiple of the
standard deviation) are mostly caused by the station being moved during
maintenance. To avoid misclassification, only the jumps detected in May-Sept.
are kept.

Parameters
----------
SHF : xarray.DataArray
Sensible heat flux
LHF : xarray.DataArray
Latent heat flux
T : xarray.DataArray
Air temperature
Tsurf : xarray.DataArray
Surface temperature
p : xarray.DataArray
Air pressure
WS : xarray.DataArray
Wind speed
RH_cor : xarray.DataArray
Relative humidity corrected
z_boom : xarray.DataArray
Boom height

Returns
-------
SHF : xarray.DataArray
Sensible heat flux corrected
LHF : xarray.DataArray
Latent heat flux corrected
df : pandas.Series
series of observed latitude, longitude or elevation
alpha: float
coefficient to be applied to the the standard deviation of the daily
coordinate fluctuation
'''
HF_nan = np.isnan(p) | np.isnan(T) | np.isnan(Tsurf) \
| np.isnan(RH_cor) | np.isnan(WS) | np.isnan(z_boom)
SHF[HF_nan] = np.nan
LHF[HF_nan] = np.nan
return SHF, LHF

def cleanSpHumid(q_h, T, Tsurf, p, RH_cor):
'''Find invalid specific humidity data values and replace with NaNs,
based on air temperature, surface temperature, air pressure,
and corrected relative humidity
diff = df.resample('D').median().interpolate(
method='linear', limit_area='inside', limit_direction='forward').diff()
thresh = diff.std() * alpha
list_diff = diff.loc[diff.abs()>thresh].reset_index()
list_diff = list_diff.loc[list_diff.time.dt.month.isin([5,6,7,8,9])]
list_diff['year']=list_diff.time.dt.year
list_diff=list_diff.groupby('year').max()
return diff, [None]+list_diff.time.to_list()+[None]


def piecewise_smoothing_and_interpolation(df_in, breaks):
BaptisteVandecrux marked this conversation as resolved.
Show resolved Hide resolved
'''Smoothes, inter- or extrapolate the gps observations. The processing is
done piecewise so that each period between station relocation are done
separately (no smoothing of the jump due to relocation). Locally Weighted
Scatterplot Smoothing (lowess) is then used to smooth the available
observations. Then this smoothed curve is interpolated linearly over internal
gaps. Eventually, this interpolated curve is extrapolated linearly for
timestamps before the first valid measurement and after the last valid
measurement.

Parameters
----------
q_h : xarray.DataArray
Specific humidity
T : xarray.DataArray
Air temperature
Tsurf : xarray.DataArray
Surface temperature
p : xarray.DataArray
Air pressure
RH_cor : xarray.DataArray
Relative humidity corrected

Returns
-------
q_h : xarray.DataArray
Specific humidity corrected'''
q_nan = np.isnan(T) | np.isnan(RH_cor) | np.isnan(p) | np.isnan(Tsurf)
q_h[q_nan] = np.nan
return q_h

df_in : pandas.Series
series of observed latitude, longitude or elevation
breaks: list
List of timestamps of station relocation. First and last item should be
None so that they can be used in slice(breaks[i], breaks[i+1])
'''
df_all = pd.Series() # dataframe gathering all the smoothed pieces
for i in range(len(breaks)-1):
df = df_in.loc[slice(breaks[i], breaks[i+1])].copy()

y_sm = lowess(df,
pd.to_numeric(df.index),
is_sorted=True, frac=1/3, it=0,
)
df.loc[df.notnull()] = y_sm[:,1]
df = df.interpolate(method='linear', limit_area='inside')

last_valid_6_months = slice(df.last_valid_index()-pd.to_timedelta('180D'),None)
df.loc[last_valid_6_months] = (df.loc[last_valid_6_months].interpolate( axis=0,
method='spline',order=1, limit_direction='forward', fill_value="extrapolate")).values

first_valid_6_months = slice(None, df.first_valid_index()+pd.to_timedelta('180D'))
df.loc[first_valid_6_months] = (df.loc[first_valid_6_months].interpolate( axis=0,
method='spline',order=1, limit_direction='backward', fill_value="extrapolate")).values
df_all=pd.concat((df_all, df))

df_all = df_all[~df_all.index.duplicated(keep='first')]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this function can cause duplicated indices

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because for each iteration/segment we select:

df = df_in.loc[slice(breaks[i], breaks[i+1])].copy()

which includes the timestamps of both breaks. So at one iteration the end break received an estimated value, and in the next iteration/segment that same break is a start break ad receives another value.

We therefore need to drop duplicates at the end.
I'm changing to keep='last' so that if a time stamp is a break, then it received the value estimated within the segment where it is a start break (and not an end break).

return df_all.values

def _calcAtmosDens(p_h, R_d, T_h, T_0): # TODO: check this shouldn't be in this step somewhere
'''Calculate atmospheric density'''
Expand Down
Loading