Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register backends on the fly #538

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"ncols",
"ndarray",
"NOQA",
"notreallyatoken",
"nqueries",
"ntuples",
"numpy",
Expand Down Expand Up @@ -101,7 +102,6 @@
],
"python.analysis.typeCheckingMode": "basic",
"python.testing.pytestArgs": [
"--cov=servicex",
"tests"
],
"python.testing.unittestEnabled": false,
Expand Down
36 changes: 34 additions & 2 deletions servicex/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import os
import tempfile
from pathlib import Path, PurePath
from typing import List, Optional, Dict
from typing import Callable, List, Optional, Dict

from pydantic import BaseModel, Field, AliasChoices, model_validator
# TODO: allow including this, but current import loop
# from servicex.models import TransformResult
from servicex.protocols import MinioAdapterProtocol

import yaml

Expand All @@ -39,6 +42,13 @@ class Endpoint(BaseModel):
endpoint: str
name: str
token: Optional[str] = ""
# TODO: don't know how to use ServiceXAdapterProtocol here as pydantic can't handle it
adapter: Optional[object] = None
# TODO: TransformResult causes an import loop, so call it object for now.
minio: Optional[Callable[[object], MinioAdapterProtocol]] = None


g_registered_endpoints: List[Endpoint] = []


class Configuration(BaseModel):
Expand Down Expand Up @@ -101,7 +111,9 @@ def read(cls, config_path: Optional[str] = None):
yaml_config = cls._add_from_path(walk_up_tree=True)

if yaml_config:
return Configuration.model_validate(yaml_config)
r = Configuration.model_validate(yaml_config)
r.api_endpoints += g_registered_endpoints
return r
else:
path_extra = f"in {config_path}" if config_path else ""
raise NameError(
Expand Down Expand Up @@ -144,3 +156,23 @@ def _add_from_path(cls, path: Optional[Path] = None, walk_up_tree: bool = False)
dir = dir.parent

return config

@classmethod
def register_endpoint(cls, ep: Endpoint):
'''Store this endpoint registration

Args:
ep: Endpoint object to register
'''
# TODO: This requires exposing Endpoint
# There is no check in this setup that the adaptor is a valid ServiceXAdapterProtocol
# because I couldn't figure out how to make pydantic handle a protocol object.
global g_registered_endpoints
g_registered_endpoints.append(ep)

@classmethod
def clear_registered_endpoints(cls):
'''Clear the list of registered endpoints.
'''
global g_registered_endpoints
g_registered_endpoints = []
52 changes: 52 additions & 0 deletions servicex/protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pathlib import Path
from typing import List, Protocol

from servicex.models import CachedDataset, TransformRequest, TransformStatus, ResultFile


class ServiceXAdapterProtocol(Protocol):
async def get_transforms(self) -> List[TransformStatus]:
...

def get_code_generators(self):
...

async def get_datasets(
self, did_finder=None, show_deleted=False
) -> List[CachedDataset]:
...

async def get_dataset(self, dataset_id=None) -> CachedDataset:
...

async def delete_dataset(self, dataset_id=None) -> bool:
...

async def delete_transform(self, transform_id=None):
...

async def submit_transform(self, transform_request: TransformRequest) -> str:
...

async def get_transform_status(self, request_id: str) -> TransformStatus:
...


class MinioAdapterProtocol(Protocol):
async def list_bucket(self) -> List[ResultFile]:
...

async def download_file(
self, object_name: str, local_dir: str, shorten_filename: bool = False) -> Path:
...

async def get_signed_url(self, object_name: str) -> str:
...

@classmethod
def for_transform(cls, transform: TransformStatus) -> 'MinioAdapterProtocol':
...

@classmethod
def hash_path(cls, file_name: str) -> str:
...
9 changes: 7 additions & 2 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from abc import ABC
from asyncio import Task, CancelledError
import logging
from typing import List, Optional, Union
from typing import Callable, List, Optional, Union
from servicex.expandable_progress import ExpandableProgress
from rich.logging import RichHandler

Expand All @@ -51,6 +51,7 @@
)
from servicex.query_cache import QueryCache
from servicex.servicex_adapter import ServiceXAdapter
from servicex.protocols import MinioAdapterProtocol

from make_it_sync import make_sync

Expand All @@ -66,6 +67,7 @@ class ServiceXException(Exception):


class Query:

