Skip to content

Commit

Permalink
704 type marker back data (#143)
Browse files Browse the repository at this point in the history
* Creating backdata unit test

* Testing updating return flag when imputation_flag exists

- Updating unit test data
- intial fir flag now overwritten with bir ...

* frozen change pre-commit fix

* update imputation flags from back data

* updated filepath for input data

* refactored, fillna using forward or backward is depreciated, now using ffill or bfill

* replaced fillna(fillmethod) with either ffill or bfill

* removed final fill method to either ffill or bfill

* Dealing with pandas infer dtype warning

* Adding extra unit test cases

* question columns are correct for back data cases

* Corrected columns in unit test data

* update filtering to use defined bool

* Docstrings and tidying up old comments

---------

Co-authored-by: Wil Roberts <[email protected]>
  • Loading branch information
Jday7879 and robertswh authored Jan 10, 2025
1 parent 9f82e82 commit 77649b5
Show file tree
Hide file tree
Showing 23 changed files with 573 additions and 79 deletions.
24 changes: 21 additions & 3 deletions mbs_results/imputation/apply_imputation_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def create_and_merge_imputation_values(
# constructed has to come first to use the result for forward
# impute from constructed
imputation_config = {
# "backdata": {
# "intermediate_column": "backdata",
# "marker": "backdata",
# # doesn't actually apply a fill so can be forward or back
# "fill_column": target,
# "fill_method": "ffill",
# "link_column": cumulative_forward_link,
# },
"c": {
"intermediate_column": "constructed",
"marker": "c",
Expand Down Expand Up @@ -145,9 +153,19 @@ def create_impute(df, group, imputation_spec):
fill_column = imputation_spec["fill_column"]
fill_method = imputation_spec["fill_method"]
link_column = imputation_spec["link_column"]
df[column_name] = (
df.groupby(group)[fill_column].fillna(method=fill_method) * df[link_column]
)
imputation_spec["marker"]

if fill_method == "ffill":
df[column_name] = df.groupby(group)[fill_column].ffill() * df[link_column]
elif fill_method == "bfill":
df[column_name] = df.groupby(group)[fill_column].bfill() * df[link_column]

if "hold_period_0_values" in df.columns:
df.loc[df["hold_period_0_values"].notnull(), column_name] = df.loc[
df["hold_period_0_values"].notnull(), "hold_period_0_values"
]
df.drop(columns="hold_period_0_values", inplace=True)

return df


Expand Down
109 changes: 79 additions & 30 deletions mbs_results/imputation/imputation_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def generate_imputation_marker(
reference: str,
strata: str,
auxiliary: str,
back_data_period: str,
time_difference=1,
**kwargs,
) -> pd.DataFrame:
Expand All @@ -35,7 +36,10 @@ def generate_imputation_marker(
Column name containing strata information (sic).
auxiliary : str
Column name containing auxiliary data.
time_difference: int
back_data_period : pd.Timestamp
Time period used as the back data period. This periods data
should not be changed
time_difference: int, Optional
lookup distance for matched pairs
kwargs : mapping, optional
A dictionary of keyword arguments passed into func.
Expand All @@ -47,17 +51,26 @@ def generate_imputation_marker(
i.e. the type of imputation method that should be used to fill
missing returns.
"""

if f"{target}_man" in df.columns:
flags = ["r", "mc", "fir", "bir", "fimc", "fic", "c"]
# Check order from Specs
else:
flags = ["r", "fir", "bir", "fic", "c"]

create_imputation_logical_columns(
df, target, period, reference, strata, auxiliary, time_difference
df,
target,
period,
reference,
strata,
auxiliary,
back_data_period,
time_difference,
)

select_cols = [f"{i}_flag_{target}" for i in flags]
df.to_csv("temp.csv")
first_condition_met = [np.where(i)[0][0] for i in df[select_cols].values]
df[f"imputation_flags_{target}"] = [flags[i] for i in first_condition_met]
df.drop(columns=select_cols, inplace=True)
Expand All @@ -72,6 +85,7 @@ def create_imputation_logical_columns(
reference: str,
strata: str,
auxiliary: str,
back_data_period: str,
time_difference: int = 1,
):
"""
Expand Down Expand Up @@ -110,31 +124,57 @@ def create_imputation_logical_columns(

df.sort_values([reference, strata, period], inplace=True)

df[f"r_flag_{target}"] = df[target].notna()
if f"imputation_flags_{target}" in df.columns:
# Case where back data is present
backdata_r_mask = df[f"backdata_flags_{target}"] == "r"
backdata_fir_mask = df[f"backdata_flags_{target}"] == "fir"
backdata_fimc_mask = df[f"backdata_flags_{target}"] == "fimc"
backdata_c_mask = df[f"backdata_flags_{target}"] == "c"
backdata_fic_mask = df[f"backdata_flags_{target}"] == "fic"

else:
df["is_backdata"] = df[reference] != df[reference]
backdata_r_mask = df[reference] != df[reference]
backdata_fir_mask = df[reference] != df[reference]
backdata_fimc_mask = df[reference] != df[reference]
backdata_c_mask = df[reference] != df[reference]
backdata_fic_mask = df[reference] != df[reference]
print(backdata_r_mask)

# if target na but not back data period OR if backdata flag is 'r'
df[f"r_flag_{target}"] = (df[target].notna() & ~df["is_backdata"]) | backdata_r_mask

if f"{target}_man" in df.columns:
df[f"mc_flag_{target}"] = df[f"{target}_man"].notna()

df[f"fir_flag_{target}"] = flag_rolling_impute(
df, time_difference, strata, reference, target, period
)
df[f"fir_flag_{target}"] = (
flag_rolling_impute(df, time_difference, strata, reference, target, period)
& ~df["is_backdata"]
) | backdata_fir_mask

df[f"bir_flag_{target}"] = flag_rolling_impute(
df, -time_difference, strata, reference, target, period
)
df[f"bir_flag_{target}"] = (
flag_rolling_impute(df, -time_difference, strata, reference, target, period)
& ~df["is_backdata"]
) | backdata_r_mask

if f"{target}_man" in df.columns:
df[f"fimc_flag_{target}"] = flag_rolling_impute(
df, time_difference, strata, reference, f"{target}_man", period
df[f"fimc_flag_{target}"] = (
flag_rolling_impute(
df, time_difference, strata, reference, f"{target}_man", period
)
| backdata_fimc_mask
)

df = imputation_overlaps_mc(df, target, reference, strata)

construction_conditions = df[target].isna() & df[auxiliary].notna()
construction_conditions = (
df[target].isna() & df[auxiliary].notna() & ~df["is_backdata"]
) | backdata_c_mask
df[f"c_flag_{target}"] = np.where(construction_conditions, True, False)

df[f"fic_flag_{target}"] = flag_rolling_impute(
df, time_difference, strata, reference, auxiliary, period
df[f"fic_flag_{target}"] = (
flag_rolling_impute(df, time_difference, strata, reference, auxiliary, period)
| backdata_fic_mask
)

return df
Expand Down Expand Up @@ -173,11 +213,15 @@ def imputation_overlaps_mc(df, target, reference, strata):
df[column] = np.where(
df[imputation_marker_column] & df[f"mc_flag_{target}"], False, None
)
df[column] = (
df.groupby([strata, reference])[column].fillna(
method=direction_single_string + "fill"
)
).fillna(True)
if direction_single_string == "b":
df[column] = (
df.groupby([strata, reference])[column].bfill().astype(bool)
).fillna(True)
elif direction_single_string == "f":
df[column] = (
df.groupby([strata, reference])[column].ffill().astype(bool)
).fillna(True)

df[imputation_marker_column] = df[imputation_marker_column] & df[column]
df.drop(
columns=[column],
Expand Down Expand Up @@ -220,23 +264,28 @@ def flag_rolling_impute(
pd.Series
"""

if time_difference < 0:
fillmethod = "bfill"
elif time_difference > 0:
fillmethod = "ffill"

df["fill_group"] = (
(df[period] - pd.DateOffset(months=1) != df.shift(1)[period])
| (df[strata].diff(1) != 0)
| (df[reference].diff(1) != 0)
).cumsum()

boolean_column = (
df.groupby(["fill_group"])[target]
.fillna(method=fillmethod)
.notnull()
.mul(df["fill_group"] == df.shift(time_difference)["fill_group"])
)
if time_difference < 0:
boolean_column = (
df.groupby(["fill_group"])[target]
.bfill()
.notnull()
.mul(df["fill_group"] == df.shift(time_difference)["fill_group"])
)

elif time_difference > 0:
boolean_column = (
df.groupby(["fill_group"])[target]
.ffill()
.notnull()
.mul(df["fill_group"] == df.shift(time_difference)["fill_group"])
)

df.drop(columns="fill_group", inplace=True)

return boolean_column
Loading

0 comments on commit 77649b5

Please sign in to comment.