Skip to content

Commit

Permalink
move coalesce to ensembleframe API
Browse files Browse the repository at this point in the history
  • Loading branch information
dougbrn committed Jan 12, 2024
1 parent 3c20f13 commit 9f03fe9
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 132 deletions.
89 changes: 0 additions & 89 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,95 +650,6 @@ def assign(self, table="object", temporary=False, **kwargs):
raise ValueError(f"{table} is not one of 'object' or 'source'")
return self

def coalesce(self, input_cols, output_col, table="object", drop_inputs=False):
"""Combines multiple input columns into a single output column, with
values equal to the first non-nan value encountered in the input cols.
Parameters
----------
input_cols: `list`
The list of column names to coalesce into a single column.
output_col: `str`, optional
The name of the coalesced output column.
table: `str`, optional
"source" or "object", the table in which the input columns are
located.
drop_inputs: `bool`, optional
Determines whether the input columns are dropped or preserved. If
a mapped column is an input and dropped, the output column is
automatically assigned to replace that column mapping internally.
Returns
-------
ensemble: `tape.ensemble.Ensemble`
An ensemble object.
"""
# we shouldn't need to sync for this
if table == "object":
table_ddf = self.object
elif table == "source":
table_ddf = self.source
else:
raise ValueError(f"{table} is not one of 'object' or 'source'")

def coalesce_partition(df, input_cols, output_col):
"""Coalescing function for a single partition (pandas dataframe)"""

# Create a subset dataframe per input column
# Rename column to output to allow combination
input_dfs = []
for col in input_cols:
col_df = df[[col]]
input_dfs.append(col_df.rename(columns={col: output_col}))

# Combine each dataframe
coal_df = input_dfs.pop()
while input_dfs:
coal_df = coal_df.combine_first(input_dfs.pop())

# Assign the output column to the partition dataframe
out_df = df.assign(**{output_col: coal_df[output_col]})

return out_df

table_ddf = table_ddf.map_partitions(lambda x: coalesce_partition(x, input_cols, output_col))

# Drop the input columns if wanted
if drop_inputs:
# First check to see if any dropped columns were critical columns
current_map = self.make_column_map().map
cols_to_update = [key for key in current_map if current_map[key] in input_cols]

# Theoretically a user could assign multiple critical columns in the input cols, this is very
# likely to be a mistake, so we throw a warning here to alert them.
if len(cols_to_update) > 1:
warnings.warn(
"""Warning: Coalesce (with column dropping) is needing to update more than one
critical column mapping, please check that the resulting mapping is set as intended"""
)

# Update critical columns to the new output column as needed
if len(cols_to_update): # if not zero
new_map = current_map
for col in cols_to_update:
new_map[col] = output_col

new_colmap = self.make_column_map()
new_colmap.map = new_map

# Update the mapping
self.update_column_mapping(new_colmap)

table_ddf = table_ddf.drop(columns=input_cols)

if table == "object":
self.update_frame(table_ddf)
elif table == "source":
self.update_frame(table_ddf)

return self

def calc_nobs(self, by_band=False, label="nobs", temporary=True):
"""Calculates the number of observations per lightcurve.
Expand Down
77 changes: 77 additions & 0 deletions src/tape/ensemble_frame.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from collections.abc import Sequence

import warnings

import dask.dataframe as dd

import dask
Expand Down Expand Up @@ -843,6 +845,81 @@ def convert_flux_to_mag(

return result

def coalesce(self, input_cols, output_col, drop_inputs=False):
"""Combines multiple input columns into a single output column, with
values equal to the first non-nan value encountered in the input cols.
Parameters
----------
input_cols: `list`
The list of column names to coalesce into a single column.
output_col: `str`, optional
The name of the coalesced output column.
drop_inputs: `bool`, optional
Determines whether the input columns are dropped or preserved. If
a mapped column is an input and dropped, the output column is
automatically assigned to replace that column mapping internally.
Returns
-------
ensemble: `tape.ensemble.Ensemble`
An ensemble object.
"""

def coalesce_partition(df, input_cols, output_col):
"""Coalescing function for a single partition (pandas dataframe)"""

# Create a subset dataframe per input column
# Rename column to output to allow combination
input_dfs = []
for col in input_cols:
col_df = df[[col]]
input_dfs.append(col_df.rename(columns={col: output_col}))

# Combine each dataframe
coal_df = input_dfs.pop()
while input_dfs:
coal_df = coal_df.combine_first(input_dfs.pop())

# Assign the output column to the partition dataframe
out_df = df.assign(**{output_col: coal_df[output_col]})

return out_df

table_ddf = self.map_partitions(lambda x: coalesce_partition(x, input_cols, output_col))

# Drop the input columns if wanted
if drop_inputs:
if self.ensemble is not None:
# First check to see if any dropped columns were critical columns
current_map = self.ensemble.make_column_map().map
cols_to_update = [key for key in current_map if current_map[key] in input_cols]

# Theoretically a user could assign multiple critical columns in the input cols, this is very
# likely to be a mistake, so we throw a warning here to alert them.
if len(cols_to_update) > 1:
warnings.warn(
"""Warning: Coalesce (with column dropping) is needing to update more than one
critical column mapping, please check that the resulting mapping is set as intended"""
)

# Update critical columns to the new output column as needed
if len(cols_to_update): # if not zero
new_map = current_map
for col in cols_to_update:
new_map[col] = output_col

new_colmap = self.ensemble.make_column_map()
new_colmap.map = new_map

# Update the mapping
self.ensemble.update_column_mapping(new_colmap)

table_ddf = table_ddf.drop(columns=input_cols)

return table_ddf

@classmethod
def from_parquet(cl, path, index=None, columns=None, label=None, ensemble=None, **kwargs):
"""Returns an EnsembleFrame constructed from loading a parquet file.
Expand Down
43 changes: 0 additions & 43 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,49 +1537,6 @@ def test_assign(dask_client, legacy):
assert new_source.iloc[i]["band2"] == new_source.iloc[i]["band"] + "2"


