diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9a387fcbc..ea61615a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: name: pre-commit runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: actions/setup-python@v4 - uses: pre-commit/action@v3.0.0 with: @@ -45,7 +45,7 @@ jobs: name: test coffea (${{ matrix.os }}) - python ${{ matrix.python-version }}, JDK${{ matrix.java-version }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: @@ -69,7 +69,7 @@ jobs: python -m pip install xgboost python -m pip install tritonclient[grpc,http] # install checked out coffea - python -m pip install -q -e '.[dev,parsl,dask,spark]' + python -m pip install -q -e '.[dev,parsl,dask,spark]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Install dependencies (MacOS) @@ -80,7 +80,7 @@ jobs: python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu python -m pip install xgboost # install checked out coffea - python -m pip install -q -e '.[dev,dask,spark]' + python -m pip install -q -e '.[dev,dask,spark]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Install dependencies (Windows) @@ -91,14 +91,14 @@ jobs: python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu python -m pip install xgboost # install checked out coffea - python -m pip install -q -e '.[dev,dask]' + python -m pip install -q -e '.[dev,dask]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Start triton server with example model if: matrix.os == 'ubuntu-latest' run: | - docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-py3 tritonserver --model-repository=/models + docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-pyt-python-py3 tritonserver --model-repository=/models - name: Test with pytest run: | @@ -119,7 +119,7 @@ jobs: touch build/html/.nojekyll - name: Deploy documentation if: github.event_name == 'push' && matrix.os == 'ubuntu-latest' && matrix.python-version == 3.11 - uses: crazy-max/ghaction-github-pages@v3 + uses: crazy-max/ghaction-github-pages@v4 with: target_branch: gh-pages build_dir: docs/build/html @@ -135,7 +135,7 @@ jobs: name: test coffea-workqueue steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Conda uses: conda-incubator/setup-miniconda@v2 env: @@ -185,7 +185,7 @@ jobs: name: deploy release steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 99d3f9e26..44dbedb0b 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -17,6 +17,6 @@ jobs: name: Validate PR title runs-on: ubuntu-latest steps: - - uses: amannn/action-semantic-pull-request@v5.2.0 + - uses: amannn/action-semantic-pull-request@v5.3.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a4d511b07..baa961304 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,14 +5,14 @@ ci: for more information, see https://pre-commit.ci autofix_prs: true autoupdate_branch: '' - autoupdate_commit_msg: '[pre-commit.ci] pre-commit autoupdate' + autoupdate_commit_msg: 'ci(pre-commit): pre-commit autoupdate' autoupdate_schedule: weekly skip: [] submodules: false repos: - repo: https://github.com/psf/black - rev: 23.7.0 + rev: 23.9.1 hooks: - id: black @@ -24,7 +24,7 @@ repos: args: ["--profile", "black", "--filter-files"] - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: check-case-conflict - id: check-merge-conflict @@ -37,24 +37,24 @@ repos: - id: trailing-whitespace - repo: https://github.com/asottile/pyupgrade - rev: v3.9.0 + rev: v3.15.0 hooks: - id: pyupgrade args: ["--py38-plus"] - repo: https://github.com/asottile/setup-cfg-fmt - rev: v2.4.0 + rev: v2.5.0 hooks: - id: setup-cfg-fmt - repo: https://github.com/pycqa/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 exclude: coffea/processor/templates - repo: https://github.com/codespell-project/codespell - rev: v2.2.5 + rev: v2.2.6 hooks: - id: codespell args: ["--skip=*.ipynb","-L hist,Hist,nd,SubJet,subjet,Subjet,PTD,ptd,fPt,fpt,Ser,ser"] diff --git a/pyproject.toml b/pyproject.toml index 454ed3319..689e03ff0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,17 +37,17 @@ classifiers = [ "Topic :: Utilities", ] dependencies = [ - "awkward>=2.3.3", - "uproot>=5.0.10", + "awkward>=2.4.6", + "uproot>=5.1.1", "dask[array]>=2023.4.0", - "dask-awkward>=2023.7.1,!=2023.8.0", - "dask-histogram>=2023.6.0", - "correctionlib>=2.0.0", + "dask-awkward>=2023.10.0", + "dask-histogram>=2023.10.0", + "correctionlib>=2.3.3", "pyarrow>=6.0.0", "fsspec", "matplotlib>=3", - "numba>=0.57.0", - "numpy>=1.22.0,<1.25", # < 1.25 for numba 0.57 series + "numba>=0.58.0", + "numpy>=1.22.0,<1.26", # < 1.26 for numba 0.58 series "scipy>=1.1.0", "tqdm>=4.27.0", "lz4", diff --git a/src/coffea/analysis_tools.py b/src/coffea/analysis_tools.py index 66b92fe2b..facf14e97 100644 --- a/src/coffea/analysis_tools.py +++ b/src/coffea/analysis_tools.py @@ -418,7 +418,7 @@ def variations(self): class NminusOneToNpz: - """Object to be returned by NmiusOne.to_npz()""" + """Object to be returned by NminusOne.to_npz()""" def __init__(self, file, labels, nev, masks, saver): self._file = file @@ -494,11 +494,17 @@ def maskscutflow(self): return self._maskscutflow def compute(self): - self._nevonecut = list(dask.compute(*self._nevonecut)) - self._nevcutflow = list(dask.compute(*self._nevcutflow)) - self._masksonecut = list(dask.compute(*self._masksonecut)) - self._maskscutflow = list(dask.compute(*self._maskscutflow)) - numpy.savez( + self._nevonecut, self._nevcutflow = dask.compute( + self._nevonecut, self._nevcutflow + ) + self._masksonecut, self._maskscutflow = dask.compute( + self._masksonecut, self._maskscutflow + ) + self._nevonecut = list(self._nevonecut) + self._nevcutflow = list(self._nevcutflow) + self._masksonecut = list(self._masksonecut) + self._maskscutflow = list(self._maskscutflow) + self._saver( self._file, labels=self._labels, nevonecut=self._nevonecut, @@ -538,7 +544,7 @@ def result(self): labels = ["initial"] + [f"N - {i}" for i in self._names] + ["N"] return NminusOneResult(labels, self._nev, self._masks) - def to_npz(self, file, compressed=False, compute=True): + def to_npz(self, file, compressed=False, compute=False): """Saves the results of the N-1 selection to a .npz file Parameters @@ -554,7 +560,7 @@ def to_npz(self, file, compressed=False, compute=True): compute : bool, optional Whether to immediately start writing or to return an object that the user can choose when to start writing by calling compute(). - Default is True. + Default is False. Returns ------- @@ -580,22 +586,29 @@ def print(self): """Prints the statistics of the N-1 selection""" if self._delayed_mode: + warnings.warn( + "Printing the N-1 selection statistics is going to compute dask_awkward objects." + ) self._nev = list(dask.compute(*self._nev)) + nev = self._nev print("N-1 selection stats:") for i, name in enumerate(self._names): - print( - f"Ignoring {name:<20}: pass = {nev[i+1]:<20}\ - all = {nev[0]:<20}\ - -- eff = {nev[i+1]*100/nev[0]:.1f} %" + stats = ( + f"Ignoring {name:<20}" + f"pass = {nev[i+1]:<20}" + f"all = {nev[0]:<20}" + f"-- eff = {nev[i+1]*100/nev[0]:.1f} %" ) + print(stats) - if True: - print( - f"All cuts {'':<20}: pass = {nev[-1]:<20}\ - all = {nev[0]:<20}\ - -- eff = {nev[-1]*100/nev[0]:.1f} %" - ) + stats_all = ( + f"All cuts {'':<20}" + f"pass = {nev[-1]:<20}" + f"all = {nev[0]:<20}" + f"-- eff = {nev[-1]*100/nev[0]:.1f} %" + ) + print(stats_all) def yieldhist(self): """Returns the N-1 selection yields as a ``hist.Hist`` object @@ -610,13 +623,13 @@ def yieldhist(self): labels = ["initial"] + [f"N - {i}" for i in self._names] + ["N"] if not self._delayed_mode: h = hist.Hist(hist.axis.Integer(0, len(labels), name="N-1")) - h.fill(numpy.arange(len(labels)), weight=self._nev) + h.fill(numpy.arange(len(labels), dtype=int), weight=self._nev) else: h = hist.dask.Hist(hist.axis.Integer(0, len(labels), name="N-1")) for i, weight in enumerate(self._masks, 1): h.fill(dask_awkward.full_like(weight, i, dtype=int), weight=weight) - h.fill(dask_awkward.zeros_like(weight)) + h.fill(dask_awkward.zeros_like(weight, dtype=int)) return h, labels @@ -712,7 +725,7 @@ def plot_vars( hist.axis.Integer(0, len(labels), name="N-1"), ) arr = awkward.flatten(var) - h.fill(arr, awkward.zeros_like(arr)) + h.fill(arr, awkward.zeros_like(arr, dtype=int)) for i, mask in enumerate(self.result().masks, 1): arr = awkward.flatten(var[mask]) h.fill(arr, awkward.full_like(arr, i, dtype=int)) @@ -725,7 +738,7 @@ def plot_vars( hist.axis.Integer(0, len(labels), name="N-1"), ) arr = dask_awkward.flatten(var) - h.fill(arr, dask_awkward.zeros_like(arr)) + h.fill(arr, dask_awkward.zeros_like(arr, dtype=int)) for i, mask in enumerate(self.result().masks, 1): arr = dask_awkward.flatten(var[mask]) h.fill(arr, dask_awkward.full_like(arr, i, dtype=int)) @@ -780,7 +793,7 @@ def result(self): self._maskscutflow, ) - def to_npz(self, file, compressed=False, compute=True): + def to_npz(self, file, compressed=False, compute=False): """Saves the results of the cutflow to a .npz file Parameters @@ -796,7 +809,7 @@ def to_npz(self, file, compressed=False, compute=True): compute : bool, optional Whether to immediately start writing or to return an object that the user can choose when to start writing by calling compute(). - Default is True. + Default is False. Returns ------- @@ -824,19 +837,27 @@ def print(self): """Prints the statistics of the Cutflow""" if self._delayed_mode: - self._nevonecut = list(dask.compute(*self._nevonecut)) - self._nevcutflow = list(dask.compute(*self._nevcutflow)) + warnings.warn( + "Printing the cutflow statistics is going to compute dask_awkward objects." + ) + self._nevonecut, self._nevcutflow = dask.compute( + self._nevonecut, self._nevcutflow + ) + nevonecut = self._nevonecut nevcutflow = self._nevcutflow + print("Cutflow stats:") for i, name in enumerate(self._names): - print( - f"Cut {name:<20}: pass = {nevonecut[i+1]:<20}\ - cumulative pass = {nevcutflow[i+1]:<20}\ - all = {nevonecut[0]:<20}\ - -- eff = {nevonecut[i+1]*100/nevonecut[0]:.1f} %\ - -- cumulative eff = {nevcutflow[i+1]*100/nevcutflow[0]:.1f} %" + stats = ( + f"Cut {name:<20}:" + f"pass = {nevonecut[i+1]:<20}" + f"cumulative pass = {nevcutflow[i+1]:<20}" + f"all = {nevonecut[0]:<20}" + f"-- eff = {nevonecut[i+1]*100/nevonecut[0]:.1f} %{'':<20}" + f"-- cumulative eff = {nevcutflow[i+1]*100/nevcutflow[0]:.1f} %" ) + print(stats) def yieldhist(self): """Returns the cutflow yields as ``hist.Hist`` objects @@ -856,8 +877,8 @@ def yieldhist(self): honecut = hist.Hist(hist.axis.Integer(0, len(labels), name="onecut")) hcutflow = honecut.copy() hcutflow.axes.name = ("cutflow",) - honecut.fill(numpy.arange(len(labels)), weight=self._nevonecut) - hcutflow.fill(numpy.arange(len(labels)), weight=self._nevcutflow) + honecut.fill(numpy.arange(len(labels), dtype=int), weight=self._nevonecut) + hcutflow.fill(numpy.arange(len(labels), dtype=int), weight=self._nevcutflow) else: honecut = hist.dask.Hist(hist.axis.Integer(0, len(labels), name="onecut")) @@ -868,12 +889,12 @@ def yieldhist(self): honecut.fill( dask_awkward.full_like(weight, i, dtype=int), weight=weight ) - honecut.fill(dask_awkward.zeros_like(weight)) + honecut.fill(dask_awkward.zeros_like(weight, dtype=int)) for i, weight in enumerate(self._maskscutflow, 1): hcutflow.fill( dask_awkward.full_like(weight, i, dtype=int), weight=weight ) - hcutflow.fill(dask_awkward.zeros_like(weight)) + hcutflow.fill(dask_awkward.zeros_like(weight, dtype=int)) return honecut, hcutflow, labels @@ -975,8 +996,8 @@ def plot_vars( hcutflow.axes.name = name, "cutflow" arr = awkward.flatten(var) - honecut.fill(arr, awkward.zeros_like(arr)) - hcutflow.fill(arr, awkward.zeros_like(arr)) + honecut.fill(arr, awkward.zeros_like(arr, dtype=int)) + hcutflow.fill(arr, awkward.zeros_like(arr, dtype=int)) for i, mask in enumerate(self.result().masksonecut, 1): arr = awkward.flatten(var[mask]) @@ -998,8 +1019,8 @@ def plot_vars( hcutflow.axes.name = name, "cutflow" arr = dask_awkward.flatten(var) - honecut.fill(arr, dask_awkward.zeros_like(arr)) - hcutflow.fill(arr, dask_awkward.zeros_like(arr)) + honecut.fill(arr, dask_awkward.zeros_like(arr, dtype=int)) + hcutflow.fill(arr, dask_awkward.zeros_like(arr, dtype=int)) for i, mask in enumerate(self.result().masksonecut, 1): arr = dask_awkward.flatten(var[mask]) diff --git a/src/coffea/lookup_tools/rochester_lookup.py b/src/coffea/lookup_tools/rochester_lookup.py index 3e8bf6eab..fece55d6d 100644 --- a/src/coffea/lookup_tools/rochester_lookup.py +++ b/src/coffea/lookup_tools/rochester_lookup.py @@ -1,4 +1,5 @@ import awkward +import dask_awkward as dak import numpy from coffea.lookup_tools.dense_lookup import dense_lookup @@ -75,7 +76,7 @@ def _error(self, func, *args): newargs = args + (0, 0) default = func(*newargs) - result = numpy.zeros_like(default) + result = awkward.zeros_like(default) for s in range(self._nsets): oneOver = 1.0 / self._members[s] for m in range(self._members[s]): @@ -226,12 +227,27 @@ def _kExtra(self, kpt, eta, nl, u, s=0, m=0): cbN_flat = awkward.flatten(cbN) cbS_flat = awkward.flatten(cbS) - invcdf = awkward.unflatten( - doublecrystalball.ppf( - u_flat, cbA_flat, cbA_flat, cbN_flat, cbN_flat, loc, cbS_flat - ), - counts, - ) + args = (u_flat, cbA_flat, cbA_flat, cbN_flat, cbN_flat, loc, cbS_flat) + + if any(isinstance(arg, dak.Array) for arg in args): + + def apply(*args): + args_lz = [ + awkward.typetracer.length_zero_if_typetracer(arg) for arg in args + ] + out = awkward.Array(doublecrystalball.ppf(*args_lz)) + if awkward.backend(args[0]) == "typetracer": + out = awkward.Array( + out.layout.to_typetracer(forget_length=True), + behavior=out.behavior, + ) + return out + + invcdf = dak.map_partitions(apply, *args) + else: + invcdf = doublecrystalball.ppf(*args) + + invcdf = awkward.unflatten(invcdf, counts) x = awkward.where( mask, diff --git a/src/coffea/nanoevents/factory.py b/src/coffea/nanoevents/factory.py index 507cf6600..073d891c4 100644 --- a/src/coffea/nanoevents/factory.py +++ b/src/coffea/nanoevents/factory.py @@ -1,6 +1,5 @@ import io import pathlib -import urllib.parse import warnings import weakref from functools import partial @@ -11,7 +10,6 @@ import dask_awkward import fsspec import uproot -from dask_awkward import ImplementsFormTransformation from coffea.nanoevents.mapping import ( CachedMapping, @@ -31,7 +29,9 @@ PHYSLITESchema, TreeMakerSchema, ) -from coffea.nanoevents.util import key_to_tuple, tuple_to_key +from coffea.nanoevents.util import key_to_tuple, quote, tuple_to_key, unquote + +_offsets_label = quote(",!offsets") def _remove_not_interpretable(branch): @@ -70,11 +70,11 @@ def _remove_not_interpretable(branch): def _key_formatter(prefix, form_key, form, attribute): if attribute == "offsets": - form_key += "%2C%21offsets" + form_key += _offsets_label return prefix + f"/{attribute}/{form_key}" -class _map_schema_base(ImplementsFormTransformation): +class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo def __init__( self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None ): @@ -83,24 +83,50 @@ def __init__( self.metadata = metadata self.version = version - def extract_form_keys_base_columns(self, form_keys): - base_columns = [] - for form_key in form_keys: - base_columns.extend( + def keys_for_buffer_keys(self, buffer_keys): + base_columns = set() + for buffer_key in buffer_keys: + form_key, attribute = self.parse_buffer_key(buffer_key) + operands = unquote(form_key).split(",") + + it_operands = iter(operands) + next(it_operands) + + base_columns.update( [ - acolumn - for acolumn in urllib.parse.unquote(form_key).split(",") - if not acolumn.startswith("!") + name + for name, maybe_transform in zip(operands, it_operands) + if maybe_transform == "!load" ] ) - return list(set(base_columns)) + return base_columns + + def parse_buffer_key(self, buffer_key): + prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2) + if attribute == "offsets": + return (form_key[: -len(_offsets_label)], attribute) + else: + return (form_key, attribute) + + @property + def buffer_key(self): + return partial(self._key_formatter, "") def _key_formatter(self, prefix, form_key, form, attribute): if attribute == "offsets": - form_key += "%2C%21offsets" + form_key += _offsets_label return prefix + f"/{attribute}/{form_key}" +class _TranslatedMapping: + def __init__(self, func, mapping): + self._func = func + self._mapping = mapping + + def __getitem__(self, index): + return self._mapping[self._func(index)] + + class _map_schema_uproot(_map_schema_base): def __init__( self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None @@ -131,9 +157,12 @@ def __call__(self, form): }, "form_key": None, } - return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) + return ( + awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form), + self, + ) - def create_column_mapping_and_key(self, tree, start, stop, interp_options): + def load_buffers(self, tree, keys, start, stop, interp_options): from functools import partial from coffea.nanoevents.util import tuple_to_key @@ -153,8 +182,15 @@ def create_column_mapping_and_key(self, tree, start, stop, interp_options): use_ak_forth=True, ) mapping.preload_column_source(partition_key[0], partition_key[1], tree) + buffer_key = partial(self._key_formatter, tuple_to_key(partition_key)) + + # The buffer-keys that dask-awkward knows about will not include the + # partition key. Therefore, we must translate the keys here. + def translate_key(index): + form_key, attribute = self.parse_buffer_key(index) + return buffer_key(form_key=form_key, attribute=attribute, form=None) - return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) + return _TranslatedMapping(translate_key, mapping) class _map_schema_parquet(_map_schema_base): @@ -180,31 +216,6 @@ def __call__(self, form): return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) - def create_column_mapping_and_key(self, columns, start, stop, interp_options): - from functools import partial - - from coffea.nanoevents.util import tuple_to_key - - uuid = "NO_UUID" - obj_path = "NO_OBJECT_PATH" - - partition_key = ( - str(uuid), - obj_path, - f"{start}-{stop}", - ) - uuidpfn = {uuid: columns} - mapping = PreloadedSourceMapping( - PreloadedOpener(uuidpfn), - start, - stop, - cache={}, - access_log=None, - ) - mapping.preload_column_source(partition_key[0], partition_key[1], columns) - - return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) - class NanoEventsFactory: """A factory class to build NanoEvents objects""" @@ -238,7 +249,7 @@ def from_root( treepath="/Events", entry_start=None, entry_stop=None, - chunks_per_file=1, + chunks_per_file=uproot._util.unset, runtime_cache=None, persistent_cache=None, schemaclass=NanoAODSchema, @@ -274,7 +285,7 @@ def from_root( metadata : dict, optional Arbitrary metadata to add to the `base.NanoEvents` object uproot_options : dict, optional - Any options to pass to ``uproot.open`` + Any options to pass to ``uproot.open`` or ``uproot.dask`` access_log : list, optional Pass a list instance to record which branches were lazily accessed by this instance use_ak_forth: @@ -339,6 +350,7 @@ def from_root( ak_add_doc=True, filter_branch=_remove_not_interpretable, steps_per_file=chunks_per_file, + **uproot_options, ) else: opener = partial( @@ -349,6 +361,7 @@ def from_root( ak_add_doc=True, filter_branch=_remove_not_interpretable, steps_per_file=chunks_per_file, + **uproot_options, ) return cls(map_schema, opener, None, cache=None, is_dask=True) elif permit_dask and not schemaclass.__dask_capable__: diff --git a/src/coffea/nanoevents/mapping/base.py b/src/coffea/nanoevents/mapping/base.py index c6a5e8e2e..3d87b410c 100644 --- a/src/coffea/nanoevents/mapping/base.py +++ b/src/coffea/nanoevents/mapping/base.py @@ -111,14 +111,19 @@ def __getitem__(self, key): if len(stack) != 1: raise RuntimeError(f"Syntax error in form key {nodes}") out = stack.pop() - try: - out = numpy.array(out) - except ValueError: - if self._debug: - print(out) - raise RuntimeError( - f"Left with non-bare array after evaluating form key {nodes}" - ) + import awkward + + if isinstance(out, awkward.contents.Content): + out = awkward.to_numpy(out) + else: + try: + out = numpy.array(out) + except ValueError: + if self._debug: + print(out) + raise RuntimeError( + f"Left with non-bare array after evaluating form key {nodes}" + ) return out @abstractmethod diff --git a/src/coffea/nanoevents/methods/physlite.py b/src/coffea/nanoevents/methods/physlite.py index c0efcdc39..751c5d03f 100644 --- a/src/coffea/nanoevents/methods/physlite.py +++ b/src/coffea/nanoevents/methods/physlite.py @@ -2,6 +2,7 @@ from numbers import Number import awkward +import dask_awkward import numpy from coffea.nanoevents.methods import base, vector @@ -38,7 +39,30 @@ def _element_link(target_collection, eventindex, index, key): return target_collection._apply_global_index(global_index) +def _element_link_method(self, link_name, target_name, _dask_array_): + if _dask_array_ is not None: + target = _dask_array_.behavior["__original_array__"]()[target_name] + links = _dask_array_[link_name] + return _element_link( + target, + _dask_array_._eventindex, + links.m_persIndex, + links.m_persKey, + ) + links = self[link_name] + return _element_link( + self._events()[target_name], + self._eventindex, + links.m_persIndex, + links.m_persKey, + ) + + def _element_link_multiple(events, obj, link_field, with_name=None): + # currently not working in dask because: + # - we don't know the resulting type beforehand + # - also not the targets, so no way to find out which columns to load? + # - could consider to treat the case of truth collections by just loading all truth columns link = obj[link_field] key = link.m_persKey index = link.m_persIndex @@ -64,22 +88,46 @@ def where(unique_keys): return out -def _get_target_offsets(offsets, event_index): +def _get_target_offsets(load_column, event_index): + if isinstance(load_column, dask_awkward.Array) and isinstance( + event_index, dask_awkward.Array + ): + # wrap in map_partitions if dask arrays + return dask_awkward.map_partitions( + _get_target_offsets, load_column, event_index + ) + + offsets = load_column.layout.offsets.data + if isinstance(event_index, Number): return offsets[event_index] + # let the necessary column optimization know that we need to load this + # column to get the offsets + if awkward.backend(load_column) == "typetracer": + awkward.typetracer.touch_data(load_column) + + # necessary to stick it into the `NumpyArray` constructor + # if typetracer is passed through + offsets = awkward.typetracer.length_zero_if_typetracer( + load_column.layout.offsets.data + ) + def descend(layout, depth, **kwargs): if layout.purelist_depth == 1: return awkward.contents.NumpyArray(offsets)[layout] - return awkward.transform(descend, event_index) + return awkward.transform(descend, event_index.layout) def _get_global_index(target, eventindex, index): - load_column = target[ - target.fields[0] - ] # awkward is eager-mode now (will need to dask this) - target_offsets = _get_target_offsets(load_column.layout.offsets, eventindex) + for field in target.fields: + # fetch first column to get offsets from + # (but try to avoid the double-jagged ones if possible) + load_column = target[field] + if load_column.ndim < 3: + break + target_offsets = _get_target_offsets(load_column, eventindex) return target_offsets + index @@ -140,12 +188,12 @@ class Muon(Particle): """ @property - def trackParticle(self): - return _element_link( - self._events().CombinedMuonTrackParticles, - self._eventindex, - self["combinedTrackParticleLink.m_persIndex"], - self["combinedTrackParticleLink.m_persKey"], + def trackParticle(self, _dask_array_=None): + return _element_link_method( + self, + "combinedTrackParticleLink", + "CombinedMuonTrackParticles", + _dask_array_, ) @@ -159,21 +207,25 @@ class Electron(Particle): """ @property - def trackParticles(self): - links = self.trackParticleLinks - return _element_link( - self._events().GSFTrackParticles, - self._eventindex, - links.m_persIndex, - links.m_persKey, + def trackParticles(self, _dask_array_=None): + return _element_link_method( + self, "trackParticleLinks", "GSFTrackParticles", _dask_array_ ) @property - def trackParticle(self): - trackParticles = self.trackParticles - return self.trackParticles[ - tuple([slice(None) for i in range(trackParticles.ndim - 1)] + [0]) - ] + def trackParticle(self, _dask_array_=None): + trackParticles = _element_link_method( + self, "trackParticleLinks", "GSFTrackParticles", _dask_array_ + ) + # Ellipsis (..., 0) slicing not supported yet by dask_awkward + slicer = tuple([slice(None) for i in range(trackParticles.ndim - 1)] + [0]) + return trackParticles[slicer] + + @property + def caloClusters(self, _dask_array_=None): + return _element_link_method( + self, "caloClusterLinks", "CaloCalTopoClusters", _dask_array_ + ) _set_repr_name("Electron") diff --git a/src/coffea/nanoevents/schemas/base.py b/src/coffea/nanoevents/schemas/base.py index 09812eee0..8a1f2251e 100644 --- a/src/coffea/nanoevents/schemas/base.py +++ b/src/coffea/nanoevents/schemas/base.py @@ -105,7 +105,6 @@ class BaseSchema: """ __dask_capable__ = True - behavior = {} def __init__(self, base_form, *args, **kwargs): params = dict(base_form.get("parameters", {})) diff --git a/src/coffea/nanoevents/schemas/physlite.py b/src/coffea/nanoevents/schemas/physlite.py index 1b9b89205..368dc7d8a 100644 --- a/src/coffea/nanoevents/schemas/physlite.py +++ b/src/coffea/nanoevents/schemas/physlite.py @@ -53,6 +53,7 @@ class PHYSLITESchema(BaseSchema): "GSFTrackParticles": "TrackParticle", "InDetTrackParticles": "TrackParticle", "MuonSpectrometerTrackParticles": "TrackParticle", + "CaloCalTopoClusters": "NanoCollection", } """Default configuration for mixin types, based on the collection name. @@ -79,7 +80,12 @@ def _build_collections(self, branch_forms): key_fields = key.split("/")[-1].split(".") top_key = key_fields[0] sub_key = ".".join(key_fields[1:]) - objname = top_key.replace("Analysis", "").replace("AuxDyn", "") + if ak_form["class"] == "RecordArray" and not ak_form["fields"]: + # skip empty records (e.g. the branches ending in "." only containing the base class) + continue + objname = ( + top_key.replace("Analysis", "").replace("AuxDyn", "").replace("Aux", "") + ) zip_groups[objname].append(((key, sub_key), ak_form)) @@ -97,6 +103,10 @@ def _build_collections(self, branch_forms): # zip the forms contents = {} for objname, keys_and_form in zip_groups.items(): + if len(keys_and_form) == 1: + # don't zip if there is only one item + contents[objname] = keys_and_form[0][1] + continue to_zip = {} for (key, sub_key), form in keys_and_form: if "." in sub_key: @@ -118,14 +128,21 @@ def _build_collections(self, branch_forms): to_zip, objname, self.mixins.get(objname, None), - bypass=True, - ) - content = contents[objname]["content"] - content["parameters"] = dict( - content.get("parameters", {}), collection_name=objname + bypass=False, ) except NotImplementedError: warnings.warn(f"Can't zip collection {objname}") + if "content" in contents[objname]: + # in this case we were able to zip everything together to a ListOffsetArray(RecordArray) + assert "List" in contents[objname]["class"] + content = contents[objname]["content"] + else: + # in this case this was not possible (e.g. because we also had non-list fields) + assert contents[objname]["class"] == "RecordArray" + content = contents[objname] + content["parameters"] = dict( + content.get("parameters", {}), collection_name=objname + ) return contents @staticmethod diff --git a/src/coffea/nanoevents/transforms.py b/src/coffea/nanoevents/transforms.py index 6363ac8f3..8ffe292af 100644 --- a/src/coffea/nanoevents/transforms.py +++ b/src/coffea/nanoevents/transforms.py @@ -13,6 +13,15 @@ def to_layout(array): return array.layout +def ensure_array(arraylike): + if isinstance(arraylike, (awkward.contents.Content, awkward.Array)): + return awkward.to_numpy(arraylike) + elif isinstance(arraylike, awkward.index.Index): + return arraylike.data + else: + return numpy.asarray(arraylike) + + def data(stack): """Extract content from array (currently a noop, can probably take place of !content) @@ -96,7 +105,7 @@ def counts2offsets(stack): Signature: counts,!counts2offsets Outputs an array with length one larger than input """ - counts = numpy.array(stack.pop()) + counts = ensure_array(stack.pop()) offsets = numpy.empty(len(counts) + 1, dtype=numpy.int64) offsets[0] = 0 numpy.cumsum(counts, out=offsets[1:]) @@ -123,11 +132,11 @@ def local2global(stack): Signature: index,target_offsets,!local2global Outputs a content array with same shape as index content """ - target_offsets = numpy.asarray(stack.pop()) + target_offsets = ensure_array(stack.pop()) index = stack.pop() index = index.mask[index >= 0] + target_offsets[:-1] index = index.mask[index < target_offsets[1:]] - out = numpy.array(awkward.flatten(awkward.fill_none(index, -1), axis=None)) + out = ensure_array(awkward.flatten(awkward.fill_none(index, -1), axis=None)) if out.dtype != numpy.int64: raise RuntimeError stack.append(out) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 618b1c741..18ab1c8c5 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -694,7 +694,7 @@ class FuturesExecutor(ExecutorBase): An accumulator to collect the output of the function pool : concurrent.futures.Executor class or instance, optional The type of futures executor to use, defaults to ProcessPoolExecutor. - You can pass an instance instead of a class to re-use an executor + You can pass an instance instead of a class to reuse an executor workers : int, optional Number of parallel processes for futures (default 1) status : bool, optional @@ -1718,7 +1718,7 @@ def _work_function( import dask_awkward to_compute = processor_instance.process(events) - materialized = dask_awkward.necessary_columns(to_compute) + # materialized = dask_awkward.report_necessary_buffers(to_compute) out = dask.compute(to_compute, scheduler="single-threaded")[0] except Exception as e: raise Exception(f"Failed processing file: {item!r}") from e @@ -1734,11 +1734,11 @@ def _work_function( metrics = {} if isinstance(file, uproot.ReadOnlyDirectory): metrics["bytesread"] = file.file.source.num_requested_bytes + # metrics["data_and_shape_buffers"] = set(materialized) + # metrics["shape_only_buffers"] = set(materialized) if schema is not None and issubclass(schema, schemas.BaseSchema): - metrics["columns"] = set(materialized) metrics["entries"] = len(events) else: - metrics["columns"] = set(materialized) metrics["entries"] = events.size metrics["processtime"] = toc - tic return {"out": out, "metrics": metrics, "processed": {item}} diff --git a/tests/test_analysis_tools.py b/tests/test_analysis_tools.py index 1e8c46ec1..bb3221432 100644 --- a/tests/test_analysis_tools.py +++ b/tests/test_analysis_tools.py @@ -513,14 +513,14 @@ def test_packed_selection_nminusone(): ): assert np.all(mask == truth) - nminusone.to_npz("nminusone.npz", compressed=False) + nminusone.to_npz("nminusone.npz", compressed=False).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == nev) assert np.all(file["masks"] == masks) os.remove("nminusone.npz") - nminusone.to_npz("nminusone.npz", compressed=True) + nminusone.to_npz("nminusone.npz", compressed=True).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == nev) @@ -619,7 +619,7 @@ def test_packed_selection_cutflow(): ): assert np.all(mask == truth) - cutflow.to_npz("cutflow.npz", compressed=False) + cutflow.to_npz("cutflow.npz", compressed=False).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == nevonecut) @@ -628,7 +628,7 @@ def test_packed_selection_cutflow(): assert np.all(file["maskscutflow"] == maskscutflow) os.remove("cutflow.npz") - cutflow.to_npz("cutflow.npz", compressed=True) + cutflow.to_npz("cutflow.npz", compressed=True).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == nevonecut) @@ -854,14 +854,14 @@ def test_packed_selection_nminusone_dak(optimization_enabled): ): assert np.all(mask.compute() == truth.compute()) - nminusone.to_npz("nminusone.npz", compressed=False) + nminusone.to_npz("nminusone.npz", compressed=False).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == list(dask.compute(*nev))) assert np.all(file["masks"] == list(dask.compute(*masks))) os.remove("nminusone.npz") - nminusone.to_npz("nminusone.npz", compressed=True) + nminusone.to_npz("nminusone.npz", compressed=True).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == list(dask.compute(*nev))) @@ -978,7 +978,7 @@ def test_packed_selection_cutflow_dak(optimization_enabled): ): assert np.all(mask.compute() == truth.compute()) - cutflow.to_npz("cutflow.npz", compressed=False) + cutflow.to_npz("cutflow.npz", compressed=False).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == list(dask.compute(*nevonecut))) @@ -987,7 +987,7 @@ def test_packed_selection_cutflow_dak(optimization_enabled): assert np.all(file["maskscutflow"] == list(dask.compute(*maskscutflow))) os.remove("cutflow.npz") - cutflow.to_npz("cutflow.npz", compressed=True) + cutflow.to_npz("cutflow.npz", compressed=True).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == list(dask.compute(*nevonecut))) @@ -1109,14 +1109,14 @@ def test_packed_selection_nminusone_dak_uproot_only(optimization_enabled): ): assert np.all(mask.compute() == truth.compute()) - nminusone.to_npz("nminusone.npz", compressed=False) + nminusone.to_npz("nminusone.npz", compressed=False).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == list(dask.compute(*nev))) assert np.all(file["masks"] == list(dask.compute(*masks))) os.remove("nminusone.npz") - nminusone.to_npz("nminusone.npz", compressed=True) + nminusone.to_npz("nminusone.npz", compressed=True).compute() with np.load("nminusone.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nev"] == list(dask.compute(*nev))) @@ -1233,7 +1233,7 @@ def test_packed_selection_cutflow_dak_uproot_only(optimization_enabled): ): assert np.all(mask.compute() == truth.compute()) - cutflow.to_npz("cutflow.npz", compressed=False) + cutflow.to_npz("cutflow.npz", compressed=False).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == list(dask.compute(*nevonecut))) @@ -1242,7 +1242,7 @@ def test_packed_selection_cutflow_dak_uproot_only(optimization_enabled): assert np.all(file["maskscutflow"] == list(dask.compute(*maskscutflow))) os.remove("cutflow.npz") - cutflow.to_npz("cutflow.npz", compressed=True) + cutflow.to_npz("cutflow.npz", compressed=True).compute() with np.load("cutflow.npz") as file: assert np.all(file["labels"] == labels) assert np.all(file["nevonecut"] == list(dask.compute(*nevonecut))) diff --git a/tests/test_jetmet_tools.py b/tests/test_jetmet_tools.py index a7ef91385..8be3a97f3 100644 --- a/tests/test_jetmet_tools.py +++ b/tests/test_jetmet_tools.py @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled): **{name: evaluator[name] for name in jec_stack_names[5:6]} ) - print(dak.necessary_columns(jets.eta)) + print(dak.report_necessary_columns(jets.eta)) print( - dak.necessary_columns( + dak.report_necessary_columns( resosf.getScaleFactor( JetEta=jets.eta, ) diff --git a/tests/test_lookup_tools.py b/tests/test_lookup_tools.py index f4d5a3b30..7fde363de 100644 --- a/tests/test_lookup_tools.py +++ b/tests/test_lookup_tools.py @@ -372,8 +372,6 @@ def test_jec_txt_effareas(): def test_rochester(): - pytest.xfail("weird side effect from running other tests... passes by itself") - rochester_data = lookup_tools.txt_converters.convert_rochester_file( "tests/samples/RoccoR2018.txt.gz", loaduncs=True ) @@ -390,27 +388,29 @@ def test_rochester(): # test against nanoaod events = NanoEventsFactory.from_root( - os.path.abspath("tests/samples/nano_dimuon.root") + {os.path.abspath("tests/samples/nano_dimuon.root"): "Events"}, + permit_dask=True, ).events() data_k = rochester.kScaleDT( events.Muon.charge, events.Muon.pt, events.Muon.eta, events.Muon.phi ) - data_k = np.array(ak.flatten(data_k)) + data_k = ak.flatten(data_k).compute().to_numpy() assert all(np.isclose(data_k, official_data_k)) data_err = rochester.kScaleDTerror( events.Muon.charge, events.Muon.pt, events.Muon.eta, events.Muon.phi ) - data_err = np.array(ak.flatten(data_err), dtype=float) + data_err = ak.flatten(data_err).compute().to_numpy() assert all(np.isclose(data_err, official_data_err, atol=1e-8)) # test against mc events = NanoEventsFactory.from_root( - os.path.abspath("tests/samples/nano_dy.root") + {os.path.abspath("tests/samples/nano_dy.root"): "Events"}, + permit_dask=True, ).events() hasgen = ~np.isnan(ak.fill_none(events.Muon.matched_gen.pt, np.nan)) - mc_rand = ak.unflatten(mc_rand, ak.num(hasgen)) + mc_rand = ak.unflatten(dak.from_awkward(ak.Array(mc_rand), 1), ak.num(hasgen)) mc_kspread = rochester.kSpreadMC( events.Muon.charge[hasgen], events.Muon.pt[hasgen], @@ -426,10 +426,10 @@ def test_rochester(): events.Muon.nTrackerLayers[~hasgen], mc_rand[~hasgen], ) - mc_k = np.array(ak.flatten(ak.ones_like(events.Muon.pt))) - hasgen_flat = np.array(ak.flatten(hasgen)) - mc_k[hasgen_flat] = np.array(ak.flatten(mc_kspread)) - mc_k[~hasgen_flat] = np.array(ak.flatten(mc_ksmear)) + mc_k = ak.flatten(ak.ones_like(events.Muon.pt)).compute().to_numpy() + hasgen_flat = ak.flatten(hasgen).compute().to_numpy() + mc_k[hasgen_flat] = ak.flatten(mc_kspread).compute().to_numpy() + mc_k[~hasgen_flat] = ak.flatten(mc_ksmear).compute().to_numpy() assert all(np.isclose(mc_k, official_mc_k)) mc_errspread = rochester.kSpreadMCerror( @@ -447,9 +447,9 @@ def test_rochester(): events.Muon.nTrackerLayers[~hasgen], mc_rand[~hasgen], ) - mc_err = np.array(ak.flatten(ak.ones_like(events.Muon.pt))) - mc_err[hasgen_flat] = np.array(ak.flatten(mc_errspread)) - mc_err[~hasgen_flat] = np.array(ak.flatten(mc_errsmear)) + mc_err = ak.flatten(ak.ones_like(events.Muon.pt)).compute().to_numpy() + mc_err[hasgen_flat] = ak.flatten(mc_errspread).compute().to_numpy() + mc_err[~hasgen_flat] = ak.flatten(mc_errsmear).compute().to_numpy() assert all(np.isclose(mc_err, official_mc_err, atol=1e-8)) diff --git a/tests/test_nanoevents_physlite.py b/tests/test_nanoevents_physlite.py index f82471198..95f58491d 100644 --- a/tests/test_nanoevents_physlite.py +++ b/tests/test_nanoevents_physlite.py @@ -1,19 +1,17 @@ import os -import numpy as np +import dask import pytest from coffea.nanoevents import NanoEventsFactory, PHYSLITESchema -pytestmark = pytest.mark.skip(reason="uproot is upset with this file...") - def _events(): path = os.path.abspath("tests/samples/DAOD_PHYSLITE_21.2.108.0.art.pool.root") factory = NanoEventsFactory.from_root( {path: "CollectionTree"}, schemaclass=PHYSLITESchema, - permit_dask=False, + permit_dask=True, ) return factory.events() @@ -23,54 +21,21 @@ def events(): return _events() +def test_load_single_field_of_linked(events): + with dask.config.set({"awkward.raise-failed-meta": True}): + events.Electrons.caloClusters.calE.compute() + + @pytest.mark.parametrize("do_slice", [False, True]) def test_electron_track_links(events, do_slice): if do_slice: - events = events[np.random.randint(2, size=len(events)).astype(bool)] - for event in events: - for electron in event.Electrons: + events = events[::2] + trackParticles = events.Electrons.trackParticles.compute() + for i, event in enumerate(events[["Electrons", "GSFTrackParticles"]].compute()): + for j, electron in enumerate(event.Electrons): for link_index, link in enumerate(electron.trackParticleLinks): track_index = link.m_persIndex - print(track_index) - print(event.GSFTrackParticles) - print(electron.trackParticleLinks) - print(electron.trackParticles) - assert ( event.GSFTrackParticles[track_index].z0 - == electron.trackParticles[link_index].z0 - ) - - -# from MetaData/EventFormat -_hash_to_target_name = { - 13267281: "TruthPhotons", - 342174277: "TruthMuons", - 368360608: "TruthNeutrinos", - 375408000: "TruthTaus", - 394100163: "TruthElectrons", - 614719239: "TruthBoson", - 660928181: "TruthTop", - 779635413: "TruthBottom", -} - - -def test_truth_links_toplevel(events): - children_px = events.TruthBoson.children.px - for i_event, event in enumerate(events): - for i_particle, particle in enumerate(event.TruthBoson): - for i_link, link in enumerate(particle.childLinks): - assert ( - event[_hash_to_target_name[link.m_persKey]][link.m_persIndex].px - == children_px[i_event][i_particle][i_link] - ) - - -def test_truth_links(events): - for i_event, event in enumerate(events): - for i_particle, particle in enumerate(event.TruthBoson): - for i_link, link in enumerate(particle.childLinks): - assert ( - event[_hash_to_target_name[link.m_persKey]][link.m_persIndex].px - == particle.children[i_link].px + == trackParticles[i][j][link_index].z0 )