def __init__(
self,
dataset_identifier: DID,
Expand All @@ -74,6 +76,7 @@ def __init__(
sx_adapter: ServiceXAdapter,
config: Configuration,
query_cache: Optional[QueryCache],
minio_generator: Callable[[TransformRequest], MinioAdapterProtocol],
servicex_polling_interval: int = 5,
minio_polling_interval: int = 5,
result_format: ResultFormat = ResultFormat.parquet,
Expand Down Expand Up @@ -116,6 +119,8 @@ def __init__(
self.files_completed = None
self._return_qastle = True

self.minio_generator = minio_generator

self.request_id = None
self.ignore_cache = ignore_cache
self.fail_if_incomplete = fail_if_incomplete
Expand Down Expand Up @@ -483,7 +488,7 @@ async def retrieve_current_transform_status(self):
# status. This includes the minio host and credentials. We use the
# transform id as the bucket.
if not self.minio:
self.minio = MinioAdapter.for_transform(self.current_status)
self.minio = self.minio_generator(self.current_status)

async def download_files(
self,
Expand Down
19 changes: 14 additions & 5 deletions servicex/servicex_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)
from servicex.types import DID
from servicex.dataset_group import DatasetGroup
from servicex.minio_adapter import MinioAdapter

from make_it_sync import make_sync
from servicex.databinder_models import ServiceXSpec, General, Sample
Expand Down Expand Up @@ -236,6 +237,7 @@ def __init__(self, backend=None, url=None, config_path=None):
will search in local directory and up in enclosing directories
"""
self.config = Configuration.read(config_path)
# TODO: Remove this as an instance var (no reason to carry it around?).
self.endpoints = self.config.endpoint_dict()

if not url and not backend:
Expand All @@ -250,13 +252,18 @@ def __init__(self, backend=None, url=None, config_path=None):

if url:
self.servicex = ServiceXAdapter(url)
self.minio_generator = MinioAdapter.for_transform
elif backend:
if backend not in self.endpoints:
raise ValueError(f"Backend {backend} not defined in .servicex file")
self.servicex = ServiceXAdapter(
self.endpoints[backend].endpoint,
refresh_token=self.endpoints[backend].token,
ep = self.endpoints[backend]
self.servicex = (
ep.adapter if ep.adapter is not None else ServiceXAdapter(
self.endpoints[backend].endpoint,
refresh_token=self.endpoints[backend].token,
)
)
self.minio_generator = MinioAdapter.for_transform if ep.minio is None else ep.minio

self.query_cache = QueryCache(self.config)
self.code_generators = set(self.get_code_generators(backend).keys())
Expand Down Expand Up @@ -364,19 +371,21 @@ def generic_query(
if real_codegen not in self.code_generators:
raise NameError(
f"{codegen} code generator not supported by serviceX "
f"deployment at {self.servicex.url}"
f"deployment at {self.servicex.url}. Supported codegens are "
f"[{', '.join(self.code_generators)}]"
)

qobj = Query(
dataset_identifier=dataset_identifier,
sx_adapter=self.servicex,
title=title,
minio_generator=self.minio_generator,
codegen=real_codegen,
config=self.config,
query_cache=self.query_cache,
result_format=result_format,
ignore_cache=ignore_cache,
query_string_generator=query,
fail_if_incomplete=fail_if_incomplete
fail_if_incomplete=fail_if_incomplete,
)
return qobj
10 changes: 9 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TransformStatus,
TransformedResults,
)
from servicex.minio_adapter import MinioAdapter

from servicex.dataset_identifier import FileListDataset
from servicex.minio_adapter import MinioAdapter
Expand Down Expand Up @@ -72,7 +73,8 @@ def python_dataset(dummy_parquet_file):
result_format=ResultFormat.parquet,
sx_adapter=None, # type: ignore
config=None, # type: ignore
query_cache=None # type: ignore
query_cache=None, # type: ignore
minio_generator=MinioAdapter.for_transform,
) # type: ignore

def foo():
Expand Down Expand Up @@ -218,3 +220,9 @@ def codegen_list():
'python': 'http://servicex-code-gen-python:8000',
'uproot': 'http://servicex-code-gen-uproot:8000',
'uproot-raw': 'http://servicex-code-gen-uproot-raw:8000'}


@fixture(autouse=True)
def clear_registered_endpoints():
from servicex.configuration import Configuration
Configuration.clear_registered_endpoints()
59 changes: 58 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from unittest.mock import patch
import pytest

from servicex.configuration import Configuration
from servicex.configuration import Configuration, Endpoint


@patch('servicex.configuration.tempfile.gettempdir', return_value="./mytemp")
Expand All @@ -52,6 +52,21 @@ def test_config_read(tempdir):
Configuration.read(config_path="invalid.yaml")


@patch('servicex.configuration.tempfile.gettempdir', return_value="./mytemp")
def test_config_endpoint_dict(tempdir):
os.environ['UserName'] = "p_higgs"
c = Configuration.read(config_path="tests/example_config.yaml")
endpoints = c.endpoint_dict()
assert len(endpoints) == 3
assert "servicex-uc-af" in endpoints

# Make sure we get back what we expect
ep = endpoints["servicex-uc-af"]
assert ep.endpoint == "https://servicex.af.uchicago.edu"
assert ep.name == "servicex-uc-af"
assert ep.token == "notreallyatoken"


@patch('servicex.configuration.tempfile.gettempdir', return_value="./mytemp")
def test_default_cache_path(tempdir):

Expand All @@ -66,3 +81,45 @@ def test_default_cache_path(tempdir):
c = Configuration.read(config_path="tests/example_config_no_cache_path.yaml")
assert c.cache_path == "mytemp/servicex_p_higgs"
del os.environ['USER']


@patch("servicex.configuration.tempfile.gettempdir", return_value="./mytemp")
def test_config_register(tempdir):
Configuration.register_endpoint(
Endpoint(
endpoint="https://servicex.cern.ch",
name="servicex-cern",
token="notreallyatoken2",
)
)

os.environ["UserName"] = "p_higgs"
c = Configuration.read(config_path="tests/example_config.yaml")
endpoints = c.endpoint_dict()
assert len(endpoints) == 4
assert "servicex-cern" in endpoints

# Make sure we get back what we expect
ep = endpoints["servicex-cern"]
assert ep.endpoint == "https://servicex.cern.ch"


@patch("servicex.configuration.tempfile.gettempdir", return_value="./mytemp")
def test_config_register_adaptor(tempdir):
'Make sure we can do this with an adaptor'
class MyAdaptor:
pass

Configuration.register_endpoint(
Endpoint(
name="my-adaptor",
adapter=MyAdaptor,
endpoint="",
)
)

os.environ["UserName"] = "p_higgs"
c = Configuration.read(config_path="tests/example_config.yaml")
endpoints = c.endpoint_dict()
assert len(endpoints) == 4
assert "my-adaptor" in endpoints
Loading
Loading