Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding a very basic FSSpecSource #967

Merged
merged 27 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
03af365
Initial fsspec source implementation
nsmith- Aug 25, 2022
baec282
Some basic test
nsmith- Aug 25, 2022
0378ac8
Actually add the test
nsmith- Aug 25, 2022
e86df23
Skip test if XRootD module not available
nsmith- Aug 26, 2022
b989153
Keep a file handle active to speed up individual chunk requests
nsmith- Aug 26, 2022
170c28e
Merge branch 'main' into fsspecsrc
nsmith- Aug 29, 2022
42ff360
sync with https://github.com/scikit-hep/uproot5/pull/692; fix conflicts
lobis Oct 3, 2023
5b8a6b9
add `s3_handler`, `use_threads` to exclusion list for fsspec options
lobis Oct 3, 2023
ffff895
Merge branch 'main' into fsspec
lobis Oct 3, 2023
07804ef
remove aiohttp
lobis Oct 3, 2023
39f87e2
add aiohttp
lobis Oct 3, 2023
84de1e9
add comment, refactor exclusion
lobis Oct 3, 2023
d9a34be
add fsspec to dependencies
lobis Oct 3, 2023
5bbdcb0
move fsspec dependencies to dev instead of test
lobis Oct 3, 2023
49b5e61
load default options
lobis Oct 4, 2023
0c99fa3
fsspec open mode set to "r" from "rb" to avoid error on github spec
lobis Oct 4, 2023
eb3bb52
add s3fs to test deps
lobis Oct 4, 2023
fce968b
rename fsspec test (replace hyphen with underscore)
lobis Oct 4, 2023
cfb4469
add tests for other protocols
lobis Oct 4, 2023
24b1106
remove read-only from fsspec open
lobis Oct 4, 2023
884c2f2
add fsspec dependencies in test insted of dev
lobis Oct 4, 2023
78b9e2f
add skip to github test
lobis Oct 4, 2023
3cc4996
remove top level skip
lobis Oct 4, 2023
339909f
attempt to fix ci
lobis Oct 4, 2023
fa4a295
add back fsspec
lobis Oct 4, 2023
c42767b
remove commented s3fs
lobis Oct 4, 2023
87b4239
Address https://github.com/scikit-hep/uproot5/pull/967#discussion_r13…
lobis Oct 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ dev = [
"awkward-pandas;python_version >= \"3.8\"",
]
test = [
"aiohttp",
"fsspec-xrootd",
lobis marked this conversation as resolved.
Show resolved Hide resolved
"lz4",
"minio",
"pytest>=6",
Expand Down
139 changes: 139 additions & 0 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import uproot.source.chunk


class FSSpecSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL for the file to open.
**kwargs (dict): any extra arguments to be forwarded to the particular
FileSystem instance constructor. This might include S3 access keys,
or HTTP headers, etc.

A :doc:`uproot.source.chunk.Source` that uses FSSpec's cat_ranges feature
to get many chunks in one request.
"""

def __init__(self, file_path, **kwargs):
import fsspec.core

# Remove uproot-specific options (should be done earlier)
# TODO: is timeout always valid?
opts = {
k: v
for k, v in kwargs.items()
if k
not in {
"file_handler",
"xrootd_handler",
"http_handler",
"s3_handler",
"object_handler",
"max_num_elements",
"num_workers",
"num_fallback_workers",
"use_threads",
"begin_chunk_size",
"minimal_ttree_metadata",
lobis marked this conversation as resolved.
Show resolved Hide resolved
}
}
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **opts)
self._file = self._fs.open(self._file_path, "rb")
self._fh = None
self._num_requests = 0
self._num_requested_chunks = 0
self._num_requested_bytes = 0
self.__enter__()

def __repr__(self):
path = repr(self._file_path)
if len(self._file_path) > 10:
path = repr("..." + self._file_path[-10:])
return f"<{type(self).__name__} {path} at 0x{id(self):012x}>"

def __enter__(self):
self._fh = self._file.__enter__()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._file.__exit__(exception_type, exception_value, traceback)

def chunk(self, start, stop):
"""
Args:
start (int): Seek position of the first byte to include.
stop (int): Seek position of the first byte to exclude
(one greater than the last byte to include).

Request a byte range of data from the file as a
:doc:`uproot.source.chunk.Chunk`.
"""
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
if self._fh:
self._fh.seek(start)
data = self._fh.read(stop - start)
else:
data = self._fs.cat_file(self._file_path, start, stop)
future = uproot.source.futures.TrivialFuture(data)
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(self, ranges, notifications):
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
as (start, stop) pairs in a single request, if possible.
notifications (``queue.Queue``): Indicator of completed
chunks. After each gets filled, it is ``put`` on the
queue; a listener should ``get`` from this queue
``len(ranges)`` times.

Request a set of byte ranges from the file.

This method has two outputs:

* The method returns a list of unfilled
:doc:`uproot.source.chunk.Chunk` objects, which get filled
in a background thread. If you try to read data from an
unfilled chunk, it will wait until it is filled.
* The method also puts the same :doc:`uproot.source.chunk.Chunk`
objects onto the ``notifications`` queue as soon as they are
filled.

Reading data from chunks on the queue can be more efficient than
reading them from the returned list. The total reading time is the
same, but work on the filled chunks can be better parallelized if
it is triggered by already-filled chunks, rather than waiting for
chunks to be filled.
"""
self._num_requests += 1
self._num_requested_chunks += len(ranges)
self._num_requested_bytes += sum(stop - start for start, stop in ranges)
data = self._fs.cat_ranges(
[self._file_path] * len(ranges),
[start for start, _ in ranges],
[stop for _, stop in ranges],
)
chunks = []
for item, (start, stop) in zip(data, ranges):
future = uproot.source.futures.TrivialFuture(item)
chunk = uproot.source.chunk.Chunk(self, start, stop, future)
uproot.source.chunk.notifier(chunk, notifications)()
Comment on lines +107 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a blocking call in this first implementatation, but will be made asynchronous later.

chunks.append(chunk)
return chunks

@property
def num_bytes(self):
"""
The number of bytes in the file.
"""
return self._fs.size(self._file_path)

@property
def closed(self):
"""
True if the associated file/connection/thread pool is closed; False
otherwise.
"""
return False
29 changes: 29 additions & 0 deletions tests/test_0692-fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE

import pytest

import uproot
import uproot.source.fsspec


@pytest.mark.network
def test_open_fsspec_http():
with uproot.open(
"https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root",
http_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/MET_pt"].array(library="np")
assert len(data) == 40


@pytest.mark.network
@pytest.mark.xrootd
def test_open_fsspec_xrootd():
pytest.importorskip("XRootD")
with uproot.open(
"root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root",
xrootd_handler=uproot.source.fsspec.FSSpecSource,
) as f:
data = f["Events/run"].array(library="np", entry_stop=20)
assert len(data) == 20
assert (data == 194778).all()