@pytest.mark.parametrize("drop_inputs", [True, False])
def test_coalesce(dask_client, drop_inputs):
ens = Ensemble(client=dask_client)

# Generate some data that needs to be coalesced

source_dict = {
"id": [0, 0, 0, 0, 0],
"time": [1, 2, 3, 4, 5],
"flux1": [5, np.nan, np.nan, 10, np.nan],
"flux2": [np.nan, 3, np.nan, np.nan, 7],
"flux3": [np.nan, np.nan, 4, np.nan, np.nan],
"error": [1, 1, 1, 1, 1],
"band": ["g", "g", "g", "g", "g"],
}

# map flux_col to one of the flux columns at the start
col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux1", err_col="error", band_col="band")
ens.from_source_dict(source_dict, column_mapper=col_map)

ens.coalesce(["flux1", "flux2", "flux3"], "flux", table="source", drop_inputs=drop_inputs)

# Coalesce should return this exact flux array
assert list(ens.source["flux"].values.compute()) == [5.0, 3.0, 4.0, 10.0, 7.0]

if drop_inputs:
# The column mapping should be updated
assert ens.make_column_map().map["flux_col"] == "flux"

# The columns to drop should be dropped
for col in ["flux1", "flux2", "flux3"]:
assert col not in ens.source.columns

# Test for the drop warning
with pytest.warns(UserWarning):
ens.coalesce(["time", "flux"], "bad_col", table="source", drop_inputs=drop_inputs)

else:
# The input columns should still be present
for col in ["flux1", "flux2", "flux3"]:
assert col in ens.source.columns


@pytest.mark.parametrize("zero_point", [("zp_mag", "zp_flux"), (25.0, 10**10)])
@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"])
@pytest.mark.parametrize("out_col_name", [None, "mag"])
Expand Down
44 changes: 44 additions & 0 deletions tests/tape_tests/test_ensemble_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import numpy as np
import pandas as pd
from tape import (
Ensemble,
ColumnMapper,
EnsembleFrame,
ObjectFrame,
Expand Down Expand Up @@ -358,3 +359,46 @@ def test_object_and_source_joins(parquet_ensemble):
# Now the same form of join (in terms of left/right) but produce a SourceFrame. This is
# because frame1.join(frame2) will yield frame1's type regardless of left vs right.
assert type(source_frame.join(object_frame, how="right")) is SourceFrame


@pytest.mark.parametrize("drop_inputs", [True, False])
def test_coalesce(dask_client, drop_inputs):
ens = Ensemble(client=dask_client)

# Generate some data that needs to be coalesced

source_dict = {
"id": [0, 0, 0, 0, 0],
"time": [1, 2, 3, 4, 5],
"flux1": [5, np.nan, np.nan, 10, np.nan],
"flux2": [np.nan, 3, np.nan, np.nan, 7],
"flux3": [np.nan, np.nan, 4, np.nan, np.nan],
"error": [1, 1, 1, 1, 1],
"band": ["g", "g", "g", "g", "g"],
}

# map flux_col to one of the flux columns at the start
col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux1", err_col="error", band_col="band")
ens.from_source_dict(source_dict, column_mapper=col_map)

ens.source.coalesce(["flux1", "flux2", "flux3"], "flux", drop_inputs=drop_inputs).update_ensemble()

# Coalesce should return this exact flux array
assert list(ens.source["flux"].values.compute()) == [5.0, 3.0, 4.0, 10.0, 7.0]

if drop_inputs:
# The column mapping should be updated
assert ens.make_column_map().map["flux_col"] == "flux"

# The columns to drop should be dropped
for col in ["flux1", "flux2", "flux3"]:
assert col not in ens.source.columns

# Test for the drop warning
with pytest.warns(UserWarning):
ens.source.coalesce(["time", "flux"], "bad_col", drop_inputs=drop_inputs)

else:
# The input columns should still be present
for col in ["flux1", "flux2", "flux3"]:
assert col in ens.source.columns

0 comments on commit 9f03fe9

Please sign in to comment.