Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support shape_touched from Dask #900

Merged
merged 33 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b8fc7fe
wip: initial commit
agoose77 Oct 3, 2023
1b4bd50
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2023
daa8529
fix: rename function
agoose77 Oct 4, 2023
df97041
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
66c8710
fix: use report_necessary_buffers
agoose77 Oct 4, 2023
f31815b
Merge branch 'master' into agoose77/feat-support-shape-touched
agoose77 Oct 4, 2023
2353a23
fix: properly parse form keys
agoose77 Oct 4, 2023
17a6f2f
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 4, 2023
bd07d03
hack: convert Content to array
agoose77 Oct 5, 2023
a6848a0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 5, 2023
9c90205
fix: ensure layout nodes converted to arrays
agoose77 Oct 5, 2023
379a508
Merge remote-tracking branch 'agoose77/agoose77/feat-support-shape-to…
agoose77 Oct 5, 2023
04b5a1a
adjust coffea pins to latest releases and pre-releases
lgray Oct 7, 2023
f19c11b
use pytorch-only triton image
lgray Oct 7, 2023
7051d2e
streamline version requirements
lgray Oct 7, 2023
3fe957b
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
d3fb1c9
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 7, 2023
33d2e68
fix: don't import protocol
agoose77 Oct 8, 2023
9d94cb0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 8, 2023
c49a8bc
Merge branch 'master' into agoose77/feat-support-shape-touched
lgray Oct 10, 2023
0d9c913
remove deprecated interface definition
lgray Oct 11, 2023
bb4df59
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e0694ad
Update tests/test_jetmet_tools.py
agoose77 Oct 11, 2023
e7384f9
remove further remnants of old remapping interface
lgray Oct 11, 2023
92efdb2
refactor: make key translation obvious
agoose77 Oct 11, 2023
8ae3cd5
fix typo from refactor
lgray Oct 12, 2023
45a0060
update pins (note uncapped numpy and numba skooch)
lgray Oct 14, 2023
14d2cc2
try to convince pip to upgrade numpy upon installing coffea
lgray Oct 14, 2023
ab3599e
be more insistent
lgray Oct 14, 2023
faff41e
numba 0.58 pins numpy from above < 1.26
lgray Oct 14, 2023
864f709
clean up usage of quoted ",!offsets"
lgray Oct 17, 2023
9b96f7b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 17, 2023
0a525d0
flake8 lint
lgray Oct 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
- name: Start triton server with example model
if: matrix.os == 'ubuntu-latest'
run: |
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-py3 tritonserver --model-repository=/models
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-pyt-python-py3 tritonserver --model-repository=/models

- name: Test with pytest
run: |
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ classifiers = [
"Topic :: Utilities",
]
dependencies = [
"awkward>=2.4.2",
"uproot>=5.0.10",
"awkward>=2.4.5",
"uproot>=5.1.0rc1",
"dask[array]>=2023.4.0",
"dask-awkward>=2023.7.1,!=2023.8.0",
"dask-histogram>=2023.6.0",
"dask-awkward>=2023.10a1",
"dask-histogram>=2023.7a0",
"correctionlib>=2.0.0",
"pyarrow>=6.0.0",
"fsspec",
Expand Down
72 changes: 61 additions & 11 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import dask_awkward
import fsspec
import uproot
from dask_awkward import ImplementsFormTransformation

from coffea.nanoevents.mapping import (
CachedMapping,
Expand Down Expand Up @@ -68,7 +67,7 @@ def _key_formatter(prefix, form_key, form, attribute):
return prefix + f"/{attribute}/{form_key}"


class _map_schema_base(ImplementsFormTransformation):
class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to inherit the protocol(s)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They weren't exported yet! I think a new release includes them. Let me check.

def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
):
Expand All @@ -77,17 +76,34 @@ def __init__(
self.metadata = metadata
self.version = version

def extract_form_keys_base_columns(self, form_keys):
base_columns = []
for form_key in form_keys:
base_columns.extend(
def keys_for_buffer_keys(self, buffer_keys):
base_columns = set()
for buffer_key in buffer_keys:
form_key, attribute = self.parse_buffer_key(buffer_key)
operands = urllib.parse.unquote(form_key).split(",")

it_operands = iter(operands)
next(it_operands)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function behaves like extract_form_key_base_columns, but supports operands e.g. attribute lookup.


base_columns.update(
[
acolumn
for acolumn in urllib.parse.unquote(form_key).split(",")
if not acolumn.startswith("!")
name
for name, maybe_transform in zip(operands, it_operands)
if maybe_transform == "!load"
]
)
return list(set(base_columns))
return base_columns

def parse_buffer_key(self, buffer_key):
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2)
if attribute == "offsets":
return (form_key[: -len("%2C%21offsets")], attribute)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to also be in the previous code as well, but some of these string splitting and urllib name mangling things were put into https://github.com/CoffeaTeam/coffea/blob/master/src/coffea/nanoevents/util.py and could be used here

else:
return (form_key, attribute)

@property
def buffer_key(self):
return partial(self._key_formatter, "")

def _key_formatter(self, prefix, form_key, form, attribute):
if attribute == "offsets":
Expand Down Expand Up @@ -125,7 +141,41 @@ def __call__(self, form):
},
"form_key": None,
}
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)
return (
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form),
self,
)

