Skip to content

Commit

Permalink
personal use case of custom modifier
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzennio committed Dec 8, 2023
1 parent bb302c2 commit f4bf46f
Showing 1 changed file with 70 additions and 149 deletions.
219 changes: 70 additions & 149 deletions src/pyhf/experimental/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from pyhf import events, get_backend
from pyhf.parameters import ParamViewer

import numpy as np

log = logging.getLogger(__name__)

__all__ = ["add_custom_modifier"]
Expand Down Expand Up @@ -35,81 +37,83 @@ class BaseBuilder:
...


def _allocate_new_param(
p: dict[str, Sequence[float]]
) -> dict[str, str | bool | int | Sequence[float]]:
return {
"paramset_type": "unconstrained",
"n_parameters": 1,
"is_shared": True,
"inits": p["inits"],
"bounds": p["bounds"],
"is_scalar": True,
"fixed": False,
}


def make_func(expression: str, deps: list[str]) -> Callable[[Sequence[float]], Any]:
def func(d: Sequence[float]) -> Any:
return ne.evaluate(expression, local_dict=dict(zip(deps, d)))

return func
def add(funcname, par_names, newparams, input_set=None, namespace=None):
namespace = namespace or {}

def make_func(
expression: str, namespace=namespace
) -> Callable[[Sequence[float]], Any]:
def func(deps: Sequence[float]) -> Any:
if expression in namespace:
parvals = dict(zip(par_names, deps))
return namespace[expression](parvals)()
return ne.evaluate(
expression, local_dict=dict(zip(par_names, deps), **namespace)
)

