Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/auto qc #180

Merged
merged 15 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ jobs:
shell: bash
run: |
cd $GITHUB_WORKSPACE/src/pypromice
python3 -m unittest -v process/aws.py get.py tx/tx.py
python3 -m unittest -v process/aws.py get.py tx/tx.py qc/static_qc_test.py
PennyHow marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ src/pypromice/postprocess/latest_timestamps.pkl

# Output positions from transmitted GPS
src/pypromice/postprocess/positions.csv

# sqlite db files
*.db
2 changes: 1 addition & 1 deletion src/pypromice/process/L0toL1.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def toL1(L0, vars_df, T_0=273.15, tilt_threshold=-100):
ds['z_boom_l'] = ds['z_boom_l'] * ((ds['t_l'] + T_0)/T_0)**0.5 # Adjust sonic ranger readings for sensitivity to air temperature

ds = clip_values(ds, vars_df)
for key in ['format', 'hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef',
for key in ['hygroclip_t_offset', 'dsr_eng_coef', 'usr_eng_coef',
'dlr_eng_coef', 'ulr_eng_coef', 'pt_z_coef', 'pt_z_p_coef',
'pt_z_factor', 'pt_antifreeze', 'boom_azimuth', 'nodata',
'conf', 'file']:
Expand Down
483 changes: 244 additions & 239 deletions src/pypromice/process/L1toL2.py

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions src/pypromice/process/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,12 @@ def writeArr(self, outpath):
if not os.path.isdir(outdir):
os.mkdir(outdir)

f = [l.attrs['format'] for l in self.L0]
if all(f):
col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'],
self.L0[0].attrs['format'],
self.L3.attrs['bedrock'])
else:
col_names = getColNames(self.vars, self.L3.attrs['number_of_booms'],
None,
self.L3.attrs['bedrock'])
col_names = getColNames(
self.vars,
self.L3.attrs['number_of_booms'],
self.L3.attrs['format'],
self.L3.attrs['bedrock'],
)

t = int(pd.Timedelta((self.L3['time'][1] - self.L3['time'][0]).values).total_seconds())
logger.info('Writing to files...')
Expand Down
Empty file added src/pypromice/qc/__init__.py
Empty file.
142 changes: 142 additions & 0 deletions src/pypromice/qc/static_qc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import logging

import numpy as np
import pandas as pd
import xarray as xr
from typing import Mapping, Optional, Union

__all__ = [
"apply_static_qc",
"find_static_regions",
"count_consecutive_static_values",
"count_consecutive_true",
]

logger = logging.getLogger(__name__)

DEFAULT_VARIABLE_THRESHOLDS = {
"t": {"max_diff": 0.0001, "period": 2},
"p": {"max_diff": 0.0001, "period": 2},
# Relative humidity can be very stable around 100%.
#"rh": {"max_diff": 0.0001, "period": 2},
}


def apply_static_qc(
ds: xr.Dataset,
variable_thresholds: Optional[Mapping] = None,
) -> xr.Dataset:
"""
Detect and filter data points that seems to be static within a certain period.

TODO: It could be nice to have a reference to the logger or description of the behaviour here.
The AWS logger program is know to return the last successfully read value if it fails reading from the sensor.

Parameters
----------
ds : xr.Dataset
Level 1 datset
variable_thresholds : Mapping
Define threshold dict to hold limit values, and the difference values.
Limit values indicate how much a variable has to change to the previous value
period is how many hours a value can stay the same without being set to NaN
* are used to calculate and define all limits, which are then applied to *_u, *_l and *_i

Returns
-------
ds_out : xr.Dataset
Level 1 dataset with difference outliers set to NaN
"""

# the differenceQC is not done on the Windspeed
# Optionally examine flagged data by setting make_plots to True
# This is best done by running aws.py directly and setting 'test_station'
# Plots will be shown before and after flag removal for each var

df = ds.to_dataframe() # Switch to pandas

if variable_thresholds is None:
variable_thresholds = DEFAULT_VARIABLE_THRESHOLDS

logger.debug(f"Running apply_static_qc using {variable_thresholds}")

for k in variable_thresholds.keys():
var_all = [
k + "_u",
k + "_l",
k + "_i",
] # apply to upper, lower boom, and instant
max_diff = variable_thresholds[k]["max_diff"] # loading static limit
period = variable_thresholds[k]["period"] # loading diff period

for v in var_all:
if v in df:
mask = find_static_regions(df[v], period, max_diff)
n_masked = mask.sum()
n_samples = len(mask)
logger.debug(
f"Applying static QC in {v}. Filtering {n_masked}/{n_samples} samples"
)
# setting outliers to NaN
df.loc[mask, v] = np.nan

