Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: switch over to dask-based processing idioms, improve dataset handling #882

Merged
merged 119 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
119 commits
Select commit Hold shift + click to select a range
0e24cb7
some first movement on dask-task-graph style runner/executor
lgray Jun 12, 2023
dcb6f74
changes for preprocessing prototype
lgray Jun 27, 2023
a03beb9
new dask-based dataset pre-processor
lgray Aug 22, 2023
95c5fc8
Added the rucio utils functions from pocketcoffea
valsdav Aug 23, 2023
1f870c2
Added dataset querying function
valsdav Aug 23, 2023
c5d2d57
Working on interface for datasets query
valsdav Aug 28, 2023
d4d371c
Querying and listing implemented: selected of results
valsdav Aug 28, 2023
e1f11cf
Printing sites availability for replicas
valsdav Aug 28, 2023
1e191bf
Added replica site selection
valsdav Aug 28, 2023
02cbbbe
Added saving
valsdav Aug 28, 2023
daf5e52
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
c9101bb
Formatting and flake8
valsdav Aug 28, 2023
0c11976
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Aug 28, 2023
b78f640
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
21cd240
Fixed comments spelling
valsdav Aug 28, 2023
dbed7d4
py 3.9 for cirrus
lgray Aug 28, 2023
f8653eb
Merge pull request #883 from valsdav/local_executors_to_dask
lgray Aug 28, 2023
153c0ae
Switched to rucio-clients
valsdav Aug 30, 2023
9fb3026
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Aug 30, 2023
5a45e4a
Added some docs to the cli
valsdav Aug 30, 2023
acb9372
Merge pull request #884 from valsdav/local_executors_to_dask
lgray Aug 30, 2023
00fb57c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 30, 2023
6d5d800
roll back test to py3.8
lgray Aug 30, 2023
e0f1726
math.ceil instead of integer division
lgray Aug 30, 2023
885bbf0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 30, 2023
3283256
Forgot to remove the slash.
lgray Aug 30, 2023
2e3a16b
Merge branch 'master' into local_executors_to_dask
lgray Sep 6, 2023
8983524
Merge branch 'master' into local_executors_to_dask
lgray Oct 17, 2023
4c8d917
make rucio-clients an extra
lgray Oct 17, 2023
cf55bf8
Merge branch 'master' into local_executors_to_dask
lgray Oct 18, 2023
35656c6
Merge branch 'master' into local_executors_to_dask
lgray Oct 20, 2023
5793cb8
Merge branch 'master' into local_executors_to_dask
lgray Nov 6, 2023
84135da
Merge branch 'master' into local_executors_to_dask
lgray Nov 9, 2023
9c598bb
two wrappers to apply processor wrapped code to datasets
lgray Nov 10, 2023
2069786
let preprocess deal with missing files in a configurable way, add in …
lgray Nov 10, 2023
e3959f4
remove old tests and switch to new tools in tests
lgray Nov 21, 2023
0443724
Merge branch 'master' into local_executors_to_dask
lgray Nov 21, 2023
707195e
disable workqueue tests (to be replaced with taskvine)
lgray Nov 21, 2023
2c65b36
patch up ci
lgray Nov 21, 2023
cc22be8
more testwq removal
lgray Nov 21, 2023
dfed4b9
adjustments to removing executor / lazydataframe
lgray Nov 21, 2023
a661987
accumulator tests
lgray Nov 22, 2023
080f2af
also allow Callables in apply_to_fileset
lgray Nov 22, 2023
d91e6b6
Merge branch 'master' into local_executors_to_dask
lgray Nov 27, 2023
4a60053
add dataset tools test
lgray Nov 27, 2023
98a719c
Make scope of dataset_query less cms-only. Edits language
valsdav Nov 28, 2023
8f669a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 28, 2023
5ec1068
Merge branch 'master' into local_executors_to_dask
lgray Nov 28, 2023
5e127c2
remove accumulator concept
lgray Nov 28, 2023
1de78e6
typing for apply_to_dataset/fileset, use setdefault
lgray Nov 28, 2023
5ee0b20
typing for preprocess
lgray Nov 28, 2023
009fdd0
flake8
lgray Nov 28, 2023
e00e691
fix up typing
lgray Nov 28, 2023
7644fe7
more typing for apply_processor
lgray Nov 28, 2023
9fee7d3
being pedantic about types
lgray Nov 29, 2023
9ba1442
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 29, 2023
1055e8b
Merge branch 'master' into local_executors_to_dask
lgray Nov 29, 2023
8625484
Merge branch 'master' into local_executors_to_dask
lgray Nov 30, 2023
54b27b6
Merge branch 'master' into local_executors_to_dask
lgray Nov 30, 2023
734507c
Merge branch 'master' into local_executors_to_dask
lgray Dec 2, 2023
1dc29db
Merge branch 'master' into local_executors_to_dask
lgray Dec 2, 2023
3ca9e8f
Merge branch 'master' into local_executors_to_dask
lgray Dec 4, 2023
059061a
taskvine test was using old location of NanoAODSchema
lgray Dec 4, 2023
3385bdb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 4, 2023
0e2baf1
lint: no longer need to import processor
lgray Dec 4, 2023
0812318
Merge branch 'master' into local_executors_to_dask
lgray Dec 6, 2023
21f72d9
Merge branch 'master' into local_executors_to_dask
lgray Dec 6, 2023
d0b5957
Merge remote-tracking branch 'origin/local_executors_to_dask' into lo…
valsdav Dec 6, 2023
c6baa74
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Dec 6, 2023
59ef352
Getting rucio client from config in environmental variable
valsdav Dec 6, 2023
fcefe19
Merge branch 'master' into local_executors_to_dask
lgray Dec 6, 2023
25df8b1
Added preprocess command to cli
valsdav Dec 6, 2023
c8a87b3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 6, 2023
31e74eb
save json as gzipped, add some options
lgray Dec 6, 2023
0487671
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 6, 2023
9c970e4
flake: drop getpass since it is not used
lgray Dec 6, 2023
ef91531
Merge branch 'master' into local_executors_to_dask
lgray Dec 7, 2023
6ff199a
Merge branch 'local_executors_to_dask' of github.com:CoffeaTeam/coffe…
valsdav Dec 7, 2023
533efea
Merge remote-tracking branch 'fork/local_executors_to_dask' into loca…
valsdav Dec 7, 2023
681782d
add failed-tail processing for uproot reports
lgray Dec 7, 2023
3ca2d96
add failed-tail stuff to __all__ of dataset_tools
lgray Dec 7, 2023
5d1ed9d
Merge branch 'master' into local_executors_to_dask
lgray Dec 7, 2023
3bd760a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 7, 2023
a29e910
lint
lgray Dec 7, 2023
13921ab
typo
lgray Dec 7, 2023
8e4424d
typo, and fileset entrypoint needs dict of reports.
lgray Dec 8, 2023
fe7984f
adapt apply_processor to possibility of reports
lgray Dec 8, 2023
87332b8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2023
f846c70
fix bugs
lgray Dec 8, 2023
40c0649
Merge branch 'local_executors_to_dask' of github.com:CoffeaTeam/coffe…
valsdav Dec 8, 2023
6e5cadd
Added processing of multiple datasets to get replicas in CLI
valsdav Dec 8, 2023
f04b60f
Added Select all options to cli
valsdav Dec 8, 2023
42af84b
lint
lgray Dec 8, 2023
4ba8c0a
Moved from cmd2 to pure rich interface for the CLI
valsdav Dec 11, 2023
d116351
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Dec 11, 2023
dd21cbc
Updated help message
valsdav Dec 11, 2023
c079634
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 11, 2023
f09b64f
linting
valsdav Dec 11, 2023
126d42d
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Dec 11, 2023
2049913
typo
lgray Dec 11, 2023
5240110
Adding non-cli interaction from datacard
valsdav Dec 11, 2023
e64e3c0
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Dec 11, 2023
d826030
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 11, 2023
dd31a2a
better defaults and typos
valsdav Dec 11, 2023
66ffcd7
Merge branch 'master' into local_executors_to_dask
lgray Dec 11, 2023
c551b93
Merge remote-tracking branch 'origin/local_executors_to_dask' into lo…
valsdav Dec 11, 2023
6986a10
Adding docs and dataset_discovery notebook
valsdav Dec 11, 2023
95b9949
typo
lgray Dec 11, 2023
a166a81
more docs in the notebook
valsdav Dec 11, 2023
66f9120
Merge branch 'local_executors_to_dask' of github.com:valsdav/coffea i…
valsdav Dec 11, 2023
a10a6e7
Merge branch 'master' into local_executors_to_dask
lgray Dec 12, 2023
0bcda6c
Merge branch 'master' into local_executors_to_dask
lgray Dec 12, 2023
0aa91ed
Merge branch 'local_executors_to_dask' of github.com:CoffeaTeam/coffe…
lgray Dec 12, 2023
2457e08
Merge pull request #940 from valsdav/local_executors_to_dask
lgray Dec 12, 2023
02001b3
actually pass uproot_options down
lgray Dec 12, 2023
ba88fe4
fix get_failed_steps_for_fileset/dataset
lgray Dec 12, 2023
6e22866
properly check each input file for steps
lgray Dec 12, 2023
6d0722c
adjust uproot pins, add test
lgray Dec 12, 2023
994ed4a
typing and docs
lgray Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,003 changes: 2,003 additions & 0 deletions binder/dataset_discovery.ipynb

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ classifiers = [
]
dependencies = [
"awkward>=2.5.1rc1",
"uproot>=5.2.0rc3",
"uproot>=5.2.0rc4",
"dask[array]>=2023.4.0",
"dask-awkward>=2023.12.1",
"dask-histogram>=2023.10.0",
Expand Down Expand Up @@ -85,6 +85,10 @@ servicex = [
"servicex>=2.5.3",
"func-adl_servicex",
]
rucio = [
"rucio-clients>=32;python_version>'3.8'",
"rucio-clients<32;python_version<'3.9'",
]
dev = [
"pre-commit",
"flake8",
Expand Down
13 changes: 9 additions & 4 deletions src/coffea/analysis_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import coffea.util


class WeightStatistics(coffea.processor.AccumulatorABC):
class WeightStatistics:
def __init__(self, sumw=0.0, sumw2=0.0, minw=numpy.inf, maxw=-numpy.inf, n=0):
self.sumw = sumw
self.sumw2 = sumw2
Expand All @@ -40,6 +40,13 @@ def add(self, other):
self.maxw = max(self.maxw, other.maxw)
self.n += other.n

def __add__(self, other):
temp = WeightStatistics(self.sumw, self.sumw2, self.minw, self.maxw, self.n)
return temp.add(other)

def __iadd__(self, other):
return self.add(other)


class Weights:
"""Container for event weights and associated systematic shifts
Expand All @@ -62,7 +69,7 @@ def __init__(self, size, storeIndividual=False):
self._weight = None if size is None else numpy.ones(size)
self._weights = {}
self._modifiers = {}
self._weightStats = coffea.processor.dict_accumulator()
self._weightStats = {}
self._storeIndividual = storeIndividual

@property
Expand Down Expand Up @@ -102,8 +109,6 @@ def __add_delayed(self, name, weight, weightUp, weightDown, shift):
if self._storeIndividual:
self._weights[name] = weight
self.__add_variation(name, weight, weightUp, weightDown, shift)
if isinstance(self._weightStats, coffea.processor.dict_accumulator):
self._weightStats = {}
self._weightStats[name] = {
"sumw": dask_awkward.to_dask_array(weight).sum(),
"sumw2": dask_awkward.to_dask_array(weight**2).sum(),
Expand Down
18 changes: 18 additions & 0 deletions src/coffea/dataset_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from coffea.dataset_tools.apply_processor import apply_to_dataset, apply_to_fileset
from coffea.dataset_tools.manipulations import (
get_failed_steps_for_dataset,
get_failed_steps_for_fileset,
max_chunks,
slice_chunks,
)
from coffea.dataset_tools.preprocess import preprocess

__all__ = [
"preprocess",
"apply_to_dataset",
"apply_to_fileset",
"max_chunks",
"slice_chunks",
"get_failed_steps_for_dataset",
"get_failed_steps_for_fileset",
]
126 changes: 126 additions & 0 deletions src/coffea/dataset_tools/apply_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from __future__ import annotations

import copy
from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union

import dask.base
import dask_awkward

from coffea.dataset_tools.preprocess import (
DatasetSpec,
DatasetSpecOptional,
FilesetSpec,
FilesetSpecOptional,
)
from coffea.nanoevents import BaseSchema, NanoAODSchema, NanoEventsFactory
from coffea.processor import ProcessorABC

DaskOutputBaseType = Union[
dask.base.DaskMethodsMixin,
Dict[Hashable, dask.base.DaskMethodsMixin],
Set[dask.base.DaskMethodsMixin],
List[dask.base.DaskMethodsMixin],
Tuple[dask.base.DaskMethodsMixin],
]

# NOTE TO USERS: You can use nested python containers as arguments to dask.compute!
DaskOutputType = Union[DaskOutputBaseType, Tuple[DaskOutputBaseType, ...]]

GenericHEPAnalysis = Callable[[dask_awkward.Array], DaskOutputType]


def apply_to_dataset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
dataset: DatasetSpec | DatasetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
metadata: dict[Hashable, Any] = {},
uproot_options: dict[str, Any] = {},
) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied dataset.
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
dataset: DatasetSpec | DatasetSpecOptional
The data to be acted upon by the data manipulation passed in.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
metadata: dict[Hashable, Any], default {}
Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.

Returns
-------
out : DaskOutputType
The output of the analysis workflow applied to the dataset
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
files = dataset["files"]
events = NanoEventsFactory.from_root(
files,
metadata=metadata,
schemaclass=schemaclass,
uproot_options=uproot_options,
).events()

report = None
if isinstance(events, tuple):
events, report = events

out = None
if isinstance(data_manipulation, ProcessorABC):
out = data_manipulation.process(events)
elif isinstance(data_manipulation, Callable):
out = data_manipulation(events)
else:
raise ValueError("data_manipulation must either be a ProcessorABC or Callable")

if report is not None:
return out, report
return out


def apply_to_fileset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
fileset: FilesetSpec | FilesetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
uproot_options: dict[str, Any] = {},
) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied fileset (set of datasets).
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
fileset: FilesetSpec | FilesetSpecOptional
The data to be acted upon by the data manipulation passed in. Metadata within the fileset should be dask-serializable.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.

Returns
-------
out : dict[str, DaskOutputType]
The output of the analysis workflow applied to the datasets, keyed by dataset name.
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
out = {}
report = {}
for name, dataset in fileset.items():
metadata = copy.deepcopy(dataset.get("metadata", {}))
metadata.setdefault("dataset", name)
dataset_out = apply_to_dataset(
data_manipulation, dataset, schemaclass, metadata, uproot_options
)
if isinstance(dataset_out, tuple):
out[name], report[name] = dataset_out
else:
out[name] = dataset_out
if len(report) > 0:
return out, report
return out
Loading
Loading