Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: docstring and minor refactoring #138

Merged
merged 3 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pro_tes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
NotFound,
)

# pylint: disable="too-few-public-methods"


class TaskNotFound(NotFound):
"""Raised when task with given task identifier was not found."""
Expand All @@ -23,6 +25,10 @@ class IdsUnavailableProblem(PyMongoError):
"""Raised when task identifier is unavailable."""


class NoTesInstancesAvailable(ValueError):
"""Raised when no TES instances are available."""


exceptions = {
Exception: {
"message": "An unexpected error occurred.",
Expand Down Expand Up @@ -68,4 +74,8 @@ class IdsUnavailableProblem(PyMongoError):
"message": "No/few unique task identifiers available.",
"code": "500",
},
NoTesInstancesAvailable: {
"message": "No valid TES instances available.",
"code": "500",
},
}
8 changes: 4 additions & 4 deletions pro_tes/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ class TesEndpoint(CustomBaseModel):
Args:
host: Host at which the TES API is served that is processing this
request; note that this should include the path information but
*not* the base path path defined in the TES API specification;
e.g., specify https://my.tes.com/api if the actual API is hosted at
*not* the base path defined in the TES API specification; e.g.,
specify https://my.tes.com/api if the actual API is hosted a
https://my.tes.com/api/ga4gh/tes/v1.
base_path: Override the default path suffix defined in the TES API
specification, i.e., `/ga4gh/tes/v1`.
Expand All @@ -676,8 +676,8 @@ class TesEndpoint(CustomBaseModel):
Attributes:
host: Host at which the TES API is served that is processing this
request; note that this should include the path information but
*not* the base path path defined in the TES API specification;
e.g., specify https://my.tes.com/api if the actual API is hosted at
*not* the base path defined in the TES API specification; e.g.,
specify https://my.tes.com/api if the actual API is hosted at
https://my.tes.com/api/ga4gh/tes/v1.
base_path: Override the default path suffix defined in the TES API
specification, i.e., `/ga4gh/tes/v1`.
Expand Down
50 changes: 44 additions & 6 deletions pro_tes/ga4gh/tes/task_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def _sanitize_request(self, payload: dict) -> Dict:
"""Sanitize request for use with py-tes.

Args:
payloads: Request payload.
payload: Request payload.

Returns:
Sanitized request payload.
Expand Down Expand Up @@ -417,7 +417,7 @@ def _sanitize_request(self, payload: dict) -> Dict:
return payload

def _set_projection(self, view: str) -> Dict:
"""Set database projectoin for selected view.
"""Set database projection for selected view.

Args:
view: View path parameter.
Expand Down Expand Up @@ -453,7 +453,17 @@ def _set_projection(self, view: str) -> Dict:
def _update_task(
self, payload: dict, db_document: DbDocument, start_time: str, **kwargs
) -> DbDocument:
"""Update the task object."""
"""Update the task object.

Args:
payload: A dictionary containing the payload for the update.
db_document: The document in the database to be updated.
start_time: The starting time of the incoming TES request.
**kwargs: Additional keyword arguments passed along with request.

Returns:
DbDocument: The updated database document.
"""
logs = self._set_logs(
payloads=deepcopy(payload), start_time=start_time
)
Expand All @@ -467,7 +477,15 @@ def _update_task(
return db_document

def _set_logs(self, payloads: dict, start_time: str) -> Dict:
"""Set up the logs for the request."""
"""Create or update `TesTask.logs` and set start time.

Args:
payload: A dictionary containing the payload for the update.
start_time: The starting time of the incoming TES request.

Returns:
Task logs with start time set.
"""
if "logs" not in payloads.keys():
logs = [
{
Expand All @@ -491,7 +509,16 @@ def _update_doc_in_db(
tes_uri: str,
remote_task_id: str,
) -> DbDocument:
"""Update the document in the database."""
"""Set end time, task metadata in `TesTask.logs`, and update document.

Args:
db_connector: The database connector.
tes_uri: The TES URI where the task if forwarded.
remote_task_id: Task identifier at the remote TES instance.

Returns:
The updated database document.
"""
time_now = datetime.now().strftime("%m-%d-%Y %H:%M:%S")
tes_endpoint_dict = {"host": tes_uri, "base_path": ""}
db_document = db_connector.upsert_fields_in_root_object(
Expand Down Expand Up @@ -529,7 +556,18 @@ def _update_doc_in_db(
def _update_task_metadata(
self, db_document: DbDocument, tes_uri: str, remote_task_id: str
) -> DbDocument:
"""Update the task metadata."""
"""Update the task metadata.
uniqueg marked this conversation as resolved.
Show resolved Hide resolved

