Skip to content

Commit

Permalink
Added missing files
Browse files Browse the repository at this point in the history
  • Loading branch information
shiroyuki committed Mar 6, 2024
1 parent cb98ccd commit 6c27b0f
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
helpers
Dockerfile-*
!Dockerfile-template
sandbox
Expand Down
149 changes: 149 additions & 0 deletions dnastack/alpha/app/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from copy import deepcopy
from typing import List, Optional, Any, Dict, Iterable, Callable

from dnastack.alpha.app.publisher_helper.collection_wrapper import BlobApiMixin, CollectionApiMixin
from dnastack.alpha.app.publisher_helper.data_connect import SearchOperation
from dnastack.alpha.app.publisher_helper.exceptions import NoCollectionError, TooManyCollectionsError
from dnastack.alpha.app.publisher_helper.models import ItemType, TableInfo, BaseItemInfo, \
BlobInfo
from dnastack.client.collections.client import CollectionServiceClient, UnknownCollectionError
from dnastack.client.collections.model import Collection as CollectionModel
from dnastack.client.data_connect import DataConnectClient
from dnastack.client.drs import DrsClient, Blob
from dnastack.client.factory import EndpointRepository
from dnastack.common.logger import get_logger_for
from dnastack.common.simple_stream import SimpleStream
from dnastack.context.helper import use


class Collection(BlobApiMixin):
""" High-level Collection API Client """

def __init__(self,
factory: EndpointRepository,
cs: CollectionServiceClient,
collection: CollectionModel,
dc: DataConnectClient,
no_auth: bool):
self._logger = get_logger_for(self)
self._factory = factory
self._cs = cs
self._collection = collection
self._no_auth = no_auth
self._drs: DrsClient = self._factory.get_one_of(client_class=DrsClient)
self._dc = dc

def get_record(self) -> CollectionModel:
return self._collection

def query(self, query: str):
return SearchOperation(self._dc, self._no_auth, query)

def list_items(self,
*,
limit: Optional[int],
kind: Optional[ItemType] = None,
kinds: Optional[Iterable[ItemType]] = None,
on_has_more_result: Optional[Callable[[int], None]] = None) -> List[BaseItemInfo]:

assert limit >= 0, 'The limit has to be ZERO (no limit) or at least 1 (to impose the limit).'

slug_name = self._collection.slugName.replace("-", "_")

if kinds is None:
kinds = [kind]

limit_fragment = ""
if limit is not None and limit > 1:
limit_fragment = f" LIMIT {limit + 1}"

items = []
for kind in kinds:
if kind == ItemType.TABLE:
query = f"SELECT * FROM collections.{slug_name}._tables{limit_fragment}"
elif kind == ItemType.BLOB:
query = f"SELECT * FROM collections.{slug_name}._files{limit_fragment}"

items.extend([
self.__simplify_item(i)
for i in self._dc.query(query, no_auth=self._no_auth)
])

row_count = len(items)

if 0 < limit < row_count and on_has_more_result and callable(on_has_more_result):
on_has_more_result(row_count)

return items

def find_blob_by_name(self, objectname: str) -> Blob:
return self._find_blob_by_name(objectname=objectname, column_name='metadata_url')

@staticmethod
def __simplify_item(row: Dict[str, Any]) -> BaseItemInfo:
if row['type'] == ItemType.BLOB.value:
return BlobInfo(**row)
elif row['type'] == ItemType.TABLE.value:
row_copy = deepcopy(row)
row_copy['name'] = (
row.get('qualified_table_name')
or row.get('preferred_name')
or row.get('display_name')
or row['name']
)
return TableInfo(**row_copy)
else:
return BaseItemInfo(**row)


class Publisher(CollectionApiMixin):
""" High-level Publisher API Client
.. code-block:: python
from dnastack.alpha.app.explorer import Publisher
# Data Connect
query = Publisher(<url>).query('SELECT * FROM collections.public_datasets.metadata LIMIT 5')
df = query.to_data_frame()
rows = query.to_list()
# DRS (not yet manually tested with collections with blobs)
blob: Optional[Blob] = collection.blob(id='123-456') or collection.blob(name='foo-bar')
blobs: Dict[str, Optional[Blob]] = collection.blobs(ids=['123-456']) or collection.blobs(names=['foo-bar'])
"""

def __init__(self, context_name_or_url: str, *, no_auth: bool = False):
self._logger = get_logger_for(self)
self._context_name_or_url = context_name_or_url
self._factory = use(self._context_name_or_url, no_auth=no_auth)
self._no_auth = no_auth
self._cs: CollectionServiceClient = self._factory.get_one_of(client_class=CollectionServiceClient)
self._dc = self.data_connect()

def collection(self, id_or_slug_name: Optional[str] = None, *, name: Optional[str] = None) -> Collection:
return Collection(self._factory,
self._cs,
self.get_collection_info(id_or_slug_name=id_or_slug_name, name=name),
self._dc,
no_auth=self._no_auth)

def query(self, query: str):
return SearchOperation(self._dc, self._no_auth, query)

def data_connect(self) -> DataConnectClient:
default_no_auth_properties = {'authentication': None, 'fallback_authentications': None}

# Look up for any similar registered service endpoint.
matched_endpoints = self._factory.all(client_class=DataConnectClient)
if matched_endpoints:
target_endpoint = matched_endpoints[0]
else:
raise RuntimeError('Unable to find a usable Data Connect endpoint')

