-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3feb74f
commit 8ea3017
Showing
2 changed files
with
289 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
|
||
import pyhf | ||
import pyhf.contrib.viz.brazil | ||
|
||
from pyhf_util import signal_region, poi_name, signal_uncertainty_name, bkg_normalization_name, create_model, get_data, get_par_bounds, fixed_poi_fit_scan, hypotest_scan, poi_upper_limit | ||
|
||
|
||
pyhf.set_backend('numpy', 'minuit') | ||
|
||
|
||
class ABCD: | ||
def __init__(self, observed_yields, signal_yields=None, signal_uncertainty=None): | ||
self._observed_yields = observed_yields | ||
self._signal_yields = signal_yields | ||
self._signal_uncertainty = signal_uncertainty | ||
|
||
@property | ||
def observed_yields(self): | ||
return self._observed_yields | ||
|
||
@property | ||
def signal_yields(self): | ||
return self._signal_yields | ||
|
||
@property | ||
def signal_uncertainty(self): | ||
return self._signal_uncertainty | ||
|
||
@property | ||
def blinded(self): | ||
return signal_region not in self.observed_yields | ||
|
||
@property | ||
def model(self): | ||
return create_model(self.signal_yields, self.signal_uncertainty, self.blinded) | ||
|
||
@property | ||
def data(self): | ||
return get_data(self.observed_yields, self.model) | ||
|
||
@property | ||
def init_pars(self): | ||
return self.model.config.suggested_init() | ||
|
||
@property | ||
def par_bounds(self): | ||
return get_par_bounds(self.observed_yields, self.model) | ||
|
||
@property | ||
def fixed_params(self): | ||
return self.model.config.suggested_fixed() | ||
|
||
@property | ||
def bkg_only_fit(self): | ||
pars = pyhf.infer.mle.fixed_poi_fit(poi_val=0, | ||
data=self.data, | ||
pdf=self.model, | ||
init_pars=self.init_pars, | ||
par_bounds=self.par_bounds, | ||
fixed_params=self.fixed_params, | ||
return_uncertainties=True) | ||
return pars | ||
|
||
@property | ||
def fit(self): | ||
pars = pyhf.infer.mle.fit(data=self.data, | ||
pdf=self.model, | ||
init_pars=self.init_pars, | ||
par_bounds=self.par_bounds, | ||
fixed_params=self.fixed_params, | ||
return_uncertainties=True) | ||
return pars | ||
|
||
@property | ||
def bkg_only_signal_region_estimate(self): | ||
return tuple(self.bkg_only_fit[self.model.config.par_order.index(bkg_normalization_name)]) | ||
|
||
@property | ||
def _twice_nll_scan(self): | ||
if not hasattr(self, '_twice_nll_scan_result'): | ||
poi_values, pars_set = fixed_poi_fit_scan(self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params) | ||
best_fit_pars = np.array(self.fit).T[0] | ||
best_fit_twice_nll = pyhf.infer.mle.twice_nll(pars=best_fit_pars, data=self.data, pdf=self.model) | ||
setattr(self, '_twice_nll_scan_result', (poi_values, np.array([pyhf.infer.mle.twice_nll(pars, self.data, self.model) - best_fit_twice_nll for pars in pars_set]))) | ||
return getattr(self, '_twice_nll_scan_result') | ||
|
||
@property | ||
def twice_nll(self): | ||
return self._twice_nll_scan | ||
|
||
@property | ||
def twice_nll_plot(self): | ||
plt.xlabel(r'$\mu$') | ||
plt.xlim(0, self.par_bounds[self.model.config.par_names().index(poi_name)][1]) | ||
plt.ylabel(r'$-2 \ln L$') | ||
plt.ylim(0, 5) | ||
poi_values, twice_nll_values = self.twice_nll | ||
return plt.plot(poi_values, twice_nll_values)[0] | ||
|
||
def signal_region_yield_p_value(self, signal_region_yield=None): | ||
if signal_region_yield is None: | ||
signal_region_yield = observed_yields[signal_region] | ||
rng = np.random.default_rng() | ||
n_samples = 1000000 | ||
size = 100 | ||
rates = rng.normal(*self.bkg_only_signal_region_estimate, size=n_samples) | ||
running_total = 0 | ||
for rate in rates: | ||
if rate <= 0: | ||
pass | ||
else: | ||
running_total += np.sum(rng.poisson(rate, size=size) >= signal_region_yield) | ||
return running_total / (n_samples * size) | ||
|
||
def q0_p_value(self): | ||
return pyhf.infer.hypotest(0, self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params, test_stat='q0') | ||
|
||
@property | ||
def _hypotest_scan(self): | ||
if not hasattr(self, '_hypotest_scan_result'): | ||
setattr(self, '_hypotest_scan_result', hypotest_scan(self.data, self.model, self.init_pars, self.par_bounds, self.fixed_params, return_tail_probs=True, return_expected_set=True)) | ||
return getattr(self, '_hypotest_scan_result') | ||
|
||
@property | ||
def clsb(self): | ||
return self._hypotest_scan[0], self._hypotest_scan[2][0] | ||
|
||
@property | ||
def clb(self): | ||
return self._hypotest_scan[0], self._hypotest_scan[2][1] | ||
|
||
@property | ||
def cls(self): | ||
return self._hypotest_scan[0], self._hypotest_scan[1], self._hypotest_scan[3] | ||
|
||
@property | ||
def upper_limit(self, cl=0.95): | ||
poi, cls_observed, cls_expected_set = self.cls | ||
return poi_upper_limit(poi, cls_observed), [poi_upper_limit(poi, cls_expected) for cls_expected in cls_expected_set] | ||
|
||
@property | ||
def brazil_plot(self): | ||
mu, cls_observed, cls_expected_set = self.cls | ||
results = list(zip(cls_observed, cls_expected_set.T)) | ||
pyhf.contrib.viz.brazil.plot_results(mu, results) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import math | ||
import multiprocessing.pool | ||
|
||
import numpy as np | ||
|
||
import pyhf | ||
|
||
|
||
signal_region = 'A' | ||
control_regions = ['B', 'C', 'D'] | ||
all_regions = [signal_region] + control_regions | ||
|
||
poi_name = 'mu' | ||
signal_uncertainty_name = 'systematic_uncertainty' | ||
|
||
bkg_normalization_name = 'mu_b' | ||
bkg_scale_factor_1_name = 'tau_B' | ||
bkg_scale_factor_2_name = 'tau_C' | ||
|
||
|
||
def normfactor(name): | ||
return {'name': name, 'type': 'normfactor', 'data': None} | ||
|
||
|
||
bkg_normalization = normfactor(bkg_normalization_name) | ||
bkg_modifiers = { | ||
signal_region: [bkg_normalization], | ||
control_regions[0]: [bkg_normalization, | ||
normfactor(bkg_scale_factor_1_name)], | ||
control_regions[1]: [bkg_normalization, | ||
normfactor(bkg_scale_factor_2_name)], | ||
control_regions[2]: [bkg_normalization, | ||
normfactor(bkg_scale_factor_1_name), | ||
normfactor(bkg_scale_factor_2_name)], | ||
} | ||
|
||
|
||
def create_model(signal_yield, signal_uncertainty, blinded): | ||
signal_modifiers = [ | ||
normfactor('mu'), | ||
{'name': signal_uncertainty_name, 'type': 'normsys', | ||
'data': {'hi': 1 + signal_uncertainty, 'lo': 1 - signal_uncertainty}} | ||
] | ||
regions_to_include = control_regions if blinded is True else all_regions | ||
spec = { | ||
'channels': [ | ||
{'name': region, | ||
'samples': [ | ||
{'name': 'signal', | ||
'data': [signal_yield[region] / signal_yield[signal_region]], | ||
'modifiers': signal_modifiers | ||
}, | ||
{'name': 'background', | ||
'data': [1], | ||
'modifiers': bkg_modifiers[region] | ||
} | ||
] | ||
} for region in regions_to_include | ||
] | ||
} | ||
return pyhf.Model(spec) | ||
|
||
|
||
def get_data(observed_yields, model): | ||
if signal_region not in observed_yields: | ||
data = [observed_yields[region] for region in control_regions] + model.config.auxdata | ||
else: | ||
data = [observed_yields[region] for region in all_regions] + model.config.auxdata | ||
return data | ||
|
||
|
||
def get_par_bounds(observed_yields, model): | ||
if signal_region not in observed_yields: | ||
background_normalization_estimate = observed_yields[control_regions[0]] * observed_yields[control_regions[1]] / observed_yields[control_regions[2]] | ||
bkg_scale_factor_1_estimate = observed_yields[control_regions[0]] / background_normalization_estimate | ||
bkg_scale_factor_2_estimate = observed_yields[control_regions[1]] / background_normalization_estimate | ||
else: | ||
background_normalization_estimate = observed_yields[signal_region] | ||
bkg_scale_factor_1_estimate = observed_yields[control_regions[0]] / observed_yields[signal_region] | ||
bkg_scale_factor_2_estimate = observed_yields[control_regions[1]] / observed_yields[signal_region] | ||
background_normalization_max = background_normalization_estimate + 5 * math.sqrt(background_normalization_estimate) | ||
poi_max = math.ceil(background_normalization_max) | ||
par_bounds = model.config.suggested_bounds() | ||
par_bounds[model.config.par_order.index(poi_name)] = (0, poi_max) | ||
par_bounds[model.config.par_order.index(bkg_normalization_name)] = (0, background_normalization_max) | ||
par_bounds[model.config.par_order.index(bkg_scale_factor_1_name)] = (0, 5 * bkg_scale_factor_1_estimate) | ||
par_bounds[model.config.par_order.index(bkg_scale_factor_2_name)] = (0, 5 * bkg_scale_factor_2_estimate) | ||
return par_bounds | ||
|
||
|
||
def fixed_poi_fit_scan(data, model, init_pars, par_bounds, fixed_params, poi_values=None): | ||
if poi_values is None: | ||
poi_max = par_bounds[model.config.par_order.index(poi_name)][1] | ||
poi_values = np.linspace(0, poi_max, int(poi_max) + 1) | ||
other_args = (data, model, init_pars, par_bounds, fixed_params) | ||
with multiprocessing.pool.Pool() as pool: | ||
results = pool.starmap(pyhf.infer.mle.fixed_poi_fit, zip(poi_values, *([arg] * len(poi_values) for arg in other_args))) | ||
return poi_values, np.array(results) | ||
|
||
|
||
def hypotest_scan(data, model, init_pars, par_bounds, fixed_params, poi_values=None, calctype='asymptotics', return_tail_probs=False, return_expected=False, return_expected_set=False): | ||
if poi_values is None: | ||
poi_max = par_bounds[model.config.par_order.index(poi_name)][1] | ||
poi_values = np.linspace(0, poi_max, int(poi_max) + 1) | ||
other_args = [data, model, init_pars, par_bounds, fixed_params, calctype, return_tail_probs, return_expected, return_expected_set] | ||
with multiprocessing.pool.Pool() as pool: | ||
results = pool.starmap(pyhf.infer.hypotest, zip(poi_values, *[[arg] * len(poi_values) for arg in other_args])) | ||
cls_observed = [] | ||
if return_tail_probs: | ||
tail_probs = [] | ||
if return_expected: | ||
cls_expected = [] | ||
if return_expected_set: | ||
cls_expected_set = [] | ||
for result in results: | ||
if len(result) > 1: | ||
i = 0 | ||
cls_observed.append(result[i]) | ||
i += 1 | ||
if return_tail_probs: | ||
tail_probs.append(result[i]) | ||
i += 1 | ||
if return_expected: | ||
cls_expected.append(result[i]) | ||
i += 1 | ||
if return_expected_set: | ||
cls_expected_set.append(result[i]) | ||
i += 1 | ||
else: | ||
cls_observed.append(result) | ||
return_values = [np.array(poi_values), np.array(cls_observed)] | ||
if return_tail_probs: | ||
return_values.append(np.array(tail_probs).T) | ||
if return_expected: | ||
return_values.append(np.array(cls_expected)) | ||
if return_expected_set: | ||
return_values.append(np.array(cls_expected_set).T) | ||
return return_values | ||
|
||
|
||
def poi_upper_limit(poi, cls, cl=0.95): | ||
return np.interp(1 - cl, cls[::-1], poi[::-1]) |