Set TES endpoint and remote task identifier in `TesTask.logs.metadata`.

Args:
db_document: The document in the database to be updated.
tes_uri: The TES URI where the task if forwarded.
remote_task_id: Task identifier at the remote TES instance.

Returns:
The updated database document.
"""
for logs in db_document.task.logs:
tesNextTes_obj = TesNextTes(id=remote_task_id, url=tes_uri)
if logs.metadata.forwarded_to is None:
Expand Down
29 changes: 23 additions & 6 deletions pro_tes/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import abc
from typing import List

from pro_tes.exceptions import NoTesInstancesAvailable
from pro_tes.middleware.task_distribution import distance, random

# pragma pylint: disable=too-few-public-methods
Expand All @@ -13,14 +14,24 @@ class AbstractMiddleware(metaclass=abc.ABCMeta):

@abc.abstractmethod
def modify_request(self, request):
"""Modify the request before it is sent to the TES instance."""
"""Modify the incoming task request.

Abstract method.

Args:
request: The incoming request object.

Returns:
The modified request object.
"""


class TaskDistributionMiddleware(AbstractMiddleware):
"""Inject task distribution logic.

Attributes:
tes_uri: TES instance best suited for TES task.
input_uris: A list of input URIs from the incoming request.
"""

def __init__(self) -> None:
Expand All @@ -29,13 +40,19 @@ def __init__(self) -> None:
self.input_uris: List[str] = []

def modify_request(self, request):
"""Add ranked list of TES instances to request body.
"""Modify the incoming task request.

Abstract method

Args:
request: Incoming request object.

Returns:
Tuple of modified request object.
The modified request object.

Raises:
pro_tes.exceptions.NoTesInstancesAvailable: If no valid TES
instances are available.
"""
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
if "inputs" in request.json.keys():
for index in range(len(request.json["inputs"])):
Expand All @@ -44,13 +61,13 @@ def modify_request(self, request):
request.json["inputs"][index]["url"]
)

if len(self.input_uris) != 0:
if self.input_uris:
self.tes_uris = distance.task_distribution(self.input_uris)
else:
self.tes_uris = random.task_distribution()

if len(self.tes_uris) != 0:
if self.tes_uris:
request.json["tes_uri"] = self.tes_uris
else:
raise Exception # pylint: disable=broad-exception-raised
raise NoTesInstancesAvailable
return request
94 changes: 54 additions & 40 deletions pro_tes/middleware/task_distribution/distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@
def task_distribution(input_uri: List) -> List:
"""Task distributor.

Distributes task by selecting the TES instance having minimum
distance between the input files and TES Instance.

Args:
input_uri: List of inputs of a TES task request

Returns:
A list of ranked TES instance.
A list of ranked TES instances, ordered by the minimum distance
between the input files and each TES instance.
"""
foca_conf = current_app.config.foca
tes_uri: List[str] = deepcopy(foca_conf.tes["service_list"])
tes_uri: List[str] = foca_conf.tes["service_list"]
access_uri_combination = get_uri_combination(input_uri, tes_uri)

# get the combination of the tes ip and input ip
Expand Down Expand Up @@ -66,12 +64,17 @@ def task_distribution(input_uri: List) -> List:
def get_uri_combination(
input_uri: List, tes_uri: List
) -> AccessUriCombination:
"""Create a combination of input uris and tes uri.
"""Create a combination of input URIs and TES URIs.

Args:
input_uri: List of input uris of TES request.
input_uri: List of input URIs of TES request.
tes_uri: List of TES instance.

Returns:
An AccessUriCombination object, which is a combination of:
:class:`pro_tes.middleware.models.AccessUriCombination`:

