-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support shape_touched
from Dask
#900
Changes from 30 commits
b8fc7fe
1b4bd50
daa8529
df97041
66c8710
f31815b
2353a23
17a6f2f
bd07d03
a6848a0
9c90205
379a508
04b5a1a
f19c11b
7051d2e
3fe957b
d3fb1c9
33d2e68
9d94cb0
c49a8bc
0d9c913
bb4df59
e0694ad
e7384f9
92efdb2
8ae3cd5
45a0060
14d2cc2
ab3599e
faff41e
864f709
9b96f7b
0a525d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ | |
import dask_awkward | ||
import fsspec | ||
import uproot | ||
from dask_awkward import ImplementsFormTransformation | ||
|
||
from coffea.nanoevents.mapping import ( | ||
CachedMapping, | ||
|
@@ -68,7 +67,7 @@ def _key_formatter(prefix, form_key, form, attribute): | |
return prefix + f"/{attribute}/{form_key}" | ||
|
||
|
||
class _map_schema_base(ImplementsFormTransformation): | ||
class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo | ||
def __init__( | ||
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None | ||
): | ||
|
@@ -77,24 +76,50 @@ def __init__( | |
self.metadata = metadata | ||
self.version = version | ||
|
||
def extract_form_keys_base_columns(self, form_keys): | ||
base_columns = [] | ||
for form_key in form_keys: | ||
base_columns.extend( | ||
def keys_for_buffer_keys(self, buffer_keys): | ||
base_columns = set() | ||
for buffer_key in buffer_keys: | ||
form_key, attribute = self.parse_buffer_key(buffer_key) | ||
operands = urllib.parse.unquote(form_key).split(",") | ||
|
||
it_operands = iter(operands) | ||
next(it_operands) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function behaves like |
||
|
||
base_columns.update( | ||
[ | ||
acolumn | ||
for acolumn in urllib.parse.unquote(form_key).split(",") | ||
if not acolumn.startswith("!") | ||
name | ||
for name, maybe_transform in zip(operands, it_operands) | ||
if maybe_transform == "!load" | ||
] | ||
) | ||
return list(set(base_columns)) | ||
return base_columns | ||
|
||
def parse_buffer_key(self, buffer_key): | ||
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2) | ||
if attribute == "offsets": | ||
return (form_key[: -len("%2C%21offsets")], attribute) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to also be in the previous code as well, but some of these string splitting and urllib name mangling things were put into https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/util.py and could be used here |
||
else: | ||
return (form_key, attribute) | ||
|
||
@property | ||
def buffer_key(self): | ||
return partial(self._key_formatter, "") | ||
|
||
def _key_formatter(self, prefix, form_key, form, attribute): | ||
if attribute == "offsets": | ||
form_key += "%2C%21offsets" | ||
return prefix + f"/{attribute}/{form_key}" | ||
|
||
|
||
class _TranslatedMapping: | ||
def __init__(self, func, mapping): | ||
self._func = func | ||
self._mapping = mapping | ||
|
||
def __getitem__(self, index): | ||
return self._mapping[self._func(index)] | ||
|
||
|
||
class _map_schema_uproot(_map_schema_base): | ||
def __init__( | ||
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None | ||
|
@@ -125,9 +150,12 @@ def __call__(self, form): | |
}, | ||
"form_key": None, | ||
} | ||
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) | ||
return ( | ||
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form), | ||
self, | ||
) | ||
|
||
def create_column_mapping_and_key(self, tree, start, stop, interp_options): | ||
def load_buffers(self, tree, keys, start, stop, interp_options): | ||
from functools import partial | ||
|
||
from coffea.nanoevents.util import tuple_to_key | ||
|
@@ -147,8 +175,15 @@ def create_column_mapping_and_key(self, tree, start, stop, interp_options): | |
use_ak_forth=True, | ||
) | ||
mapping.preload_column_source(partition_key[0], partition_key[1], tree) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've deliberately tried not to touch this logic at all. Hopefully, by presenting the correct interface to uproot, internal refactoring can happen here in future without any side effects. |
||
buffer_key = partial(self._key_formatter, tuple_to_key(partition_key)) | ||
|
||
return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) | ||
# The buffer-keys that dask-awkward knows about will not include the | ||
# partition key. Therefore, we must translate the keys here. | ||
def translate_key(index): | ||
form_key, attribute = self.parse_buffer_key(index) | ||
return buffer_key(form_key=form_key, attribute=attribute, form=None) | ||
|
||
return _TranslatedMapping(translate_key, mapping) | ||
|
||
|
||
class _map_schema_parquet(_map_schema_base): | ||
|
@@ -174,31 +209,6 @@ def __call__(self, form): | |
|
||
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) | ||
|
||
def create_column_mapping_and_key(self, columns, start, stop, interp_options): | ||
from functools import partial | ||
|
||
from coffea.nanoevents.util import tuple_to_key | ||
|
||
uuid = "NO_UUID" | ||
obj_path = "NO_OBJECT_PATH" | ||
|
||
partition_key = ( | ||
str(uuid), | ||
obj_path, | ||
f"{start}-{stop}", | ||
) | ||
uuidpfn = {uuid: columns} | ||
mapping = PreloadedSourceMapping( | ||
PreloadedOpener(uuidpfn), | ||
start, | ||
stop, | ||
cache={}, | ||
access_log=None, | ||
) | ||
mapping.preload_column_source(partition_key[0], partition_key[1], columns) | ||
|
||
return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) | ||
|
||
|
||
class NanoEventsFactory: | ||
"""A factory class to build NanoEvents objects""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1718,7 +1718,7 @@ def _work_function( | |
import dask_awkward | ||
|
||
to_compute = processor_instance.process(events) | ||
materialized = dask_awkward.necessary_columns(to_compute) | ||
# materialized = dask_awkward.report_necessary_buffers(to_compute) | ||
agoose77 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
out = dask.compute(to_compute, scheduler="single-threaded")[0] | ||
except Exception as e: | ||
raise Exception(f"Failed processing file: {item!r}") from e | ||
|
@@ -1734,11 +1734,11 @@ def _work_function( | |
metrics = {} | ||
if isinstance(file, uproot.ReadOnlyDirectory): | ||
metrics["bytesread"] = file.file.source.num_requested_bytes | ||
# metrics["data_and_shape_buffers"] = set(materialized) | ||
# metrics["shape_only_buffers"] = set(materialized) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these placeholders for something in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just to get tests to run - executor is going to be significantly changed in a subsequent PR. |
||
if schema is not None and issubclass(schema, schemas.BaseSchema): | ||
metrics["columns"] = set(materialized) | ||
metrics["entries"] = len(events) | ||
else: | ||
metrics["columns"] = set(materialized) | ||
metrics["entries"] = events.size | ||
metrics["processtime"] = toc - tic | ||
return {"out": out, "metrics": metrics, "processed": {item}} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled): | |
**{name: evaluator[name] for name in jec_stack_names[5:6]} | ||
) | ||
|
||
print(dak.necessary_columns(jets.eta)) | ||
print(dak.report_necessary_columns(jets.eta)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lgray note that @douglasdavis this is something I didn't fully think about when we made The difference will be that for remapped forms, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It therefore might mean that we should remove that alias for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually prefer the new behavior since users will probably expect to get a list of columns in the file they're reading! But seeing the remapped keys is nice for debugging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make it so that we get a version or two of dask-awkward before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lgray I'm not 100% clear if we're on the same page.
It would be possible to restore the old behavior, but it will require a reasonable amount of work on the uproot side (I am guessing).
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I suppose for my most typical use case I never see keys with |
||
print( | ||
dak.necessary_columns( | ||
dak.report_necessary_columns( | ||
resosf.getScaleFactor( | ||
JetEta=jets.eta, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to inherit the protocol(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They weren't exported yet! I think a new release includes them. Let me check.