Skip to content

Commit

Permalink
Feat: Updates GET workflows to v3 (#178)
Browse files Browse the repository at this point in the history
* updates get_workflow_list

* updates

* fix pytest

* remove a print

* changelog

* review suggestions
  • Loading branch information
dapineyro authored Jan 16, 2025
1 parent 01ba2e8 commit 4b1e1a6
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 118 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## lifebit-ai/cloudos-cli: changelog

## v2.15.0 (2025-01-16)

### Feature

- Updates GET workflows endpoint to v3.

## v2.14.0 (2024-12-18)

- Adds the new `--accelerate-file-staging` parameter to job submission to add support for AWS S3 mountpoint for quicker file staging.
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.14.0'
__version__ = '2.15.0'
120 changes: 74 additions & 46 deletions cloudos/clos.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,65 +340,54 @@ def get_curated_workflow_list(self, workspace_id, get_all=True, page=1, verify=T
r : list
A list of dicts, each corresponding to a workflow.
"""
data = {"search": "",
"page": page,
"filters": [
[
{
"isPredefined": True,
"isCurated": True,
"isFeatured": False,
"isModule": False
},
{
"isPredefined": True,
"isCurated": False,
"isFeatured": False,
"isModule": False
},
{
"isPredefined": True,
"isCurated": True,
"isFeatured": True,
"isModule": False
}
]
]
}
headers = {
"Content-type": "application/json",
"apikey": self.apikey
}
r = retry_requests_post("{}/api/v1/workflows/getByType?teamId={}".format(self.cloudos_url,
workspace_id),
json=data, headers=headers, verify=verify)
r = retry_requests_get(
"{}/api/v3/workflows?search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={}&teamId={}".format(
self.cloudos_url, page, workspace_id),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
content = json.loads(r.content)
if get_all:
workflows_collected = len(content['pipelines'])
workflows_to_get = content['total']
workflows_collected = len(content['workflows'])
workflows_to_get = content['paginationMetadata']['Pagination-Count']
if workflows_to_get <= workflows_collected or workflows_collected == 0:
return content['pipelines']
return content['workflows']
if workflows_to_get > workflows_collected:
return content['pipelines'] + self.get_curated_workflow_list(workspace_id,
return content['workflows'] + self.get_curated_workflow_list(workspace_id,
get_all=True,
page=page+1,
verify=verify)
else:
return content['pipelines']
return content['workflows']

def get_workflow_list(self, workspace_id, verify=True):
def get_workflow_list(self, workspace_id, verify=True, get_all=True,
page=1, page_size=10, max_page_size=1000,
archived_status=False):
"""Get all the workflows from a CloudOS workspace.
Parameters
----------
workspace_id : string
The CloudOS workspace id from to collect the workflows.
verify: [bool|string]
verify : [bool|string]
Whether to use SSL verification or not. Alternatively, if
a string is passed, it will be interpreted as the path to
the SSL certificate file.
get_all : bool
Whether to get all available curated workflows or just the
indicated page.
page : int
The page number to retrieve, from the paginated response.
page_size : int
The number of workflows by page. From 1 to 1000.
max_page_size : int
Max page size defined by the API server. It is currently 1000.
archived_status : bool
Whether to retrieve archived workflows or not.
Returns
-------
Expand All @@ -409,12 +398,41 @@ def get_workflow_list(self, workspace_id, verify=True):
"Content-type": "application/json",
"apikey": self.apikey
}
r = retry_requests_get("{}/api/v1/workflows?teamId={}".format(self.cloudos_url,
workspace_id),
headers=headers, verify=verify)
archived_status = str(archived_status).lower()
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, page_size, page, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
return json.loads(r.content)
content = json.loads(r.content)
if get_all:
total_workflows = content['paginationMetadata']['Pagination-Count']
if total_workflows <= max_page_size:
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, total_workflows, 1, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
return json.loads(r.content)['workflows']
else:
n_pages = (total_workflows // max_page_size) + int((total_workflows % max_page_size) > 0)
for p in range(n_pages):
p += 1
r = retry_requests_get(
"{}/api/v3/workflows?teamId={}&pageSize={}&page={}&archived.status={}".format(
self.cloudos_url, workspace_id, max_page_size, p, archived_status),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
if p == 1:
all_content = json.loads(r.content)['workflows']
else:
all_content += json.loads(r.content)['workflows']
return all_content
else:
return content['workflows']

@staticmethod
def process_workflow_list(r, all_fields=False):
Expand All @@ -435,11 +453,10 @@ def process_workflow_list(r, all_fields=False):
"""
COLUMNS = ['_id',
'name',
'isModule',
'archived.status',
'mainFile',
'workflowType',
'parameters',
'group',
'repository.name',
'repository.platform',
'repository.url',
Expand Down Expand Up @@ -511,14 +528,25 @@ def is_module(self, workflow_name, workspace_id, verify=True):
"""
my_workflows_r = self.get_workflow_list(workspace_id, verify=verify)
my_workflows = self.process_workflow_list(my_workflows_r)
is_module = my_workflows.loc[
group = my_workflows.loc[
(my_workflows['name'] == workflow_name) & (my_workflows['archived.status'] == False),
'isModule']
if len(is_module) == 0:
'group']
if len(group) == 0:
raise ValueError(f'No workflow found with name: {workflow_name}')
if len(is_module) > 1:
if len(group) > 1:
raise ValueError(f'More than one workflow found with name: {workflow_name}')
return is_module.values[0]
module_groups = ['system-tools',
'data-factory-data-connection-etl',
'data-factory',
'data-factory-omics-etl',
'drug-discovery',
'data-factory-omics-insights',
'intermediate'
]
if group.values[0] in module_groups:
return True
else:
return False

def get_project_list(self, workspace_id, verify=True):
"""Get all the project from a CloudOS workspace.
Expand Down
14 changes: 3 additions & 11 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,8 @@ def fetch_cloudos_id(self,
if resource not in allowed_resources:
raise ValueError('Your specified resource is not supported. ' +
f'Use one of the following: {allowed_resources}')
headers = {
"Content-type": "application/json",
"apikey": apikey
}
r = retry_requests_get("{}/api/v1/{}?teamId={}".format(cloudos_url,
resource,
workspace_id),
headers=headers, verify=verify)
if r.status_code >= 400:
raise BadRequestException(r)
content = json.loads(r.content)
if resource == 'workflows':
content = self.get_workflow_list(workspace_id, verify=verify)
for element in content:
if (element["name"] == name and
element["repository"]["platform"] == repository_platform and
Expand All @@ -167,6 +157,8 @@ def fetch_cloudos_id(self,
elif "importsFile" in element.keys() and element["importsFile"] == importsfile:
return element["_id"]
elif resource == 'projects':
r = self.get_project_list(workspace_id, verify=verify)
content = json.loads(r.content)
# New API projects endpoint spec
if type(content) is dict:
for element in content["projects"]:
Expand Down
12 changes: 9 additions & 3 deletions tests/test_clos/test_detect_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE_SIZE = 10
PAGE = 1
ARCHIVED_STATUS = "false"


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -19,17 +22,20 @@ def test_detect_workflow():
API request is mocked and replicated with json files
"""
json_data = load_json_file(INPUT)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=json_data,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand Down
23 changes: 17 additions & 6 deletions tests/test_clos/test_get_curated_workflow_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import pytest
import responses
from responses import matchers
from cloudos.clos import Cloudos
from cloudos.utils.errors import BadRequestException
from tests.functions_for_pytest import load_json_file
Expand All @@ -10,6 +11,7 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE = 1


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -20,12 +22,21 @@ def test_get_curated_workflow_list_correct_response():
API request is mocked and replicated with json files
"""
create_json = load_json_file(OUTPUT)
search_str = f"teamId={WORKSPACE_ID}"
params = {"teamId": WORKSPACE_ID,
"groups[]": ["curated", "featured", "predefined"],
"page": PAGE}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY}
search_str = f"search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={PAGE}&teamId={WORKSPACE_ID}"
# mock GET method with the .json
responses.add(
responses.POST,
url=f"{CLOUDOS_URL}/api/v1/workflows/getByType?{search_str}",
responses.GET,
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=create_json,
headers=header,
match=[matchers.query_param_matcher(params)],
status=200)
# start cloudOS service
clos = Cloudos(apikey=APIKEY, cromwell_token=None, cloudos_url=CLOUDOS_URL)
Expand All @@ -46,11 +57,11 @@ def test_get_curated_workflow_list_incorrect_response():
error_message = {"statusCode": 400, "code": "BadRequest",
"message": "Bad Request.", "time": "2022-11-23_17:31:07"}
error_json = json.dumps(error_message)
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"search=&groups[]=curated&groups[]=featured&groups[]=predefined&page={PAGE}&teamId={WORKSPACE_ID}"
# mock GET method with the .json
responses.add(
responses.POST,
url=f"{CLOUDOS_URL}/api/v1/workflows/getByType?{search_str}",
responses.GET,
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=error_json,
status=400)
# raise 400 error
Expand Down
21 changes: 15 additions & 6 deletions tests/test_clos/test_get_workflow_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
APIKEY = 'vnoiweur89u2ongs'
CLOUDOS_URL = 'http://cloudos.lifebit.ai'
WORKSPACE_ID = 'lv89ufc838sdig'
PAGE_SIZE = 10
PAGE = 1
ARCHIVED_STATUS = "false"


@mock.patch('cloudos.clos', mock.MagicMock())
Expand All @@ -21,17 +24,20 @@ def test_get_workflow_list_correct_response():
API request is mocked and replicated with json files
"""
create_json = load_json_file(INPUT)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=create_json,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand All @@ -55,17 +61,20 @@ def test_get_workflow_list_incorrect_response():
error_message = {"statusCode": 400, "code": "BadRequest",
"message": "Bad Request.", "time": "2022-11-23_17:31:07"}
error_json = json.dumps(error_message)
params = {"teamId": WORKSPACE_ID}
params = {"teamId": WORKSPACE_ID,
"pageSize": PAGE_SIZE,
"page": PAGE,
"archived.status": ARCHIVED_STATUS}
header = {
"Accept": "application/json, text/plain, */*",
"Content-Type": "application/json;charset=UTF-8",
"apikey": APIKEY
}
search_str = f"teamId={WORKSPACE_ID}"
search_str = f"teamId={WORKSPACE_ID}&pageSize={PAGE_SIZE}&page={PAGE}&archived.status={ARCHIVED_STATUS}"
# mock GET method with the .json
responses.add(
responses.GET,
url=f"{CLOUDOS_URL}/api/v1/workflows?{search_str}",
url=f"{CLOUDOS_URL}/api/v3/workflows?{search_str}",
body=error_json,
headers=header,
match=[matchers.query_param_matcher(params)],
Expand Down
Loading

0 comments on commit 4b1e1a6

Please sign in to comment.