-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support shape_touched
from Dask
#900
Changes from 21 commits
b8fc7fe
1b4bd50
daa8529
df97041
66c8710
f31815b
2353a23
17a6f2f
bd07d03
a6848a0
9c90205
379a508
04b5a1a
f19c11b
7051d2e
3fe957b
d3fb1c9
33d2e68
9d94cb0
c49a8bc
0d9c913
bb4df59
e0694ad
e7384f9
92efdb2
8ae3cd5
45a0060
14d2cc2
ab3599e
faff41e
864f709
9b96f7b
0a525d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ | |
import dask_awkward | ||
import fsspec | ||
import uproot | ||
from dask_awkward import ImplementsFormTransformation | ||
|
||
from coffea.nanoevents.mapping import ( | ||
CachedMapping, | ||
|
@@ -68,7 +67,7 @@ def _key_formatter(prefix, form_key, form, attribute): | |
return prefix + f"/{attribute}/{form_key}" | ||
|
||
|
||
class _map_schema_base(ImplementsFormTransformation): | ||
class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo | ||
def __init__( | ||
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None | ||
): | ||
|
@@ -77,17 +76,34 @@ def __init__( | |
self.metadata = metadata | ||
self.version = version | ||
|
||
def extract_form_keys_base_columns(self, form_keys): | ||
base_columns = [] | ||
for form_key in form_keys: | ||
base_columns.extend( | ||
def keys_for_buffer_keys(self, buffer_keys): | ||
base_columns = set() | ||
for buffer_key in buffer_keys: | ||
form_key, attribute = self.parse_buffer_key(buffer_key) | ||
operands = urllib.parse.unquote(form_key).split(",") | ||
|
||
it_operands = iter(operands) | ||
next(it_operands) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function behaves like |
||
|
||
base_columns.update( | ||
[ | ||
acolumn | ||
for acolumn in urllib.parse.unquote(form_key).split(",") | ||
if not acolumn.startswith("!") | ||
name | ||
for name, maybe_transform in zip(operands, it_operands) | ||
if maybe_transform == "!load" | ||
] | ||
) | ||
return list(set(base_columns)) | ||
return base_columns | ||
|
||
def parse_buffer_key(self, buffer_key): | ||
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2) | ||
if attribute == "offsets": | ||
return (form_key[: -len("%2C%21offsets")], attribute) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to also be in the previous code as well, but some of these string splitting and urllib name mangling things were put into https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/util.py and could be used here |
||
else: | ||
return (form_key, attribute) | ||
|
||
@property | ||
def buffer_key(self): | ||
return partial(self._key_formatter, "") | ||
|
||
def _key_formatter(self, prefix, form_key, form, attribute): | ||
if attribute == "offsets": | ||
|
@@ -125,7 +141,41 @@ def __call__(self, form): | |
}, | ||
"form_key": None, | ||
} | ||
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) | ||
return ( | ||
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form), | ||
self, | ||
) | ||
|
||
def load_buffers(self, tree, keys, start, stop, interp_options): | ||
from functools import partial | ||
|
||
from coffea.nanoevents.util import tuple_to_key | ||
|
||
partition_key = ( | ||
str(tree.file.uuid), | ||
tree.object_path, | ||
f"{start}-{stop}", | ||
) | ||
uuidpfn = {partition_key[0]: tree.file.file_path} | ||
mapping = UprootSourceMapping( | ||
TrivialUprootOpener(uuidpfn, interp_options), | ||
start, | ||
stop, | ||
cache={}, | ||
access_log=None, | ||
use_ak_forth=True, | ||
) | ||
mapping.preload_column_source(partition_key[0], partition_key[1], tree) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've deliberately tried not to touch this logic at all. Hopefully, by presenting the correct interface to uproot, internal refactoring can happen here in future without any side effects. |
||
buffer_key = partial(self._key_formatter, tuple_to_key(partition_key)) | ||
|
||
class TranslateBufferKeys: | ||
def __getitem__(this, key): | ||
form_key, attribute = self.parse_buffer_key(key) | ||
return mapping[ | ||
buffer_key(form_key=form_key, attribute=attribute, form=None) | ||
] | ||
|
||
return TranslateBufferKeys() | ||
|
||
def create_column_mapping_and_key(self, tree, start, stop, interp_options): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lgray can we remove this from the base definition, or does someone out there likely use it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interface is completely private and rather obscure to boot, so I am fine to remove it. |
||
from functools import partial | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1718,7 +1718,7 @@ def _work_function( | |
import dask_awkward | ||
|
||
to_compute = processor_instance.process(events) | ||
materialized = dask_awkward.necessary_columns(to_compute) | ||
# materialized = dask_awkward.report_necessary_buffers(to_compute) | ||
agoose77 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
out = dask.compute(to_compute, scheduler="single-threaded")[0] | ||
except Exception as e: | ||
raise Exception(f"Failed processing file: {item!r}") from e | ||
|
@@ -1734,11 +1734,11 @@ def _work_function( | |
metrics = {} | ||
if isinstance(file, uproot.ReadOnlyDirectory): | ||
metrics["bytesread"] = file.file.source.num_requested_bytes | ||
# metrics["data_and_shape_buffers"] = set(materialized) | ||
# metrics["shape_only_buffers"] = set(materialized) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these placeholders for something in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just to get tests to run - executor is going to be significantly changed in a subsequent PR. |
||
if schema is not None and issubclass(schema, schemas.BaseSchema): | ||
metrics["columns"] = set(materialized) | ||
metrics["entries"] = len(events) | ||
else: | ||
metrics["columns"] = set(materialized) | ||
metrics["entries"] = events.size | ||
metrics["processtime"] = toc - tic | ||
return {"out": out, "metrics": metrics, "processed": {item}} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to inherit the protocol(s)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They weren't exported yet! I think a new release includes them. Let me check.