Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Switch file reading to use concrete endpoints #48

Merged
merged 16 commits into from
Jun 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions src/fsspec_xrootd/xrootd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

Check warning on line 1 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing module docstring

import asyncio
import io
import logging
import os.path
import time
import warnings
Expand All @@ -14,18 +15,18 @@
from fsspec.asyn import AsyncFileSystem, _run_coros_in_chunks, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError
from fsspec.spec import AbstractBufferedFile
from XRootD import client

Check failure on line 18 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD'
from XRootD.client.flags import (

Check failure on line 19 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.flags'
DirListFlags,
MkDirFlags,
OpenFlags,
QueryCode,
StatInfoFlags,
)
from XRootD.client.responses import HostList, XRootDStatus

Check failure on line 26 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unable to import 'XRootD.client.responses'


class ErrorCodes(IntEnum):

Check warning on line 29 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
INVALID_PATH = 400


Expand All @@ -50,13 +51,13 @@
asyncio.get_running_loop().create_future()
)

def callback(status: XRootDStatus, content: T, servers: HostList) -> None:

Check warning on line 54 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Unused argument 'servers'
if future.cancelled():
return
loop = future.get_loop()
try:
loop.call_soon_threadsafe(future.set_result, (status, content))
except Exception as exc:

Check warning on line 60 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Catching too general exception Exception
loop.call_soon_threadsafe(future.set_exception, exc)

async def wrapped(*args: Any, **kwargs: Any) -> tuple[XRootDStatus, T]:
Expand Down Expand Up @@ -141,7 +142,7 @@
handle: client.File


class ReadonlyFileHandleCache:

Check warning on line 145 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring
def __init__(self, loop: Any, max_items: int | None, ttl: int):
self.loop = loop
self._max_items = max_items
Expand Down Expand Up @@ -169,7 +170,7 @@
item.handle.close(callback=lambda *args: None)
cache.clear()

def close_all(self) -> None:

Check warning on line 173 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing function or method docstring
self._close_all(self.loop, self._cache)

async def _close(self, url: str, timeout: int) -> None:
Expand All @@ -182,7 +183,7 @@
close = sync_wrapper(_close)

async def _start_pruner(self) -> None:
self._prune_task = asyncio.create_task(self._pruner())

Check warning on line 186 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Attribute '_prune_task' defined outside __init__

async def _pruner(self) -> None:
while True:
Expand Down Expand Up @@ -219,7 +220,7 @@
return handle


class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]

Check warning on line 223 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Missing class docstring

Check warning on line 223 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_cp_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'

Check warning on line 223 in src/fsspec_xrootd/xrootd.py

View workflow job for this annotation

GitHub Actions / Format

Method '_pipe_file' is abstract in class 'AsyncFileSystem' but is not overridden in child class 'XRootDFileSystem'
protocol = "root"
root_marker = "/"
default_timeout = 60
Expand All @@ -234,6 +235,8 @@
hostid: str,
asynchronous: bool = False,
loop: asyncio.AbstractEventLoop | None = None,
locate_all_sources: bool = True,
valid_sources: list[str] | None = None,
**storage_options: Any,
) -> None:
"""
Expand All @@ -248,10 +251,23 @@
If true, synchronous methods will not be available in this instance
loop:
Bring your own loop (for sync methods)
locate_all_sources = True: bool
Only active for reading (does nothing for writing). Defaults to True.
Finds all locations at which the file is hosted, and chooses from those. Does
not let the redirector pick the first to respond.
valid_sources = None: list
If given and locate_all_sources is True, fsspec will only reject any file host
not in this list. Entries should be of the form ie: `cmsxrootd-site1.fnal.gov`
(no port number)
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
self.timeout = storage_options.get("timeout", XRootDFileSystem.default_timeout)
self.hostid = hostid
self.locate_all_sources = locate_all_sources
if valid_sources:
self.valid_sources = valid_sources
else:
self.valid_sources = []
self._myclient = client.FileSystem("root://" + hostid)
if not self._myclient.url.is_valid():
raise ValueError(f"Invalid hostid: {hostid!r}")
Expand Down Expand Up @@ -756,14 +772,34 @@
if not isinstance(path, str):
raise ValueError(f"Path expected to be string, path: {path}")

self.fs = fs
# Ensure any read-only handle is closed
fs.invalidate_cache(path)

# Try opening with given pathname before trying to locate all sources (if requested)
self._myFile = client.File()
status, _ = self._myFile.open(
status, _n = self._myFile.open(
fs.unstrip_protocol(path),
self.mode,
timeout=self.timeout,
)
if not status.ok and "r" in mode and self.fs.locate_all_sources:
self._hosts = self._locate_sources(path)
# Try hosts until you find an openable file
for i_host in range(len(self._hosts)):
self._myFile = client.File()
status, _n = self._myFile.open(
fs.protocol + "://" + self._hosts[i_host] + "/" + path,
self.mode,
timeout=self.timeout,
)
if status.ok:
# Move hosts that tried and failed to self._dismissed_hosts
self._dismissed_hosts = self._hosts[:i_host]
self._hosts = self._hosts[i_host:]
break
# If above loop cannot find source OR locate_all_sources is off and we
# could not read file initially, end up here
if not status.ok:
raise OSError(f"File did not open properly: {status.message}")

Expand All @@ -773,7 +809,6 @@
self.metaOffset = _deets.size

self.path = path
self.fs = fs
self.mode = mode
self.blocksize = (
self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
Expand Down Expand Up @@ -815,6 +850,59 @@
self.location = None
self.offset = 0

def _locate_sources(self, logical_filename: str) -> list[str]:
"""Find hosts that have the desired file.

Gets a list of hosts from the XRootD server that was provided when the
XRootDFile object was instantiated. Note that this implies it will only find
more hosts of the given file if self.fs is a redirector. Implementation of a
solution from the Pepper project in this issue:

(https://github.com/CoffeaTeam/fsspec-xrootd/issues/36).

If valid_sources is a non-empty list in fs.storage_options, will only return domain names
that are also in valid_sources

Parameters
----------
logical_filename: The logical filename of the file. (ex: "//store/mc/other/stuff/file.root")

Returns
-------
List of domain names that host the file
"""
myclient = self.fs._myclient
# From Pepper:
# The flag PrefName (to get domain names instead of IP addresses) does
# not exist in the Python bindings. However, MAKEPATH has the same value
status, loc = myclient.locate(logical_filename, client.flags.OpenFlags.MAKEPATH)
if loc is None:
raise OSError("XRootD error: " + status.message)
hosts = []
for r in loc:
if len(r.address.split(":")) > 1:
# Strip off the port number if necessary
clean_address = "".join(r.address.split(":")[:-1])
else:
clean_address = r.address
if (clean_address in self.fs.valid_sources) or (
len(self.fs.valid_sources) == 0
):
hosts.append(clean_address)
logging.debug(f"Added host {clean_address} to _hosts")
else:
logging.debug(
f"Host {clean_address} not in valid_sources {self.fs.valid_sources}"
)
if len(hosts) == 0:
err_msg = f"XRootD error: No hosts for file {logical_filename} found using XRootD server {self.fs.storage_options['hostid']}"
if len(self.fs.valid_sources) > 0:
vld_src_msg = f" and valid sources {self.fs.valid_sources}"
raise OSError(err_msg + vld_src_msg)
else:
raise OSError(err_msg)
return hosts

def _fetch_range(self, start: int, end: int) -> Any:
status, data = self._myFile.read(
self.metaOffset + start, self.metaOffset + end - start, timeout=self.timeout
Expand Down
Loading