def load_buffers(self, tree, keys, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key

partition_key = (
str(tree.file.uuid),
tree.object_path,
f"{start}-{stop}",
)
uuidpfn = {partition_key[0]: tree.file.file_path}
mapping = UprootSourceMapping(
TrivialUprootOpener(uuidpfn, interp_options),
start,
stop,
cache={},
access_log=None,
use_ak_forth=True,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've deliberately tried not to touch this logic at all. Hopefully, by presenting the correct interface to uproot, internal refactoring can happen here in future without any side effects.

buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

class TranslateBufferKeys:
def __getitem__(this, key):
form_key, attribute = self.parse_buffer_key(key)
return mapping[
buffer_key(form_key=form_key, attribute=attribute, form=None)
]

return TranslateBufferKeys()

def create_column_mapping_and_key(self, tree, start, stop, interp_options):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lgray can we remove this from the base definition, or does someone out there likely use it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interface is completely private and rather obscure to boot, so I am fine to remove it.

from functools import partial
Expand Down
21 changes: 13 additions & 8 deletions src/coffea/nanoevents/mapping/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,19 @@ def __getitem__(self, key):
if len(stack) != 1:
raise RuntimeError(f"Syntax error in form key {nodes}")
out = stack.pop()
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
import awkward

if isinstance(out, awkward.contents.Content):
out = awkward.to_numpy(out)
else:
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
return out

@abstractmethod
Expand Down
15 changes: 12 additions & 3 deletions src/coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ def to_layout(array):
return array.layout


def ensure_array(arraylike):
if isinstance(arraylike, (awkward.contents.Content, awkward.Array)):
return awkward.to_numpy(arraylike)
elif isinstance(arraylike, awkward.index.Index):
return arraylike.data
else:
return numpy.asarray(arraylike)


def data(stack):
"""Extract content from array
(currently a noop, can probably take place of !content)
Expand Down Expand Up @@ -96,7 +105,7 @@ def counts2offsets(stack):
Signature: counts,!counts2offsets
Outputs an array with length one larger than input
"""
counts = numpy.array(stack.pop())
counts = ensure_array(stack.pop())
offsets = numpy.empty(len(counts) + 1, dtype=numpy.int64)
offsets[0] = 0
numpy.cumsum(counts, out=offsets[1:])
Expand All @@ -123,11 +132,11 @@ def local2global(stack):
Signature: index,target_offsets,!local2global
Outputs a content array with same shape as index content
"""
target_offsets = numpy.asarray(stack.pop())
target_offsets = ensure_array(stack.pop())
index = stack.pop()
index = index.mask[index >= 0] + target_offsets[:-1]
index = index.mask[index < target_offsets[1:]]
out = numpy.array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
out = ensure_array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
if out.dtype != numpy.int64:
raise RuntimeError
stack.append(out)
Expand Down
6 changes: 3 additions & 3 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ def _work_function(
import dask_awkward

to_compute = processor_instance.process(events)
materialized = dask_awkward.necessary_columns(to_compute)
# materialized = dask_awkward.report_necessary_buffers(to_compute)
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
out = dask.compute(to_compute, scheduler="single-threaded")[0]
except Exception as e:
raise Exception(f"Failed processing file: {item!r}") from e
Expand All @@ -1734,11 +1734,11 @@ def _work_function(
metrics = {}
if isinstance(file, uproot.ReadOnlyDirectory):
metrics["bytesread"] = file.file.source.num_requested_bytes
# metrics["data_and_shape_buffers"] = set(materialized)
# metrics["shape_only_buffers"] = set(materialized)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these placeholders for something in the future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to get tests to run - executor is going to be significantly changed in a subsequent PR.

if schema is not None and issubclass(schema, schemas.BaseSchema):
metrics["columns"] = set(materialized)
metrics["entries"] = len(events)
else:
metrics["columns"] = set(materialized)
metrics["entries"] = events.size
metrics["processtime"] = toc - tic
return {"out": out, "metrics": metrics, "processed": {item}}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_jetmet_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled):
**{name: evaluator[name] for name in jec_stack_names[5:6]}
)

print(dak.necessary_columns(jets.eta))
print(dak.report_necessary_buffers(jets.eta))
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
print(
dak.necessary_columns(
dak.report_necessary_buffers(
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
resosf.getScaleFactor(
JetEta=jets.eta,
)
Expand Down