Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Merge pull request #212 from lincc-frameworks/tape_ensemble_refactor_…
Browse files Browse the repository at this point in the history
…working

Make convert_flux_to_mag part of the EnsembleFrame
  • Loading branch information
wilsonbb authored Aug 29, 2023
2 parents d033607 + 9a61392 commit 17f9cc1
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 123 deletions.
58 changes: 1 addition & 57 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,63 +1094,7 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwa
self._source_dirty = False
self._object_dirty = False
return self

def convert_flux_to_mag(self, flux_col, zero_point, err_col=None, zp_form="mag", out_col_name=None):
"""Converts a flux column into a magnitude column.
Parameters
----------
flux_col: 'str'
The name of the ensemble flux column to convert into magnitudes.
zero_point: 'str'
The name of the ensemble column containing the zero point
information for column transformation.
err_col: 'str', optional
The name of the ensemble column containing the errors to propagate.
Errors are propagated using the following approximation:
Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the
error in flux is much smaller than the flux.
zp_form: `str`, optional
The form of the zero point column, either "flux" or
"magnitude"/"mag". Determines how the zero point (zp) is applied in
the conversion. If "flux", then the function is applied as
mag=-2.5*log10(flux/zp), or if "magnitude", then
mag=-2.5*log10(flux)+zp.
out_col_name: 'str', optional
The name of the output magnitude column, if None then the output
is just the flux column name + "_mag". The error column is also
generated as the out_col_name + "_err".
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with a new magnitude (and error) column.
"""
if out_col_name is None:
out_col_name = flux_col + "_mag"

if zp_form == "flux": # mag = -2.5*np.log10(flux/zp)
self._source = self._source.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])}
)

elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp
self._source = self._source.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]}
)

else:
raise ValueError(f"{zp_form} is not a valid zero_point format.")

# Calculate Errors
if err_col is not None:
self._source = self._source.assign(
**{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])}
)

return self


def _generate_object_table(self):
"""Generate the object table from the source table."""
counts = self._source.groupby([self._id_col, self._band_col])[self._time_col].aggregate("count")
Expand Down
63 changes: 63 additions & 0 deletions src/tape/ensemble_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dask.dataframe.core import get_parallel_type
from dask.dataframe.extensions import make_array_nonempty

import numpy as np
import pandas as pd

class _Frame(dd.core._Frame):
Expand Down Expand Up @@ -126,6 +127,68 @@ def from_tapeframe(
result.label = label
result.ensemble = ensemble
return result

def convert_flux_to_mag(self,
flux_col,
zero_point,
err_col=None,
zp_form="mag",
out_col_name=None,
):
"""Converts this EnsembleFrame's flux column into a magnitude column, returning a new
EnsembleFrame.
Parameters
----------
flux_col: 'str'
The name of the EnsembleFrame flux column to convert into magnitudes.
zero_point: 'str'
The name of the EnsembleFrame column containing the zero point
information for column transformation.
err_col: 'str', optional
The name of the EnsembleFrame column containing the errors to propagate.
Errors are propagated using the following approximation:
Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the
error in flux is much smaller than the flux.
zp_form: `str`, optional
The form of the zero point column, either "flux" or
"magnitude"/"mag". Determines how the zero point (zp) is applied in
the conversion. If "flux", then the function is applied as
mag=-2.5*log10(flux/zp), or if "magnitude", then
mag=-2.5*log10(flux)+zp.
out_col_name: 'str', optional
The name of the output magnitude column, if None then the output
is just the flux column name + "_mag". The error column is also
generated as the out_col_name + "_err".
Returns
----------
result: `tape.EnsembleFrame`
A new EnsembleFrame object with a new magnitude (and error) column.
"""
if out_col_name is None:
out_col_name = flux_col + "_mag"

result = None
if zp_form == "flux": # mag = -2.5*np.log10(flux/zp)
result = self.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])}
)

elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp
result = self.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]}
)
else:
raise ValueError(f"{zp_form} is not a valid zero_point format.")

# Calculate Errors
if err_col is not None:
result = result.assign(
**{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])}
)

return result

