From 178ddf6f67912b75bce172e62a4a59147473f812 Mon Sep 17 00:00:00 2001 From: Ayush5120 Date: Sun, 19 Feb 2023 23:20:16 +0530 Subject: [PATCH 1/3] feat: return specific error if no TES available --- pro_tes/exceptions.py | 10 ++++++++++ pro_tes/middleware/middleware.py | 7 ++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pro_tes/exceptions.py b/pro_tes/exceptions.py index 89ac63b..71f331a 100644 --- a/pro_tes/exceptions.py +++ b/pro_tes/exceptions.py @@ -14,6 +14,8 @@ NotFound, ) +# pylint: disable="too-few-public-methods" + class TaskNotFound(NotFound): """Raised when task with given task identifier was not found.""" @@ -23,6 +25,10 @@ class IdsUnavailableProblem(PyMongoError): """Raised when task identifier is unavailable.""" +class NoTesInstancesAvailable(ValueError): + """Raised when no TES instances are available.""" + + exceptions = { Exception: { "message": "An unexpected error occurred.", @@ -68,4 +74,8 @@ class IdsUnavailableProblem(PyMongoError): "message": "No/few unique task identifiers available.", "code": "500", }, + NoTesInstancesAvailable: { + "message": "No valid TES instances available.", + "code": "500", + }, } diff --git a/pro_tes/middleware/middleware.py b/pro_tes/middleware/middleware.py index 749dc2b..c2e3f94 100644 --- a/pro_tes/middleware/middleware.py +++ b/pro_tes/middleware/middleware.py @@ -3,6 +3,7 @@ import abc from typing import List +from pro_tes.exceptions import NoTesInstancesAvailable from pro_tes.middleware.task_distribution import distance, random # pragma pylint: disable=too-few-public-methods @@ -44,13 +45,13 @@ def modify_request(self, request): request.json["inputs"][index]["url"] ) - if len(self.input_uris) != 0: + if self.input_uris: self.tes_uris = distance.task_distribution(self.input_uris) else: self.tes_uris = random.task_distribution() - if len(self.tes_uris) != 0: + if self.tes_uris: request.json["tes_uri"] = self.tes_uris else: - raise Exception # pylint: disable=broad-exception-raised + raise NoTesInstancesAvailable return request From 366ce031126ace9170b6c113a2e787d6a86d62a1 Mon Sep 17 00:00:00 2001 From: Ayush5120 Date: Sun, 19 Feb 2023 22:26:42 +0530 Subject: [PATCH 2/3] refactor: minor improvements on code clarity --- .../middleware/task_distribution/distance.py | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/pro_tes/middleware/task_distribution/distance.py b/pro_tes/middleware/task_distribution/distance.py index 045a157..545e8f9 100644 --- a/pro_tes/middleware/task_distribution/distance.py +++ b/pro_tes/middleware/task_distribution/distance.py @@ -36,7 +36,7 @@ def task_distribution(input_uri: List) -> List: A list of ranked TES instance. """ foca_conf = current_app.config.foca - tes_uri: List[str] = deepcopy(foca_conf.tes["service_list"]) + tes_uri: List[str] = foca_conf.tes["service_list"] access_uri_combination = get_uri_combination(input_uri, tes_uri) # get the combination of the tes ip and input ip @@ -100,12 +100,10 @@ def get_uri_combination( }, } """ - tes_deployment_list = [] - for uri in tes_uri: - temp_obj = TesDeployment( - tes_uri=uri, stats=TesStats(total_distance=None) - ) - tes_deployment_list.append(temp_obj) + tes_deployment_list = [ + TesDeployment(tes_uri=uri, stats=TesStats(total_distance=None)) + for uri in tes_uri + ] task_param = TaskParams(input_uris=input_uri) access_uri_combination = AccessUriCombination( @@ -139,9 +137,7 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict: uri_no_auth = remove_auth_from_url(uri) try: tes_ip = gethostbyname(urlparse(uri_no_auth).netloc) - except KeyError: - continue - except gaierror: + except (KeyError, gaierror): continue for count, obj_ip in enumerate(obj_ip_list): ips[(index, count)] = (tes_ip, obj_ip) @@ -258,16 +254,14 @@ def rank_tes_instances( Returns: A list of tes uri in increasing order of total distance. """ - combination = [] - for value in access_uri_combination.tes_deployments: - combination.append(value.dict()) + combination = [ + value.dict() for value in access_uri_combination.tes_deployments + ] # sorting the TES uri in decreasing order of total distance ranked_combination = sorted( combination, key=lambda x: x["stats"]["total_distance"] ) - ranked_tes_uri = [] - for value in ranked_combination: - ranked_tes_uri.append(str(value["tes_uri"])) + ranked_tes_uri = [str(value["tes_uri"]) for value in ranked_combination] return ranked_tes_uri From dd0ec34979bb0a35d3a8a59e660fae1a96aa6ba4 Mon Sep 17 00:00:00 2001 From: Ayush5120 Date: Sun, 19 Feb 2023 21:51:40 +0530 Subject: [PATCH 3/3] docs: extend and correct docstrings --- pro_tes/ga4gh/tes/models.py | 8 +-- pro_tes/ga4gh/tes/task_runs.py | 50 ++++++++++++-- pro_tes/middleware/middleware.py | 22 +++++- .../middleware/task_distribution/distance.py | 68 ++++++++++++------- pro_tes/tasks/track_task_progress.py | 9 +-- pro_tes/utils/models.py | 6 +- 6 files changed, 117 insertions(+), 46 deletions(-) diff --git a/pro_tes/ga4gh/tes/models.py b/pro_tes/ga4gh/tes/models.py index e055b42..da82ced 100644 --- a/pro_tes/ga4gh/tes/models.py +++ b/pro_tes/ga4gh/tes/models.py @@ -666,8 +666,8 @@ class TesEndpoint(CustomBaseModel): Args: host: Host at which the TES API is served that is processing this request; note that this should include the path information but - *not* the base path path defined in the TES API specification; - e.g., specify https://my.tes.com/api if the actual API is hosted at + *not* the base path defined in the TES API specification; e.g., + specify https://my.tes.com/api if the actual API is hosted a https://my.tes.com/api/ga4gh/tes/v1. base_path: Override the default path suffix defined in the TES API specification, i.e., `/ga4gh/tes/v1`. @@ -676,8 +676,8 @@ class TesEndpoint(CustomBaseModel): Attributes: host: Host at which the TES API is served that is processing this request; note that this should include the path information but - *not* the base path path defined in the TES API specification; - e.g., specify https://my.tes.com/api if the actual API is hosted at + *not* the base path defined in the TES API specification; e.g., + specify https://my.tes.com/api if the actual API is hosted at https://my.tes.com/api/ga4gh/tes/v1. base_path: Override the default path suffix defined in the TES API specification, i.e., `/ga4gh/tes/v1`. diff --git a/pro_tes/ga4gh/tes/task_runs.py b/pro_tes/ga4gh/tes/task_runs.py index 235e9ed..7cd8ee8 100644 --- a/pro_tes/ga4gh/tes/task_runs.py +++ b/pro_tes/ga4gh/tes/task_runs.py @@ -387,7 +387,7 @@ def _sanitize_request(self, payload: dict) -> Dict: """Sanitize request for use with py-tes. Args: - payloads: Request payload. + payload: Request payload. Returns: Sanitized request payload. @@ -417,7 +417,7 @@ def _sanitize_request(self, payload: dict) -> Dict: return payload def _set_projection(self, view: str) -> Dict: - """Set database projectoin for selected view. + """Set database projection for selected view. Args: view: View path parameter. @@ -453,7 +453,17 @@ def _set_projection(self, view: str) -> Dict: def _update_task( self, payload: dict, db_document: DbDocument, start_time: str, **kwargs ) -> DbDocument: - """Update the task object.""" + """Update the task object. + + Args: + payload: A dictionary containing the payload for the update. + db_document: The document in the database to be updated. + start_time: The starting time of the incoming TES request. + **kwargs: Additional keyword arguments passed along with request. + + Returns: + DbDocument: The updated database document. + """ logs = self._set_logs( payloads=deepcopy(payload), start_time=start_time ) @@ -467,7 +477,15 @@ def _update_task( return db_document def _set_logs(self, payloads: dict, start_time: str) -> Dict: - """Set up the logs for the request.""" + """Create or update `TesTask.logs` and set start time. + + Args: + payload: A dictionary containing the payload for the update. + start_time: The starting time of the incoming TES request. + + Returns: + Task logs with start time set. + """ if "logs" not in payloads.keys(): logs = [ { @@ -491,7 +509,16 @@ def _update_doc_in_db( tes_uri: str, remote_task_id: str, ) -> DbDocument: - """Update the document in the database.""" + """Set end time, task metadata in `TesTask.logs`, and update document. + + Args: + db_connector: The database connector. + tes_uri: The TES URI where the task if forwarded. + remote_task_id: Task identifier at the remote TES instance. + + Returns: + The updated database document. + """ time_now = datetime.now().strftime("%m-%d-%Y %H:%M:%S") tes_endpoint_dict = {"host": tes_uri, "base_path": ""} db_document = db_connector.upsert_fields_in_root_object( @@ -529,7 +556,18 @@ def _update_doc_in_db( def _update_task_metadata( self, db_document: DbDocument, tes_uri: str, remote_task_id: str ) -> DbDocument: - """Update the task metadata.""" + """Update the task metadata. + + Set TES endpoint and remote task identifier in `TesTask.logs.metadata`. + + Args: + db_document: The document in the database to be updated. + tes_uri: The TES URI where the task if forwarded. + remote_task_id: Task identifier at the remote TES instance. + + Returns: + The updated database document. + """ for logs in db_document.task.logs: tesNextTes_obj = TesNextTes(id=remote_task_id, url=tes_uri) if logs.metadata.forwarded_to is None: diff --git a/pro_tes/middleware/middleware.py b/pro_tes/middleware/middleware.py index c2e3f94..da38330 100644 --- a/pro_tes/middleware/middleware.py +++ b/pro_tes/middleware/middleware.py @@ -14,7 +14,16 @@ class AbstractMiddleware(metaclass=abc.ABCMeta): @abc.abstractmethod def modify_request(self, request): - """Modify the request before it is sent to the TES instance.""" + """Modify the incoming task request. + + Abstract method. + + Args: + request: The incoming request object. + + Returns: + The modified request object. + """ class TaskDistributionMiddleware(AbstractMiddleware): @@ -22,6 +31,7 @@ class TaskDistributionMiddleware(AbstractMiddleware): Attributes: tes_uri: TES instance best suited for TES task. + input_uris: A list of input URIs from the incoming request. """ def __init__(self) -> None: @@ -30,13 +40,19 @@ def __init__(self) -> None: self.input_uris: List[str] = [] def modify_request(self, request): - """Add ranked list of TES instances to request body. + """Modify the incoming task request. + + Abstract method Args: request: Incoming request object. Returns: - Tuple of modified request object. + The modified request object. + + Raises: + pro_tes.exceptions.NoTesInstancesAvailable: If no valid TES + instances are available. """ if "inputs" in request.json.keys(): for index in range(len(request.json["inputs"])): diff --git a/pro_tes/middleware/task_distribution/distance.py b/pro_tes/middleware/task_distribution/distance.py index 545e8f9..face507 100644 --- a/pro_tes/middleware/task_distribution/distance.py +++ b/pro_tes/middleware/task_distribution/distance.py @@ -26,14 +26,12 @@ def task_distribution(input_uri: List) -> List: """Task distributor. - Distributes task by selecting the TES instance having minimum - distance between the input files and TES Instance. - Args: input_uri: List of inputs of a TES task request Returns: - A list of ranked TES instance. + A list of ranked TES instances, ordered by the minimum distance + between the input files and each TES instance. """ foca_conf = current_app.config.foca tes_uri: List[str] = foca_conf.tes["service_list"] @@ -66,12 +64,17 @@ def task_distribution(input_uri: List) -> List: def get_uri_combination( input_uri: List, tes_uri: List ) -> AccessUriCombination: - """Create a combination of input uris and tes uri. + """Create a combination of input URIs and TES URIs. Args: - input_uri: List of input uris of TES request. + input_uri: List of input URIs of TES request. tes_uri: List of TES instance. + Returns: + An AccessUriCombination object, which is a combination of: + :class:`pro_tes.middleware.models.AccessUriCombination`: + + Examples: A AccessUriCombination object of the form like: { "task_params": { @@ -116,11 +119,21 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict: """Create a pair of TES IP and Input IP. Args: - input_uri: List of input uris of TES request. - tes_uri: List of TES instance. + input_uri: A list of input URIs for a TES task request. + tes_uri: A list of TES instances to choose from. Returns: - A dictionary of combination of tes ip with all the input ips. + A dictionary where the keys are tuples representing the combination + of TES instance and input URI, and the values are tuples containing + the IP addresses of the TES instance and input URI. + + Example: + { + (0, 0): ('10.0.0.1', '192.168.0.1'), + (0, 1): ('10.0.0.1', '192.168.0.2'), + (1, 0): ('10.0.0.2', '192.168.0.1'), + (1, 1): ('10.0.0.2', '192.168.0.2') + } """ ips = {} @@ -147,19 +160,23 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict: def ip_distance( *args: str, ) -> Dict[str, Dict]: - """Compute ip distance between ip pairs. + """Compute IP distance between IP pairs. - :param *args: IP addresses of the form '8.8.8.8' without schema and - suffixes. + Args: + *args: IP addresses of the form '8.8.8.8' without schema and + suffixes. - :return: A dictionary with a key for each IP address, pointing to a - dictionary containing city, region and country information for the - IP address, as well as a key "distances" pointing to a dictionary - indicating the distances, in kilometers, between all pairs of IPs, - with the tuple of IPs as the keys. IPs that cannot be located are - skipped from the resulting dictionary. - :raises ValueError: No args were passed. + Returns: + A dictionary with a key for each IP address, pointing to a + dictionary containing city, region and country information for the + IP address, as well as a key "distances" pointing to a dictionary + indicating the distances, in kilometers, between all pairs of IPs, + with the tuple of IPs as the keys. IPs that cannot be located are + skipped from the resulting dictionary. + + Raises: + ValueError: No args were passed. """ if not args: raise ValueError("Expected at least one URI or IP address.") @@ -195,16 +212,19 @@ def ip_distance( def calculate_distance( - ips_unique: Dict[Set[str], List[Tuple[int, str]]], tes_uri: List[str] + ips_unique: Dict[Set[str], List[Tuple[int, str]]], + tes_uri: List[str], ) -> Dict[Set[str], float]: """Calculate distances between all IPs. Args: - ips_unique: A dictionary of unique ips. + ips_unique: A dictionary of unique Ips. tes_uri: List of TES instance. Returns: - A dictionary of distances between all ips. + A dictionary of distances between all IP addresses. + The keys are sets of IP addresses, and the values are the distances + between them as floats. """ distances_unique: Dict[Set[str], float] = {} ips_all = frozenset().union(*list(ips_unique.keys())) # type: ignore @@ -246,13 +266,13 @@ def calculate_distance( def rank_tes_instances( access_uri_combination: AccessUriCombination, ) -> List[str]: - """Rank the tes instance based on the total distance. + """Rank TES instances in increasing order of total distance. Args: access_uri_combination: Combination of task_params and tes_deployments. Returns: - A list of tes uri in increasing order of total distance. + A list of TES URI in increasing order of total distance. """ combination = [ value.dict() for value in access_uri_combination.tes_deployments diff --git a/pro_tes/tasks/track_task_progress.py b/pro_tes/tasks/track_task_progress.py index eb36deb..a265bcf 100644 --- a/pro_tes/tasks/track_task_progress.py +++ b/pro_tes/tasks/track_task_progress.py @@ -45,14 +45,11 @@ def task__track_task_progress( # pylint: disable=too-many-arguments but *not* the base path defined in the TES API specification; e.g., specify https://my.tes.com/api if the actual API is hosted at https://my.tes.com/api/ga4gh/tes/v1. - remote_base_path: Override the default path suffix defined in the tes + remote_base_path: Override the default path suffix defined in the TES API specification, i.e., `/ga4gh/tes/v1`. - remote_task_id: task run identifier on remote tes service. - user: User name for basic authentication. + remote_task_id: task run identifier on remote TES service. + user: User-name for basic authentication. password: Password for basic authentication. - - Returns: - Task identifier. """ foca_config: Config = current_app.config.foca controller_config: Dict = foca_config.controllers["post_task"] diff --git a/pro_tes/utils/models.py b/pro_tes/utils/models.py index b03b58a..32803a4 100644 --- a/pro_tes/utils/models.py +++ b/pro_tes/utils/models.py @@ -20,7 +20,7 @@ class TaskModelConverter: - """Convert py-tes to proTES to proTES TES task model. + """Convert py-tes to proTES TES task model. Convert :class:`tes.models.Task` to :class:`pro_tes.ga4gh.tes.models.TesTask` @@ -37,7 +37,7 @@ def __init__(self, task: Task) -> None: self.task: Task = task def convert_task(self) -> TesTask: - """Convert py-tes to proTES TES task to proTES TES task. + """Convert py-tes to proTES TES task. Returns: Instance of :class:`pro_tes.ga4gh.tes.models.TesTask` @@ -66,7 +66,7 @@ def convert_task(self) -> TesTask: ) def convert_state(self) -> TesState: - """Convert py-tes to proTES TES task state to proTES TES task state. + """Convert py-tes to proTES TES task state. Returns: Instance of :class:`pro_tes.ga4gh.tes.models.TesState`