Examples:
A AccessUriCombination object of the form like:
{
"task_params": {
Expand Down Expand Up @@ -100,12 +103,10 @@ def get_uri_combination(
},
}
"""
tes_deployment_list = []
for uri in tes_uri:
temp_obj = TesDeployment(
tes_uri=uri, stats=TesStats(total_distance=None)
)
tes_deployment_list.append(temp_obj)
tes_deployment_list = [
TesDeployment(tes_uri=uri, stats=TesStats(total_distance=None))
for uri in tes_uri
]

task_param = TaskParams(input_uris=input_uri)
access_uri_combination = AccessUriCombination(
Expand All @@ -118,11 +119,21 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict:
"""Create a pair of TES IP and Input IP.

Args:
input_uri: List of input uris of TES request.
tes_uri: List of TES instance.
input_uri: A list of input URIs for a TES task request.
tes_uri: A list of TES instances to choose from.

Returns:
A dictionary of combination of tes ip with all the input ips.
A dictionary where the keys are tuples representing the combination
of TES instance and input URI, and the values are tuples containing
the IP addresses of the TES instance and input URI.

Example:
{
(0, 0): ('10.0.0.1', '192.168.0.1'),
(0, 1): ('10.0.0.1', '192.168.0.2'),
(1, 0): ('10.0.0.2', '192.168.0.1'),
(1, 1): ('10.0.0.2', '192.168.0.2')
}
"""
ips = {}

Expand All @@ -139,9 +150,7 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict:
uri_no_auth = remove_auth_from_url(uri)
try:
tes_ip = gethostbyname(urlparse(uri_no_auth).netloc)
except KeyError:
continue
except gaierror:
except (KeyError, gaierror):
continue
for count, obj_ip in enumerate(obj_ip_list):
ips[(index, count)] = (tes_ip, obj_ip)
Expand All @@ -151,19 +160,23 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict:
def ip_distance(
*args: str,
) -> Dict[str, Dict]:
"""Compute ip distance between ip pairs.
"""Compute IP distance between IP pairs.

:param *args: IP addresses of the form '8.8.8.8' without schema and
suffixes.
Args:
*args: IP addresses of the form '8.8.8.8' without schema and
suffixes.

:return: A dictionary with a key for each IP address, pointing to a
dictionary containing city, region and country information for the
IP address, as well as a key "distances" pointing to a dictionary
indicating the distances, in kilometers, between all pairs of IPs,
with the tuple of IPs as the keys. IPs that cannot be located are
skipped from the resulting dictionary.

:raises ValueError: No args were passed.
Returns:
A dictionary with a key for each IP address, pointing to a
dictionary containing city, region and country information for the
IP address, as well as a key "distances" pointing to a dictionary
indicating the distances, in kilometers, between all pairs of IPs,
with the tuple of IPs as the keys. IPs that cannot be located are
skipped from the resulting dictionary.

Raises:
ValueError: No args were passed.
"""
if not args:
raise ValueError("Expected at least one URI or IP address.")
Expand Down Expand Up @@ -199,16 +212,19 @@ def ip_distance(


def calculate_distance(
ips_unique: Dict[Set[str], List[Tuple[int, str]]], tes_uri: List[str]
ips_unique: Dict[Set[str], List[Tuple[int, str]]],
tes_uri: List[str],
) -> Dict[Set[str], float]:
"""Calculate distances between all IPs.

Args:
ips_unique: A dictionary of unique ips.
ips_unique: A dictionary of unique Ips.
tes_uri: List of TES instance.

Returns:
A dictionary of distances between all ips.
A dictionary of distances between all IP addresses.
The keys are sets of IP addresses, and the values are the distances
between them as floats.
"""
distances_unique: Dict[Set[str], float] = {}
ips_all = frozenset().union(*list(ips_unique.keys())) # type: ignore
Expand Down Expand Up @@ -250,24 +266,22 @@ def calculate_distance(
def rank_tes_instances(
access_uri_combination: AccessUriCombination,
) -> List[str]:
"""Rank the tes instance based on the total distance.
"""Rank TES instances in increasing order of total distance.

Args:
access_uri_combination: Combination of task_params and tes_deployments.

Returns:
A list of tes uri in increasing order of total distance.
A list of TES URI in increasing order of total distance.
"""
combination = []
for value in access_uri_combination.tes_deployments:
combination.append(value.dict())
combination = [
value.dict() for value in access_uri_combination.tes_deployments
]

# sorting the TES uri in decreasing order of total distance
ranked_combination = sorted(
combination, key=lambda x: x["stats"]["total_distance"]
)

ranked_tes_uri = []
for value in ranked_combination:
ranked_tes_uri.append(str(value["tes_uri"]))
ranked_tes_uri = [str(value["tes_uri"]) for value in ranked_combination]
return ranked_tes_uri
Loading