diff --git a/src/abcd_pyhf/abcd.py b/src/abcd_pyhf/abcd.py new file mode 100644 index 0000000..4cbeff3 --- /dev/null +++ b/src/abcd_pyhf/abcd.py @@ -0,0 +1,147 @@ +import numpy as np +import matplotlib.pyplot as plt + +import pyhf +import pyhf.contrib.viz.brazil + +from pyhf_util import signal_region, poi_name, signal_uncertainty_name, bkg_normalization_name, create_model, get_data, get_par_bounds, fixed_poi_fit_scan, hypotest_scan, poi_upper_limit + + +pyhf.set_backend('numpy', 'minuit') + + +class ABCD: + def __init__(self, observed_yields, signal_yields=None, signal_uncertainty=None): + self._observed_yields = observed_yields + self._signal_yields = signal_yields + self._signal_uncertainty = signal_uncertainty + + @property + def observed_yields(self): + return self._observed_yields + + @property + def signal_yields(self): + return self._signal_yields + + @property + def signal_uncertainty(self): + return self._signal_uncertainty + + @property + def blinded(self): + return signal_region not in self.observed_yields + + @property + def model(self): + return create_model(self.signal_yields, self.signal_uncertainty, self.blinded) + + @property + def data(self): + return get_data(self.observed_yields, self.model) + + @property + def init_pars(self): + return self.model.config.suggested_init() + + @property + def par_bounds(self): + return get_par_bounds(self.observed_yields, self.model) + + @property + def fixed_params(self): + return self.model.config.suggested_fixed() + + @property + def bkg_only_fit(self): + pars = pyhf.infer.mle.fixed_poi_fit(poi_val=0, + data=self.data, + pdf=self.model, + init_pars=self.init_pars, + par_bounds=self.par_bounds, + fixed_params=self.fixed_params, + return_uncertainties=True) + return pars + + @property + def fit(self): + pars = pyhf.infer.mle.fit(data=self.data, + pdf=self.model, + init_pars=self.init_pars, + par_bounds=self.par_bounds, + fixed_params=self.fixed_params, + return_uncertainties=True) + return pars + + @property + def bkg_only_signal_region_estimate(self): + return tuple(self.bkg_only_fit[self.model.config.par_order.index(bkg_normalization_name)]) + + @property + def _twice_nll_scan(self): + if not hasattr(self, '_twice_nll_scan_result'): + poi_values, pars_set = fixed_poi_fit_scan(self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params) + best_fit_pars = np.array(self.fit).T[0] + best_fit_twice_nll = pyhf.infer.mle.twice_nll(pars=best_fit_pars, data=self.data, pdf=self.model) + setattr(self, '_twice_nll_scan_result', (poi_values, np.array([pyhf.infer.mle.twice_nll(pars, self.data, self.model) - best_fit_twice_nll for pars in pars_set]))) + return getattr(self, '_twice_nll_scan_result') + + @property + def twice_nll(self): + return self._twice_nll_scan + + @property + def twice_nll_plot(self): + plt.xlabel(r'$\mu$') + plt.xlim(0, self.par_bounds[self.model.config.par_names().index(poi_name)][1]) + plt.ylabel(r'$-2 \ln L$') + plt.ylim(0, 5) + poi_values, twice_nll_values = self.twice_nll + return plt.plot(poi_values, twice_nll_values)[0] + + def signal_region_yield_p_value(self, signal_region_yield=None): + if signal_region_yield is None: + signal_region_yield = observed_yields[signal_region] + rng = np.random.default_rng() + n_samples = 1000000 + size = 100 + rates = rng.normal(*self.bkg_only_signal_region_estimate, size=n_samples) + running_total = 0 + for rate in rates: + if rate <= 0: + pass + else: + running_total += np.sum(rng.poisson(rate, size=size) >= signal_region_yield) + return running_total / (n_samples * size) + + def q0_p_value(self): + return pyhf.infer.hypotest(0, self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params, test_stat='q0') + + @property + def _hypotest_scan(self): + if not hasattr(self, '_hypotest_scan_result'): + setattr(self, '_hypotest_scan_result', hypotest_scan(self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params, return_tail_probs=True, return_expected_set=True)) + return getattr(self, '_hypotest_scan_result') + + @property + def clsb(self): + return self._hypotest_scan[0], self._hypotest_scan[2][0] + + @property + def clb(self): + return self._hypotest_scan[0], self._hypotest_scan[2][1] + + @property + def cls(self): + return self._hypotest_scan[0], self._hypotest_scan[1], self._hypotest_scan[3] + + @property + def upper_limit(self, cl=0.95): + poi, cls_observed, cls_expected_set = self.cls + return poi_upper_limit(poi, cls_observed), [poi_upper_limit(poi, cls_expected) for cls_expected in cls_expected_set] + + @property + def brazil_plot(self): + mu, cls_observed, cls_expected_set = self.cls + results = list(zip(cls_observed, cls_expected_set.T)) + pyhf.contrib.viz.brazil.plot_results(mu, results) diff --git a/src/abcd_pyhf/pyhf_util.py b/src/abcd_pyhf/pyhf_util.py new file mode 100644 index 0000000..32ca175 --- /dev/null +++ b/src/abcd_pyhf/pyhf_util.py @@ -0,0 +1,142 @@ +import math +import multiprocessing.pool + +import numpy as np + +import pyhf + + +signal_region = 'A' +control_regions = ['B', 'C', 'D'] +all_regions = [signal_region] + control_regions + +poi_name = 'mu' +signal_uncertainty_name = 'systematic_uncertainty' + +bkg_normalization_name = 'mu_b' +bkg_scale_factor_1_name = 'tau_B' +bkg_scale_factor_2_name = 'tau_C' + + +def normfactor(name): + return {'name': name, 'type': 'normfactor', 'data': None} + + +bkg_normalization = normfactor(bkg_normalization_name) +bkg_modifiers = { + signal_region: [bkg_normalization], + control_regions[0]: [bkg_normalization, + normfactor(bkg_scale_factor_1_name)], + control_regions[1]: [bkg_normalization, + normfactor(bkg_scale_factor_2_name)], + control_regions[2]: [bkg_normalization, + normfactor(bkg_scale_factor_1_name), + normfactor(bkg_scale_factor_2_name)], +} + + +def create_model(signal_yield, signal_uncertainty, blinded): + signal_modifiers = [ + normfactor('mu'), + {'name': signal_uncertainty_name, 'type': 'normsys', + 'data': {'hi': 1 + signal_uncertainty, 'lo': 1 - signal_uncertainty}} + ] + regions_to_include = control_regions if blinded is True else all_regions + spec = { + 'channels': [ + {'name': region, + 'samples': [ + {'name': 'signal', + 'data': [signal_yield[region] / signal_yield[signal_region]], + 'modifiers': signal_modifiers + }, + {'name': 'background', + 'data': [1], + 'modifiers': bkg_modifiers[region] + } + ] + } for region in regions_to_include + ] + } + return pyhf.Model(spec) + + +def get_data(observed_yields, model): + if signal_region not in observed_yields: + data = [observed_yields[region] for region in control_regions] + model.config.auxdata + else: + data = [observed_yields[region] for region in all_regions] + model.config.auxdata + return data + + +def get_par_bounds(observed_yields, model): + if signal_region not in observed_yields: + background_normalization_estimate = observed_yields[control_regions[0]] * observed_yields[control_regions[1]] / observed_yields[control_regions[2]] + bkg_scale_factor_1_estimate = observed_yields[control_regions[0]] / background_normalization_estimate + bkg_scale_factor_2_estimate = observed_yields[control_regions[1]] / background_normalization_estimate + else: + background_normalization_estimate = observed_yields[signal_region] + bkg_scale_factor_1_estimate = observed_yields[control_regions[0]] / observed_yields[signal_region] + bkg_scale_factor_2_estimate = observed_yields[control_regions[1]] / observed_yields[signal_region] + background_normalization_max = background_normalization_estimate + 5 * math.sqrt(background_normalization_estimate) + poi_max = math.ceil(background_normalization_max) + par_bounds = model.config.suggested_bounds() + par_bounds[model.config.par_order.index(poi_name)] = (0, poi_max) + par_bounds[model.config.par_order.index(bkg_normalization_name)] = (0, background_normalization_max) + par_bounds[model.config.par_order.index(bkg_scale_factor_1_name)] = (0, 5 * bkg_scale_factor_1_estimate) + par_bounds[model.config.par_order.index(bkg_scale_factor_2_name)] = (0, 5 * bkg_scale_factor_2_estimate) + return par_bounds + + +def fixed_poi_fit_scan(data, model, init_pars, par_bounds, fixed_params, poi_values=None): + if poi_values is None: + poi_max = par_bounds[model.config.par_order.index(poi_name)][1] + poi_values = np.linspace(0, poi_max, int(poi_max) + 1) + other_args = (data, model, init_pars, par_bounds, fixed_params) + with multiprocessing.pool.Pool() as pool: + results = pool.starmap(pyhf.infer.mle.fixed_poi_fit, zip(poi_values, *([arg] * len(poi_values) for arg in other_args))) + return poi_values, np.array(results) + + +def hypotest_scan(data, model, init_pars, par_bounds, fixed_params, poi_values=None, calctype='asymptotics', return_tail_probs=False, return_expected=False, return_expected_set=False): + if poi_values is None: + poi_max = par_bounds[model.config.par_order.index(poi_name)][1] + poi_values = np.linspace(0, poi_max, int(poi_max) + 1) + other_args = [data, model, init_pars, par_bounds, fixed_params, calctype, return_tail_probs, return_expected, return_expected_set] + with multiprocessing.pool.Pool() as pool: + results = pool.starmap(pyhf.infer.hypotest, zip(poi_values, *[[arg] * len(poi_values) for arg in other_args])) + cls_observed = [] + if return_tail_probs: + tail_probs = [] + if return_expected: + cls_expected = [] + if return_expected_set: + cls_expected_set = [] + for result in results: + if len(result) > 1: + i = 0 + cls_observed.append(result[i]) + i += 1 + if return_tail_probs: + tail_probs.append(result[i]) + i += 1 + if return_expected: + cls_expected.append(result[i]) + i += 1 + if return_expected_set: + cls_expected_set.append(result[i]) + i += 1 + else: + cls_observed.append(result) + return_values = [np.array(poi_values), np.array(cls_observed)] + if return_tail_probs: + return_values.append(np.array(tail_probs).T) + if return_expected: + return_values.append(np.array(cls_expected)) + if return_expected_set: + return_values.append(np.array(cls_expected_set).T) + return return_values + + +def poi_upper_limit(poi, cls, cl=0.95): + return np.interp(1 - cl, cls[::-1], poi[::-1])