From 9a613923af007a2c12f1dbed9b1ed40300382c90 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 29 Aug 2023 10:38:43 -0700 Subject: [PATCH] Make convert_flux_to_mag part of the EnsembleFrame --- src/tape/ensemble.py | 58 +---------------------- src/tape/ensemble_frame.py | 63 +++++++++++++++++++++++++ tests/tape_tests/conftest.py | 17 ++++--- tests/tape_tests/test_ensemble.py | 55 --------------------- tests/tape_tests/test_ensemble_frame.py | 60 +++++++++++++++++++++-- 5 files changed, 130 insertions(+), 123 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index f1693918..839d39a7 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1094,63 +1094,7 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwa self._source_dirty = False self._object_dirty = False return self - - def convert_flux_to_mag(self, flux_col, zero_point, err_col=None, zp_form="mag", out_col_name=None): - """Converts a flux column into a magnitude column. - - Parameters - ---------- - flux_col: 'str' - The name of the ensemble flux column to convert into magnitudes. - zero_point: 'str' - The name of the ensemble column containing the zero point - information for column transformation. - err_col: 'str', optional - The name of the ensemble column containing the errors to propagate. - Errors are propagated using the following approximation: - Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the - error in flux is much smaller than the flux. - zp_form: `str`, optional - The form of the zero point column, either "flux" or - "magnitude"/"mag". Determines how the zero point (zp) is applied in - the conversion. If "flux", then the function is applied as - mag=-2.5*log10(flux/zp), or if "magnitude", then - mag=-2.5*log10(flux)+zp. - out_col_name: 'str', optional - The name of the output magnitude column, if None then the output - is just the flux column name + "_mag". The error column is also - generated as the out_col_name + "_err". - - Returns - ---------- - ensemble: `tape.ensemble.Ensemble` - The ensemble object with a new magnitude (and error) column. - - """ - if out_col_name is None: - out_col_name = flux_col + "_mag" - - if zp_form == "flux": # mag = -2.5*np.log10(flux/zp) - self._source = self._source.assign( - **{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])} - ) - - elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp - self._source = self._source.assign( - **{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]} - ) - - else: - raise ValueError(f"{zp_form} is not a valid zero_point format.") - - # Calculate Errors - if err_col is not None: - self._source = self._source.assign( - **{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])} - ) - - return self - + def _generate_object_table(self): """Generate the object table from the source table.""" counts = self._source.groupby([self._id_col, self._band_col])[self._time_col].aggregate("count") diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 1894fe2a..70098c13 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -7,6 +7,7 @@ from dask.dataframe.core import get_parallel_type from dask.dataframe.extensions import make_array_nonempty +import numpy as np import pandas as pd class _Frame(dd.core._Frame): @@ -126,6 +127,68 @@ def from_tapeframe( result.label = label result.ensemble = ensemble return result + + def convert_flux_to_mag(self, + flux_col, + zero_point, + err_col=None, + zp_form="mag", + out_col_name=None, + ): + """Converts this EnsembleFrame's flux column into a magnitude column, returning a new + EnsembleFrame. + + Parameters + ---------- + flux_col: 'str' + The name of the EnsembleFrame flux column to convert into magnitudes. + zero_point: 'str' + The name of the EnsembleFrame column containing the zero point + information for column transformation. + err_col: 'str', optional + The name of the EnsembleFrame column containing the errors to propagate. + Errors are propagated using the following approximation: + Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the + error in flux is much smaller than the flux. + zp_form: `str`, optional + The form of the zero point column, either "flux" or + "magnitude"/"mag". Determines how the zero point (zp) is applied in + the conversion. If "flux", then the function is applied as + mag=-2.5*log10(flux/zp), or if "magnitude", then + mag=-2.5*log10(flux)+zp. + out_col_name: 'str', optional + The name of the output magnitude column, if None then the output + is just the flux column name + "_mag". The error column is also + generated as the out_col_name + "_err". + Returns + ---------- + result: `tape.EnsembleFrame` + A new EnsembleFrame object with a new magnitude (and error) column. + """ + if out_col_name is None: + out_col_name = flux_col + "_mag" + + result = None + if zp_form == "flux": # mag = -2.5*np.log10(flux/zp) + result = self.assign( + **{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])} + ) + + elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp + result = self.assign( + **{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]} + ) + else: + raise ValueError(f"{zp_form} is not a valid zero_point format.") + + # Calculate Errors + if err_col is not None: + result = result.assign( + **{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])} + ) + + return result + """ Dask Dataframes are constructed indirectly using method dispatching and inference on the underlying data. So to ensure our subclasses behave correctly, we register the methods diff --git a/tests/tape_tests/conftest.py b/tests/tape_tests/conftest.py index 51f02018..15174293 100644 --- a/tests/tape_tests/conftest.py +++ b/tests/tape_tests/conftest.py @@ -108,15 +108,18 @@ def ensemble_from_source_dict(dask_client): ens = Ensemble(client=dask_client) # Create some fake data with two IDs (8001, 8002), two bands ["g", "b"] - # a few time steps, and flux. + # a few time steps, flux, and data for zero point calculations. source_dict = { - "id": [8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002, 8002], - "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], - "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], - "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], - "flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0], + "id": [8001, 8001, 8002, 8002, 8002], + "time": [1, 2, 3, 4, 5], + "flux": [30.5, 70, 80.6, 30.2, 60.3], + "zp_mag": [25.0, 25.0, 25.0, 25.0, 25.0], + "zp_flux": [10**10, 10**10, 10**10, 10**10, 10**10], + "error": [10, 10, 10, 10, 10], + "band": ["g", "g", "b", "b", "b"], } - cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") + # map flux_col to one of the flux columns at the start + cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band") ens.from_source_dict(source_dict, column_mapper=cmap) return ens, source_dict \ No newline at end of file diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 41567e2f..49f92238 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -706,61 +706,6 @@ def test_coalesce(dask_client, drop_inputs): for col in ["flux1", "flux2", "flux3"]: assert col in ens._source.columns - -@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"]) -@pytest.mark.parametrize("err_col", [None, "error"]) -@pytest.mark.parametrize("out_col_name", [None, "mag"]) -def test_convert_flux_to_mag(dask_client, zp_form, err_col, out_col_name): - ens = Ensemble(client=dask_client) - - source_dict = { - "id": [0, 0, 0, 0, 0], - "time": [1, 2, 3, 4, 5], - "flux": [30.5, 70, 80.6, 30.2, 60.3], - "zp_mag": [25.0, 25.0, 25.0, 25.0, 25.0], - "zp_flux": [10**10, 10**10, 10**10, 10**10, 10**10], - "error": [10, 10, 10, 10, 10], - "band": ["g", "g", "g", "g", "g"], - } - - if out_col_name is None: - output_column = "flux_mag" - else: - output_column = out_col_name - - # map flux_col to one of the flux columns at the start - col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band") - ens.from_source_dict(source_dict, column_mapper=col_map) - - if zp_form == "flux": - ens.convert_flux_to_mag("flux", "zp_flux", err_col, zp_form, out_col_name) - - res_mag = ens._source.compute()[output_column].to_list()[0] - assert pytest.approx(res_mag, 0.001) == 21.28925 - - if err_col is not None: - res_err = ens._source.compute()[output_column + "_err"].to_list()[0] - assert pytest.approx(res_err, 0.001) == 0.355979 - else: - assert output_column + "_err" not in ens._source.columns - - elif zp_form == "mag" or zp_form == "magnitude": - ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, out_col_name) - - res_mag = ens._source.compute()[output_column].to_list()[0] - assert pytest.approx(res_mag, 0.001) == 21.28925 - - if err_col is not None: - res_err = ens._source.compute()[output_column + "_err"].to_list()[0] - assert pytest.approx(res_err, 0.001) == 0.355979 - else: - assert output_column + "_err" not in ens._source.columns - - else: - with pytest.raises(ValueError): - ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, "mag") - - def test_find_day_gap_offset(dask_client): ens = Ensemble(client=dask_client) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index ce82712e..a75d96bc 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -1,6 +1,6 @@ """ Test EnsembleFrame (inherited from Dask.DataFrame) creation and manipulations. """ import pandas as pd -from tape import Ensemble, EnsembleFrame, TapeFrame +from tape import ColumnMapper, EnsembleFrame, TapeFrame import pytest @@ -26,7 +26,7 @@ def test_from_dict(data_fixture, request): # The calculation for finding the max flux from the data. Note that the # inherited dask compute method must be called to obtain the result. - assert ens_frame.flux.max().compute() == 5.0 + assert ens_frame.flux.max().compute() == 80.6 @pytest.mark.parametrize( "data_fixture", @@ -52,7 +52,7 @@ def test_from_pandas(data_fixture, request): # The calculation for finding the max flux from the data. Note that the # inherited dask compute method must be called to obtain the result. - assert ens_frame.flux.max().compute() == 5.0 + assert ens_frame.flux.max().compute() == 80.6 @pytest.mark.parametrize( @@ -102,4 +102,56 @@ def test_frame_propagation(data_fixture, request): # Test that the inherited dask.DataFrame.compute method returns # the underlying TapeFrame. assert isinstance(ens_frame.compute(), TapeFrame) - assert len(ens_frame) == len(ens_frame.compute()) \ No newline at end of file + assert len(ens_frame) == len(ens_frame.compute()) + +@pytest.mark.parametrize( + "data_fixture", + [ + "ensemble_from_source_dict", + ], +) +@pytest.mark.parametrize("err_col", [None, "error"]) +@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"]) +@pytest.mark.parametrize("out_col_name", [None, "mag"]) +def test_convert_flux_to_mag(data_fixture, request, err_col, zp_form, out_col_name): + ens, data = request.getfixturevalue(data_fixture) + + if out_col_name is None: + output_column = "flux_mag" + else: + output_column = out_col_name + + ens_frame = EnsembleFrame.from_dict(data, npartitions=1) + ens_frame.label = TEST_LABEL + ens_frame.ensemble = ens + + if zp_form == "flux": + ens_frame = ens_frame.convert_flux_to_mag("flux", "zp_flux", err_col, zp_form, out_col_name) + + res_mag = ens_frame.compute()[output_column].to_list()[0] + assert pytest.approx(res_mag, 0.001) == 21.28925 + + if err_col is not None: + res_err = ens_frame.compute()[output_column + "_err"].to_list()[0] + assert pytest.approx(res_err, 0.001) == 0.355979 + else: + assert output_column + "_err" not in ens_frame.columns + + elif zp_form == "mag" or zp_form == "magnitude": + ens_frame = ens_frame.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, out_col_name) + + res_mag = ens_frame.compute()[output_column].to_list()[0] + assert pytest.approx(res_mag, 0.001) == 21.28925 + + if err_col is not None: + res_err = ens_frame.compute()[output_column + "_err"].to_list()[0] + assert pytest.approx(res_err, 0.001) == 0.355979 + else: + assert output_column + "_err" not in ens_frame.columns + + else: + with pytest.raises(ValueError): + ens_frame.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, "mag") + + # Verify that if we converted to a new frame, it's still an EnsembleFrame. + assert isinstance(ens_frame, EnsembleFrame) \ No newline at end of file