Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support shape_touched from Dask #900

Merged
merged 33 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b8fc7fe
wip: initial commit
agoose77 Oct 3, 2023
1b4bd50
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2023
daa8529
fix: rename function
agoose77 Oct 4, 2023
df97041
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
66c8710
fix: use report_necessary_buffers
agoose77 Oct 4, 2023
f31815b
Merge branch 'master' into agoose77/feat-support-shape-touched
agoose77 Oct 4, 2023
2353a23
fix: properly parse form keys
agoose77 Oct 4, 2023
17a6f2f
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
bd07d03
hack: convert Content to array
agoose77 Oct 5, 2023
a6848a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2023
9c90205
fix: ensure layout nodes converted to arrays
agoose77 Oct 5, 2023
379a508
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 5, 2023
04b5a1a
adjust coffea pins to latest releases and pre-releases
lgray Oct 7, 2023
f19c11b
use pytorch-only triton image
lgray Oct 7, 2023
7051d2e
streamline version requirements
lgray Oct 7, 2023
3fe957b
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
d3fb1c9
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
33d2e68
fix: don't import protocol
agoose77 Oct 8, 2023
9d94cb0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 8, 2023
c49a8bc
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 10, 2023
0d9c913
remove deprecated interface definition
lgray Oct 11, 2023
bb4df59
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e0694ad
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e7384f9
remove further remnants of old remapping interface
lgray Oct 11, 2023
92efdb2
refactor: make key translation obvious
agoose77 Oct 11, 2023
8ae3cd5
fix typo from refactor
lgray Oct 12, 2023
45a0060
update pins (note uncapped numpy and numba skooch)
lgray Oct 14, 2023
14d2cc2
try to convince pip to upgrade numpy upon installing coffea
lgray Oct 14, 2023
ab3599e
be more insistent
lgray Oct 14, 2023
faff41e
numba 0.58 pins numpy from above < 1.26
lgray Oct 14, 2023
864f709
clean up usage of quoted ",!offsets"
lgray Oct 17, 2023
9b96f7b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 17, 2023
0a525d0
flake8 lint
lgray Oct 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/coffea/lookup_tools/rochester_lookup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import awkward
import dask_awkward as dak
import numpy

from coffea.lookup_tools.dense_lookup import dense_lookup
Expand Down Expand Up @@ -75,7 +76,7 @@ def _error(self, func, *args):

newargs = args + (0, 0)
default = func(*newargs)
result = numpy.zeros_like(default)
result = awkward.zeros_like(default)
for s in range(self._nsets):
oneOver = 1.0 / self._members[s]
for m in range(self._members[s]):
Expand Down Expand Up @@ -226,12 +227,27 @@ def _kExtra(self, kpt, eta, nl, u, s=0, m=0):
cbN_flat = awkward.flatten(cbN)
cbS_flat = awkward.flatten(cbS)

invcdf = awkward.unflatten(
doublecrystalball.ppf(
u_flat, cbA_flat, cbA_flat, cbN_flat, cbN_flat, loc, cbS_flat
),
counts,
)
args = (u_flat, cbA_flat, cbA_flat, cbN_flat, cbN_flat, loc, cbS_flat)

if any(isinstance(arg, dak.Array) for arg in args):

def apply(*args):
args_lz = [
awkward.typetracer.length_zero_if_typetracer(arg) for arg in args
]
out = awkward.Array(doublecrystalball.ppf(*args_lz))
if awkward.backend(args[0]) == "typetracer":
out = awkward.Array(
out.layout.to_typetracer(forget_length=True),
behavior=out.behavior,
)
return out

invcdf = dak.map_partitions(apply, *args)
else:
invcdf = doublecrystalball.ppf(*args)

invcdf = awkward.unflatten(invcdf, counts)

