-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: switch over to dask-based processing idioms, improve dataset handling #882
Changes from 43 commits
0e24cb7
dcb6f74
a03beb9
95c5fc8
1f870c2
c5d2d57
d4d371c
e1f11cf
1e191bf
02cbbbe
daf5e52
c9101bb
0c11976
b78f640
21cd240
dbed7d4
f8653eb
153c0ae
9fb3026
5a45e4a
acb9372
00fb57c
6d5d800
e0f1726
885bbf0
3283256
2e3a16b
8983524
4c8d917
cf55bf8
35656c6
5793cb8
84135da
9c598bb
2069786
e3959f4
0443724
707195e
2c65b36
cc22be8
dfed4b9
a661987
080f2af
d91e6b6
4a60053
98a719c
8f669a0
5ec1068
5e127c2
1de78e6
5ee0b20
009fdd0
e00e691
7644fe7
9fee7d3
9ba1442
1055e8b
8625484
54b27b6
734507c
1dc29db
3ca9e8f
059061a
3385bdb
0e2baf1
0812318
21f72d9
d0b5957
c6baa74
59ef352
fcefe19
25df8b1
c8a87b3
31e74eb
0487671
9c970e4
ef91531
6ff199a
533efea
681782d
3ca2d96
5d1ed9d
3bd760a
a29e910
13921ab
8e4424d
fe7984f
87332b8
f846c70
40c0649
6e5cadd
f04b60f
42af84b
4ba8c0a
d116351
dd21cbc
c079634
f09b64f
126d42d
2049913
5240110
e64e3c0
d826030
dd31a2a
66ffcd7
c551b93
6986a10
95b9949
a166a81
66f9120
a10a6e7
0bcda6c
0aa91ed
2457e08
02001b3
ba88fe4
6e22866
6d0722c
994ed4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,11 @@ servicex = [ | |
"servicex>=2.5.3", | ||
"func-adl_servicex", | ||
] | ||
rucio = [ | ||
"rucio-clients>=32;python_version>'3.8'", | ||
"rucio-clients<32;python_version<'3.9'", | ||
"cmd2", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to check, is some of the interactive console stuff available via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. switched to full rich cli, removed cmd2 dependency |
||
] | ||
dev = [ | ||
"pre-commit", | ||
"flake8", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from coffea.dataset_tools.apply_processor import apply_to_fileset, apply_to_one_dataset | ||
from coffea.dataset_tools.manipulations import max_chunks, slice_chunks | ||
from coffea.dataset_tools.preprocess import preprocess | ||
|
||
__all__ = [ | ||
"preprocess", | ||
"apply_to_one_dataset", | ||
lgray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"apply_to_fileset", | ||
"max_chunks", | ||
"slice_chunks", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import copy | ||
from typing import Callable, Union | ||
|
||
from coffea.nanoevents import NanoAODSchema, NanoEventsFactory | ||
from coffea.processor import ProcessorABC | ||
|
||
|
||
def apply_to_one_dataset( | ||
data_manipulation: Union[ProcessorABC, Callable], | ||
lgray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dataset, | ||
schemaclass=NanoAODSchema, | ||
metadata={}, | ||
): | ||
files = dataset["files"] | ||
events = NanoEventsFactory.from_root( | ||
files, | ||
metadata=metadata, | ||
schemaclass=schemaclass, | ||
).events() | ||
if isinstance(data_manipulation, ProcessorABC): | ||
return data_manipulation.process(events) | ||
elif isinstance(data_manipulation, Callable): | ||
return data_manipulation(events) | ||
else: | ||
raise ValueError("data_manipulation must either be a ProcessorABC or Callable") | ||
|
||
|
||
def apply_to_fileset( | ||
data_manipulation: Union[ProcessorABC, Callable], fileset, schemaclass=NanoAODSchema | ||
): | ||
out = {} | ||
for name, dataset in fileset.items(): | ||
metadata = copy.deepcopy(dataset.get("metadata", {})) | ||
metadata["dataset"] = name | ||
lgray marked this conversation as resolved.
Show resolved
Hide resolved
|
||
out[name] = apply_to_one_dataset( | ||
data_manipulation, dataset, schemaclass, metadata | ||
) | ||
return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much reason to test workqueue if taskvine tests that it can eat dask graphs.
Perhapse we might want to keep some sort of smoke test for various dask backends just to ensure they play well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been clear in the past that it's nice to have coffea do its own full-stack tests for things like this.
I don't think we've achieved the level of parity where tests on dask distributed completely confirm they will work on taskvine or the other way around.
@btovar do you think that's achievable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definitely. I can make a pr. Further, since tasvine is orthogonal to coffea+dask, should this more in something like an examples directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@btovar a test that also serves as a worked-out example would be great! I don't think it's so orthogonal since, for instance, dask and distributed have deeper ties than dask and taskvine (so far as I know right now), but I still test that distributed works in coffea tests for the essential ingredients.