Skip to content

Commit

Permalink
Fixes #90 and readies Uproot lazyarray for Awkward ArrayCache weakref…
Browse files Browse the repository at this point in the history
…s. (#92)

* Fixes #90 and readies Uproot lazyarray for Awkward ArrayCache weakrefs.

* Apply black formatting.
  • Loading branch information
jpivarski authored Sep 8, 2020
1 parent 0429ae0 commit fc5b4ae
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 34 deletions.
4 changes: 3 additions & 1 deletion tests/test_0017-multi-basket-multi-branch-fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ def test_branch_array_4(file_handler):

def test_cache():
with uproot4.open(
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root")
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"),
object_cache=100,
array_cache="100 MB",
) as f:
assert f.cache_key == "db4be408-93ad-11ea-9027-d201a8c0beef:/"
assert f["sample"].cache_key == "db4be408-93ad-11ea-9027-d201a8c0beef:/sample;1"
Expand Down
8 changes: 6 additions & 2 deletions tests/test_0018-array-fetching-interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def test_compute():
awkward1 = pytest.importorskip("awkward1")

with uproot4.open(
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root")
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"),
object_cache=100,
array_cache="100 MB",
)["sample"] as sample:
result = sample.arrays(["stuff", "i4"], aliases={"stuff": "abs(i4) + Ai8"})
assert result.tolist() == [
Expand Down Expand Up @@ -151,7 +153,9 @@ def test_arrays():
)

with uproot4.open(
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root")
skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"),
object_cache=100,
array_cache="100 MB",
)["sample"] as sample:
result = sample.arrays(["I4", "F4"], aliases={"I4": "i4", "F4": "f4"})
assert result.tolist() == [
Expand Down
3 changes: 3 additions & 0 deletions uproot4/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
from uproot4.cache import LRUCache
from uproot4.cache import LRUArrayCache

object_cache = LRUCache(100)
array_cache = LRUArrayCache("100 MB")

from uproot4.source.file import MemmapSource
from uproot4.source.file import MultithreadedFileSource
from uproot4.source.http import HTTPSource
Expand Down
110 changes: 110 additions & 0 deletions uproot4/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,113 @@ def awkward_form_remove_uproot(awkward1, form):
)
else:
raise RuntimeError("unrecognized form: {0}".format(type(form)))


# FIXME: Until we get Awkward reading these bytes directly, rather than
# going through ak.from_iter, the integer dtypes will be int64 and the
# floating dtypes will be float64 because that's what ak.from_iter makes.
def awkward_form_of_iter(awkward1, form):
if isinstance(form, awkward1.forms.BitMaskedForm):
return awkward1.forms.BitMaskedForm(
form.mask,
awkward_form_of_iter(awkward1, form.content),
form.valid_when,
form.lsb_order,
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.ByteMaskedForm):
return awkward1.forms.ByteMaskedForm(
form.mask,
awkward_form_of_iter(awkward1, form.content),
form.valid_when,
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.EmptyForm):
return awkward1.forms.EmptyForm(form.has_identities, form.parameters,)
elif isinstance(form, awkward1.forms.IndexedForm):
return awkward1.forms.IndexedForm(
form.index,
awkward_form_of_iter(awkward1, form.content),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.IndexedOptionForm):
return awkward1.forms.IndexedOptionForm(
form.index,
awkward_form_of_iter(awkward1, form.content),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.ListForm):
return awkward1.forms.ListForm(
form.starts,
form.stops,
awkward_form_of_iter(awkward1, form.content),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.ListOffsetForm):
return awkward1.forms.ListOffsetForm(
form.offsets,
awkward_form_of_iter(awkward1, form.content),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.NumpyForm):
if form.parameter("__array__") in ("char", "byte"):
f = form
else:
d = form.to_numpy()
if issubclass(d.type, numpy.integer):
d = numpy.dtype(numpy.int64)
elif issubclass(d.type, numpy.floating):
d = numpy.dtype(numpy.float64)
f = awkward1.forms.Form.from_numpy(d)
out = awkward1.forms.NumpyForm(
form.inner_shape,
f.itemsize,
f.format,
form.has_identities,
form.parameters,
)
return out
elif isinstance(form, awkward1.forms.RecordForm):
return awkward1.forms.RecordForm(
dict(
(k, awkward_form_of_iter(awkward1, v)) for k, v in form.contents.items()
),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.RegularForm):
return awkward1.forms.RegularForm(
awkward_form_of_iter(awkward1, form.content),
form.size,
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.UnionForm):
return awkward1.forms.UnionForm(
form.tags,
form.index,
[awkward_form_of_iter(awkward1, x) for x in form.contents],
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.UnmaskedForm):
return awkward1.forms.UnmaskedForm(
awkward_form_of_iter(awkward1, form.content),
form.has_identities,
form.parameters,
)
elif isinstance(form, awkward1.forms.VirtualForm):
return awkward1.forms.VirtualForm(
awkward_form_of_iter(awkward1, form.form),
form.has_length,
form.has_identities,
form.parameters,
)
else:
raise RuntimeError("unrecognized form: {0}".format(type(form)))
70 changes: 53 additions & 17 deletions uproot4/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def __enter__(self):
def __exit__(self, exception_type, exception_value, traceback):
pass

def __getattr__(self, attr):
return getattr(self.hasbranches, attr)

def __getitem__(self, where):
return self.hasbranches[where]


def iterate(
files,
Expand Down Expand Up @@ -257,7 +263,7 @@ def concatenate(
language=uproot4.language.python.PythonLanguage(),
decompression_executor=None,
interpretation_executor=None,
array_cache=None,
array_cache=uproot4.array_cache,
library="ak",
how=None,
custom_classes=None,
Expand Down Expand Up @@ -414,7 +420,7 @@ def lazy(
step_size="100 MB",
decompression_executor=None,
interpretation_executor=None,
array_cache="100 MB",
array_cache=uproot4.array_cache,
library="ak",
custom_classes=None,
allow_missing=False,
Expand Down Expand Up @@ -522,16 +528,20 @@ def lazy(
* :doc:`uproot4.behaviors.TBranch.lazy` (this function): returns a lazily
read array from ``TTrees``.
"""
import awkward1

files = _regularize_files(files)
decompression_executor, interpretation_executor = _regularize_executors(
decompression_executor, interpretation_executor
)
array_cache = _regularize_array_cache(array_cache, None)
library = uproot4.interpretation.library._regularize_library_lazy(library)
import awkward1

if type(array_cache) == dict:
array_cache = _WrapDict(array_cache)

if array_cache is not None:
array_cache = awkward1.layout.ArrayCache(array_cache)
layout_array_cache = awkward1.layout.ArrayCache(array_cache)

real_options = dict(options)
if "num_workers" not in real_options:
Expand Down Expand Up @@ -635,9 +645,10 @@ def real_filter_branch(branch):
names = []
for key in common_keys:
branch = obj[key]
form = branchid_interpretation[branch.cache_key].awkward_form(
obj.file, index_format="i64"
)
interpretation = branchid_interpretation[branch.cache_key]
form = interpretation.awkward_form(obj.file, index_format="i64")
if isinstance(interpretation, uproot4.interpretation.objects.AsObjects):
form = uproot4._util.awkward_form_of_iter(awkward1, form)
generator = awkward1.layout.ArrayGenerator(
branch.array,
(
Expand All @@ -650,19 +661,21 @@ def real_filter_branch(branch):
"ak",
),
{},
uproot4._util.awkward_form_remove_uproot(awkward1, form),
uproot4._util.awkward_form_remove_uproot(
awkward1, form
), # , interpretation
length,
)
cache_key = "{0}:{1}:{2}-{3}:{4}".format(
branch.cache_key,
branchid_interpretation[branch.cache_key].cache_key,
interpretation.cache_key,
start,
stop,
library.name,
)
global_cache_key.append(cache_key)
virtualarray = awkward1.layout.VirtualArray(
generator, cache=array_cache, cache_key=cache_key
generator, cache=layout_array_cache, cache_key=cache_key
)
fields.append(virtualarray)
names.append(key)
Expand All @@ -672,11 +685,7 @@ def real_filter_branch(branch):
global_offsets.append(global_offsets[-1] + length)

out = awkward1.partition.IrregularlyPartitionedArray(partitions, global_offsets[1:])
out = awkward1.Array(out)

return library.wrap_awkward_lazy(
out, common_keys, global_offsets, ",".join(global_cache_key)
)
return awkward1.Array(out, cache=array_cache)


Report = collections.namedtuple(
Expand Down Expand Up @@ -860,7 +869,7 @@ def arrays(
arrays; if None, the global ``uproot4.interpretation_executor`` is
used.
array_cache (None, MutableMapping, or memory size): Cache of arrays;
if None, do not use a cache; if a memory size, create a new cache
if None, use the file's cache; if a memory size, create a new cache
of this size.
library (str or :doc:`uproot4.interpretation.library.Library`): The library
that is used to represent arrays. Options are ``"np"`` for NumPy,
Expand Down Expand Up @@ -1807,7 +1816,7 @@ def array(
arrays; if None, the global ``uproot4.interpretation_executor`` is
used.
array_cache (None, MutableMapping, or memory size): Cache of arrays;
if None, do not use a cache; if a memory size, create a new cache
if None, use the file's cache; if a memory size, create a new cache
of this size.
library (str or :doc:`uproot4.interpretation.library.Library`): The library
that is used to represent arrays. Options are ``"np"`` for NumPy,
Expand Down Expand Up @@ -3118,3 +3127,30 @@ def _regularize_step_size(
return _hasbranches_num_entries_for(
hasbranches, target_num_bytes, entry_start, entry_stop, branchid_interpretation
)


class _WrapDict(MutableMapping):
def __init__(self, dict):
self.dict = dict

def __str__(self):
return str(self.dict)

def __repr__(self):
return repr(self.dict)

def __getitem__(self, where):
return self.dict[where]

def __setitem__(self, where, what):
self.dict[where] = what

def __delitem__(self, where):
del self.dict[where]

def __iter__(self, where):
for x in self.dict:
yield x

def __len__(self):
return len(self.dict)
57 changes: 51 additions & 6 deletions uproot4/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class LRUCache(MutableMapping):
Get and set (or explicitly remove) items with ``MutableMapping`` syntax:
square bracket subscripting.
LRUCache is thread-safe for getting, setting, and deleting, but not for
iterating.
LRUCache is thread-safe for all options: getting, setting, deleting,
iterating, listing keys, values, and items.
This cache is insensitive to the size of the objects it stores, and hence
is a better ``object_cache`` than an ``array_cache``.
Expand Down Expand Up @@ -78,6 +78,48 @@ def current(self):
"""
return self._current

def keys(self):
"""
Returns a copy of the keys currently in the cache, in least-recently
used order.
The list ascends from least-recently used to most-recently used: index
``0`` is the least-recently used and index ``-1`` is the most-recently
used.
(Calling this method does not change the order.)
"""
with self._lock:
return list(self._order)

def values(self):
"""
Returns a copy of the values currently in the cache, in least-recently
used order.
The list ascends from least-recently used to most-recently used: index
``0`` is the least-recently used and index ``-1`` is the most-recently
used.
(Calling this method does not change the order.)
"""
with self._lock:
return [self._data[where] for where in self._order]

def items(self):
"""
Returns a copy of the items currently in the cache, in least-recently
used order.
The list ascends from least-recently used to most-recently used: index
``0`` is the least-recently used and index ``-1`` is the most-recently
used.
(Calling this method does not change the order.)
"""
with self._lock:
return [(where, self._data[where]) for where in self._order]

def __getitem__(self, where):
with self._lock:
out = self._data[where]
Expand Down Expand Up @@ -106,11 +148,14 @@ def __delitem__(self, where):
self._order.remove(where)

def __iter__(self):
for x in self._order:
with self._lock:
order = list(self._order)
for x in order:
yield x

def __len__(self):
return len(self._order)
with self._lock:
return len(self._order)


class LRUArrayCache(LRUCache):
Expand All @@ -129,8 +174,8 @@ class LRUArrayCache(LRUCache):
Get and set (or explicitly remove) items with ``MutableMapping`` syntax:
square bracket subscripting.
LRUArrayCache is thread-safe for getting, setting, and deleting, but not
for iterating.
LRUArrayCache is thread-safe for all options: getting, setting, deleting,
iterating, listing keys, values, and items.
This cache is sensitive to the size of the objects it stores, but only if
those objects have meaningful ``nbytes``. It is therefore a better
Expand Down
Loading

0 comments on commit fc5b4ae

Please sign in to comment.