def make_builder(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseBuilder:
class _builder(BaseBuilder):
is_shared = False
return func

def _allocate_new_param(p):
param_dict = {
'paramset_type': p['paramset_type']
if 'paramset_type' in p.keys()
else 'unconstrained',
'n_parameters': 1,
'is_shared': True,
'inits': p['inits'],
'bounds': p['bounds'],
'is_scalar': True,
'fixed': False,
'auxdata': p['auxdata'] if 'auxdata' in p.keys() else (0.0,),
}
return param_dict

class _builder:
is_shared = True

def __init__(self, config):
self.builder_data = {"funcs": {}}
self.builder_data = {'funcs': {}}
self.config = config
self.required_parsets = {}

def collect(self, thismod, nom):
maskval = bool(thismod)
maskval = True if thismod else False
mask = [maskval] * len(nom)
return {"mask": mask}
return {'mask': mask}

def append(self, key, channel, sample, thismod, defined_sample):
def append(self, key, channel, sample, thismod, defined_samp):
self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault(
"data", {"mask": []}
'data', {'mask': []}
)
nom = (
defined_sample["data"]
if defined_sample
defined_samp['data']
if defined_samp
else [0.0] * self.config.channel_nbins[channel]
)
mod_data = self.collect(thismod, nom)
self.builder_data[key][sample]["data"]["mask"] += mod_data["mask"]
moddata = self.collect(thismod, nom)
self.builder_data[key][sample]['data']['mask'] += moddata['mask']
if thismod:
if thismod["name"] != func_name:
self.builder_data["funcs"].setdefault(
thismod["name"], thismod["data"]["expr"]
if thismod['name'] != funcname:
self.builder_data['funcs'].setdefault(
thismod['name'], thismod['data']['expr']
)
self.required_parsets = {
k: [_allocate_new_param(v)] for k, v in new_params.items()
k: [_allocate_new_param(v)] for k, v in newparams.items()
}

def finalize(self):
return self.builder_data

return _builder


def make_applier(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseApplier:
class _applier(BaseApplier):
name = func_name
op_code = "multiplication"
class _applier:
name = funcname
op_code = 'addition'

def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):
self.funcs = [make_func(v, deps) for v in builder_data["funcs"].values()]
self.funcs = [make_func(f) for f in builder_data['funcs'].values()]

self.batch_size = batch_size
pars_for_applier = deps
_mod_names = [f"{mtype}/{m}" for m, mtype in modifiers]
pars_for_applier = par_names
_modnames = [f'{mtype}/{m}' for m, mtype in modifiers]

parfield_shape = (
(self.batch_size, pdfconfig.npars)
Expand All @@ -119,25 +123,25 @@ def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):
self.param_viewer = ParamViewer(
parfield_shape, pdfconfig.par_map, pars_for_applier
)
self._custom_mod_mask = [
[[builder_data[mod_name][s]["data"]["mask"]] for s in pdfconfig.samples]
for mod_name in _mod_names
self._custommod_mask = [
[[builder_data[modname][s]['data']['mask']] for s in pdfconfig.samples]
for modname in _modnames
]
self._precompute()
events.subscribe("tensorlib_changed")(self._precompute)
events.subscribe('tensorlib_changed')(self._precompute)

def _precompute(self):
tensorlib, _ = get_backend()
if not self.param_viewer.index_selection:
return
self.custom_mod_mask = tensorlib.tile(
tensorlib.astensor(self._custom_mod_mask),
self.custommod_mask = tensorlib.tile(
tensorlib.astensor(self._custommod_mask),
(1, 1, self.batch_size or 1, 1),
)
self.custom_mod_mask_bool = tensorlib.astensor(
self.custom_mod_mask, dtype="bool"
self.custommod_mask_bool = tensorlib.astensor(
self.custommod_mask, dtype="bool"
)
self.custom_mod_default = tensorlib.ones(self.custom_mod_mask.shape)
self.custommod_default = tensorlib.zeros(self.custommod_mask.shape)

def apply(self, pars):
"""
Expand All @@ -147,100 +151,17 @@ def apply(self, pars):
if not self.param_viewer.index_selection:
return
tensorlib, _ = get_backend()
if self.batch_size is None:
deps = self.param_viewer.get(pars)
results = tensorlib.astensor([f(deps) for f in self.funcs])
results = tensorlib.einsum(
"msab,m->msab", self.custom_mod_mask, results
)
else:
deps = self.param_viewer.get(pars)
results = tensorlib.astensor([f(deps) for f in self.funcs])
results = tensorlib.einsum(
"msab,ma->msab", self.custom_mod_mask, results
)
deps = self.param_viewer.get(pars)
out = tensorlib.astensor([f(deps) for f in self.funcs])
results = np.zeros_like(self.custommod_mask)
np.place(results, self.custommod_mask, out)
results = tensorlib.where(
self.custom_mod_mask_bool, results, self.custom_mod_default
self.custommod_mask_bool, results, self.custommod_default
)
return results

return _applier


def add_custom_modifier(
func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> dict[str, tuple[BaseBuilder, BaseApplier]]:
r"""
Add a custom modifier type with the modifier data defined through a custom
numexpr string expression.
Example:
>>> import pyhf
>>> import pyhf.experimental.modifiers
>>> pyhf.set_backend("numpy")
>>> new_params = {
... "m1": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)},
... "m2": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)},
... }
>>> expanded_pyhf = pyhf.experimental.modifiers.add_custom_modifier(
... "custom", ["m1", "m2"], new_params
... )
>>> model = pyhf.Model(
... {
... "channels": [
... {
... "name": "singlechannel",
... "samples": [
... {
... "name": "signal",
... "data": [10, 20],
... "modifiers": [
... {
... "name": "f2",
... "type": "custom",
... "data": {"expr": "m1"},
... },
... ],
... },
... {
... "name": "background",
... "data": [100, 150],
... "modifiers": [
... {
... "name": "f1",
... "type": "custom",
... "data": {"expr": "m1+(m2**2)"},
... },
... ],
... },
... ],
... }
... ]
... },
... modifier_set=expanded_pyhf,
... poi_name="m1",
... validate=False,
... batch_size=1,
... )
>>> model.config.modifiers
[('f1', 'custom'), ('f2', 'custom')]
Args:
func_name (:obj:`str`): The name of the custom modifier type.
deps (:obj:`list`): The names of the new parameters of the modifier
function.
new_params (:obj:`dict`): The new parameters.
Returns:
:obj:`dict`: The updated ``pyhf.modifiers.histfactory_set`` with the added
custom modifier type.
.. versionadded:: 0.8.0
"""
_builder = make_builder(func_name, deps, new_params)
_applier = make_applier(func_name, deps, new_params)

modifier_set = {_applier.name: (_builder, _applier)}
modifier_set.update(**pyhf.modifiers.histfactory_set)
modifier_set.update(
**(input_set if input_set is not None else pyhf.modifiers.histfactory_set)
)
return modifier_set

0 comments on commit f4bf46f

Please sign in to comment.