diff --git a/bin/getL0tx b/bin/getL0tx index f9b4bf80..e306a5ee 100755 --- a/bin/getL0tx +++ b/bin/getL0tx @@ -89,7 +89,7 @@ if __name__ == '__main__': typ, accountDetails = mail_server.login(account, password) if typ != 'OK': print('Not able to sign in!') - raise + raise Exception('Not able to sign in!') # Grab new emails result, data = mail_server.select(mailbox='"[Gmail]/All Mail"', readonly=True) diff --git a/bin/getL3 b/bin/getL3 index d7571c33..875fba13 100755 --- a/bin/getL3 +++ b/bin/getL3 @@ -1,5 +1,7 @@ #!/usr/bin/env python +import logging import os +import sys from argparse import ArgumentParser from pypromice.process import AWS @@ -24,16 +26,22 @@ if __name__ == '__main__': """Executed from the command line""" args = parse_arguments() + logging.basicConfig( + format="%(asctime)s; %(levelname)s; %(name)s; %(message)s", + level=logging.INFO, + stream=sys.stdout, + ) + station_name = args.config_file.split('/')[-1].split('.')[0] station_path = os.path.join(args.inpath, station_name) if os.path.exists(station_path): - pAWS = AWS(args.config_file, station_path, args.variables, args.metadata) + aws = AWS(args.config_file, station_path, args.variables, args.metadata) else: - pAWS = AWS(args.config_file, args.inpath, args.variables, args.metadata) + aws = AWS(args.config_file, args.inpath, args.variables, args.metadata) - pAWS.process() - pAWS.write(args.outpath) + aws.process() + aws.write(args.outpath) else: """Executed on import""" diff --git a/src/pypromice/process/L0toL1.py b/src/pypromice/process/L0toL1.py index 742f2730..d0b09a7d 100644 --- a/src/pypromice/process/L0toL1.py +++ b/src/pypromice/process/L0toL1.py @@ -7,6 +7,8 @@ import xarray as xr import re +from pypromice.process.value_clipping import clip_values + def toL1(L0, vars_df, T_0=273.15, tilt_threshold=-100): '''Process one Level 0 (L0) product to Level 1 @@ -113,6 +115,13 @@ def toL1(L0, vars_df, T_0=273.15, tilt_threshold=-100): ds['z_boom_l'] = _reformatArray(ds['z_boom_l']) # Reformat boom height ds['z_boom_l'] = ds['z_boom_l'] * ((ds['t_l'] + T_0)/T_0)**0.5 # Adjust sonic ranger readings for sensitivity to air temperature + ds = clip_values(ds, vars_df) + for key in ['format', 'hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef', + 'dlr_eng_coef', 'ulr_eng_coef', 'pt_z_coef', 'pt_z_p_coef', + 'pt_z_factor', 'pt_antifreeze', 'boom_azimuth', 'nodata', + 'conf', 'file']: + ds.attrs.pop(key, None) + return ds def addTimeShift(ds, vars_df): diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index ba8693c1..1510a757 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -9,7 +9,10 @@ import os import xarray as xr -def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., +from pypromice.process.value_clipping import clip_values + + +def toL2(L1, vars_df: pd.DataFrame, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., eps_clear=9.36508e-6, emissivity=0.97): '''Process one Level 1 (L1) product to Level 2 @@ -17,6 +20,8 @@ def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., ---------- L1 : xarray.Dataset Level 1 dataset + vars_df : pd.DataFrame + Metadata dataframe T_0 : float, optional Ice point temperature in K. The default is 273.15. ews : float, optional @@ -144,7 +149,9 @@ def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., if hasattr(ds,'t_i'): if ~ds['t_i'].isnull().all(): # Instantaneous msg processing ds['rh_i_cor'] = correctHumidity(ds['rh_i'], ds['t_i'], # Correct relative humidity - T_0, T_100, ews, ei0) + T_0, T_100, ews, ei0) + + ds = clip_values(ds, vars_df) return ds def flagNAN(ds_in, @@ -205,7 +212,8 @@ def flagNAN(ds_in, print('---> flagging',t0, t1, v) ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) else: - print('---> could not flag', v,', not in dataset') + print('---> could not flag', v,', not in dataset') + return ds diff --git a/src/pypromice/process/aws.py b/src/pypromice/process/aws.py index bc8b0e0e..4fc895ed 100644 --- a/src/pypromice/process/aws.py +++ b/src/pypromice/process/aws.py @@ -2,34 +2,35 @@ """ AWS data processing module """ +import logging +from functools import reduce from importlib import metadata import os, unittest, toml, datetime, uuid, pkg_resources -from typing import Optional +from typing import Sequence, Optional import numpy as np import warnings + warnings.simplefilter(action='ignore', category=FutureWarning) import pandas as pd import xarray as xr from datetime import timedelta -try: - from L0toL1 import toL1 - from L1toL2 import toL2 - from L2toL3 import toL3 -except: - from pypromice.process.L0toL1 import toL1 - from pypromice.process.L1toL2 import toL2 - from pypromice.process.L2toL3 import toL3 +from pypromice.process.L0toL1 import toL1 +from pypromice.process.L1toL2 import toL2 +from pypromice.process.L2toL3 import toL3 pd.set_option('display.precision', 2) xr.set_options(keep_attrs=True) +logger = logging.getLogger(__name__) + #------------------------------------------------------------------------------ + class AWS(object): '''AWS object to load and process PROMICE AWS data''' - + def __init__(self, config_file, inpath, var_file=None, meta_file=None): '''Object initialisation @@ -40,84 +41,82 @@ def __init__(self, config_file, inpath, var_file=None, meta_file=None): inpath : str Input file path var_file: str, optional - Variables look-up table file path. If not given then pypromice's + Variables look-up table file path. If not given then pypromice's variables file is used. The default is None. meta_file: str, optional - Metadata info file path. If not given then pypromice's + Metadata info file path. If not given then pypromice's metadata file is used. The default is None. ''' assert(os.path.isfile(config_file)), "cannot find "+config_file assert(os.path.isdir(inpath)), "cannot find "+inpath - print('\nAWS object initialising...') - + logger.info('AWS object initialising...') + # Load config, variables CSF standards, and L0 files self.config = self.loadConfig(config_file, inpath) self.vars = getVars(var_file) self.meta = getMeta(meta_file) - # Hard-wire the msg_lat and msg_lon here - # Prevents having to list these vars in the individual station toml files - config_keys = list(self.config.keys()) - for i in config_keys: - self.config[i]['columns'].extend(['msg_lat', 'msg_lon']) - # Load config file L0 = self.loadL0() self.L0=[] for l in L0: n = getColNames(self.vars, l.attrs['number_of_booms'], l.attrs['format']) self.L0.append(popCols(l, n)) - + + self.L1 = None + self.L1A = None + self.L2 = None + self.L3 = None + def process(self): '''Perform L0 to L3 data processing''' try: - print(f'Commencing {self.L0.attrs["number_of_booms"]}-boom processing...') + logger.info(f'Commencing {self.L0.attrs["number_of_booms"]}-boom processing...') except: - print(f'Commencing {self.L0[0].attrs["number_of_booms"]}-boom processing...') + logger.info(f'Commencing {self.L0[0].attrs["number_of_booms"]}-boom processing...') self.getL1() self.getL2() self.getL3() - + def write(self, outpath): '''Write L3 data to .csv and .nc file''' if os.path.isdir(outpath): self.writeArr(outpath) else: - print(f'Outpath f{outpath} does not exist. Unable to save to file') + logger.info(f'Outpath f{outpath} does not exist. Unable to save to file') pass - + def getL1(self): '''Perform L0 to L1 data processing''' - print('Level 1 processing...') + logger.info('Level 1 processing...') self.L0 = [addBasicMeta(item, self.vars) for item in self.L0] self.L1 = [toL1(item, self.vars) for item in self.L0] - self.L1A = mergeVars(self.L1, self.vars) + self.L1A = reduce(xr.Dataset.combine_first, self.L1) def getL2(self): '''Perform L1 to L2 data processing''' - print('Level 2 processing...') - self.L2 = toL2(self.L1A) - self.L2 = clipValues(self.L2, self.vars) + logger.info('Level 2 processing...') + self.L2 = toL2(self.L1A, vars_df=self.vars) def getL3(self): '''Perform L2 to L3 data processing, including resampling and metadata and attribute population''' - print('Level 3 processing...') + logger.info('Level 3 processing...') self.L3 = toL3(self.L2) - + # Resample L3 product f = [l.attrs['format'] for l in self.L0] if 'raw' in f or 'STM' in f: - print('Resampling to 10 minute') + logger.info('Resampling to 10 minute') self.L3 = resampleL3(self.L3, '10min') else: - self.L3 = resampleL3(self.L3, '60min') - print('Resampling to hour') - - # Re-format time + self.L3 = resampleL3(self.L3, '60min') + logger.info('Resampling to hour') + + # Re-format time t = self.L3['time'].values self.L3['time'] = list(t) - + # Switch gps_lon to negative (degrees_east) # Do this here, and NOT in addMeta, otherwise we switch back to positive # when calling getMeta in joinL3! PJW @@ -126,18 +125,18 @@ def getL3(self): # Add variable attributes and metadata self.L3 = self.addAttributes(self.L3) - + # Round all values to specified decimals places - self.L3 = roundValues(self.L3, self.vars) - + self.L3 = roundValues(self.L3, self.vars) + def addAttributes(self, L3): '''Add variable and attribute metadata - + Parameters ---------- L3 : xr.Dataset Level-3 data object - + Returns ------- L3 : xr.Dataset @@ -149,7 +148,7 @@ def addAttributes(self, L3): def writeArr(self, outpath): '''Write L3 data to .nc and .csv hourly and daily files - + Parameters ---------- outpath : str @@ -157,44 +156,44 @@ def writeArr(self, outpath): L3 : AWS.L3 Level-3 data object ''' - outdir = os.path.join(outpath, self.L3.attrs['station_id']) + outdir = os.path.join(outpath, self.L3.attrs['station_id']) if not os.path.isdir(outdir): os.mkdir(outdir) - + f = [l.attrs['format'] for l in self.L0] if all(f): - col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], + col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], self.L0[0].attrs['format'], self.L3.attrs['bedrock']) else: - col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], + col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'], None, self.L3.attrs['bedrock']) - + t = int(pd.Timedelta((self.L3['time'][1] - self.L3['time'][0]).values).total_seconds()) - print('Writing to files...') + logger.info('Writing to files...') if t == 600: out_csv = os.path.join(outdir, self.L3.attrs['station_id']+'_10min.csv') out_nc = os.path.join(outdir, self.L3.attrs['station_id']+'_10min.nc') else: out_csv = os.path.join(outdir, self.L3.attrs['station_id']+'_hour.csv') - out_nc = os.path.join(outdir, self.L3.attrs['station_id']+'_hour.nc') + out_nc = os.path.join(outdir, self.L3.attrs['station_id']+'_hour.nc') writeCSV(out_csv, self.L3, col_names) col_names = col_names + ['lat', 'lon', 'alt'] - writeNC(out_nc, self.L3, col_names) - print(f'Written to {out_csv}') - print(f'Written to {out_nc}') - + writeNC(out_nc, self.L3, col_names) + logger.info(f'Written to {out_csv}') + logger.info(f'Written to {out_nc}') + def loadConfig(self, config_file, inpath): '''Load configuration from .toml file - + Parameters ---------- config_file : str TOML file path inpath : str - Input folder directory where L0 files can be found - + Input folder directory where L0 files can be found + Returns ------- conf : dict @@ -202,62 +201,48 @@ def loadConfig(self, config_file, inpath): ''' conf = getConfig(config_file, inpath) return conf - + def loadL0(self): - '''Load level 0 (L0) data from associated TOML-formatted + '''Load level 0 (L0) data from associated TOML-formatted config file and L0 data file - Try readL0file() using the config with msg_lat & msg_lon appended. The - specific ParserError except will occur when the number of columns in - the tx file does not match the expected columns. In this case, remove - msg_lat & msg_lon from the config and call readL0file() again. These - station files either have no data after Nov 2022 (when msg_lat & - msg_lon were added to processing), or for whatever reason these fields + Try readL0file() using the config with msg_lat & msg_lon appended. The + specific ParserError except will occur when the number of columns in + the tx file does not match the expected columns. In this case, remove + msg_lat & msg_lon from the config and call readL0file() again. These + station files either have no data after Nov 2022 (when msg_lat & + msg_lon were added to processing), or for whatever reason these fields did not exist in the modem message and were not added. - + Returns ------- ds_list : list List of L0 xr.Dataset objects ''' - c = self.config - if len(c.keys()) == 1: # one file in this config - target = c[list(c.keys())[0]] + ds_list = [] + for k in self.config.keys(): + target = self.config[k] try: - ds = self.readL0file(target) + ds_list.append(self.readL0file(target)) + except pd.errors.ParserError as e: - # ParserError: Too many columns specified: expected 40 and found 38 - print(f'-----> No msg_lat or msg_lon for {list(c.keys())[0]}') + logger.info(f'-----> No msg_lat or msg_lon for {k}') for item in ['msg_lat', 'msg_lon']: - target['columns'].remove(item) # Also removes from self.config - ds = self.readL0file(target) - print(f'L0 data successfully loaded from {list(c.keys())[0]}') - return [ds] - else: - ds_list = [] - for k in c.keys(): - try: - ds_list.append(self.readL0file(c[k])) - except pd.errors.ParserError as e: - - # ParserError: Too many columns specified: expected 40 and found 38 - print(f'-----> No msg_lat or msg_lon for {k}') - for item in ['msg_lat', 'msg_lon']: - c[k]['columns'].remove(item) # Also removes from self.config - ds_list.append(self.readL0file(c[k])) - print(f'L0 data successfully loaded from {k}') - return ds_list + target['columns'].remove(item) # Also removes from self.config + ds_list.append(self.readL0file(target)) + logger.info(f'L0 data successfully loaded from {k}') + return ds_list def readL0file(self, conf): '''Read L0 .txt file to Dataset object using config dictionary and populate with initial metadata - + Parameters ---------- conf : dict - Configuration parameters - + Configuration parameters + Returns ------- ds : xr.Dataset @@ -271,18 +256,18 @@ def readL0file(self, conf): #------------------------------------------------------------------------------ -def getConfig(config_file, inpath): - '''Load configuration from .toml file. PROMICE .toml files support defining - features at the top level which apply to all nested properties, but do not +def getConfig(config_file, inpath, default_columns: Sequence[str] = ('msg_lat', 'msg_lon')): + '''Load configuration from .toml file. PROMICE .toml files support defining + features at the top level which apply to all nested properties, but do not overwrite nested properties if they are defined - + Parameters ---------- config_file : str TOML file path inpath : str Input folder directory where L0 files can be found - + Returns ------- conf : dict @@ -298,6 +283,7 @@ def getConfig(config_file, inpath): conf[s]['conf'] = config_file conf[s]['file'] = os.path.join(inpath, s) + conf[s]["columns"].extend(default_columns) for t in top: conf.pop(t) # Delete all top level keys beause each file # should carry all properties with it @@ -309,7 +295,7 @@ def getConfig(config_file, inpath): def getL0(infile, nodata, cols, skiprows, file_version, delimiter=',', comment='#', time_offset: Optional[float] = None) -> xr.Dataset: ''' Read L0 data file into pandas DataFrame object - + Parameters ---------- infile : str @@ -328,15 +314,14 @@ def getL0(infile, nodata, cols, skiprows, file_version, Notifier of commented sections in L0 file time_offset : Optional[float] Time offset in hours for correcting for non utc time data. - Returns ------- ds : xarray.Dataset L0 Dataset ''' - if file_version == 1: - df = pd.read_csv(infile, comment=comment, index_col=0, - na_values=nodata, names=cols, + if file_version == 1: + df = pd.read_csv(infile, comment=comment, index_col=0, + na_values=nodata, names=cols, sep=delimiter, skiprows=skiprows, skip_blank_lines=True, usecols=range(len(cols)), @@ -348,7 +333,7 @@ def getL0(infile, nodata, cols, skiprows, file_version, format='%Y%j%H%M' ) df = df.set_index('time') - + else: df = pd.read_csv(infile, comment=comment, index_col=0, na_values=nodata, names=cols, parse_dates=True, @@ -359,16 +344,16 @@ def getL0(infile, nodata, cols, skiprows, file_version, try: df.index = pd.to_datetime(df.index) except ValueError as e: - print("\n", infile) - print("\nValueError:") - print(e) - print('\n\t\t> Trying pd.to_datetime with format=mixed') + logger.info("\n", infile) + logger.info("\nValueError:") + logger.info(e) + logger.info('\t\t> Trying pd.to_datetime with format=mixed') try: df.index = pd.to_datetime(df.index, format='mixed') except Exception as e: - print("\nDateParseError:") - print(e) - print('\n\t\t> Trying again removing apostrophes in timestamp (old files format)') + logger.info("\nDateParseError:") + logger.info(e) + logger.info('\t\t> Trying again removing apostrophes in timestamp (old files format)') df.index = pd.to_datetime(df.index.str.replace("\"","")) if time_offset is not None: @@ -384,16 +369,16 @@ def getL0(infile, nodata, cols, skiprows, file_version, return ds def addBasicMeta(ds, vars_df): - ''' Use a variable lookup table DataFrame to add the basic metadata + ''' Use a variable lookup table DataFrame to add the basic metadata to the xarray dataset. This is later amended to finalise L3 - + Parameters ---------- ds : xr.Dataset Dataset to add metadata to vars_df : pd.DataFrame Metadata dataframe - + Returns ------- ds : xr.Dataset @@ -409,7 +394,7 @@ def addBasicMeta(ds, vars_df): def populateMeta(ds, conf, skip): '''Populate L0 Dataset with metadata dictionary - + Parameters ---------- ds : xarray.Dataset @@ -417,8 +402,8 @@ def populateMeta(ds, conf, skip): conf : dict Metadata dictionary skip : list - List of column names to skip parsing to metadata - + List of column names to skip parsing to metadata + Returns ------- ds : xarray.Dataset @@ -433,7 +418,7 @@ def populateMeta(ds, conf, skip): def writeCSV(outfile, Lx, csv_order): '''Write data product to CSV file - + Parameters ---------- outfile : str @@ -444,14 +429,14 @@ def writeCSV(outfile, Lx, csv_order): List order of variables ''' Lcsv = Lx.to_dataframe().dropna(how='all') - if csv_order is not None: + if csv_order is not None: names = [c for c in csv_order if c in list(Lcsv.columns)] Lcsv = Lcsv[names] Lcsv.to_csv(outfile) - + def writeNC(outfile, Lx, col_names=None): '''Write data product to NetCDF file - + Parameters ---------- outfile : str @@ -459,18 +444,18 @@ def writeNC(outfile, Lx, col_names=None): Lx : xr.Dataset Dataset to write to file ''' - if os.path.isfile(outfile): + if os.path.isfile(outfile): os.remove(outfile) - if col_names is not None: + if col_names is not None: names = [c for c in col_names if c in list(Lx.keys())] else: names = list(Lx.keys()) - Lx[names].to_netcdf(outfile, mode='w', format='NETCDF4', compute=True) - + Lx[names].to_netcdf(outfile, mode='w', format='NETCDF4', compute=True) + def writeAll(outpath, station_id, l3_h, l3_d, l3_m, csv_order=None): - '''Write L3 hourly, daily and monthly datasets to .nc and .csv + '''Write L3 hourly, daily and monthly datasets to .nc and .csv files - + outpath : str Output file path station_id : str @@ -491,105 +476,12 @@ def writeAll(outpath, station_id, l3_h, l3_d, l3_m, csv_order=None): outfile_m = os.path.join(outpath, station_id + '_month') for o,l in zip([outfile_h, outfile_d, outfile_m], [l3_h ,l3_d, l3_m]): writeCSV(o+'.csv',l, csv_order) - writeNC(o+'.nc',l) - -def mergeVars(ds_list, variables, cols=['lo','hi','OOL']): #TODO find way to make this one line as described - '''Merge dataset by variable attributes from lookup table file. This could - be as simple as: - ds = xr.open_mfdataset(infile_list, combine='by_coords', mask_and_scale=False).load() - Except that some files have overlapping times. - - Parameters - ---------- - ds_list : list - List of xarray.Dataset objects - varaibles : pandas.DataFrame - Variable look up table - cols : str, optional - Variable column names to merge by. The default is ['lo','hi','OOL']. - - Returns - ------- - ds : xarray.Dataset - Dataset with merged attributes - ''' - # Combine Dataset objects - ds = ds_list[0] - if len(ds_list) > 1: - for d in ds_list[1:]: - ds = ds.combine_first(d) - - # Get variables - df = variables[cols] - df = df.dropna(how='all') - - # Remove outliers - ds = clipValues(ds, df, cols) - - # Clean up metadata - for k in ['format', 'hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef', - 'dlr_eng_coef', 'ulr_eng_coef', 'pt_z_coef', 'pt_z_p_coef', - 'pt_z_factor', 'pt_antifreeze', 'boom_azimuth', 'nodata', - 'conf', 'file']: - if k in ds.attrs.keys(): - ds.attrs.pop(k) - return ds + writeNC(o+'.nc',l) -def clipValues(ds, df, cols=['lo','hi','OOL']): - '''Clip values in dataset to defined "hi" and "lo" variables from dataframe. - There is a special treatment here for rh_u and rh_l variables, where values - are clipped and not assigned to NaN. This is for replication purposes - - Parameters - ---------- - ds : xarray.Dataset - Dataset to clip hi-lo range to - df : pandas.DataFrame - Dataframe to retrieve attribute hi-lo values from - - Returns - ------- - ds : xarray.Dataset - Dataset with clipped data - ''' - df = df[cols] - df = df.dropna(how='all') - lo = cols[0] - hi = cols[1] - ool = cols[2] - for var in df.index: - if var not in list(ds.variables): - continue - - if var in ['rh_u_cor', 'rh_l_cor']: - ds[var] = ds[var].where(ds[var] >= df.loc[var, lo], other = 0) - ds[var] = ds[var].where(ds[var] <= df.loc[var, hi], other = 100) - - # Mask out invalid corrections based on uncorrected var - var_uncor=var.split('_cor')[0] - ds[var] = ds[var].where(~np.isnan(ds[var_uncor]), other=np.nan) - - else: - if ~np.isnan(df.loc[var, lo]): - ds[var] = ds[var].where(ds[var] >= df.loc[var, lo]) - if ~np.isnan(df.loc[var, hi]): - ds[var] = ds[var].where(ds[var] <= df.loc[var, hi]) - - other_vars = df.loc[var][ool] - if isinstance(other_vars, str) and ~ds[var].isnull().all(): - for o in other_vars.split(): - if o not in list(ds.variables): - continue - else: - if ~np.isnan(df.loc[var, lo]): - ds[o] = ds[o].where(ds[var] >= df.loc[var, lo]) - if ~np.isnan(df.loc[var, hi]): - ds[o] = ds[o].where(ds[var] <= df.loc[var, hi]) - return ds -def popCols(ds, names): +def popCols(ds, names): '''Populate dataset with all given variable names - + Parammeters ----------- ds : xr.Dataset @@ -599,11 +491,11 @@ def popCols(ds, names): ''' for v in names: if v not in list(ds.variables): - ds[v] = (('time'), np.arange(ds['time'].size)*np.nan) + ds[v] = (('time'), np.arange(ds['time'].size)*np.nan) return ds def getColNames(vars_df, booms=None, data_type=None, bedrock=False): - '''Get all variable names for a given data type, based on a variables + '''Get all variable names for a given data type, based on a variables look-up table Parameters @@ -611,10 +503,10 @@ def getColNames(vars_df, booms=None, data_type=None, bedrock=False): vars_df : pd.DataFrame Variables look-up table booms : int, optional - Number of booms. If this parameter is empty then all variables + Number of booms. If this parameter is empty then all variables regardless of boom type will be passed. The default is None. data_type : str, optional - Data type, "tx", "STM" or "raw". If this parameter is empty then all + Data type, "tx", "STM" or "raw". If this parameter is empty then all variables regardless of data type will be passed. The default is None. Returns @@ -626,12 +518,12 @@ def getColNames(vars_df, booms=None, data_type=None, bedrock=False): vars_df = vars_df.loc[vars_df['station_type'].isin(['one-boom','all'])] elif booms==2: vars_df = vars_df.loc[vars_df['station_type'].isin(['two-boom','all'])] - + if data_type=='TX': vars_df = vars_df.loc[vars_df['data_type'].isin(['TX','all'])] elif data_type=='STM' or data_type=='raw': vars_df = vars_df.loc[vars_df['data_type'].isin(['raw','all'])] - + col_names = list(vars_df.index) if isinstance(bedrock, str): bedrock = (bedrock.lower() == 'true') @@ -645,9 +537,9 @@ def getColNames(vars_df, booms=None, data_type=None, bedrock=False): return col_names def roundValues(ds, df, col='max_decimals'): - '''Round all variable values in data array based on pre-defined rounding + '''Round all variable values in data array based on pre-defined rounding value in variables look-up table DataFrame - + Parameters ---------- ds : xr.Dataset @@ -655,28 +547,28 @@ def roundValues(ds, df, col='max_decimals'): df : pd.Dataframe Variable look-up table with rounding values col : str - Column in variable look-up table that contains rounding values. The + Column in variable look-up table that contains rounding values. The default is "max_decimals" ''' df = df[col] df = df.dropna(how='all') for var in df.index: - if var not in list(ds.variables): + if var not in list(ds.variables): continue if df[var] is not np.nan: ds[var] = ds[var].round(decimals=int(df[var])) return ds - + def addVars(ds, variables): '''Add variable attributes from file to dataset - + Parameters ---------- ds : xarray.Dataset Dataset to add variable attributes to variables : pandas.DataFrame Variables lookup table file - + Returns ------- ds : xarray.Dataset @@ -688,19 +580,19 @@ def addVars(ds, variables): ds[k].attrs['long_name'] = variables.loc[k]['long_name'] ds[k].attrs['units'] = variables.loc[k]['units'] ds[k].attrs['coverage_content_type'] = variables.loc[k]['coverage_content_type'] - ds[k].attrs['coordinates'] = variables.loc[k]['coordinates'] + ds[k].attrs['coordinates'] = variables.loc[k]['coordinates'] return ds def addMeta(ds, meta): '''Add metadata attributes from file to dataset - + Parameters ---------- ds : xarray.Dataset Dataset to add metadata attributes to - meta : str + meta : dict Metadata file - + Returns ------- ds : xarray.Dataset @@ -708,15 +600,15 @@ def addMeta(ds, meta): ''' ds['lon'] = ds['gps_lon'].mean() ds['lon'].attrs = ds['gps_lon'].attrs - + ds['lat'] = ds['gps_lat'].mean() ds['lat'].attrs = ds['gps_lat'].attrs - + ds['alt'] = ds['gps_alt'].mean() ds['alt'].attrs = ds['gps_alt'].attrs # for k in ds.keys(): # for each var - # if 'units' in ds[k].attrs: + # if 'units' in ds[k].attrs: # if ds[k].attrs['units'] == 'C': # ds[k].attrs['units'] = 'degrees_C' @@ -726,8 +618,8 @@ def addMeta(ds, meta): ds.attrs['date_created'] = str(datetime.datetime.now().isoformat()) ds.attrs['date_modified'] = ds.attrs['date_created'] ds.attrs['date_issued'] = ds.attrs['date_created'] - ds.attrs['date_metadata_modified'] = ds.attrs['date_created'] - + ds.attrs['date_metadata_modified'] = ds.attrs['date_created'] + ds.attrs['geospatial_bounds'] = "POLYGON((" + \ f"{ds['lat'].min().values} {ds['lon'].min().values}, " + \ f"{ds['lat'].min().values} {ds['lon'].max().values}, " + \ @@ -744,38 +636,38 @@ def addMeta(ds, meta): ds.attrs['geospatial_vertical_positive'] = 'up' ds.attrs['time_coverage_start'] = str(ds['time'][0].values) ds.attrs['time_coverage_end'] = str(ds['time'][-1].values) - + try: ds.attrs['source']= 'pypromice v' + str(metadata.version('pypromice')) except: ds.attrs['source'] = 'pypromice' - + # https://www.digi.com/resources/documentation/digidocs/90001437-13/reference/r_iso_8601_duration_format.htm try: ds.attrs['time_coverage_duration'] = str(pd.Timedelta((ds['time'][-1] - ds['time'][0]).values).isoformat()) ds.attrs['time_coverage_resolution'] = str(pd.Timedelta((ds['time'][1] - ds['time'][0]).values).isoformat()) except: ds.attrs['time_coverage_duration'] = str(pd.Timedelta(0).isoformat()) - ds.attrs['time_coverage_resolution'] = str(pd.Timedelta(0).isoformat()) + ds.attrs['time_coverage_resolution'] = str(pd.Timedelta(0).isoformat()) # Note: int64 dtype (long int) is incompatible with OPeNDAP access via THREDDS for NetCDF files # See https://stackoverflow.com/questions/48895227/output-int32-time-dimension-in-netcdf-using-xarray ds.time.encoding["dtype"] = "i4" # 32-bit signed integer #ds.time.encoding["calendar"] = 'proleptic_gregorian' # this is default - - # Load metadata attributes and add to Dataset + + # Load metadata attributes and add to Dataset [_addAttr(ds, key, value) for key,value in meta.items()] - + # Check attribute formating for k,v in ds.attrs.items(): if not isinstance(v, str) or not isinstance(v, int): - ds.attrs[k]=str(v) + ds.attrs[k]=str(v) return ds def getVars(v_file=None): '''Load variables.csv file - + Parameters ---------- v_file : str @@ -792,9 +684,10 @@ def getVars(v_file=None): else: return pd.read_csv(v_file, index_col=0, comment="#") + def getMeta(m_file=None, delimiter=','): #TODO change to DataFrame output to match variables.csv '''Load metadata table - + Parameters ---------- m_file : str @@ -811,33 +704,33 @@ def getMeta(m_file=None, delimiter=','): if m_file is None: with pkg_resources.resource_stream('pypromice', 'process/metadata.csv') as stream: lines = stream.read().decode("utf-8") - lines = lines.split("\n") - else: + lines = lines.split("\n") + else: with open(m_file, 'r') as f: lines = f.readlines() for l in lines[1:]: try: - meta[l.split(',')[0]] = l.split(delimiter)[1].split('\n')[0].replace(';',',') + meta[l.split(',')[0]] = l.split(delimiter)[1].split('\n')[0].replace(';',',') except IndexError: pass return meta - + def resampleL3(ds_h, t): - '''Resample L3 AWS data, e.g. hourly to daily average. This uses pandas + '''Resample L3 AWS data, e.g. hourly to daily average. This uses pandas DataFrame resampling at the moment as a work-around to the xarray Dataset resampling. As stated, xarray resampling is a lengthy process that takes ~2-3 minutes per operation: ds_d = ds_h.resample({'time':"1D"}).mean() This has now been fixed, so needs implementing: https://github.com/pydata/xarray/issues/4498#event-6610799698 - + Parameters ---------- ds_h : xarray.Dataset L3 AWS daily dataset t : str - Resample factor, same variable definition as in + Resample factor, same variable definition as in pandas.DataFrame.resample() - + Returns ------- ds_d : xarray.Dataset @@ -851,37 +744,37 @@ def resampleL3(ds_h, t): df_d[var] = _calcWindDir(df_d['wspd_x_'+var.split('_')[1]], df_d['wspd_y_'+var.split('_')[1]]) else: - print(var,'in dataframe but not','wspd_x_'+var.split('_')[1],'wspd_x_'+var.split('_')[1]) - vals = [xr.DataArray(data=df_d[c], dims=['time'], + logger.info(var,'in dataframe but not','wspd_x_'+var.split('_')[1],'wspd_x_'+var.split('_')[1]) + vals = [xr.DataArray(data=df_d[c], dims=['time'], coords={'time':df_d.index}, attrs=ds_h[c].attrs) for c in df_d.columns] - ds_d = xr.Dataset(dict(zip(df_d.columns,vals)), attrs=ds_h.attrs) + ds_d = xr.Dataset(dict(zip(df_d.columns,vals)), attrs=ds_h.attrs) return ds_d def _calcWindDir(wspd_x, wspd_y): '''Calculate wind direction in degrees - + Parameters ---------- wspd_x : xarray.DataArray Wind speed in X direction wspd_y : xarray.DataArray Wind speed in Y direction - + Returns ------- wdir : xarray.DataArray Wind direction''' deg2rad = np.pi / 180 - rad2deg = 1 / deg2rad - wdir = np.arctan2(wspd_x, wspd_y) * rad2deg - wdir = (wdir + 360) % 360 + rad2deg = 1 / deg2rad + wdir = np.arctan2(wspd_x, wspd_y) * rad2deg + wdir = (wdir + 360) % 360 return wdir def _addAttr(ds, key, value): '''Add attribute to xarray dataset - + ds : xr.Dataset Dataset to add attribute to key : str @@ -893,14 +786,14 @@ def _addAttr(ds, key, value): ds[key.split('.')[0]].attrs[key.split('.')[1]] = str(value) except: pass - # print(f'Unable to add metadata to {key.split(".")[0]}') + # logger.info(f'Unable to add metadata to {key.split(".")[0]}') else: - ds.attrs[key] = value + ds.attrs[key] = value #------------------------------------------------------------------------------ -class TestProcess(unittest.TestCase): +class TestProcess(unittest.TestCase): def testgetVars(self): '''Test variable table lookup retrieval''' @@ -908,24 +801,24 @@ def testgetVars(self): self.assertIsInstance(v, pd.DataFrame) self.assertTrue(v.columns[0] in 'standard_name') self.assertTrue(v.columns[2] in 'units') - - def testgetMeta(self): + + def testgetMeta(self): '''Test AWS names retrieval''' m = getMeta() self.assertIsInstance(m, dict) self.assertTrue('references' in m) - + def testAddAll(self): '''Test variable and metadata attributes added to Dataset''' d = xr.Dataset() - v = getVars() + v = getVars() att = list(v.index) att1 = ['gps_lon', 'gps_lat', 'gps_alt', 'albedo', 'p'] for a in att: d[a]=[0,1] for a in att1: d[a]=[0,1] - d['time'] = [datetime.datetime.now(), + d['time'] = [datetime.datetime.now(), datetime.datetime.now()-timedelta(days=365)] d.attrs['station_id']='TEST' meta = getMeta() @@ -941,7 +834,7 @@ def testL0toL3(self): pAWS = AWS(os.path.join(os.path.dirname(pypromice.__file__),'test/test_config1.toml'), os.path.join(os.path.dirname(pypromice.__file__),'test')) except: - pAWS = AWS('../test/test_config1.toml', '../test/') + pAWS = AWS('../test/test_config1.toml', '../test/') pAWS.process() self.assertIsInstance(pAWS.L3, xr.Dataset) self.assertTrue(pAWS.L3.attrs['station_id']=='TEST1') diff --git a/src/pypromice/process/value_clipping.py b/src/pypromice/process/value_clipping.py new file mode 100644 index 00000000..3511617c --- /dev/null +++ b/src/pypromice/process/value_clipping.py @@ -0,0 +1,59 @@ +import numpy as np +import pandas +import xarray + + +def clip_values( + ds: xarray.Dataset, + var_configurations: pandas.DataFrame, +): + """ + Clip values in dataset to defined "hi" and "lo" variables from dataframe. + There is a special treatment here for rh_u and rh_l variables, where values + are clipped and not assigned to NaN. This is for replication purposes + + Parameters + ---------- + ds : `xarray.Dataset` + Dataset to clip hi-lo range to + var_configurations : `pandas.DataFrame` + Dataframe to retrieve attribute hi-lo values from + + Returns + ------- + ds : `xarray.Dataset` + Dataset with clipped data + """ + cols = ["lo", "hi", "OOL"] + assert set(cols) <= set(var_configurations.columns) + + variable_limits = var_configurations[cols].dropna(how="all") + for var, row in variable_limits.iterrows(): + if var not in list(ds.variables): + continue + + if var in ["rh_u_cor", "rh_l_cor"]: + ds[var] = ds[var].where(ds[var] >= row.lo, other=0) + ds[var] = ds[var].where(ds[var] <= row.hi, other=100) + + # Mask out invalid corrections based on uncorrected var + var_uncor = var.split("_cor")[0] + ds[var] = ds[var].where(~np.isnan(ds[var_uncor]), other=np.nan) + + else: + if ~np.isnan(row.lo): + ds[var] = ds[var].where(ds[var] >= row.lo) + if ~np.isnan(row.hi): + ds[var] = ds[var].where(ds[var] <= row.hi) + + other_vars = row.OOL + if isinstance(other_vars, str) and ~ds[var].isnull().all(): + for o in other_vars.split(): + if o not in list(ds.variables): + continue + else: + if ~np.isnan(row.lo): + ds[var] = ds[var].where(ds[var] >= row.lo) + if ~np.isnan(row.hi): + ds[var] = ds[var].where(ds[var] <= row.hi) + return ds