Skip to content

Commit

Permalink
Merge pull request #900 from agoose77/agoose77/feat-support-shape-tou…
Browse files Browse the repository at this point in the history
…ched

feat: support `shape_touched` from Dask
  • Loading branch information
lgray authored Oct 17, 2023
2 parents 1aa0fbd + 0a525d0 commit 8662c9f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 69 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
python -m pip install xgboost
python -m pip install tritonclient[grpc,http]
# install checked out coffea
python -m pip install -q -e '.[dev,parsl,dask,spark]'
python -m pip install -q -e '.[dev,parsl,dask,spark]' --upgrade --upgrade-strategy eager
python -m pip list
java -version
- name: Install dependencies (MacOS)
Expand All @@ -80,7 +80,7 @@ jobs:
python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
python -m pip install xgboost
# install checked out coffea
python -m pip install -q -e '.[dev,dask,spark]'
python -m pip install -q -e '.[dev,dask,spark]' --upgrade --upgrade-strategy eager
python -m pip list
java -version
- name: Install dependencies (Windows)
Expand All @@ -91,14 +91,14 @@ jobs:
python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
python -m pip install xgboost
# install checked out coffea
python -m pip install -q -e '.[dev,dask]'
python -m pip install -q -e '.[dev,dask]' --upgrade --upgrade-strategy eager
python -m pip list
java -version
- name: Start triton server with example model
if: matrix.os == 'ubuntu-latest'
run: |
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-py3 tritonserver --model-repository=/models
docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-pyt-python-py3 tritonserver --model-repository=/models
- name: Test with pytest
run: |
Expand Down
14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ classifiers = [
"Topic :: Utilities",
]
dependencies = [
"awkward>=2.4.2",
"uproot>=5.0.10",
"awkward>=2.4.6",
"uproot>=5.1.1",
"dask[array]>=2023.4.0",
"dask-awkward>=2023.7.1,!=2023.8.0",
"dask-histogram>=2023.6.0",
"correctionlib>=2.0.0",
"dask-awkward>=2023.10.0",
"dask-histogram>=2023.10.0",
"correctionlib>=2.3.3",
"pyarrow>=6.0.0",
"fsspec",
"matplotlib>=3",
"numba>=0.57.0",
"numpy>=1.22.0,<1.25", # < 1.25 for numba 0.57 series
"numba>=0.58.0",
"numpy>=1.22.0,<1.26", # < 1.26 for numba 0.58 series
"scipy>=1.1.0",
"tqdm>=4.27.0",
"lz4",
Expand Down
95 changes: 53 additions & 42 deletions src/coffea/nanoevents/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import io
import pathlib
import urllib.parse
import warnings
import weakref
from functools import partial
Expand All @@ -11,7 +10,6 @@
import dask_awkward
import fsspec
import uproot
from dask_awkward import ImplementsFormTransformation

from coffea.nanoevents.mapping import (
CachedMapping,
Expand All @@ -30,7 +28,9 @@
PHYSLITESchema,
TreeMakerSchema,
)
from coffea.nanoevents.util import key_to_tuple, tuple_to_key
from coffea.nanoevents.util import key_to_tuple, quote, tuple_to_key, unquote

_offsets_label = quote(",!offsets")


def _remove_not_interpretable(branch):
Expand Down Expand Up @@ -64,11 +64,11 @@ def _remove_not_interpretable(branch):

def _key_formatter(prefix, form_key, form, attribute):
if attribute == "offsets":
form_key += "%2C%21offsets"
form_key += _offsets_label
return prefix + f"/{attribute}/{form_key}"


class _map_schema_base(ImplementsFormTransformation):
class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo
def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
):
Expand All @@ -77,24 +77,50 @@ def __init__(
self.metadata = metadata
self.version = version

def extract_form_keys_base_columns(self, form_keys):
base_columns = []
for form_key in form_keys:
base_columns.extend(
def keys_for_buffer_keys(self, buffer_keys):
base_columns = set()
for buffer_key in buffer_keys:
form_key, attribute = self.parse_buffer_key(buffer_key)
operands = unquote(form_key).split(",")

it_operands = iter(operands)
next(it_operands)

base_columns.update(
[
acolumn
for acolumn in urllib.parse.unquote(form_key).split(",")
if not acolumn.startswith("!")
name
for name, maybe_transform in zip(operands, it_operands)
if maybe_transform == "!load"
]
)
return list(set(base_columns))
return base_columns

def parse_buffer_key(self, buffer_key):
prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2)
if attribute == "offsets":
return (form_key[: -len(_offsets_label)], attribute)
else:
return (form_key, attribute)

@property
def buffer_key(self):
return partial(self._key_formatter, "")

def _key_formatter(self, prefix, form_key, form, attribute):
if attribute == "offsets":
form_key += "%2C%21offsets"
form_key += _offsets_label
return prefix + f"/{attribute}/{form_key}"


class _TranslatedMapping:
def __init__(self, func, mapping):
self._func = func
self._mapping = mapping

def __getitem__(self, index):
return self._mapping[self._func(index)]


class _map_schema_uproot(_map_schema_base):
def __init__(
self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None
Expand Down Expand Up @@ -125,9 +151,12 @@ def __call__(self, form):
},
"form_key": None,
}
return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)
return (
awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form),
self,
)