x = awkward.where(
mask,
Expand Down
71 changes: 65 additions & 6 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,48 @@ def __init__(
self.metadata = metadata
self.version = version

def extract_form_keys_base_columns(self, form_keys):
base_columns = []
for form_key in form_keys:
base_columns.extend(
def keys_for_buffer_keys(self, buffer_keys):
base_columns = set()
for buffer_key in buffer_keys:
form_key, attribute = self.parse_buffer_key(buffer_key)
base_columns.update(
[
acolumn
for acolumn in urllib.parse.unquote(form_key).split(",")
if not acolumn.startswith("!")
]
)
return list(set(base_columns))
return base_columns

def parse_buffer_key(self, buffer_key):
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2)
if attribute == "offsets":
return (form_key[: -len("%2C%21offsets")], attribute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to also be in the previous code as well, but some of these string splitting and urllib name mangling things were put into https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/util.py and could be used here

else:
return (form_key, attribute)

@property
def buffer_key(self):
return partial(self._key_formatter, "")

def _key_formatter(self, prefix, form_key, form, attribute):
if attribute == "offsets":
form_key += "%2C%21offsets"
return prefix + f"/{attribute}/{form_key}"

# TODO: deprecate
def extract_form_keys_base_columns(self, form_keys):
base_columns = []
for form_key in form_keys:
base_columns.extend(
[
acolumn
for acolumn in urllib.parse.unquote(form_key).split(",")
if not acolumn.startswith("!")
]
)
return list(set(base_columns))


class _map_schema_uproot(_map_schema_base):
def __init__(
Expand Down Expand Up @@ -125,7 +150,41 @@ def __call__(self, form):
},
"form_key": None,
}
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)
return (
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form),
self,
)