# Back to xarray, and re-assign the original attrs
ds_out = df.to_xarray()
ds_out = ds_out.assign_attrs(ds.attrs) # Dataset attrs
for x in ds_out.data_vars: # variable-specific attrs
ds_out[x].attrs = ds[x].attrs

return ds_out


def find_static_regions(
data: pd.Series,
min_repeats: int,
max_diff: float,
) -> pd.Series:
"""
Algorithm that ensures values can stay the same within the outliers_mask
"""
consecutive_true_df = count_consecutive_static_values(data, max_diff)
static_regions = consecutive_true_df >= min_repeats
# Ignore entries which already nan in the input data
static_regions[data.isna()] = False
return static_regions


def count_consecutive_static_values(
data: pd.Series,
max_diff: float,
) -> pd.Series:
diff = data.ffill().diff().abs() # forward filling all NaNs!
mask: pd.Series = diff < max_diff
return count_consecutive_true(mask)


def count_consecutive_true(
series: Union[pd.Series, pd.DataFrame]
) -> Union[pd.Series, pd.DataFrame]:
"""
Convert boolean series to integer series where the values represent the number of connective true values.

Examples
--------
>>> count_consecutive_true(pd.Series([False, True, False, False, True, True, True, False, True]))
pd.Series([0, 1, 0, 0, 1, 2, 3, 0, 1])

Parameters
----------
series
Boolean pandas Series or DataFrame

Returns
-------
consecutive_true_count
Integer pandas Series or DataFrame with values representing the number of connective true values.

"""
# assert series.dtype == bool
cumsum = series.cumsum()
is_first = series.astype("int").diff() == 1
offset = (is_first * cumsum).replace(0, np.nan).fillna(method="ffill").fillna(0)
return ((cumsum - offset + 1) * series).astype("int")
146 changes: 146 additions & 0 deletions src/pypromice/qc/static_qc_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import unittest

import numpy as np
import numpy.testing
import pandas as pd

from pypromice.qc.static_qc import find_static_regions


class StaticQATestCase(unittest.TestCase):
def test_1_hour_static(self):
self._test_1_hour_repeat(10)

def test_1_hour_second_index(self):
self._test_1_hour_repeat(0)

def test_1_hour_last_index(self):
self._test_1_hour_repeat(-2)

def _test_1_hour_repeat(self, index: int):
self.assertTrue(index < -1 or index >= 0)
time_range = pd.date_range(
start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left"
)
input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range)))
input_series[index + 1] = input_series[index]
min_repeats = 1
expected_output = input_series.map(lambda _: False)
expected_output[index + 1] = True

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_no_static_period(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left"
)
input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range)))
min_repeats = 1
expected_output = input_series.map(lambda _: False)

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_static_period_longer_than_period_threshold(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left"
)
index_start = 23
index_end = 33
min_repeats = 4
expected_filter_start = 27
expected_filter_end = 33
input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range)))
input_series[index_start:index_end] = input_series[index_start]
expected_output = input_series.map(lambda _: False)
expected_output[expected_filter_start:expected_filter_end] = True

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_period_threshold_longer_than_static_period(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left"
)
index_start = 23
index_end = 27
min_repeats = 10
input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range)))
input_series[index_start:index_end] = input_series[index_start]
expected_output = input_series.map(lambda _: False)

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_static_period_at_the_end(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left"
)
index_start = 23
min_repeats = 4
expected_filter_start = 27
input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range)))
input_series[index_start:] = input_series[index_start]
expected_output = input_series.map(lambda _: False)
expected_output[expected_filter_start:] = True

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_dont_filter_nan_values(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left"
)
input_series = pd.Series(
index=time_range, data=np.zeros_like(time_range, dtype="float")
)
min_repeats = 4
input_series[:] = np.nan
input_series[9] = -11
input_series[10:12] = -10
input_series[15] = -9
# There are >=4 repeats if the nan values are forward filled. [10:15] == -10
# The output mask shouldn't filter nan values.
expected_output = input_series.map(lambda _: False)

static_mask = find_static_regions(
input_series, min_repeats=min_repeats, max_diff=0.001
)

pd.testing.assert_series_equal(expected_output, static_mask, check_names=False)

def test_series_with_nan_values_between_static_values(self):
time_range = pd.date_range(
start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left"
)
values = np.zeros_like(time_range, dtype="float")
values[:] = np.nan
values[9] = -11
values[16] = -11
values[17] = -9
series = pd.Series(index=time_range, data=values)
expected_mask = np.zeros_like(values, dtype="bool")
period = 4
# The value and index 16 is the same as 9 which is longer than period
# Note: The station region mask shall not filter nan values
expected_mask[16] = True

output_mask = find_static_regions(series, min_repeats=period, max_diff=0.01)

np.testing.assert_equal(expected_mask, output_mask)