diff --git a/harmonize_wq/harmonize.py b/harmonize_wq/harmonize.py index d29819c..f8fbb39 100644 --- a/harmonize_wq/harmonize.py +++ b/harmonize_wq/harmonize.py @@ -648,12 +648,17 @@ def harmonize(df_in, char_val, units_out=None, errors='raise', # Note: just phosphorus right now # Total is TP (digested) from the whole water sample (vs total dissolved) # Dissolved is TDP (total) filtered water digested (vs undigested DIP) - if out_col == 'Phosphorus': - frac_dict = {'TP_Phosphorus': ['Total'], - 'TDP_Phosphorus': ['Dissolved'], - 'Other_Phosphorus': [''],} - # Make columns for Sample Fractions, loudly if unexpected (not in dict) + if out_col in ['Phosphorus', 'Nitrogen']: + # NOTE: only top level fractions, while TADA has lower for: + #'Chlorophyll a', 'Turbidity', 'Fecal Coliform', 'Escherichia coli' + if out_col=='Phosphorus': + frac_dict = {'TP_Phosphorus': ['Total'], + 'TDP_Phosphorus': ['Dissolved'], + 'Other_Phosphorus': [''],} + else: + frac_dict = 'TADA' frac_dict = wqp.fraction(frac_dict) # Run sample fraction on WQP + df_out = wqp.df diff --git a/harmonize_wq/tests/test_harmonize_WQP.py b/harmonize_wq/tests/test_harmonize_WQP.py index 66ae02a..ef8744e 100644 --- a/harmonize_wq/tests/test_harmonize_WQP.py +++ b/harmonize_wq/tests/test_harmonize_WQP.py @@ -666,7 +666,7 @@ def test_harmonize_nitrogen(): actual = harmonize.harmonize(NARROW_RESULTS3, 'Nitrogen') # Test that the dataframe has expected type, size, cols, and rows assert isinstance(actual, pandas.core.frame.DataFrame) # Test type - assert actual.size == 16482 # Test size + assert actual.size == 16728 # Test size assert 'Nitrogen' in actual.columns # Check for column assert len(actual['Nitrogen'].dropna()) == 182 # Number of results # Confirm orginal data was not altered @@ -708,7 +708,11 @@ def test_harmonize_nitrogen(): # TODO: add test case where 'g/kg' # TODO: add test case where 'cm3/g @STP' # TODO: add test case where 'cm3/g STP' - + + # check sample fraction, everything went to total mixed forms + assert len(actual['Nitrogen'].dropna()) == 182, "Fraction issue" + fract_col = 'TOTAL NITROGEN_ MIXED FORMS' + assert len(actual[fract_col].dropna()) == 182, "Fraction issue" #@pytest.mark.skip(reason="no change") def test_harmonize_conductivity(): @@ -1220,8 +1224,8 @@ def test_split_table(harmonized_tables): 'DetectionQuantitationLimitMeasure/MeasureValue', 'DetectionQuantitationLimitMeasure/MeasureUnitCode', 'ProviderName', 'QA_flag', 'Nitrogen', 'Speciation', - 'Conductivity', 'Activity_datetime', - 'Depth'] + 'TOTAL NITROGEN_ MIXED FORMS', 'Conductivity', + 'Activity_datetime', 'Depth'] assert list(actual_main.columns) == expected expected = ['ActivityStartDate', 'ActivityStartTime/Time', 'ActivityStartTime/TimeZoneCode', diff --git a/harmonize_wq/wq_data.py b/harmonize_wq/wq_data.py index ffba069..e500762 100644 --- a/harmonize_wq/wq_data.py +++ b/harmonize_wq/wq_data.py @@ -792,7 +792,7 @@ def replace_unit_by_dict(self, val_dict, mask=None): for item in val_dict.items(): self._replace_in_col(col, item[0], item[1], mask) - def fraction(self, frac_dict=None, suffix=None, + def fraction(self, frac_dict=None, catch_all=None, suffix=None, fract_col='ResultSampleFractionText'): """Create columns for sample fractions using frac_dict to set names. @@ -801,6 +801,8 @@ def fraction(self, frac_dict=None, suffix=None, frac_dict : dict, optional Dictionary where {fraction_name : new_col}. The default None starts with an empty dictionary. + catch_all : str, optional + Name for new field to map sample fractions not mapped by frac_dict suffix : str, optional String to add to the end of any new column name. The default None, uses out_col attribute. @@ -817,83 +819,85 @@ def fraction(self, frac_dict=None, suffix=None, -------- Not fully implemented with TADA table yet. """ + # Check for sample fraction column + harmonize.df_checks(self.df, [fract_col]) + c_mask = self.c_mask + + fracs = list(set(self.df[c_mask][fract_col])) # List of fracs in data + + if ' ' in fracs: + #TODO: new col instead of overwrite + # Replace bad sample fraction w/ nan + self.df = self._replace_in_col(fract_col, ' ', nan, c_mask) + fracs.remove(' ') + + df_out = self.df # Set var for easier referencing + char = list(set(df_out[self.c_mask]['CharacteristicName']))[0] + + + # Deal with lack of args if suffix is None: suffix = self.out_col + if catch_all is None: + catch_all = f'Other_{suffix}' - catch_all = f'Other_{suffix}' + # Set up dict for what sample fraction to what col if frac_dict is None: - frac_dict = {catch_all: ''} - else: - if catch_all not in frac_dict.keys(): - frac_dict[catch_all] = [''] + frac_dict = {} + elif frac_dict=='TADA': + # Get dictionary for updates from TADA (note keys are all caps) + tada = domains.harmonize_TADA_dict()[char.upper()] + frac_dict = {} + for key in tada: + # Add keys another level down + frac_dict[key] = list(tada[key]) + # Add their values + frac_dict[key] += [x for v in tada[key].values() for x in v] + #else: dict was already provided + if catch_all not in frac_dict.keys(): + frac_dict[catch_all] = ['', nan] + # Make sure catch_all exists if not isinstance(frac_dict[catch_all], list): frac_dict[catch_all] = [frac_dict[catch_all]] - # Get all domain values - #accepted_fracs = list(domains.get_domain_dict('ResultSampleFraction').keys()) - for key in domains.get_domain_dict('ResultSampleFraction').keys(): - # Check against expected fractions and add others to catch_all - if key not in [x for v in frac_dict.values() for x in v]: - frac_dict[catch_all] += [key] - # Flatten for some uses - samp_fract_set = sorted({x for v in frac_dict.values() for x in v}) - - # Check for sample fraction column - harmonize.df_checks(self.df, [fract_col]) - # Replace bad sample fraction w/ nan - self.df = self._replace_in_col(fract_col, ' ', nan, c_mask) - - df_out = self.df # Set var for easier referencing - - # Clean up sample fraction column based on charName - # Get char - c_dict = domains.out_col_lookup() - char = list(c_dict.keys())[list(c_dict.values()).index(self.out_col)] - # Get dictionary for updates from TADA - harmonize_dict = domains.harmonize_TADA_dict() - # TADA keys are all caps - harmonize_fract = harmonize_dict[char.upper()] - # Loop through dictionary making updates to sample fraction - for fract_set in harmonize_fract.values(): - for row in fract_set.items(): - fract_mask = df_out[c_mask][fract_col].isin(row[1]) # Mask by values - df_out[c_mask][fract_mask][fract_col] = row[0] # Update to key - # Compare df_out againt self.df to add QA flag if changed - cond_change = ~(df_out[fract_col] == self.df[fract_col]) - cond_na = df_out[fract_col].notna() - df_out[cond_change & cond_na] - # TODO: LEFT OFF ABOVE IS STILL EMPTY - - self.df = df_out - - # Make column for any unexpected Sample Fraction values, loudly - for s_f in set(df_out[c_mask][fract_col].dropna()): - if s_f not in samp_fract_set: - char = f"{s_f.replace(' ', '_')}_{suffix}" - frac_dict[char] = s_f - warn(f'Warning: "{char}" column for {s_f}, may be error') - # TODO: add QA_flag - # Test we didn't skip any SampleFraction - samp_fract_set = sorted({x for v in frac_dict.values() for x in v}) - for s_f in set(df_out[c_mask][fract_col].dropna()): - assert s_f in samp_fract_set, f'{s_f} check in {fract_col}' - # Create out columns for each sample fraction + # First cut to make the keys work as column names + for key in frac_dict: + frac_dict[key.replace(',', '_')] = frac_dict.pop(key) + for key in frac_dict: + if key == self.out_col: + #TODO: prevent it from over-writing any col + # If it is the same col name as the out_col add '_1' + frac_dict[key+'_1'] = frac_dict.pop(key) + + # Compare sample fractions against expected + init_fracs = [x for v in frac_dict.values() for x in v] + not_init = [frac for frac in fracs if frac not in init_fracs] + if len(not_init)>0: + # TODO: when to add QA_flag? + smp = f'{char} sample fractions not in frac_dict' + solution = f'expected domains, mapped to "{catch_all}"' + print(f'{len(not_init)} {smp}') + # Compare against domains + all_fracs = list(domains.get_domain_dict('ResultSampleFraction')) + add_fracs = [frac for frac in not_init if frac in all_fracs] + # Add new fractions to frac_dict mapped to catch_all + if len(add_fracs)>0: + print(f'{len(add_fracs)} {smp} found in {solution}') + frac_dict[catch_all] += add_fracs + bad_fracs = [frac for frac in not_init if frac not in all_fracs] + if len(bad_fracs)>0: + warn(f'{len(bad_fracs)} {smp} or {solution}') + frac_dict[catch_all] += bad_fracs + + # Loop through dictionary making updates based on sample fraction for frac in frac_dict.items(): - col = frac[0] # New column name - for smp_frac in frac[1]: - if smp_frac in set(df_out.loc[c_mask, fract_col].dropna()): - # New subset mask for sample frac - f_mask = c_mask & (df_out[fract_col]==smp_frac) - # Copy measure to new col (new col name from char_list) - df_out.loc[f_mask, col] = df_out.loc[f_mask, suffix] - elif smp_frac == '': - # Values where sample fraction missing go to catch all - if df_out.loc[c_mask, fract_col].isnull().values.any(): - # New subset mask - f_mask = c_mask & (df_out[fract_col].isnull()) - # Copy measure to new col - df_out.loc[f_mask, col] = df_out.loc[f_mask, suffix] + frac_mask = df_out[fract_col].isin(frac[1]) & c_mask + # Make sure they exist in the data + if any(frac_mask): + # add col and copy results over + df_out.loc[frac_mask, frac[0]] = df_out.loc[frac_mask, self.out_col] + self.df = df_out return frac_dict