def load_buffers(self, tree, keys, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key

partition_key = (
str(tree.file.uuid),
tree.object_path,
f"{start}-{stop}",
)
uuidpfn = {partition_key[0]: tree.file.file_path}
mapping = UprootSourceMapping(
TrivialUprootOpener(uuidpfn, interp_options),
start,
stop,
cache={},
access_log=None,
use_ak_forth=True,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've deliberately tried not to touch this logic at all. Hopefully, by presenting the correct interface to uproot, internal refactoring can happen here in future without any side effects.

buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

class TranslateBufferKeys:
def __getitem__(this, key):
form_key, attribute = self.parse_buffer_key(key)
return mapping[
buffer_key(form_key=form_key, attribute=attribute, form=None)
]

return TranslateBufferKeys()

def create_column_mapping_and_key(self, tree, start, stop, interp_options):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lgray can we remove this from the base definition, or does someone out there likely use it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interface is completely private and rather obscure to boot, so I am fine to remove it.

from functools import partial
Expand Down
6 changes: 3 additions & 3 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ def _work_function(
import dask_awkward

to_compute = processor_instance.process(events)
materialized = dask_awkward.necessary_columns(to_compute)
# materialized = dask_awkward.report_necessary_buffers(to_compute)
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
out = dask.compute(to_compute, scheduler="single-threaded")[0]
except Exception as e:
raise Exception(f"Failed processing file: {item!r}") from e
Expand All @@ -1734,11 +1734,11 @@ def _work_function(
metrics = {}
if isinstance(file, uproot.ReadOnlyDirectory):
metrics["bytesread"] = file.file.source.num_requested_bytes
# metrics["data_and_shape_buffers"] = set(materialized)
# metrics["shape_only_buffers"] = set(materialized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these placeholders for something in the future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to get tests to run - executor is going to be significantly changed in a subsequent PR.

if schema is not None and issubclass(schema, schemas.BaseSchema):
metrics["columns"] = set(materialized)
metrics["entries"] = len(events)
else:
metrics["columns"] = set(materialized)
metrics["entries"] = events.size
metrics["processtime"] = toc - tic
return {"out": out, "metrics": metrics, "processed": {item}}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_jetmet_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled):
**{name: evaluator[name] for name in jec_stack_names[5:6]}
)

print(dak.necessary_columns(jets.eta))
print(dak.report_necessary_buffers(jets.eta))
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
print(
dak.necessary_columns(
dak.report_necessary_buffers(
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
resosf.getScaleFactor(
JetEta=jets.eta,
)
Expand Down
28 changes: 14 additions & 14 deletions tests/test_lookup_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,6 @@ def test_jec_txt_effareas():


def test_rochester():
pytest.xfail("weird side effect from running other tests... passes by itself")

rochester_data = lookup_tools.txt_converters.convert_rochester_file(
"tests/samples/RoccoR2018.txt.gz", loaduncs=True
)
Expand All @@ -390,27 +388,29 @@ def test_rochester():

# test against nanoaod
events = NanoEventsFactory.from_root(
os.path.abspath("tests/samples/nano_dimuon.root")
{os.path.abspath("tests/samples/nano_dimuon.root"): "Events"},
permit_dask=True,
).events()

data_k = rochester.kScaleDT(
events.Muon.charge, events.Muon.pt, events.Muon.eta, events.Muon.phi
)
data_k = np.array(ak.flatten(data_k))
data_k = ak.flatten(data_k).compute().to_numpy()
assert all(np.isclose(data_k, official_data_k))
data_err = rochester.kScaleDTerror(
events.Muon.charge, events.Muon.pt, events.Muon.eta, events.Muon.phi
)
data_err = np.array(ak.flatten(data_err), dtype=float)
data_err = ak.flatten(data_err).compute().to_numpy()
assert all(np.isclose(data_err, official_data_err, atol=1e-8))

# test against mc
events = NanoEventsFactory.from_root(
os.path.abspath("tests/samples/nano_dy.root")
{os.path.abspath("tests/samples/nano_dy.root"): "Events"},
permit_dask=True,
).events()

hasgen = ~np.isnan(ak.fill_none(events.Muon.matched_gen.pt, np.nan))
mc_rand = ak.unflatten(mc_rand, ak.num(hasgen))
mc_rand = ak.unflatten(dak.from_awkward(ak.Array(mc_rand), 1), ak.num(hasgen))
mc_kspread = rochester.kSpreadMC(
events.Muon.charge[hasgen],
events.Muon.pt[hasgen],
Expand All @@ -426,10 +426,10 @@ def test_rochester():
events.Muon.nTrackerLayers[~hasgen],
mc_rand[~hasgen],
)
mc_k = np.array(ak.flatten(ak.ones_like(events.Muon.pt)))
hasgen_flat = np.array(ak.flatten(hasgen))
mc_k[hasgen_flat] = np.array(ak.flatten(mc_kspread))
mc_k[~hasgen_flat] = np.array(ak.flatten(mc_ksmear))
mc_k = ak.flatten(ak.ones_like(events.Muon.pt)).compute().to_numpy()
hasgen_flat = ak.flatten(hasgen).compute().to_numpy()
mc_k[hasgen_flat] = ak.flatten(mc_kspread).compute().to_numpy()
mc_k[~hasgen_flat] = ak.flatten(mc_ksmear).compute().to_numpy()
assert all(np.isclose(mc_k, official_mc_k))

mc_errspread = rochester.kSpreadMCerror(
Expand All @@ -447,9 +447,9 @@ def test_rochester():
events.Muon.nTrackerLayers[~hasgen],
mc_rand[~hasgen],
)
mc_err = np.array(ak.flatten(ak.ones_like(events.Muon.pt)))
mc_err[hasgen_flat] = np.array(ak.flatten(mc_errspread))
mc_err[~hasgen_flat] = np.array(ak.flatten(mc_errsmear))
mc_err = ak.flatten(ak.ones_like(events.Muon.pt)).compute().to_numpy()
mc_err[hasgen_flat] = ak.flatten(mc_errspread).compute().to_numpy()
mc_err[~hasgen_flat] = ak.flatten(mc_errsmear).compute().to_numpy()
assert all(np.isclose(mc_err, official_mc_err, atol=1e-8))


Expand Down