Skip to content

Commit

Permalink
Merge pull request #1 from kbase/db-specific-search
Browse files Browse the repository at this point in the history
Changes to support CDM data import
  • Loading branch information
jeff-cohere authored Aug 14, 2024
2 parents 6000f85 + ee92b7f commit db00daa
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 72 deletions.
48 changes: 48 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: tests

# This action is triggered:
# 1. when someone creates a pull request (to any branch)
# 2. when changes are merged into the main branch (via a pull request)
on:
push:
branches: [ main ]
pull_request:
branches: [ '*' ]

jobs:
test:
runs-on: ${{ matrix.os }}
container: ${{ matrix.container }}

# we support Linux and macOS
strategy:
matrix:
os: [ubuntu-latest, macos-latest]

# Steps for running tests and analysis.
steps:
- name: Checking out repository (${{ matrix.os }})
uses: actions/checkout@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
submodules: recursive

- name: Setting up Python 3.12 (${{ matrix.os }})
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Installing dtspy dependencies (${{ matrix.os }})
run: python3 -m pip install -r requirements.txt

- name: Running tests (${{ matrix.os }})
run: coverage run -m unittest discover
env:
DTS_KBASE_DEV_TOKEN: ${{ secrets.DTS_KBASE_DEV_TOKEN }}

# add this when ready
#- if: ${{ matrix.os == 'ubuntu-latest' }}
# name: Uploading coverage report to codecov.io
# uses: codecov/[email protected]
# with:
# token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# dtspy