return DataConnectClient.make(
target_endpoint.copy(update=default_no_auth_properties)
if self._no_auth
else target_endpoint
)
Empty file.
85 changes: 85 additions & 0 deletions dnastack/alpha/app/publisher_helper/collection_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from typing import Optional, Dict, List

from dnastack.alpha.app.publisher_helper.exceptions import NoCollectionError, TooManyCollectionsError
from dnastack.client.collections.client import UnknownCollectionError
from dnastack.client.collections.model import Collection as CollectionModel
from dnastack.client.drs import Blob
from dnastack.common.simple_stream import SimpleStream


class CollectionApiMixin:
def list_collections(self) -> List[CollectionModel]:
return self._cs.list_collections(no_auth=self._no_auth)

def get_collection_info(self, id_or_slug_name: Optional[str] = None, *, name: Optional[str] = None) -> CollectionModel:
# NOTE: "ID" and "slug name" are unique identifier whereas "name" is not.
assert id_or_slug_name or name, 'One of the arguments MUST be defined.'

if id_or_slug_name is not None:
try:
collection = self._cs.get(id_or_slug_name, no_auth=self._no_auth)
except UnknownCollectionError as e:
raise NoCollectionError(id_or_slug_name) from e
return collection
elif name is not None:
assert name.strip(), 'The name cannot be empty.'
target_collections = SimpleStream(self._cs.list_collections(no_auth=self._no_auth)) \
.filter(lambda endpoint: name == endpoint.name) \
.to_list()
if len(target_collections) == 1:
return target_collections[0]
elif len(target_collections) == 0:
raise NoCollectionError(name)
else:
raise TooManyCollectionsError(target_collections)
else:
raise NotImplementedError()


class BlobApiMixin:
def blob(self, *, id: Optional[str] = None, name: Optional[str] = None) -> Optional[Blob]:
blobs = self.blobs(ids=[id] if id else [], names=[name] if name else [])
if blobs:
return blobs.get(id if id is not None else name)
else:
return None

def blobs(self, *, ids: Optional[List[str]] = None, names: Optional[List[str]] = None) -> Dict[str, Optional[Blob]]:
assert ids or names, 'One of the arguments MUST be defined.'

if ids:
conditions: str = ' OR '.join([
f"(id = '{id}')"
for id in ids
])
elif names:
conditions: str = ' OR '.join([
f"(name = '{name}')"
for name in names
])
else:
raise NotImplementedError()

collection: CollectionModel = self._collection

id_to_name_map: Dict[str, str] = SimpleStream(
self.query(f"SELECT id, name FROM ({collection.itemsQuery}) WHERE {conditions}").load_data()
).to_map(lambda row: row['id'], lambda row: row['name'])

return {
id if ids is not None else id_to_name_map[id]: self._drs.get_blob(id)
for id in id_to_name_map.keys()
}

def _find_blob_by_name(self,
objectname: str,
column_name: str) -> Blob:
collection: CollectionModel = self._collection

db_slug = collection.slugName.replace("-", "_")

# language=sql
q = f"SELECT {column_name} FROM collections.{db_slug}._files WHERE name='{objectname}' LIMIT 1"

results = self.query(q)
return self._drs.get_blob(next(results.load_data())[column_name], no_auth=self._no_auth)
26 changes: 26 additions & 0 deletions dnastack/alpha/app/publisher_helper/data_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Iterator, Dict, Any, List

from dnastack import DataConnectClient
from dnastack.common.exceptions import DependencyError


class SearchOperation:
def __init__(self, dc: DataConnectClient, no_auth: bool, query: str):
self._dc = dc
self._no_auth = no_auth
self.__query = query

def load_data(self) -> Iterator[Dict[str, Any]]:
return self._dc.query(self.__query, no_auth=self._no_auth)

def to_list(self) -> List[Dict[str, Any]]:
return [row for row in self.load_data()]

def to_data_frame(self):
try:
# We delay the import as late as possible so that the optional dependency (pandas)
# does not block the other functionalities of the library.
import pandas as pd
return pd.DataFrame(self.load_data())
except ImportError:
raise DependencyError('pandas')
6 changes: 6 additions & 0 deletions dnastack/alpha/app/publisher_helper/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class NoCollectionError(RuntimeError):
pass


class TooManyCollectionsError(RuntimeError):
pass
34 changes: 34 additions & 0 deletions dnastack/alpha/app/publisher_helper/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from abc import ABC
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List

from pydantic import Field, BaseModel


class BaseItemInfo(BaseModel, ABC):
""" Base Simplified Library Item
Based on https://github.com/DNAstack/indexing-service/blob/main/src/main/java/com/dnastack/indexingservice/library/LibraryItem.java.
"""
id: str
name: str
type: str
size: int
size_unit: str
item_updated_time: datetime


class TableInfo(BaseItemInfo):
""" Simplified Library Item """
json_schema: Dict[str, Any]


class BlobInfo(BaseItemInfo):
""" Simplified Library Item """
checksums: List[Dict[str, str]] = Field(default_factory=list)


class ItemType(Enum):
BLOB = 'blob'
TABLE = 'table'

0 comments on commit 6c27b0f

Please sign in to comment.