Skip to content

Commit

Permalink
Merge branch 'master' into local_executors_to_dask
Browse files Browse the repository at this point in the history
  • Loading branch information
lgray authored Nov 9, 2023
2 parents 5793cb8 + 6e94679 commit 84135da
Show file tree
Hide file tree
Showing 18 changed files with 73 additions and 108 deletions.
101 changes: 27 additions & 74 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@
TrivialUprootOpener,
UprootSourceMapping,
)
from coffea.nanoevents.schemas import (
BaseSchema,
DelphesSchema,
NanoAODSchema,
PFNanoAODSchema,
PHYSLITESchema,
TreeMakerSchema,
)
from coffea.nanoevents.schemas import BaseSchema, NanoAODSchema
from coffea.nanoevents.util import key_to_tuple, quote, tuple_to_key, unquote

_offsets_label = quote(",!offsets")
Expand Down Expand Up @@ -240,7 +233,7 @@ def __setstate__(self, state):
def from_root(
cls,
file,
treepath="/Events",
treepath=uproot._util.unset,
entry_start=None,
entry_stop=None,
chunks_per_file=uproot._util.unset,
Expand All @@ -252,7 +245,7 @@ def from_root(
access_log=None,
iteritems_options={},
use_ak_forth=True,
permit_dask=False,
delayed=True,
):
"""Quickly build NanoEvents from a root file
Expand Down Expand Up @@ -284,45 +277,27 @@ def from_root(
Pass a list instance to record which branches were lazily accessed by this instance
use_ak_forth:
Toggle using awkward_forth to interpret branches in root file.
permit_dask:
Allow nanoevents to use dask as a backend.
delayed:
Nanoevents will use dask as a backend to construct a delayed task graph representing your analysis.
"""

if treepath is not uproot._util.unset and not isinstance(
file, uproot.reading.ReadOnlyDirectory
):
raise ValueError(
"""Specification of treename by argument to from_root is no longer supported in coffea 2023.
Please use one of the allow types for "files" specified by uproot: https://github.com/scikit-hep/uproot5/blob/v5.1.2/src/uproot/_dask.py#L109-L132
"""
)

if (
permit_dask
delayed
and not isinstance(schemaclass, FunctionType)
and schemaclass.__dask_capable__
):
behavior = None
if schemaclass is BaseSchema:
from coffea.nanoevents.methods import base

behavior = base.behavior
elif schemaclass is NanoAODSchema:
from coffea.nanoevents.methods import nanoaod

behavior = nanoaod.behavior
elif schemaclass is PFNanoAODSchema:
from coffea.nanoevents.methods import nanoaod

behavior = nanoaod.behavior
elif schemaclass is TreeMakerSchema:
from coffea.nanoevents.methods import base, vector

behavior = {}
behavior.update(base.behavior)
behavior.update(vector.behavior)
elif schemaclass is PHYSLITESchema:
from coffea.nanoevents.methods import physlite

behavior = physlite.behavior
elif schemaclass is DelphesSchema:
from coffea.nanoevents.methods import delphes

behavior = delphes.behavior

map_schema = _map_schema_uproot(
schemaclass=schemaclass,
behavior=dict(behavior),
behavior=dict(schemaclass.behavior()),
metadata=metadata,
version="latest",
)
Expand Down Expand Up @@ -360,7 +335,7 @@ def from_root(
**uproot_options,
)
return cls(map_schema, opener, None, cache=None, is_dask=True)
elif permit_dask and not schemaclass.__dask_capable__:
elif delayed and not schemaclass.__dask_capable__:
warnings.warn(
f"{schemaclass} is not dask capable despite allowing dask, generating non-dask nanoevents"
)
Expand All @@ -369,7 +344,7 @@ def from_root(
tree = file[treepath]
elif "<class 'uproot.rootio.ROOTDirectory'>" == str(type(file)):
raise RuntimeError(
"The file instance (%r) is an uproot3 type, but this module is only compatible with uproot4 or higher"
"The file instance (%r) is an uproot3 type, but this module is only compatible with uproot5 or higher"
% file
)
else:
Expand Down Expand Up @@ -414,7 +389,7 @@ def from_root(
def from_parquet(
cls,
file,
treepath="/Events",
treepath=uproot._util.unset,
entry_start=None,
entry_stop=None,
runtime_cache=None,
Expand All @@ -424,7 +399,7 @@ def from_parquet(
parquet_options={},
skyhook_options={},
access_log=None,
permit_dask=False,
delayed=True,
):
"""Quickly build NanoEvents from a parquet file
Expand Down Expand Up @@ -453,6 +428,8 @@ def from_parquet(
Any options to pass to ``pyarrow.parquet.ParquetFile``
access_log : list, optional
Pass a list instance to record which branches were lazily accessed by this instance
delayed:
Nanoevents will use dask as a backend to construct a delayed task graph representing your analysis.
"""
import pyarrow
import pyarrow.dataset as ds
Expand All @@ -468,37 +445,13 @@ def from_parquet(
)

if (
permit_dask
delayed
and not isinstance(schemaclass, FunctionType)
and schemaclass.__dask_capable__
):
behavior = None
if schemaclass is BaseSchema:
from coffea.nanoevents.methods import base

behavior = base.behavior
elif schemaclass is NanoAODSchema:
from coffea.nanoevents.methods import nanoaod

behavior = nanoaod.behavior
elif schemaclass is TreeMakerSchema:
from coffea.nanoevents.methods import base, vector

behavior = {}
behavior.update(base.behavior)
behavior.update(vector.behavior)
elif schemaclass is PHYSLITESchema:
from coffea.nanoevents.methods import physlite

behavior = physlite.behavior
elif schemaclass is DelphesSchema:
from coffea.nanoevents.methods import delphes

behavior = delphes.behavior

map_schema = _map_schema_parquet(
schemaclass=schemaclass,
behavior=dict(behavior),
behavior=dict(schemaclass.behavior()),
metadata=metadata,
version="latest",
)
Expand All @@ -511,7 +464,7 @@ def from_parquet(
else:
raise TypeError("Invalid file type (%s)" % (str(type(file))))
return cls(map_schema, opener, None, cache=None, is_dask=True)
elif permit_dask and not schemaclass.__dask_capable__:
elif delayed and not schemaclass.__dask_capable__:
warnings.warn(
f"{schemaclass} is not dask capable despite allowing dask, generating non-dask nanoevents"
)
Expand Down Expand Up @@ -722,7 +675,7 @@ def events(self):

events = self._events()
if events is None:
behavior = dict(self._schema.behavior)
behavior = dict(self._schema.behavior())
behavior["__events_factory__"] = self
events = awkward.from_buffers(
self._schema.form,
Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ def __init__(self, base_form: Dict[str, Any]):
v for v in output.values()
]

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import base, candidate

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def form(self):
"""Awkward form of this schema"""
return self._form

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import base

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/delphes.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def _preprocess_branch_form(objname, form):

return output

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import delphes

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/nanoaod.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ def _build_collections(self, field_names, input_contents):

return output.keys(), output.values()

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import nanoaod

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/pdune.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ def _build_collections(self, branch_forms):
# }
# return form

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import pdune

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/physlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def _create_eventindex_form(base_form, key):
}
return form

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import physlite

Expand Down
4 changes: 2 additions & 2 deletions src/coffea/nanoevents/schemas/treemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ def _build_collections(self, branch_forms):

return branch_forms

@property
def behavior(self):
@classmethod
def behavior(cls):
"""Behaviors necessary to implement this schema"""
from coffea.nanoevents.methods import base, vector

Expand Down
2 changes: 1 addition & 1 deletion src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ def _work_function(
schemaclass=schema,
metadata=metadata,
access_log=materialized,
permit_dask=True,
delayed=True,
)
events = factory.events()[item.entrystart : item.entrystop]
elif format == "parquet":
Expand Down
4 changes: 2 additions & 2 deletions tests/test_analysis_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
fname = "tests/samples/nano_dy.root"
eagerevents = NanoEventsFactory.from_root(
{os.path.abspath(fname): "Events"},
schemaclass=NanoAODSchema.v6,
schemaclass=NanoAODSchema,
metadata={"dataset": "DYJets"},
delayed=False,
).events()
dakevents = NanoEventsFactory.from_root(
{os.path.abspath(fname): "Events"},
schemaclass=NanoAODSchema,
metadata={"dataset": "DYJets"},
permit_dask=True,
).events()
uprootevents = uproot.dask({fname: "Events"})

Expand Down
1 change: 0 additions & 1 deletion tests/test_fix823.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def test_explicit_delete_after_assign():
{testfile: "Events"},
metadata={"dataset": "nano_dy"},
schemaclass=NanoAODSchema,
permit_dask=True,
).events()

genpart = events["GenPart"]
Expand Down
1 change: 0 additions & 1 deletion tests/test_jetmet_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ def test_corrected_jets_factory(optimization_enabled):
events = NanoEventsFactory.from_root(
{os.path.abspath("tests/samples/nano_dy.root"): "Events"},
metadata={},
permit_dask=True,
).events()

jec_stack_names = [
Expand Down
2 changes: 0 additions & 2 deletions tests/test_lookup_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,6 @@ def test_rochester():
# test against nanoaod
events = NanoEventsFactory.from_root(
{os.path.abspath("tests/samples/nano_dimuon.root"): "Events"},
permit_dask=True,
).events()

data_k = rochester.kScaleDT(
Expand All @@ -406,7 +405,6 @@ def test_rochester():
# test against mc
events = NanoEventsFactory.from_root(
{os.path.abspath("tests/samples/nano_dy.root"): "Events"},
permit_dask=True,
).events()

hasgen = ~np.isnan(ak.fill_none(events.Muon.matched_gen.pt, np.nan))
Expand Down
Loading

0 comments on commit 84135da

Please sign in to comment.