Skip to content

Commit

Permalink
Merge pull request #264 from CITCOM-project/interaction-terms
Browse files Browse the repository at this point in the history
Estimator return types
  • Loading branch information
christopher-wild authored Feb 28, 2024
2 parents 66c35ac + b8ad419 commit 0d9b1d7
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 132 deletions.
12 changes: 7 additions & 5 deletions causal_testing/surrogate/surrogate_search_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def search(

# The GA fitness function after including required variables into the function's scope
# Unused arguments are required for pygad's fitness function signature
#pylint: disable=cell-var-from-loop
def fitness_function(ga, solution, idx): # pylint: disable=unused-argument
# pylint: disable=cell-var-from-loop
def fitness_function(ga, solution, idx): # pylint: disable=unused-argument
surrogate.control_value = solution[0] - self.delta
surrogate.treatment_value = solution[0] + self.delta

Expand All @@ -45,8 +45,10 @@ def fitness_function(ga, solution, idx): # pylint: disable=unused-argument
adjustment_dict[adjustment] = solution[i + 1]

ate = surrogate.estimate_ate_calculated(adjustment_dict)

return contradiction_function(ate)
if len(ate) > 1:
raise ValueError(
"Multiple ate values provided but currently only single values supported in this method")
return contradiction_function(ate[0])

gene_types, gene_space = self.create_gene_types(surrogate, specification)

Expand Down Expand Up @@ -82,7 +84,7 @@ def fitness_function(ga, solution, idx): # pylint: disable=unused-argument

@staticmethod
def create_gene_types(
surrogate_model: CubicSplineRegressionEstimator, specification: CausalSpecification
surrogate_model: CubicSplineRegressionEstimator, specification: CausalSpecification
) -> tuple[list, list]:
"""Generate the gene_types and gene_space for a given fitness function and specification
:param surrogate_model: Instance of a CubicSplineRegressionEstimator
Expand Down
57 changes: 29 additions & 28 deletions causal_testing/testing/causal_test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ class SomeEffect(CausalTestOutcome):
"""An extension of TestOutcome representing that the expected causal effect should not be zero."""

def apply(self, res: CausalTestResult) -> bool:
if res.test_value.type == "ate":
return (0 < res.ci_low() < res.ci_high()) or (res.ci_low() < res.ci_high() < 0)
if res.test_value.type == "coefficient":
ci_low = res.ci_low() if isinstance(res.ci_low(), Iterable) else [res.ci_low()]
ci_high = res.ci_high() if isinstance(res.ci_high(), Iterable) else [res.ci_high()]
return any(0 < ci_low < ci_high or ci_low < ci_high < 0 for ci_low, ci_high in zip(ci_low, ci_high))
if res.test_value.type == "risk_ratio":
return (1 < res.ci_low() < res.ci_high()) or (res.ci_low() < res.ci_high() < 1)
return any(
1 < ci_low < ci_high or ci_low < ci_high < 1 for ci_low, ci_high in zip(res.ci_low(), res.ci_high()))
if res.test_value.type in ('coefficient', 'ate'):
return any(
0 < ci_low < ci_high or ci_low < ci_high < 0 for ci_low, ci_high in zip(res.ci_low(), res.ci_high()))

raise ValueError(f"Test Value type {res.test_value.type} is not valid for this TestOutcome")


Expand All @@ -51,23 +50,20 @@ def __init__(self, atol: float = 1e-10, ctol: float = 0.05):
self.ctol = ctol

def apply(self, res: CausalTestResult) -> bool:
if res.test_value.type == "ate":
return (res.ci_low() < 0 < res.ci_high()) or (abs(res.test_value.value) < self.atol)
if res.test_value.type == "coefficient":
ci_low = res.ci_low() if isinstance(res.ci_low(), Iterable) else [res.ci_low()]
ci_high = res.ci_high() if isinstance(res.ci_high(), Iterable) else [res.ci_high()]
if res.test_value.type == "risk_ratio":
return any(ci_low < 1 < ci_high or np.isclose(value, 1.0, atol=self.atol) for ci_low, ci_high, value in
zip(res.ci_low(), res.ci_high(), res.test_value.value))
if res.test_value.type in ('coefficient', 'ate'):
value = res.test_value.value if isinstance(res.ci_high(), Iterable) else [res.test_value.value]

return (
sum(
not ((ci_low < 0 < ci_high) or abs(v) < self.atol)
for ci_low, ci_high, v in zip(ci_low, ci_high, value)
)
/ len(value)
< self.ctol
sum(
not ((ci_low < 0 < ci_high) or abs(v) < self.atol)
for ci_low, ci_high, v in zip(res.ci_low(), res.ci_high(), value)
)
/ len(value)
< self.ctol
)
if res.test_value.type == "risk_ratio":
return (res.ci_low() < 1 < res.ci_high()) or np.isclose(res.test_value.value, 1.0, atol=self.atol)

raise ValueError(f"Test Value type {res.test_value.type} is not valid for this TestOutcome")


Expand All @@ -93,28 +89,33 @@ def __str__(self):


class Positive(SomeEffect):
"""An extension of TestOutcome representing that the expected causal effect should be positive."""
"""An extension of TestOutcome representing that the expected causal effect should be positive.
Currently only single values are supported for the test value"""

def apply(self, res: CausalTestResult) -> bool:
if res.ci_valid() and not super().apply(res):
return False
if len(res.test_value.value) > 1:
raise ValueError("Positive Effects are currently only supported on single float datatypes")
if res.test_value.type in {"ate", "coefficient"}:
return bool(res.test_value.value > 0)
return bool(res.test_value.value[0] > 0)
if res.test_value.type == "risk_ratio":
return bool(res.test_value.value > 1)
# Dead code but necessary for pylint
return bool(res.test_value.value[0] > 1)
raise ValueError(f"Test Value type {res.test_value.type} is not valid for this TestOutcome")


class Negative(SomeEffect):
"""An extension of TestOutcome representing that the expected causal effect should be negative."""
"""An extension of TestOutcome representing that the expected causal effect should be negative.
Currently only single values are supported for the test value"""

def apply(self, res: CausalTestResult) -> bool:
if res.ci_valid() and not super().apply(res):
return False
if len(res.test_value.value) > 1:
raise ValueError("Negative Effects are currently only supported on single float datatypes")
if res.test_value.type in {"ate", "coefficient"}:
return bool(res.test_value.value < 0)
return bool(res.test_value.value[0] < 0)
if res.test_value.type == "risk_ratio":
return bool(res.test_value.value < 1)
return bool(res.test_value.value[0] < 1)
# Dead code but necessary for pylint
raise ValueError(f"Test Value type {res.test_value.type} is not valid for this TestOutcome")
6 changes: 5 additions & 1 deletion causal_testing/testing/causal_test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self,
estimator: Estimator,
test_value: TestValue,
confidence_intervals: [float, float] = None,
confidence_intervals: [pd.Series, pd.Series] = None,
effect_modifier_configuration: {Variable: Any} = None,
adequacy=None,
):
Expand Down Expand Up @@ -99,12 +99,16 @@ def to_dict(self, json=False):
def ci_low(self):
"""Return the lower bracket of the confidence intervals."""
if self.confidence_intervals:
if isinstance(self.confidence_intervals[0], pd.Series):
return self.confidence_intervals[0].to_list()
return self.confidence_intervals[0]
return None

def ci_high(self):
"""Return the higher bracket of the confidence intervals."""
if self.confidence_intervals:
if isinstance(self.confidence_intervals[1], pd.Series):
return self.confidence_intervals[1].to_list()
return self.confidence_intervals[1]
return None

Expand Down
61 changes: 29 additions & 32 deletions causal_testing/testing/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import statsmodels.formula.api as smf
from econml.dml import CausalForestDML
from patsy import dmatrix # pylint: disable = no-name-in-module

from patsy import ModelDesc
from sklearn.ensemble import GradientBoostingRegressor
from statsmodels.regression.linear_model import RegressionResultsWrapper
from statsmodels.tools.sm_exceptions import PerfectSeparationError
Expand Down Expand Up @@ -343,30 +343,28 @@ def add_modelling_assumptions(self):
"do not need to be linear."
)

def estimate_coefficient(self) -> float:
def estimate_coefficient(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the unit average treatment effect of the treatment on the outcome. That is, the change in outcome
caused by a unit change in treatment.
:return: The unit average treatment effect and the 95% Wald confidence intervals.
"""
model = self._run_linear_regression()
newline = "\n"
treatment = [self.treatment]
if str(self.df.dtypes[self.treatment]) == "object":
patsy_md = ModelDesc.from_formula(self.treatment)
if any((self.df.dtypes[factor.name()] == 'object' for factor in patsy_md.rhs_termlist[1].factors)):
design_info = dmatrix(self.formula.split("~")[1], self.df).design_info
treatment = design_info.column_names[design_info.term_name_slices[self.treatment]]
else:
treatment = [self.treatment]
assert set(treatment).issubset(
model.params.index.tolist()
), f"{treatment} not in\n{' ' + str(model.params.index).replace(newline, newline + ' ')}"
unit_effect = model.params[treatment] # Unit effect is the coefficient of the treatment
[ci_low, ci_high] = self._get_confidence_intervals(model, treatment)
if str(self.df.dtypes[self.treatment]) != "object":
unit_effect = unit_effect[0]
ci_low = ci_low[0]
ci_high = ci_high[0]
return unit_effect, [ci_low, ci_high]

def estimate_ate(self) -> tuple[float, list[float, float], float]:
def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the average treatment effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value.
Expand All @@ -384,8 +382,9 @@ def estimate_ate(self) -> tuple[float, list[float, float], float]:

# Perform a t-test to compare the predicted outcome of the control and treated individual (ATE)
t_test_results = model.t_test(individuals.loc["treated"] - individuals.loc["control"])
ate = t_test_results.effect[0]
ate = pd.Series(t_test_results.effect[0])
confidence_intervals = list(t_test_results.conf_int(alpha=self.alpha).flatten())
confidence_intervals = [pd.Series(interval) for interval in confidence_intervals]
return ate, confidence_intervals

def estimate_control_treatment(self, adjustment_config: dict = None) -> tuple[pd.Series, pd.Series]:
Expand Down Expand Up @@ -414,7 +413,7 @@ def estimate_control_treatment(self, adjustment_config: dict = None) -> tuple[pd

return y.iloc[1], y.iloc[0]

def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[float, list[float, float]]:
def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the risk_ratio effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value.
Expand All @@ -423,12 +422,11 @@ def estimate_risk_ratio(self, adjustment_config: dict = None) -> tuple[float, li
if adjustment_config is None:
adjustment_config = {}
control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config)
ci_low = treatment_outcome["mean_ci_lower"] / control_outcome["mean_ci_upper"]
ci_high = treatment_outcome["mean_ci_upper"] / control_outcome["mean_ci_lower"]

return (treatment_outcome["mean"] / control_outcome["mean"]), [ci_low, ci_high]
ci_low = pd.Series(treatment_outcome["mean_ci_lower"] / control_outcome["mean_ci_upper"])
ci_high = pd.Series(treatment_outcome["mean_ci_upper"] / control_outcome["mean_ci_lower"])
return pd.Series(treatment_outcome["mean"] / control_outcome["mean"]), [ci_low, ci_high]

def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[float, list[float, float]]:
def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the ate effect of the treatment on the outcome. That is, the change in outcome caused
by changing the treatment variable from the control value to the treatment value. Here, we actually
calculate the expected outcomes under control and treatment and divide one by the other. This
Expand All @@ -439,10 +437,9 @@ def estimate_ate_calculated(self, adjustment_config: dict = None) -> tuple[float
if adjustment_config is None:
adjustment_config = {}
control_outcome, treatment_outcome = self.estimate_control_treatment(adjustment_config=adjustment_config)
ci_low = treatment_outcome["mean_ci_lower"] - control_outcome["mean_ci_upper"]
ci_high = treatment_outcome["mean_ci_upper"] - control_outcome["mean_ci_lower"]

return (treatment_outcome["mean"] - control_outcome["mean"]), [ci_low, ci_high]
ci_low = pd.Series(treatment_outcome["mean_ci_lower"] - control_outcome["mean_ci_upper"])
ci_high = pd.Series(treatment_outcome["mean_ci_upper"] - control_outcome["mean_ci_lower"])
return pd.Series(treatment_outcome["mean"] - control_outcome["mean"]), [ci_low, ci_high]

def _run_linear_regression(self) -> RegressionResultsWrapper:
"""Run linear regression of the treatment and adjustment set against the outcome and return the model.
Expand All @@ -456,8 +453,8 @@ def _run_linear_regression(self) -> RegressionResultsWrapper:
def _get_confidence_intervals(self, model, treatment):
confidence_intervals = model.conf_int(alpha=self.alpha, cols=None)
ci_low, ci_high = (
confidence_intervals[0].loc[treatment],
confidence_intervals[1].loc[treatment],
pd.Series(confidence_intervals[0].loc[treatment]),
pd.Series(confidence_intervals[1].loc[treatment]),
)
return [ci_low, ci_high]

Expand Down Expand Up @@ -495,7 +492,7 @@ def __init__(
terms = [treatment] + sorted(list(adjustment_set)) + sorted(list(effect_modifiers))
self.formula = f"{outcome} ~ cr({'+'.join(terms)}, df={basis})"

def estimate_ate_calculated(self, adjustment_config: dict = None) -> float:
def estimate_ate_calculated(self, adjustment_config: dict = None) -> pd.Series:
model = self._run_linear_regression()

x = {"Intercept": 1, self.treatment: self.treatment_value}
Expand All @@ -511,7 +508,7 @@ def estimate_ate_calculated(self, adjustment_config: dict = None) -> float:
x[self.treatment] = self.control_value
control = model.predict(x).iloc[0]

return treatment - control
return pd.Series(treatment - control)


class InstrumentalVariableEstimator(Estimator):
Expand Down Expand Up @@ -567,7 +564,7 @@ def add_modelling_assumptions(self):
"""
)

def estimate_iv_coefficient(self, df):
def estimate_iv_coefficient(self, df) -> float:
"""
Estimate the linear regression coefficient of the treatment on the
outcome.
Expand All @@ -581,7 +578,7 @@ def estimate_iv_coefficient(self, df):
# Estimate the coefficient of I on X by cancelling
return ab / a

def estimate_coefficient(self, bootstrap_size=100):
def estimate_coefficient(self, bootstrap_size=100) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""
Estimate the unit ate (i.e. coefficient) of the treatment on the
outcome.
Expand All @@ -590,10 +587,10 @@ def estimate_coefficient(self, bootstrap_size=100):
[self.estimate_iv_coefficient(self.df.sample(len(self.df), replace=True)) for _ in range(bootstrap_size)]
)
bound = ceil((bootstrap_size * self.alpha) / 2)
ci_low = bootstraps[bound]
ci_high = bootstraps[bootstrap_size - bound]
ci_low = pd.Series(bootstraps[bound])
ci_high = pd.Series(bootstraps[bootstrap_size - bound])

return self.estimate_iv_coefficient(self.df), (ci_low, ci_high)
return pd.Series(self.estimate_iv_coefficient(self.df)), [ci_low, ci_high]


class CausalForestEstimator(Estimator):
Expand All @@ -610,7 +607,7 @@ def add_modelling_assumptions(self):
"""
self.modelling_assumptions.append("Non-parametric estimator: no restrictions imposed on the data.")

def estimate_ate(self) -> float:
def estimate_ate(self) -> tuple[pd.Series, list[pd.Series, pd.Series]]:
"""Estimate the average treatment effect.
:return ate, confidence_intervals: The average treatment effect and 95% confidence intervals.
Expand Down Expand Up @@ -638,9 +635,9 @@ def estimate_ate(self) -> float:
model.fit(outcome_df, treatment_df, X=effect_modifier_df, W=confounders_df)

# Obtain the ATE and 95% confidence intervals
ate = model.ate(effect_modifier_df, T0=self.control_value, T1=self.treatment_value)
ate = pd.Series(model.ate(effect_modifier_df, T0=self.control_value, T1=self.treatment_value))
ate_interval = model.ate_interval(effect_modifier_df, T0=self.control_value, T1=self.treatment_value)
ci_low, ci_high = ate_interval[0], ate_interval[1]
ci_low, ci_high = pd.Series(ate_interval[0]), pd.Series(ate_interval[1])
return ate, [ci_low, ci_high]

def estimate_cates(self) -> pd.DataFrame:
Expand Down
Loading

0 comments on commit 0d9b1d7

Please sign in to comment.