![Tests](https://github.com/kbase/dtspy/actions/workflows/tests.yml/badge.svg)

Python client for the Data Transfer Service
174 changes: 102 additions & 72 deletions dts/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
from frictionless.resources import JsonResource
import io
import requests
from requests.auth import AuthBase
import logging
Expand All @@ -21,12 +20,13 @@ class KBaseAuth(AuthBase):
def __init__(self, api_key):
self.api_key = api_key

def __call__(self, r):
token = base64.b64encode(bytes(self.api_key + '\n', 'utf-8'))
r.headers['Authorization'] = f'Bearer {token.decode('utf-8')}'
return r
def __call__(self, request):
b64_token = base64.b64encode(bytes(self.api_key + '\n', 'utf-8'))
token = b64_token.decode('utf-8')
request.headers['Authorization'] = f'Bearer {token}'
return request

class Client(object):
class Client:
"""`Client`: A client for performing file transfers with the Data Transfer System"""
def __init__(self,
api_key = None,
Expand All @@ -39,6 +39,8 @@ def __init__(self,
self.connect(server = server, port = port, api_key = api_key)
else:
self.uri = None
self.name = None
self.version = None

def connect(self,
api_key = None,
Expand All @@ -48,11 +50,11 @@ def connect(self,
* Connects the client to the given DTS `server` via the given `port` using the given
(unencoded) `api_key`."""
if type(api_key) != str:
if not isinstance(api_key, str):
raise TypeError('api_key must be an unencoded API key.')
if type(server) != str:
if not isinstance(server, str):
raise TypeError('server must be a URI for a DTS server.')
if port and type(port) != int:
if port and not isinstance(port, int):
raise TypeError('port must be an integer')
self.auth = KBaseAuth(api_key)
if port:
Expand Down Expand Up @@ -91,105 +93,133 @@ def databases(self):
except Exception as err:
logger.error(f'Other error occurred: {err}')
return None
else:
results = response.json()
return [Database(id = r['id'],
name = r['name'],
organization = r['organization'],
url = r['url']) for r in results]
results = response.json()
return [Database(id = r['id'],
name = r['name'],
organization = r['organization'],
url = r['url']) for r in results]

def search(self,
database = None,
query = None,
status = None,
offset = 0,
limit = None,
specific = None,
):
"""
`client.search(database = None,
query = None,
status = None,
offset = 0,
limit = None) -> `list` of `frictionless.DataResource` objects
limit = None,
specific = None) -> `list` of `frictionless.DataResource` objects
* Performs a synchronous search of the database with the given name using the
given query string.
Optional arguments:
* query: a search string that is directly interpreted by the database
* status: filters for files based on their status:
* `"staged"` means "search only for files that are already in the source database staging area"
* `"archived"` means "search only for files that are archived and not staged"
* `"unstaged"` means "search only for files that are not staged"
* offset: a 0-based index from which to start retrieving results (default: 0)
* limit: if given, the maximum number of results to retrieve
* specific: a dictionary mapping database-specific search parameters to their values
"""
params = {
'database': database,
'query': query,
}
if not self.uri:
raise RuntimeError('dts.Client: not connected.')
if type(database) != str:
if query:
if not isinstance(query, str):
# we also accept numeric values
if isinstance(query, int) or isinstance(query, float):
query = str(query)
else:
raise RuntimeError('search: query must be a string or a number.')
else:
raise RuntimeError('search: missing query.')
if not isinstance(database, str):
raise TypeError('search: database must be a string.')
if type(offset) != int or offset < 0:
raise TypeError('search: invalid offset: %s.'%offset)
if status:
if status not in ['staged', 'unstaged']:
raise TypeError(f'search: invalid status: {status}.')
params['status'] = status
if offset:
if not str(offset).isdigit():
raise TypeError('search: offset must be numeric')
if int(offset) < 0:
raise ValueError(f'search: offset must be non-negative')
params['offset'] = int(offset)
if limit:
if type(limit) != int:
raise TypeError('search: limit must be an int.')
elif limit < 1:
raise TypeError(f'search: invalid number of retrieved results: {N}')
if not str(limit).isdigit():
raise TypeError('search: limit must be numeric')
if int(limit) < 1:
raise ValueError(f'search: limit must be greater than 1')
params['limit'] = int(limit)
if specific:
if not isinstance(specific, dict):
raise TypeError('search: specific must be a dict.')
params['specific'] = specific
try:
params = {
'database': database,
'query': query,
'status': status,
'offset': offset,
'limit': limit,
}
response = requests.get(url=f'{self.uri}/files', params=params, auth=self.auth)
response = requests.post(url=f'{self.uri}/files',
json=params,
auth=self.auth)
response.raise_for_status()
except HTTPError as http_err:
logger.error(f'HTTP error occurred: {http_err}')
except (HTTPError, requests.exceptions.HTTPError) as err:
logger.error(f'HTTP error occurred: {err}')
return None
except Exception as err:
logger.error(f'Other error occurred: {err}')
return None
else:
return [JsonResource(r) for r in response.json()['resources']]
return [JsonResource(r) for r in response.json()['resources']]

def transfer(self,
file_ids = None,
source = None,
destination = None):
destination = None,
timeout = None):
"""
`client.transfer(file_ids = None,
source = None,
destination = None) -> UUID
destination = None,
timeout = None) -> UUID
* Submits a request to transfer files from a source to a destination database. the
files in the source database are identified by a list of string file_ids.
"""
if not self.uri:
raise RuntimeError('dts.Client: not connected.')
if type(source) != str:
if not isinstance(source, str):
raise TypeError('transfer: source database name must be a string.')
if type(destination) != str:
if not isinstance(destination, str):
raise TypeError('transfer: destination database name must be a string.')
if type(file_ids) != list:
raise TypeError('batch: sequences must be a list of string file IDs.')
if not isinstance(file_ids, list):
raise TypeError('transfer: file_ids must be a list of string file IDs.')
if timeout and not isinstance(timeout, int) and not isinstance(timeout, float):
raise TypeError('transfer: timeout must be a number of seconds.')
try:
response = requests.post(f'{self.uri}/transfers',
data={
source: source,
destination: destination,
file_ids: file_ids,
})
response = requests.post(url=f'{self.uri}/transfers',
json={
'source': source,
'destination': destination,
'file_ids': file_ids,
},
auth=self.auth,
timeout=timeout)
response.raise_for_status()
except HTTPError as http_err:
logger.error(f'HTTP error occurred: {http_err}')
except (HTTPError, requests.exceptions.HTTPError) as err:
logger.error(f'HTTP error occurred: {err}')
return None
except Exception as err:
logger.error(f'Other error occurred: {err}')
return None
else:
return uuid.UUID(response.json()["id"])
return uuid.UUID(response.json()["id"])

def transferStatus(self, id):
"""`client.transferStatus(id)` -> TransferStatus
def transfer_status(self, id):
"""`client.transfer_status(id)` -> TransferStatus
* Returns status information for the transfer with the given identifier.
Possible statuses are:
Expand All @@ -205,43 +235,43 @@ def transferStatus(self, id):
if not self.uri:
raise RuntimeError('dts.Client: not connected.')
try:
response = requests.get(f'{self.uri}/transfers/{str(id)}')
response = requests.get(url=f'{self.uri}/transfers/{id}',
auth=self.auth)
response.raise_for_status()
except HTTPError as http_err:
except (HTTPError, requests.exceptions.HTTPError) as err:
logger.error(f'HTTP error occurred: {http_err}')
return None
except Exception as err:
logger.error(f'Other error occurred: {err}')
return None
else:
results = response.json()
return TransferStatus(
id = response['id'],
status = response['status'],
message = response['message'] if 'message' in response else None,
num_files = response['num_files'],
num_files_transferred = response['num_files_transferred'],
)

def deleteTransfer(self, id):
results = response.json()
return TransferStatus(
id = results.get('id'),
status = results.get('status'),
message = results.get('message'),
num_files = results.get('num_files'),
num_files_transferred = results.get('num_files_transferred'),
)

def cancel_transfer(self, id):
"""
`client.deleteTransfer(id) -> None
`client.cancel_transfer(id) -> None
* Deletes a file transfer, canceling
"""
if not self.uri:
raise RuntimeError('dts.Client: not connected.')
try:
response = requests.delete(f'{self.uri}/transfers/{str(id)}')
response = requests.delete(url=f'{self.uri}/transfers/{id}',
auth=self.auth)
response.raise_for_status()
except HTTPError as http_err:
except (HTTPError, requests.exceptions.HTTPError) as err:
logger.error(f'HTTP error occurred: {http_err}')
return None
except Exception as err:
logger.error(f'Other error occurred: {err}')
return None
else:
return None
return None

def __repr__(self):
if self.uri:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ certifi==2024.7.4
chardet==5.2.0
charset-normalizer==3.3.2
click==8.1.7
coverage==7.6.0
frictionless==5.17.0
humanize==4.9.0
idna==3.7
Expand Down
Empty file added test/__init__.py
Empty file.
Loading

0 comments on commit db00daa

Please sign in to comment.