From 9f03fe990485f19ce199fbb0533d32dc68ec3de2 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 12 Jan 2024 11:21:34 -0800 Subject: [PATCH] move coalesce to ensembleframe API --- src/tape/ensemble.py | 89 ------------------------- src/tape/ensemble_frame.py | 77 +++++++++++++++++++++ tests/tape_tests/test_ensemble.py | 43 ------------ tests/tape_tests/test_ensemble_frame.py | 44 ++++++++++++ 4 files changed, 121 insertions(+), 132 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 90bf1a7c..44d963c7 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -650,95 +650,6 @@ def assign(self, table="object", temporary=False, **kwargs): raise ValueError(f"{table} is not one of 'object' or 'source'") return self - def coalesce(self, input_cols, output_col, table="object", drop_inputs=False): - """Combines multiple input columns into a single output column, with - values equal to the first non-nan value encountered in the input cols. - - Parameters - ---------- - input_cols: `list` - The list of column names to coalesce into a single column. - output_col: `str`, optional - The name of the coalesced output column. - table: `str`, optional - "source" or "object", the table in which the input columns are - located. - drop_inputs: `bool`, optional - Determines whether the input columns are dropped or preserved. If - a mapped column is an input and dropped, the output column is - automatically assigned to replace that column mapping internally. - - Returns - ------- - ensemble: `tape.ensemble.Ensemble` - An ensemble object. - - """ - # we shouldn't need to sync for this - if table == "object": - table_ddf = self.object - elif table == "source": - table_ddf = self.source - else: - raise ValueError(f"{table} is not one of 'object' or 'source'") - - def coalesce_partition(df, input_cols, output_col): - """Coalescing function for a single partition (pandas dataframe)""" - - # Create a subset dataframe per input column - # Rename column to output to allow combination - input_dfs = [] - for col in input_cols: - col_df = df[[col]] - input_dfs.append(col_df.rename(columns={col: output_col})) - - # Combine each dataframe - coal_df = input_dfs.pop() - while input_dfs: - coal_df = coal_df.combine_first(input_dfs.pop()) - - # Assign the output column to the partition dataframe - out_df = df.assign(**{output_col: coal_df[output_col]}) - - return out_df - - table_ddf = table_ddf.map_partitions(lambda x: coalesce_partition(x, input_cols, output_col)) - - # Drop the input columns if wanted - if drop_inputs: - # First check to see if any dropped columns were critical columns - current_map = self.make_column_map().map - cols_to_update = [key for key in current_map if current_map[key] in input_cols] - - # Theoretically a user could assign multiple critical columns in the input cols, this is very - # likely to be a mistake, so we throw a warning here to alert them. - if len(cols_to_update) > 1: - warnings.warn( - """Warning: Coalesce (with column dropping) is needing to update more than one - critical column mapping, please check that the resulting mapping is set as intended""" - ) - - # Update critical columns to the new output column as needed - if len(cols_to_update): # if not zero - new_map = current_map - for col in cols_to_update: - new_map[col] = output_col - - new_colmap = self.make_column_map() - new_colmap.map = new_map - - # Update the mapping - self.update_column_mapping(new_colmap) - - table_ddf = table_ddf.drop(columns=input_cols) - - if table == "object": - self.update_frame(table_ddf) - elif table == "source": - self.update_frame(table_ddf) - - return self - def calc_nobs(self, by_band=False, label="nobs", temporary=True): """Calculates the number of observations per lightcurve. diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index b270932e..352285ca 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -1,5 +1,7 @@ from collections.abc import Sequence +import warnings + import dask.dataframe as dd import dask @@ -843,6 +845,81 @@ def convert_flux_to_mag( return result + def coalesce(self, input_cols, output_col, drop_inputs=False): + """Combines multiple input columns into a single output column, with + values equal to the first non-nan value encountered in the input cols. + + Parameters + ---------- + input_cols: `list` + The list of column names to coalesce into a single column. + output_col: `str`, optional + The name of the coalesced output column. + drop_inputs: `bool`, optional + Determines whether the input columns are dropped or preserved. If + a mapped column is an input and dropped, the output column is + automatically assigned to replace that column mapping internally. + + Returns + ------- + ensemble: `tape.ensemble.Ensemble` + An ensemble object. + + """ + + def coalesce_partition(df, input_cols, output_col): + """Coalescing function for a single partition (pandas dataframe)""" + + # Create a subset dataframe per input column + # Rename column to output to allow combination + input_dfs = [] + for col in input_cols: + col_df = df[[col]] + input_dfs.append(col_df.rename(columns={col: output_col})) + + # Combine each dataframe + coal_df = input_dfs.pop() + while input_dfs: + coal_df = coal_df.combine_first(input_dfs.pop()) + + # Assign the output column to the partition dataframe + out_df = df.assign(**{output_col: coal_df[output_col]}) + + return out_df + + table_ddf = self.map_partitions(lambda x: coalesce_partition(x, input_cols, output_col)) + + # Drop the input columns if wanted + if drop_inputs: + if self.ensemble is not None: + # First check to see if any dropped columns were critical columns + current_map = self.ensemble.make_column_map().map + cols_to_update = [key for key in current_map if current_map[key] in input_cols] + + # Theoretically a user could assign multiple critical columns in the input cols, this is very + # likely to be a mistake, so we throw a warning here to alert them. + if len(cols_to_update) > 1: + warnings.warn( + """Warning: Coalesce (with column dropping) is needing to update more than one + critical column mapping, please check that the resulting mapping is set as intended""" + ) + + # Update critical columns to the new output column as needed + if len(cols_to_update): # if not zero + new_map = current_map + for col in cols_to_update: + new_map[col] = output_col + + new_colmap = self.ensemble.make_column_map() + new_colmap.map = new_map + + # Update the mapping + self.ensemble.update_column_mapping(new_colmap) + + table_ddf = table_ddf.drop(columns=input_cols) + + return table_ddf + @classmethod def from_parquet(cl, path, index=None, columns=None, label=None, ensemble=None, **kwargs): """Returns an EnsembleFrame constructed from loading a parquet file. diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index c2a8945c..6ee17f75 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -1537,49 +1537,6 @@ def test_assign(dask_client, legacy): assert new_source.iloc[i]["band2"] == new_source.iloc[i]["band"] + "2" -@pytest.mark.parametrize("drop_inputs", [True, False]) -def test_coalesce(dask_client, drop_inputs): - ens = Ensemble(client=dask_client) - - # Generate some data that needs to be coalesced - - source_dict = { - "id": [0, 0, 0, 0, 0], - "time": [1, 2, 3, 4, 5], - "flux1": [5, np.nan, np.nan, 10, np.nan], - "flux2": [np.nan, 3, np.nan, np.nan, 7], - "flux3": [np.nan, np.nan, 4, np.nan, np.nan], - "error": [1, 1, 1, 1, 1], - "band": ["g", "g", "g", "g", "g"], - } - - # map flux_col to one of the flux columns at the start - col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux1", err_col="error", band_col="band") - ens.from_source_dict(source_dict, column_mapper=col_map) - - ens.coalesce(["flux1", "flux2", "flux3"], "flux", table="source", drop_inputs=drop_inputs) - - # Coalesce should return this exact flux array - assert list(ens.source["flux"].values.compute()) == [5.0, 3.0, 4.0, 10.0, 7.0] - - if drop_inputs: - # The column mapping should be updated - assert ens.make_column_map().map["flux_col"] == "flux" - - # The columns to drop should be dropped - for col in ["flux1", "flux2", "flux3"]: - assert col not in ens.source.columns - - # Test for the drop warning - with pytest.warns(UserWarning): - ens.coalesce(["time", "flux"], "bad_col", table="source", drop_inputs=drop_inputs) - - else: - # The input columns should still be present - for col in ["flux1", "flux2", "flux3"]: - assert col in ens.source.columns - - @pytest.mark.parametrize("zero_point", [("zp_mag", "zp_flux"), (25.0, 10**10)]) @pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"]) @pytest.mark.parametrize("out_col_name", [None, "mag"]) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 1937f457..962f9b2c 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd from tape import ( + Ensemble, ColumnMapper, EnsembleFrame, ObjectFrame, @@ -358,3 +359,46 @@ def test_object_and_source_joins(parquet_ensemble): # Now the same form of join (in terms of left/right) but produce a SourceFrame. This is # because frame1.join(frame2) will yield frame1's type regardless of left vs right. assert type(source_frame.join(object_frame, how="right")) is SourceFrame + + +@pytest.mark.parametrize("drop_inputs", [True, False]) +def test_coalesce(dask_client, drop_inputs): + ens = Ensemble(client=dask_client) + + # Generate some data that needs to be coalesced + + source_dict = { + "id": [0, 0, 0, 0, 0], + "time": [1, 2, 3, 4, 5], + "flux1": [5, np.nan, np.nan, 10, np.nan], + "flux2": [np.nan, 3, np.nan, np.nan, 7], + "flux3": [np.nan, np.nan, 4, np.nan, np.nan], + "error": [1, 1, 1, 1, 1], + "band": ["g", "g", "g", "g", "g"], + } + + # map flux_col to one of the flux columns at the start + col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux1", err_col="error", band_col="band") + ens.from_source_dict(source_dict, column_mapper=col_map) + + ens.source.coalesce(["flux1", "flux2", "flux3"], "flux", drop_inputs=drop_inputs).update_ensemble() + + # Coalesce should return this exact flux array + assert list(ens.source["flux"].values.compute()) == [5.0, 3.0, 4.0, 10.0, 7.0] + + if drop_inputs: + # The column mapping should be updated + assert ens.make_column_map().map["flux_col"] == "flux" + + # The columns to drop should be dropped + for col in ["flux1", "flux2", "flux3"]: + assert col not in ens.source.columns + + # Test for the drop warning + with pytest.warns(UserWarning): + ens.source.coalesce(["time", "flux"], "bad_col", drop_inputs=drop_inputs) + + else: + # The input columns should still be present + for col in ["flux1", "flux2", "flux3"]: + assert col in ens.source.columns