Skip to content

Commit

Permalink
DLHub Removal + making https download more robust (#423)
Browse files Browse the repository at this point in the history
* Removing DLHub and fixing https download bug

* Add https streaming download with monitor + flake8 fixes

* make it easier to test locally with confidential client credentials

* Minimizing requirements

* updating GitHub Actions to latest versions (there were breaking issues with caching)

* Pinning pydantic less than 2
  • Loading branch information
blaiszik authored Jan 15, 2024
1 parent 5456145 commit 0ebdf89
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 98 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
CLIENT_SECRET: ${{ secrets.CLIENT_SECRET }}
name: build
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache : 'pip'
Expand Down
71 changes: 4 additions & 67 deletions foundry/foundry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pandas as pd
from pydantic import ValidationError
from typing import Any, Dict, List
from typing import Any, Dict, List, AnyStr
import logging
import warnings
import os
Expand All @@ -14,7 +14,6 @@

from mdf_connect_client import MDFConnectClient
from mdf_forge import Forge
from dlhub_sdk import DLHubClient
from globus_sdk import AuthClient

from .auth import PubAuths
Expand All @@ -37,15 +36,14 @@ class Foundry(FoundryBase):
"""Foundry Client Base Class
Foundry object used for all interactions with Foundry datasets and models. Interfaces with MDF Connect Client,
Globus Compute, Globus Auth, Globus Transfer, Globus Search, DLHub, and relevant Globus Endpoints
Globus Compute, Globus Auth, Globus Transfer, Globus Search, and relevant Globus Endpoints
"""

dlhub_client: Any
forge_client: Any
connect_client: Any
transfer_client: Any
auth_client: Any
index = ""
index: AnyStr = ""
auths: Any

def __init__(
Expand Down Expand Up @@ -84,9 +82,7 @@ def __init__(
"search",
"petrel",
"transfer",
"dlhub",
"openid",
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all", # funcx
"https://auth.globus.org/scopes/f10a69a9-338c-4e5b-baa1-0dc92359ab47/https", # Eagle HTTPS
"https://auth.globus.org/scopes/82f1b5c6-6e9b-11e5-ba47-22000b92c6ec/https", # NCSA HTTPS
"https://auth.globus.org/scopes/d31d4f5d-be37-4adc-a761-2f716b7af105/action_all", # Globus Search Lambda
Expand Down Expand Up @@ -115,7 +111,7 @@ def __init__(
search_client=self.auths["search"],
transfer_client=self.auths["transfer"],
data_mdf_authorizer=self.auths["data_mdf"],
petrel_authorizer=self.auths["petrel"],
petrel_authorizer=self.auths["petrel"]
)

self.transfer_client = self.auths['transfer']
Expand All @@ -132,19 +128,6 @@ def __init__(
authorizer=self.auths["mdf_connect"], test=test
)

self.dlhub_client = DLHubClient(
dlh_authorizer=self.auths["dlhub"],
search_authorizer=self.auths["search_authorizer"],
fx_authorizer=self.auths[
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all"
],
openid_authorizer=self.auths['openid'],
sl_authorizer=self.auths[
"https://auth.globus.org/scopes/d31d4f5d-be37-4adc-a761-2f716b7af105/action_all"
],
force_login=False,
)

def load(self, name, download=True, globus=False, verbose=False, metadata=None, authorizers=None, **kwargs):
"""Load the metadata for a Foundry dataset into the client
Expand Down Expand Up @@ -250,22 +233,6 @@ def list(self):
"""
return self.search()

def run(self, name, inputs, funcx_endpoint=None, **kwargs):
"""Run a model on inputted data
Args:
name (str): DLHub model name
inputs: Data to send to DLHub as inputs (should be JSON serializable, example types include dict, list,
np.ndarray, etc)
funcx_endpoint (str) (optional): UUID for the funcx endpoint to run the model on, if not the default (eg River)
Returns:
Results after invocation via the DLHub service
"""
if funcx_endpoint is not None:
self.dlhub_client.fx_endpoint = funcx_endpoint
return self.dlhub_client.run(name, inputs=inputs, **kwargs)

def load_data(self, source_id=None, globus=True, as_hdf5=False, splits=[]):
"""Load in the data associated with the prescribed dataset
Expand Down Expand Up @@ -469,36 +436,6 @@ def publish_dataset(
res = None
return res

def publish_model(self, title, creators, short_name, servable_type, serv_options, affiliations=None, paper_doi=None):
"""Simplified publishing method for servables
Args:
title (string): title for the servable
creators (string | list): either the creator's name (FamilyName, GivenName) or a list of the creators' names
short_name (string): shorthand name for the servable
servable_type (string): the type of the servable, must be a member of ("static_method",
"class_method",
"keras",
"pytorch",
"tensorflow",
"sklearn")
serv_options (dict): the servable_type specific arguments that are necessary for publishing. arguments can
be found at https://dlhub-sdk.readthedocs.io/en/latest/source/dlhub_sdk.models.servables.html
under the appropriate ``create_model`` signature. use the argument names as keys and their values as
the values.
affiliations (list): list of affiliations for each author
paper_doi (str): DOI of a paper that describes the servable
Returns:
(string): task id of this submission, can be used to check for success
Raises:
ValueError: If the given servable_type is not in the list of acceptable types
Exception: If the serv_options are incomplete or the request to publish results in an error
"""
return self.dlhub_client.easy_publish(title, creators, short_name, servable_type, serv_options, affiliations,
paper_doi)

def check_status(self, source_id, short=False, raw=False):
"""Check the status of your submission.
Expand Down
44 changes: 35 additions & 9 deletions foundry/https_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,55 @@ def _get_files(tc, ep, queue, max_depth):


# TODO (wardlt): Avoid passing dictionaries, as documenting their content is tedious
def download_file(item, https_config):
def download_file(item, https_config, base_directory="data/", timeout=1800):
"""Download a file to disk
Args:
item: Dictionary defining the path to the file
https_config: Configuration defining the URL of the server and the name of the dataset
"""
url = f"{https_config['base_url']}{item['path']}{item['name']}"
base_url = https_config.get('base_url', '').rstrip('/')
path = item.get('path', '').strip('/')

# build destination path for data file
destination = os.path.join("data/", https_config['source_id'], item['name'])
# Extracting the name and subdirectory from the item
name = item.get('name', '')
subdirectory = name.split('/')[0] if '/' in name else ''

# Avoid duplication of subdirectory in path
if subdirectory and path.endswith(subdirectory):
full_path = f"{path}/{name.split('/', 1)[-1]}".strip('/')
else:
full_path = '/'.join([path, name]).strip('/')

url = f"{base_url}/{full_path}"

# build destination path for data file
destination = os.path.join(base_directory, https_config['source_id'], item['name'])
parent_path = os.path.split(destination)[0]

# if parent directories don't exist, create them
if not os.path.exists(parent_path):
os.makedirs(parent_path)

response = requests.get(url)

# write file to local destination
with open(destination, "wb") as f:
f.write(response.content)
try:
with requests.get(url, stream=True, timeout=timeout) as response:
response.raise_for_status()

downloaded_size = 0
print(f"\rStarting Download of: {url}")

with open(destination, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# Calculate and print the download progress
print(f"\rDownloading... {downloaded_size/(1<<20):,.2f} MB", end="")
return destination
except requests.exceptions.RequestException as e:
print(f"Error downloading file: {e}")
except IOError as e:
print(f"Error writing file to disk: {e}")

# TODO (wardlt): Should we just return the key?
return {destination + " status": True}
2 changes: 1 addition & 1 deletion foundry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class FoundryConfig(BaseModel):
metadata_file: Optional[str] = "foundry_metadata.json"
destination_endpoint: Optional[str] = None
local: Optional[bool] = False
local_cache_dir = "./data"
local_cache_dir: Optional[str] = "./data"
metadata_key: Optional[str] = "foundry"
organization: Optional[str] = "foundry"

Expand Down
5 changes: 1 addition & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
globus-sdk>=3,<4
dlhub_sdk>=2.0.3
requests>=2.18.4
tqdm>=4.19.4
six>=1.11.0
h5py>=2.10.0
numpy>=1.15.4
pandas>=0.23.4
scikit-learn>=1.0
pydantic>=1.6.1
pydantic<2.0.0
mdf_forge>=0.8.0
mdf-connect-client>=0.4.0
json2table>=1.1.5
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
install_requires=[
"mdf_forge>=0.8.0",
"globus-sdk>=3,<4",
"dlhub_sdk>=1.0.0",
"numpy>=1.15.4",
"pandas>=0.23.4",
"pydantic>=1.4",
"pydantic<2.0.0",
"mdf_connect_client>=0.4.0",
"h5py>=2.10.0",
"json2table"
Expand Down
23 changes: 10 additions & 13 deletions tests/test_foundry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@
from foundry import Foundry
from foundry.auth import PubAuths
from foundry.https_upload import upload_to_endpoint
from dlhub_sdk import DLHubClient
from globus_sdk import AuthClient
from mdf_connect_client import MDFConnectClient


client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
is_gha = os.getenv("GITHUB_ACTIONS")
confidential_login = (os.getenv("GITHUB_ACTIONS") or (client_id and client_secret))

services = [
"data_mdf",
"mdf_connect",
"search",
"petrel",
"transfer",
"dlhub",
"openid",
"https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all", # funcx
"https://auth.globus.org/scopes/f10a69a9-338c-4e5b-baa1-0dc92359ab47/https", # Eagle HTTPS
"https://auth.globus.org/scopes/82f1b5c6-6e9b-11e5-ba47-22000b92c6ec/https", # NCSA HTTPS
"https://auth.globus.org/scopes/d31d4f5d-be37-4adc-a761-2f716b7af105/action_all", # Globus Search Lambda
]

if is_gha:
if confidential_login:
# Use confidential login if the tests are being run on GitHub Actions or
# if a client ID and secret are provided
auths = mdf_toolbox.confidential_login(client_id=client_id,
client_secret=client_secret,
services=services, make_clients=True)
Expand All @@ -46,6 +45,7 @@
client_secret=client_secret,
services=["search"], make_clients=False)
else:
# Otherwise try to allow the user to login directly
auths = mdf_toolbox.login(services=services, make_clients=True)
search_auth = mdf_toolbox.login(services=["search"], make_clients=False)

Expand Down Expand Up @@ -211,16 +211,13 @@ def test_foundry_init():
assert isinstance(f.forge_client, Forge)
assert isinstance(f.connect_client, MDFConnectClient)

if not is_gha:
assert isinstance(f.dlhub_client, DLHubClient)
if not confidential_login:

f2 = Foundry(authorizers=auths, no_browser=False, no_local_server=True)
assert isinstance(f2.dlhub_client, DLHubClient)
assert isinstance(f2.forge_client, Forge)
assert isinstance(f2.connect_client, MDFConnectClient)

f3 = Foundry(authorizers=auths, no_browser=True, no_local_server=False)
assert isinstance(f3.dlhub_client, DLHubClient)
assert isinstance(f3.forge_client, Forge)
assert isinstance(f3.connect_client, MDFConnectClient)

Expand Down Expand Up @@ -330,7 +327,7 @@ def test_dataframe_load_doi():
_delete_test_data(f)


@pytest.mark.skipif(bool(is_gha), reason="Test does not succeed on GHA - no Globus endpoint")
@pytest.mark.skipif(bool(confidential_login), reason="Test does not succeed on GHA - no Globus endpoint")
def test_download_globus():
f = Foundry(authorizers=auths, no_browser=True, no_local_server=True)
_delete_test_data(f)
Expand All @@ -340,7 +337,7 @@ def test_download_globus():
_delete_test_data(f)


@pytest.mark.skipif(bool(is_gha), reason="Test does not succeed on GHA - no Globus endpoint")
@pytest.mark.skipif(bool(confidential_login), reason="Test does not succeed on GHA - no Globus endpoint")
def test_globus_dataframe_load():
f = Foundry(authorizers=auths, no_browser=True, no_local_server=True)

Expand All @@ -358,7 +355,7 @@ def test_globus_dataframe_load():
_delete_test_data(f)


@pytest.mark.skipif(bool(is_gha), reason="Not run as part of GHA CI")
@pytest.mark.skipif(bool(confidential_login), reason="Not run as part of GHA CI")
def test_publish_with_https():
"""System test: Assess the end-to-end publication of a dataset via HTTPS
"""
Expand Down Expand Up @@ -456,7 +453,7 @@ def test_ACL_creation_and_deletion():
pass


@pytest.mark.skipif(bool(is_gha), reason="Not run as part of GHA CI")
@pytest.mark.skipif(bool(confidential_login), reason="Not run as part of GHA CI")
def test_publish_with_globus():
# TODO: automate dealing with curation and cleaning after tests

Expand Down

0 comments on commit 0ebdf89

Please sign in to comment.