"""
Dask Dataframes are constructed indirectly using method dispatching and inference on the
underlying data. So to ensure our subclasses behave correctly, we register the methods
Expand Down
17 changes: 10 additions & 7 deletions tests/tape_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,18 @@ def ensemble_from_source_dict(dask_client):
ens = Ensemble(client=dask_client)

# Create some fake data with two IDs (8001, 8002), two bands ["g", "b"]
# a few time steps, and flux.
# a few time steps, flux, and data for zero point calculations.
source_dict = {
"id": [8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002, 8002],
"time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1],
"band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"],
"err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0],
"flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0],
"id": [8001, 8001, 8002, 8002, 8002],
"time": [1, 2, 3, 4, 5],
"flux": [30.5, 70, 80.6, 30.2, 60.3],
"zp_mag": [25.0, 25.0, 25.0, 25.0, 25.0],
"zp_flux": [10**10, 10**10, 10**10, 10**10, 10**10],
"error": [10, 10, 10, 10, 10],
"band": ["g", "g", "b", "b", "b"],
}
cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band")
# map flux_col to one of the flux columns at the start
cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band")
ens.from_source_dict(source_dict, column_mapper=cmap)

return ens, source_dict
55 changes: 0 additions & 55 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,61 +706,6 @@ def test_coalesce(dask_client, drop_inputs):
for col in ["flux1", "flux2", "flux3"]:
assert col in ens._source.columns


@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"])
@pytest.mark.parametrize("err_col", [None, "error"])
@pytest.mark.parametrize("out_col_name", [None, "mag"])
def test_convert_flux_to_mag(dask_client, zp_form, err_col, out_col_name):
ens = Ensemble(client=dask_client)

source_dict = {
"id": [0, 0, 0, 0, 0],
"time": [1, 2, 3, 4, 5],
"flux": [30.5, 70, 80.6, 30.2, 60.3],
"zp_mag": [25.0, 25.0, 25.0, 25.0, 25.0],
"zp_flux": [10**10, 10**10, 10**10, 10**10, 10**10],
"error": [10, 10, 10, 10, 10],
"band": ["g", "g", "g", "g", "g"],
}

if out_col_name is None:
output_column = "flux_mag"
else:
output_column = out_col_name

# map flux_col to one of the flux columns at the start
col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band")
ens.from_source_dict(source_dict, column_mapper=col_map)

if zp_form == "flux":
ens.convert_flux_to_mag("flux", "zp_flux", err_col, zp_form, out_col_name)

res_mag = ens._source.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens._source.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens._source.columns

elif zp_form == "mag" or zp_form == "magnitude":
ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, out_col_name)

res_mag = ens._source.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens._source.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens._source.columns

else:
with pytest.raises(ValueError):
ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, "mag")


def test_find_day_gap_offset(dask_client):
ens = Ensemble(client=dask_client)

Expand Down
60 changes: 56 additions & 4 deletions tests/tape_tests/test_ensemble_frame.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" Test EnsembleFrame (inherited from Dask.DataFrame) creation and manipulations. """
import pandas as pd
from tape import Ensemble, EnsembleFrame, TapeFrame
from tape import ColumnMapper, EnsembleFrame, TapeFrame

import pytest

Expand All @@ -26,7 +26,7 @@ def test_from_dict(data_fixture, request):

# The calculation for finding the max flux from the data. Note that the
# inherited dask compute method must be called to obtain the result.
assert ens_frame.flux.max().compute() == 5.0
assert ens_frame.flux.max().compute() == 80.6

@pytest.mark.parametrize(
"data_fixture",
Expand All @@ -52,7 +52,7 @@ def test_from_pandas(data_fixture, request):

# The calculation for finding the max flux from the data. Note that the
# inherited dask compute method must be called to obtain the result.
assert ens_frame.flux.max().compute() == 5.0
assert ens_frame.flux.max().compute() == 80.6


@pytest.mark.parametrize(
Expand Down Expand Up @@ -102,4 +102,56 @@ def test_frame_propagation(data_fixture, request):
# Test that the inherited dask.DataFrame.compute method returns
# the underlying TapeFrame.
assert isinstance(ens_frame.compute(), TapeFrame)
assert len(ens_frame) == len(ens_frame.compute())
assert len(ens_frame) == len(ens_frame.compute())

@pytest.mark.parametrize(
"data_fixture",
[
"ensemble_from_source_dict",
],
)
@pytest.mark.parametrize("err_col", [None, "error"])
@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"])
@pytest.mark.parametrize("out_col_name", [None, "mag"])
def test_convert_flux_to_mag(data_fixture, request, err_col, zp_form, out_col_name):
ens, data = request.getfixturevalue(data_fixture)

if out_col_name is None:
output_column = "flux_mag"
else:
output_column = out_col_name

ens_frame = EnsembleFrame.from_dict(data, npartitions=1)
ens_frame.label = TEST_LABEL
ens_frame.ensemble = ens

if zp_form == "flux":
ens_frame = ens_frame.convert_flux_to_mag("flux", "zp_flux", err_col, zp_form, out_col_name)

res_mag = ens_frame.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens_frame.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens_frame.columns

elif zp_form == "mag" or zp_form == "magnitude":
ens_frame = ens_frame.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, out_col_name)

res_mag = ens_frame.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens_frame.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens_frame.columns

else:
with pytest.raises(ValueError):
ens_frame.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, "mag")

# Verify that if we converted to a new frame, it's still an EnsembleFrame.
assert isinstance(ens_frame, EnsembleFrame)

0 comments on commit 17f9cc1

Please sign in to comment.