def create_column_mapping_and_key(self, tree, start, stop, interp_options):
def load_buffers(self, tree, keys, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key
Expand All @@ -147,8 +176,15 @@ def create_column_mapping_and_key(self, tree, start, stop, interp_options):
use_ak_forth=True,
)
mapping.preload_column_source(partition_key[0], partition_key[1], tree)
buffer_key = partial(self._key_formatter, tuple_to_key(partition_key))

# The buffer-keys that dask-awkward knows about will not include the
# partition key. Therefore, we must translate the keys here.
def translate_key(index):
form_key, attribute = self.parse_buffer_key(index)
return buffer_key(form_key=form_key, attribute=attribute, form=None)

return mapping, partial(self._key_formatter, tuple_to_key(partition_key))
return _TranslatedMapping(translate_key, mapping)


class _map_schema_parquet(_map_schema_base):
Expand All @@ -174,31 +210,6 @@ def __call__(self, form):

return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form)

def create_column_mapping_and_key(self, columns, start, stop, interp_options):
from functools import partial

from coffea.nanoevents.util import tuple_to_key

uuid = "NO_UUID"
obj_path = "NO_OBJECT_PATH"

partition_key = (
str(uuid),
obj_path,
f"{start}-{stop}",
)
uuidpfn = {uuid: columns}
mapping = PreloadedSourceMapping(
PreloadedOpener(uuidpfn),
start,
stop,
cache={},
access_log=None,
)
mapping.preload_column_source(partition_key[0], partition_key[1], columns)

return mapping, partial(self._key_formatter, tuple_to_key(partition_key))


class NanoEventsFactory:
"""A factory class to build NanoEvents objects"""
Expand Down
21 changes: 13 additions & 8 deletions src/coffea/nanoevents/mapping/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,19 @@ def __getitem__(self, key):
if len(stack) != 1:
raise RuntimeError(f"Syntax error in form key {nodes}")
out = stack.pop()
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
import awkward

if isinstance(out, awkward.contents.Content):
out = awkward.to_numpy(out)
else:
try:
out = numpy.array(out)
except ValueError:
if self._debug:
print(out)
raise RuntimeError(
f"Left with non-bare array after evaluating form key {nodes}"
)
return out

@abstractmethod
Expand Down
15 changes: 12 additions & 3 deletions src/coffea/nanoevents/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ def to_layout(array):
return array.layout


def ensure_array(arraylike):
if isinstance(arraylike, (awkward.contents.Content, awkward.Array)):
return awkward.to_numpy(arraylike)
elif isinstance(arraylike, awkward.index.Index):
return arraylike.data
else:
return numpy.asarray(arraylike)


def data(stack):
"""Extract content from array
(currently a noop, can probably take place of !content)
Expand Down Expand Up @@ -96,7 +105,7 @@ def counts2offsets(stack):
Signature: counts,!counts2offsets
Outputs an array with length one larger than input
"""
counts = numpy.array(stack.pop())
counts = ensure_array(stack.pop())
offsets = numpy.empty(len(counts) + 1, dtype=numpy.int64)
offsets[0] = 0
numpy.cumsum(counts, out=offsets[1:])
Expand All @@ -123,11 +132,11 @@ def local2global(stack):
Signature: index,target_offsets,!local2global
Outputs a content array with same shape as index content
"""
target_offsets = numpy.asarray(stack.pop())
target_offsets = ensure_array(stack.pop())
index = stack.pop()
index = index.mask[index >= 0] + target_offsets[:-1]
index = index.mask[index < target_offsets[1:]]
out = numpy.array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
out = ensure_array(awkward.flatten(awkward.fill_none(index, -1), axis=None))
if out.dtype != numpy.int64:
raise RuntimeError
stack.append(out)
Expand Down
6 changes: 3 additions & 3 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ def _work_function(
import dask_awkward

to_compute = processor_instance.process(events)
materialized = dask_awkward.necessary_columns(to_compute)
# materialized = dask_awkward.report_necessary_buffers(to_compute)
out = dask.compute(to_compute, scheduler="single-threaded")[0]
except Exception as e:
raise Exception(f"Failed processing file: {item!r}") from e
Expand All @@ -1734,11 +1734,11 @@ def _work_function(
metrics = {}
if isinstance(file, uproot.ReadOnlyDirectory):
metrics["bytesread"] = file.file.source.num_requested_bytes
# metrics["data_and_shape_buffers"] = set(materialized)
# metrics["shape_only_buffers"] = set(materialized)
if schema is not None and issubclass(schema, schemas.BaseSchema):
metrics["columns"] = set(materialized)
metrics["entries"] = len(events)
else:
metrics["columns"] = set(materialized)
metrics["entries"] = events.size
metrics["processtime"] = toc - tic
return {"out": out, "metrics": metrics, "processed": {item}}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_jetmet_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled):
**{name: evaluator[name] for name in jec_stack_names[5:6]}
)

print(dak.necessary_columns(jets.eta))
print(dak.report_necessary_columns(jets.eta))
print(
dak.necessary_columns(
dak.report_necessary_columns(
resosf.getScaleFactor(
JetEta=jets.eta,
)
Expand Down

0 comments on commit 8662c9f

Please sign in to comment.