diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe8453b8d..ea61615a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,7 @@ jobs: python -m pip install xgboost python -m pip install tritonclient[grpc,http] # install checked out coffea - python -m pip install -q -e '.[dev,parsl,dask,spark]' + python -m pip install -q -e '.[dev,parsl,dask,spark]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Install dependencies (MacOS) @@ -80,7 +80,7 @@ jobs: python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu python -m pip install xgboost # install checked out coffea - python -m pip install -q -e '.[dev,dask,spark]' + python -m pip install -q -e '.[dev,dask,spark]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Install dependencies (Windows) @@ -91,14 +91,14 @@ jobs: python -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu python -m pip install xgboost # install checked out coffea - python -m pip install -q -e '.[dev,dask]' + python -m pip install -q -e '.[dev,dask]' --upgrade --upgrade-strategy eager python -m pip list java -version - name: Start triton server with example model if: matrix.os == 'ubuntu-latest' run: | - docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-py3 tritonserver --model-repository=/models + docker run -d --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v ${{ github.workspace }}/tests/samples/triton_models_test:/models nvcr.io/nvidia/tritonserver:23.04-pyt-python-py3 tritonserver --model-repository=/models - name: Test with pytest run: | diff --git a/pyproject.toml b/pyproject.toml index f5b168c1e..689e03ff0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,17 +37,17 @@ classifiers = [ "Topic :: Utilities", ] dependencies = [ - "awkward>=2.4.2", - "uproot>=5.0.10", + "awkward>=2.4.6", + "uproot>=5.1.1", "dask[array]>=2023.4.0", - "dask-awkward>=2023.7.1,!=2023.8.0", - "dask-histogram>=2023.6.0", - "correctionlib>=2.0.0", + "dask-awkward>=2023.10.0", + "dask-histogram>=2023.10.0", + "correctionlib>=2.3.3", "pyarrow>=6.0.0", "fsspec", "matplotlib>=3", - "numba>=0.57.0", - "numpy>=1.22.0,<1.25", # < 1.25 for numba 0.57 series + "numba>=0.58.0", + "numpy>=1.22.0,<1.26", # < 1.26 for numba 0.58 series "scipy>=1.1.0", "tqdm>=4.27.0", "lz4", diff --git a/src/coffea/nanoevents/factory.py b/src/coffea/nanoevents/factory.py index 58dd55ad5..b6656282f 100644 --- a/src/coffea/nanoevents/factory.py +++ b/src/coffea/nanoevents/factory.py @@ -1,6 +1,5 @@ import io import pathlib -import urllib.parse import warnings import weakref from functools import partial @@ -11,7 +10,6 @@ import dask_awkward import fsspec import uproot -from dask_awkward import ImplementsFormTransformation from coffea.nanoevents.mapping import ( CachedMapping, @@ -30,7 +28,9 @@ PHYSLITESchema, TreeMakerSchema, ) -from coffea.nanoevents.util import key_to_tuple, tuple_to_key +from coffea.nanoevents.util import key_to_tuple, quote, tuple_to_key, unquote + +_offsets_label = quote(",!offsets") def _remove_not_interpretable(branch): @@ -64,11 +64,11 @@ def _remove_not_interpretable(branch): def _key_formatter(prefix, form_key, form, attribute): if attribute == "offsets": - form_key += "%2C%21offsets" + form_key += _offsets_label return prefix + f"/{attribute}/{form_key}" -class _map_schema_base(ImplementsFormTransformation): +class _map_schema_base: # ImplementsFormMapping, ImplementsFormMappingInfo def __init__( self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None ): @@ -77,24 +77,50 @@ def __init__( self.metadata = metadata self.version = version - def extract_form_keys_base_columns(self, form_keys): - base_columns = [] - for form_key in form_keys: - base_columns.extend( + def keys_for_buffer_keys(self, buffer_keys): + base_columns = set() + for buffer_key in buffer_keys: + form_key, attribute = self.parse_buffer_key(buffer_key) + operands = unquote(form_key).split(",") + + it_operands = iter(operands) + next(it_operands) + + base_columns.update( [ - acolumn - for acolumn in urllib.parse.unquote(form_key).split(",") - if not acolumn.startswith("!") + name + for name, maybe_transform in zip(operands, it_operands) + if maybe_transform == "!load" ] ) - return list(set(base_columns)) + return base_columns + + def parse_buffer_key(self, buffer_key): + prefix, attribute, form_key = buffer_key.rsplit("/", maxsplit=2) + if attribute == "offsets": + return (form_key[: -len(_offsets_label)], attribute) + else: + return (form_key, attribute) + + @property + def buffer_key(self): + return partial(self._key_formatter, "") def _key_formatter(self, prefix, form_key, form, attribute): if attribute == "offsets": - form_key += "%2C%21offsets" + form_key += _offsets_label return prefix + f"/{attribute}/{form_key}" +class _TranslatedMapping: + def __init__(self, func, mapping): + self._func = func + self._mapping = mapping + + def __getitem__(self, index): + return self._mapping[self._func(index)] + + class _map_schema_uproot(_map_schema_base): def __init__( self, schemaclass=BaseSchema, metadata=None, behavior=None, version=None @@ -125,9 +151,12 @@ def __call__(self, form): }, "form_key": None, } - return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) + return ( + awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form), + self, + ) - def create_column_mapping_and_key(self, tree, start, stop, interp_options): + def load_buffers(self, tree, keys, start, stop, interp_options): from functools import partial from coffea.nanoevents.util import tuple_to_key @@ -147,8 +176,15 @@ def create_column_mapping_and_key(self, tree, start, stop, interp_options): use_ak_forth=True, ) mapping.preload_column_source(partition_key[0], partition_key[1], tree) + buffer_key = partial(self._key_formatter, tuple_to_key(partition_key)) + + # The buffer-keys that dask-awkward knows about will not include the + # partition key. Therefore, we must translate the keys here. + def translate_key(index): + form_key, attribute = self.parse_buffer_key(index) + return buffer_key(form_key=form_key, attribute=attribute, form=None) - return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) + return _TranslatedMapping(translate_key, mapping) class _map_schema_parquet(_map_schema_base): @@ -174,31 +210,6 @@ def __call__(self, form): return awkward.forms.form.from_dict(self.schemaclass(lform, self.version).form) - def create_column_mapping_and_key(self, columns, start, stop, interp_options): - from functools import partial - - from coffea.nanoevents.util import tuple_to_key - - uuid = "NO_UUID" - obj_path = "NO_OBJECT_PATH" - - partition_key = ( - str(uuid), - obj_path, - f"{start}-{stop}", - ) - uuidpfn = {uuid: columns} - mapping = PreloadedSourceMapping( - PreloadedOpener(uuidpfn), - start, - stop, - cache={}, - access_log=None, - ) - mapping.preload_column_source(partition_key[0], partition_key[1], columns) - - return mapping, partial(self._key_formatter, tuple_to_key(partition_key)) - class NanoEventsFactory: """A factory class to build NanoEvents objects""" diff --git a/src/coffea/nanoevents/mapping/base.py b/src/coffea/nanoevents/mapping/base.py index c6a5e8e2e..3d87b410c 100644 --- a/src/coffea/nanoevents/mapping/base.py +++ b/src/coffea/nanoevents/mapping/base.py @@ -111,14 +111,19 @@ def __getitem__(self, key): if len(stack) != 1: raise RuntimeError(f"Syntax error in form key {nodes}") out = stack.pop() - try: - out = numpy.array(out) - except ValueError: - if self._debug: - print(out) - raise RuntimeError( - f"Left with non-bare array after evaluating form key {nodes}" - ) + import awkward + + if isinstance(out, awkward.contents.Content): + out = awkward.to_numpy(out) + else: + try: + out = numpy.array(out) + except ValueError: + if self._debug: + print(out) + raise RuntimeError( + f"Left with non-bare array after evaluating form key {nodes}" + ) return out @abstractmethod diff --git a/src/coffea/nanoevents/transforms.py b/src/coffea/nanoevents/transforms.py index e969310b2..2985f9709 100644 --- a/src/coffea/nanoevents/transforms.py +++ b/src/coffea/nanoevents/transforms.py @@ -13,6 +13,15 @@ def to_layout(array): return array.layout +def ensure_array(arraylike): + if isinstance(arraylike, (awkward.contents.Content, awkward.Array)): + return awkward.to_numpy(arraylike) + elif isinstance(arraylike, awkward.index.Index): + return arraylike.data + else: + return numpy.asarray(arraylike) + + def data(stack): """Extract content from array (currently a noop, can probably take place of !content) @@ -96,7 +105,7 @@ def counts2offsets(stack): Signature: counts,!counts2offsets Outputs an array with length one larger than input """ - counts = numpy.array(stack.pop()) + counts = ensure_array(stack.pop()) offsets = numpy.empty(len(counts) + 1, dtype=numpy.int64) offsets[0] = 0 numpy.cumsum(counts, out=offsets[1:]) @@ -123,11 +132,11 @@ def local2global(stack): Signature: index,target_offsets,!local2global Outputs a content array with same shape as index content """ - target_offsets = numpy.asarray(stack.pop()) + target_offsets = ensure_array(stack.pop()) index = stack.pop() index = index.mask[index >= 0] + target_offsets[:-1] index = index.mask[index < target_offsets[1:]] - out = numpy.array(awkward.flatten(awkward.fill_none(index, -1), axis=None)) + out = ensure_array(awkward.flatten(awkward.fill_none(index, -1), axis=None)) if out.dtype != numpy.int64: raise RuntimeError stack.append(out) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 42df52eeb..18ab1c8c5 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1718,7 +1718,7 @@ def _work_function( import dask_awkward to_compute = processor_instance.process(events) - materialized = dask_awkward.necessary_columns(to_compute) + # materialized = dask_awkward.report_necessary_buffers(to_compute) out = dask.compute(to_compute, scheduler="single-threaded")[0] except Exception as e: raise Exception(f"Failed processing file: {item!r}") from e @@ -1734,11 +1734,11 @@ def _work_function( metrics = {} if isinstance(file, uproot.ReadOnlyDirectory): metrics["bytesread"] = file.file.source.num_requested_bytes + # metrics["data_and_shape_buffers"] = set(materialized) + # metrics["shape_only_buffers"] = set(materialized) if schema is not None and issubclass(schema, schemas.BaseSchema): - metrics["columns"] = set(materialized) metrics["entries"] = len(events) else: - metrics["columns"] = set(materialized) metrics["entries"] = events.size metrics["processtime"] = toc - tic return {"out": out, "metrics": metrics, "processed": {item}} diff --git a/tests/test_jetmet_tools.py b/tests/test_jetmet_tools.py index a7ef91385..8be3a97f3 100644 --- a/tests/test_jetmet_tools.py +++ b/tests/test_jetmet_tools.py @@ -837,9 +837,9 @@ def test_corrected_jets_factory(optimization_enabled): **{name: evaluator[name] for name in jec_stack_names[5:6]} ) - print(dak.necessary_columns(jets.eta)) + print(dak.report_necessary_columns(jets.eta)) print( - dak.necessary_columns( + dak.report_necessary_columns( resosf.getScaleFactor( JetEta=jets.eta, )