From 482c379fac76c6a2177002a21f7c6eeb701fc7ea Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Fri, 23 Aug 2024 17:23:16 -0400 Subject: [PATCH] added support for am --- .../external_api/artifact_manager.py | 319 ++++++++++++++ fabrictestbed/fabric_manager.py | 390 ++++++++++++++++++ fabrictestbed/token_manager/__init__.py | 0 fabrictestbed/token_manager/token_manager.py | 341 +++++++++++++++ 4 files changed, 1050 insertions(+) create mode 100644 fabrictestbed/external_api/artifact_manager.py create mode 100644 fabrictestbed/fabric_manager.py create mode 100644 fabrictestbed/token_manager/__init__.py create mode 100644 fabrictestbed/token_manager/token_manager.py diff --git a/fabrictestbed/external_api/artifact_manager.py b/fabrictestbed/external_api/artifact_manager.py new file mode 100644 index 0000000..148d29c --- /dev/null +++ b/fabrictestbed/external_api/artifact_manager.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Author Komal Thareja (kthare10@renci.org) +import enum +import json +import os +from typing import List +from urllib.parse import quote + +import requests + + +class Visibility(enum.Enum): + Author = enum.auto() + Project = enum.auto() + Public = enum.auto() + + def __repr__(self): + return self.name + + def __str__(self): + return self.name + + +class ArtifactManagerError(Exception): + """ + Custom exception for errors raised by the ArtifactManager. + """ + pass + + +class ArtifactManager: + def __init__(self, api_url: str, token: str): + """ + Initializes the ArtifactManager with the API base URL and authentication token. + + :param api_url: The base URL for the artifact API. The URL should start with 'https'. + If not provided, it will be automatically prefixed. + :param token: The authentication token required for accessing the API. + This token will be included in the Authorization header for all API requests. + """ + if "https" not in api_url: + self.api_url = f"https://{api_url}/api" + else: + self.api_url = f"{api_url}/api" + + # Set the headers for API requests + self.headers = { + 'Authorization': f'Bearer {token}', + 'Accept': 'application/json', + 'Content-Type': 'application/json' + } + + def create_artifact(self, artifact_title: str, description_short: str, description_long: str, authors: List[str], + project_id: str, tags: List[str], visibility: Visibility = Visibility.Author) -> str: + """ + Creates a new artifact in the FABRIC Testbed. + + :param artifact_title: The title of the artifact to create. + :param description_short: A brief description of the artifact. + :param description_long: A detailed description of the artifact. + :param authors: A list of authors associated with the artifact. + :param project_id: The unique identifier for the project associated with the artifact. + :param tags: A list of tags to associate with the artifact for easier searching. + :param visibility: The visibility level of the artifact. Defaults to 'Author' level visibility. + :return: The ID of the newly created artifact. + :raises ArtifactManagerError: If the artifact creation fails. + """ + create_artifact_url = f"{self.api_url}/artifacts" + + data = { + "authors": authors, + "description_short": description_short, + "description_long": description_long, + "project_uuid": project_id, + "title": artifact_title, + "visibility": str(visibility).lower() + } + if tags: + data["tags"] = tags + + response = requests.post(create_artifact_url, headers=self.headers, json=data) + self.raise_for_status(response=response) + + return response.json() + + def update_artifact(self, artifact_id: str, artifact_title: str, description_short: str, description_long: str, + authors: List[str], project_id: str, tags: List[str], + visibility: Visibility = Visibility.Author): + """ + Updates an existing artifact in the FABRIC Testbed. + + :param artifact_id: The unique identifier of the artifact to update. + :param artifact_title: The updated title of the artifact. + :param description_short: The updated short description of the artifact. + :param description_long: The updated long description of the artifact. + :param authors: A list of updated authors associated with the artifact. + :param project_id: The unique identifier for the project associated with the artifact. + :param tags: A list of updated tags to associate with the artifact. + :param visibility: The updated visibility level of the artifact. Defaults to 'Author' level visibility. + :return: None + :raises ArtifactManagerError: If the artifact update fails. + """ + update_artifact_url = f"{self.api_url}/artifacts/{artifact_id}" + + data = { + "authors": authors, + "description_short": description_short, + "description_long": description_long, + "project_uuid": project_id, + "title": artifact_title, + "visibility": str(visibility).lower() + } + if tags: + data["tags"] = tags + + response = requests.put(update_artifact_url, headers=self.headers, json=data) + self.raise_for_status(response=response) + + def upload_file_to_artifact(self, artifact_id: str, file_path: str, storage_type: str = "fabric", + storage_repo: str = "renci") -> dict: + """ + Uploads a file to an existing artifact in the FABRIC Testbed. + + :param artifact_id: The ID of the artifact to which the file will be uploaded. + :param file_path: The local path to the file that will be uploaded. + :param storage_type: The type of storage to use (default: 'fabric'). + :param storage_repo: The storage repository to use (default: 'renci'). + :return: The response from the API as a JSON object. + :raises ArtifactManagerError: If the file upload fails. + """ + upload_content_url = f"{self.api_url}/contents" + + headers = self.headers.copy() + headers.pop("Content-Type") + + # Prepare the multipart form data + files = { + 'file': (os.path.basename(file_path), open(file_path, 'rb'), 'application/gzip'), + 'data': (None, json.dumps({ + "artifact": artifact_id, + "storage_type": storage_type, + "storage_repo": storage_repo + }), 'application/json') + } + + response = requests.post(upload_content_url, headers=headers, files=files) + self.raise_for_status(response=response) + + return response.json() + + def list_artifacts(self, search: str = None) -> List[dict]: + """ + Lists all artifacts in the FABRIC Testbed artifact repository, with optional filtering by search term. + Fetches all pages of results. + + :param search: Optional search filter (e.g., tag, project name). + If provided, only artifacts matching the search filter will be returned. + :return: A list of artifacts as a list of JSON objects. + :raises ArtifactManagerError: If the listing of artifacts fails. + """ + list_url = f"{self.api_url}/artifacts" + + params = {} + if search: + params['search'] = search + + all_artifacts = [] + page = 1 + + while True: + params['page'] = page + response = requests.get(list_url, headers=self.headers, params=params) + self.raise_for_status(response=response) + + data = response.json() + all_artifacts.extend(data['results']) + + if not data.get('next'): + break + page += 1 + + return all_artifacts + + def get_artifact(self, artifact_id: str) -> dict: + """ + Retrieves the details of a specific artifact by its ID. + + :param artifact_id: The unique identifier of the artifact to retrieve. + :return: A dictionary representing the artifact's details. + :raises ArtifactManagerError: If the retrieval of the artifact fails. + """ + get_url = f"{self.api_url}/artifacts/{artifact_id}" + + response = requests.get(get_url, headers=self.headers) + self.raise_for_status(response=response) + + return response.json() + + def download_artifact(self, urn: str, version: str, download_dir: str) -> str: + """ + Downloads a specific artifact by its URN from the artifact repository and saves it in a version-specific subdirectory. + + :param urn: The unique identifier of the specific version of the artifact to download. + :param version: The version of the artifact to create a subdirectory for. + :param download_dir: The directory where the downloaded artifact will be saved. + :return: The file path of the downloaded artifact. + :raises ArtifactManagerError: If the download fails. + """ + # Construct the download URL + download_url = f"{self.api_url}/contents/download/{urn}" + + # Define headers to match the curl command + headers = { + 'accept': 'application/json' + } + + try: + response = requests.get(download_url, headers=headers, stream=True) + self.raise_for_status(response) # Raises HTTPError for bad responses + + # Extract the file name from headers or URL + content_disposition = response.headers.get('content-disposition', '') + file_name = content_disposition.split('filename=')[-1].strip( + '\"') if 'filename=' in content_disposition else 'artifact' + + # Remove all file extensions for directory name + base_name = file_name + while '.' in base_name: + base_name = os.path.splitext(base_name)[0] + + # Create directory structure + outer_dir = os.path.join(download_dir, base_name) + version_dir = os.path.join(outer_dir, version) + + os.makedirs(version_dir, exist_ok=True) + + # Define the file path in the version directory + file_path = os.path.join(version_dir, file_name) + + # Save the file to the specified directory + with open(file_path, 'wb') as file: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + file.write(chunk) + + print(f"Downloaded {urn} to {file_path}") + return file_path + except requests.HTTPError as e: + print(f"HTTPError: {e}") + raise ArtifactManagerError(f"HTTPError: {e}") + except Exception as e: + print(f"Exception: {e}") + raise ArtifactManagerError(f"Exception: {e}") + + def delete_artifact(self, artifact_id: str): + """ + Deletes a specific artifact by its ID from the artifact repository. + + :param artifact_id: The unique identifier of the artifact to delete. + :raises ArtifactManagerError: If the deletion fails. + """ + delete_url = f"{self.api_url}/artifacts/{artifact_id}" + + response = requests.delete(delete_url, headers=self.headers) + self.raise_for_status(response=response) + + def get_tags(self) -> List[str]: + """ + Retrieves all defined tags from the FABRIC Testbed artifact repository. + + :return: A list of all tags as strings. + :raises ArtifactManagerError: If the retrieval of tags fails. + """ + get_url = f"{self.api_url}/meta/tags" + + response = requests.get(get_url, headers=self.headers) + self.raise_for_status(response=response) + + return response.json().get("results") + + @staticmethod + def raise_for_status(response: requests.Response): + """ + Checks the response status and raises an ArtifactManagerError if the request was unsuccessful. + + :param response: The response object returned from the API request. + :raises ArtifactManagerError: If the response contains an HTTP error. + """ + try: + response.raise_for_status() + except requests.HTTPError as e: + try: + message = response.json() + except json.JSONDecodeError: + message = {"message": "Unknown error occurred while processing the request."} + + raise ArtifactManagerError(f"Error {response.status_code}: {e}. Message: {message}") diff --git a/fabrictestbed/fabric_manager.py b/fabrictestbed/fabric_manager.py new file mode 100644 index 0000000..73d055a --- /dev/null +++ b/fabrictestbed/fabric_manager.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import os +from typing import List, Tuple, Union + +from fabric_cf.orchestrator.orchestrator_proxy import Status + +from fabrictestbed.external_api.artifact_manager import ArtifactManager, Visibility +from fabrictestbed.util.utils import Utils + +from fabrictestbed.external_api.core_api import CoreApi + +from fabrictestbed.util.constants import Constants + +from fabrictestbed.slice_manager import SliceManager + + +class FabricManagerException(Exception): + """Custom exception for FabricManager-related errors.""" + pass + + +class FabricManager(SliceManager): + """ + FabricManager extends SliceManager to integrate capabilities for interacting with + the FABRIC testbed, including managing SSH keys, retrieving user/project information, + and handling artifacts. + + :param cm_host: Credential Manager host + :param oc_host: Orchestrator host + :param core_api_host: Core API host for user/project info + :param am_host: Artifact Manager host for managing artifacts + :param token_location: Location of the token file + :param project_id: Identifier for the project + :param scope: Scope of the API access (default is 'all') + :param initialize: Flag to initialize the SliceManager + :param project_name: Name of the project + :param auto_refresh: Flag to enable automatic token refresh + """ + def __init__(self, *, cm_host: str = None, oc_host: str = None, core_api_host: str = None, am_host: str = None, + token_location: str = None, project_id: str = None, scope: str = "all", initialize: bool = True, + project_name: str = None, auto_refresh: bool = True): + super().__init__(cm_host=cm_host, oc_host=oc_host, token_location=token_location, + project_id=project_id, scope=scope, initialize=initialize, project_name=project_name, + auto_refresh=auto_refresh) + if core_api_host is None: + core_api_host = os.environ.get(Constants.FABRIC_CORE_API_HOST) + + if core_api_host is None is None: + raise FabricManagerException(f"Invalid initialization parameters: oc_host: {oc_host}") + + self.core_api_host = core_api_host + + if am_host is None: + am_host = os.environ.get(Constants.FABRIC_AM_HOST) + + if am_host is None is None: + raise FabricManagerException(f"Invalid initialization parameters: am_host: {am_host}") + + self.am_host = am_host + + def get_ssh_keys(self, uuid: str = None, email: str = None) -> list: + """ + Retrieve SSH keys associated with a user. + + :param uuid: User's UUID + :param email: User's email address + :return: List of SSH keys + :raises FabricManagerException: If there is an error in retrieving SSH keys. + """ + try: + core_api_proxy = CoreApi(core_api_host=self.core_api_host, token=self.ensure_valid_token()) + return core_api_proxy.get_ssh_keys(uuid=uuid, email=email) + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def create_ssh_keys(self, key_type: str, description: str, comment: str = "ssh-key-via-api", + store_pubkey: bool = True) -> list: + """ + Create new SSH keys for a user. + + :param key_type: Type of the SSH key (e.g., 'sliver' or 'bastion') + :param description: Description of the key + :param comment: Comment to associate with the SSH key + :param store_pubkey: Flag indicating whether the public key should be stored + :return: List of SSH keys + :raises FabricManagerException: If there is an error in creating SSH keys. + """ + try: + core_api_proxy = CoreApi(core_api_host=self.core_api_host, token=self.ensure_valid_token()) + return core_api_proxy.create_ssh_keys(key_type=key_type, comment=comment, store_pubkey=store_pubkey, + description=description) + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def get_user_info(self, uuid: str = None, email: str = None) -> dict: + """ + Retrieve user information using Core API. + + :param uuid: User's UUID + :param email: User's email address + :return: Dictionary containing user information + :raises FabricManagerException: If there is an error in retrieving user information. + """ + try: + core_api_proxy = CoreApi(core_api_host=self.core_api_host, token=self.ensure_valid_token()) + return core_api_proxy.get_user_info(uuid=uuid, email=email) + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def get_project_info(self, project_name: str = "all", project_id: str = "all", uuid: str = None) -> list: + """ + Retrieve information about the user's projects. + + :param project_name: Name of the project (default is "all") + :param project_id: Identifier of the project (default is "all") + :param uuid: User's UUID + :return: List of projects + :raises FabricManagerException: If there is an error in retrieving project information. + """ + try: + core_api_proxy = CoreApi(core_api_host=self.core_api_host, token=self.ensure_valid_token()) + return core_api_proxy.get_user_projects(project_name=project_name, project_id=project_id, uuid=uuid) + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def get_metrics_overview(self, excluded_projects: List[str] = None, + authenticated: bool = False) -> Tuple[Status, Union[list, Exception]]: + """ + Retrieve an overview of metrics. + + :param excluded_projects: List of projects to exclude from the metrics + :param authenticated: Flag indicating whether to retrieve metrics for a specific user + :return: Tuple containing the status and a list of metrics or an exception + """ + try: + token = self.ensure_valid_token() if authenticated else None + return self.oc_proxy.get_metrics_overview(token=token, excluded_projects=excluded_projects) + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + return Status.FAILURE, FabricManagerException(error_message) + + def create_artifact(self, artifact_title: str, description_short: str, description_long: str, authors: List[str], + tags: List[str], visibility: Visibility = Visibility.Author, + update_existing: bool = True) -> dict: + """ + Create a new artifact or update an existing one. + + :param artifact_title: Title of the artifact + :param description_short: Short description of the artifact + :param description_long: Long description of the artifact + :param authors: List of authors associated with the artifact + :param tags: List of tags associated with the artifact + :param visibility: Visibility level of the artifact + :param update_existing: Flag indicating whether to update an existing artifact + :return: Dictionary containing the artifact details + :raises FabricManagerException: If there is an error in creating or updating the artifact. + """ + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + existing_artifacts = am_proxy.list_artifacts(search=artifact_title) + + artifact = None + if update_existing: + for e in existing_artifacts: + if self.project_id in e.get("project_uuid"): + artifact = e + break + + authors = [self.get_user_id()] if not authors else authors.append(self.get_user_id()) + + if not artifact: + artifact = am_proxy.create_artifact( + artifact_title=artifact_title, + description_short=description_short, + description_long=description_long, + tags=tags, + visibility=visibility, + authors=authors, + project_id=self.project_id + ) + else: + am_proxy.update_artifact(artifact_id=artifact.get("uuid"), + artifact_title=artifact_title, + description_short=description_short, + description_long=description_long, + tags=tags, + visibility=visibility, + authors=authors, project_id=self.project_id) + return artifact + + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def list_artifacts(self, search: str = None) -> list: + """ + List artifacts based on a search query. + + :param search: Search query for filtering artifacts + :return: List of artifacts + :raises FabricManagerException: If there is an error in listing the artifacts. + """ + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + return am_proxy.list_artifacts(search=search) + except Exception as e: + raise FabricManagerException(Utils.extract_error_message(exception=e)) + + def delete_artifact(self, artifact_id: str = None, artifact_title: str = None): + """ + Delete an artifact by its ID or title. + + This method deletes an artifact from the system. Either the `artifact_id` or `artifact_title` + must be provided to identify the artifact to be deleted. If `artifact_id` is not provided, + the method will search for the artifact using `artifact_title` and then delete it. + + :param artifact_id: The unique identifier of the artifact to be deleted. + :param artifact_title: The title of the artifact to be deleted. + :raises ValueError: If neither `artifact_id` nor `artifact_title` is provided. + :raises FabricManagerException: If an error occurs during the deletion process. + """ + if artifact_id is None and artifact_title is None: + raise ValueError("Either artifact_id or artifact_title must be specified!") + + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + if not artifact_id: + existing_artifacts = am_proxy.list_artifacts(search=artifact_title) + + artifact = None + for e in existing_artifacts: + if self.project_id in e.get("project_uuid"): + artifact = e + break + if artifact: + artifact_id = artifact.get("uuid") + + if artifact_id: + am_proxy.delete_artifact(artifact_id=artifact_id) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def get_artifact(self, artifact_id: str = None, artifact_title: str = None): + """ + Retrieve an artifact by its ID or title. + + This method retrieves an artifact from the system based on either its `artifact_id` or `artifact_title`. + If `artifact_id` is not provided, the method will search for the artifact using `artifact_title`. + + :param artifact_id: The unique identifier of the artifact to retrieve. + :param artifact_title: The title of the artifact to retrieve. + :return: A dictionary containing the artifact details. + :raises ValueError: If neither `artifact_id` nor `artifact_title` is provided. + :raises FabricManagerException: If an error occurs during the retrieval process. + """ + if artifact_id is None and artifact_title is None: + raise ValueError("Either artifact_id or artifact_title must be specified!") + + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + if not artifact_id: + existing_artifacts = am_proxy.list_artifacts(search=artifact_title) + + artifact = None + for e in existing_artifacts: + if self.project_id in e.get("project_uuid"): + artifact = e + break + return artifact + am_proxy.get_artifact(artifact_id=artifact_id) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def get_tags(self): + """ + Retrieve all tags associated with artifacts. + + This method returns a list of all tags that are associated with artifacts in the system. + Tags are useful for categorizing and searching for artifacts. + + :return: A list of tags. + :raises FabricManagerException: If an error occurs while retrieving the tags. + """ + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + return am_proxy.get_tags() + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def upload_file_to_artifact(self, file_to_upload: str, artifact_id: str = None, artifact_title: str = None) -> dict: + """ + Upload a file to an existing artifact. + + This method uploads a file to an artifact identified by either its `artifact_id` or `artifact_title`. + If `artifact_id` is not provided, the method will search for the artifact using `artifact_title` before uploading the file. + + :param file_to_upload: The path to the file that should be uploaded. + :param artifact_id: The unique identifier of the artifact to which the file will be uploaded. + :param artifact_title: The title of the artifact to which the file will be uploaded. + :return: A dictionary containing the details of the uploaded file. + :raises ValueError: If neither `artifact_id` nor `artifact_title` is provided. + :raises FabricManagerException: If an error occurs during the upload process. + """ + if artifact_id is None and artifact_title is None: + raise ValueError("Either artifact_id or artifact_title must be specified!") + + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + if not artifact_id: + artifact = self.get_artifact(artifact_id=artifact_id, artifact_title=artifact_title) + if artifact: + artifact_id = artifact.get(artifact.get("uuid")) + return am_proxy.upload_file_to_artifact(artifact_id=artifact_id, file_path=file_to_upload) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) + + def download_artifact(self, download_dir: str, artifact_id: str = None, artifact_title: str = None, + version: str = None): + """ + Download an artifact to a specified directory. + + This method downloads an artifact identified by either its `artifact_id` or `artifact_title` to the specified `download_dir`. + If `artifact_id` is not provided, the method will search for the artifact using `artifact_title`. + + :param download_dir: The directory where the artifact will be downloaded. + :param artifact_id: The unique identifier of the artifact to download. + :param artifact_title: The title of the artifact to download. + :param version: The specific version of the artifact to download (optional). + :return: The path to the downloaded artifact. + :raises ValueError: If neither `artifact_id` nor `artifact_title` is provided. + :raises FabricManagerException: If an error occurs during the download process. + """ + if artifact_id is None and artifact_title is None: + raise ValueError("Either artifact_id or artifact_title must be specified!") + try: + am_proxy = ArtifactManager(api_url=self.am_host, token=self.ensure_valid_token()) + artifact = self.get_artifact(artifact_id=artifact_id, artifact_title=artifact_title) + artifact_urn = None + for v in artifact.get("versions"): + if not version: + version = v.get("version") + artifact_urn = v.get("urn") + break + if version in v: + artifact_urn = v.get("urn") + break + if not artifact_urn: + raise ValueError(f"Requested version: {version} not found for the artifact: " + f"{artifact_id}/{artifact_title}!") + return am_proxy.download_artifact(urn=artifact_urn, download_dir=download_dir, version=version) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise FabricManagerException(error_message) diff --git a/fabrictestbed/token_manager/__init__.py b/fabrictestbed/token_manager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fabrictestbed/token_manager/token_manager.py b/fabrictestbed/token_manager/token_manager.py new file mode 100644 index 0000000..d0b49e7 --- /dev/null +++ b/fabrictestbed/token_manager/token_manager.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import json +import logging +import os +from abc import ABC +from datetime import datetime, timezone, timedelta +from typing import Tuple, List, Union, Any + +from fabric_cm.credmgr.credmgr_proxy import CredmgrProxy, Status, TokenType +from fabrictestbed.slice_manager import CmStatus + +from fabrictestbed.util.utils import Utils + +from fabrictestbed.util.constants import Constants + + +class TokenManagerException(Exception): + pass + + +class TokenManager(ABC): + def __init__(self, *, cm_host: str = None, token_location: str = None, project_id: str = None, scope: str = "all", + project_name: str = None, auto_refresh: bool = True, initialize: bool = True): + """ + Initialize a TokenManager instance. + + This constructor sets up the TokenManager with the necessary parameters for managing tokens, including + optional initialization of the manager. It also configures settings related to the project and scope. + + @param cm_host: (Optional) The host address of the credential manager. If not provided, it may be + retrieved from environment variables or other sources. + @param token_location: (Optional) The location of the token file. This is where the token is stored + or retrieved from. + @param project_id: (Optional) The ID of the project associated with the token. This can be used to + filter or manage tokens for a specific project. + @param scope: (Optional) The scope of the token's validity. Defaults to "all". It determines the + extent or range of access the token provides. + @param project_name: (Optional) The name of the project associated with the token. This can be used + to filter or manage tokens for a specific project. + @param auto_refresh: (Optional) A flag indicating whether the token should be automatically refreshed + when it expires. Defaults to True. + @param initialize: (Optional) A flag indicating whether the manager should be initialized upon + creation. Defaults to True. If set to False, initialization tasks are skipped. + + @return: None + """ + + self.auto_refresh = auto_refresh + self.logger = logging.getLogger() + self.initialized = False + if cm_host is None: + cm_host = os.environ.get(Constants.FABRIC_CREDMGR_HOST) + self.cm_proxy = CredmgrProxy(credmgr_host=cm_host) + self.token_location = token_location + self.tokens = {} + self.project_id = project_id + if self.project_id is None: + self.project_id = os.environ.get(Constants.FABRIC_PROJECT_ID) + self.project_name = project_name + if self.project_name is None: + self.project_name = os.environ.get(Constants.FABRIC_PROJECT_NAME) + self.scope = scope + if self.token_location is None: + self.token_location = os.environ.get(Constants.FABRIC_TOKEN_LOCATION) + + if cm_host is None or self.token_location is None: + raise TokenManagerException(f"Invalid initialization parameters: cm_host: {cm_host}, " + f"token_location: {self.token_location}") + + # Try to load the project_id or project_name from the Token + if project_id is None and project_name is None: + self._extract_project_and_user_info_from_token(cm_host=cm_host) + + # Validate the required parameters are set + if self.project_id is None and self.project_name is None: + raise TokenManagerException(f"Invalid initialization parameters: project_id={self.project_id}, " + f"project_name={self.project_name}") + + self.user_id = None + self.user_email = None + + if initialize: + self.initialize() + + def initialize(self): + """ + Initialize the Slice Manager object + - Load the tokens + - Refresh if needed + """ + if not self.initialized: + self._load_tokens() + self.initialized = True + + def _check_initialized(self): + """ + Check if Slice Manager has been initialized + @raises Exception if slice manager has been initialized + """ + if not self.initialized: + raise TokenManagerException("Fabric Client has not been initialized!") + + def get_refresh_token(self) -> str: + """ + Get Refresh Token + @return refresh token + """ + return self.tokens.get(CredmgrProxy.REFRESH_TOKEN, None) + + def get_id_token(self) -> str: + """ + Get Id token + @return id token + """ + return self.tokens.get(CredmgrProxy.ID_TOKEN, None) + + def set_token_location(self, *, token_location: str): + """ + Set token location: path of the file where tokens should be saved + @param token_location file name along with complete path where tokens should be stored + """ + self.token_location = token_location + + def _extract_project_and_user_info_from_token(self, cm_host: str): + """ + Extract project and user information from the identity token. + + This method determines the project ID, project name, user ID, and user email + by decoding the identity token, if these details are not explicitly provided. + + @param: cm_host (str): The hostname of the credential manager (CM) to be used for decoding the token. + + Notes: + - This method assumes that tokens have already been loaded. + - If project and user information is successfully extracted from the token, it will be stored in the + instance variables `project_id`, `project_name`, `user_id`, and `user_email`. + """ + self._load_tokens(refresh=False) + if self.get_id_token() is not None: + logging.info("Project Id/Name not specified, trying to determine it from the token") + decoded_token = Utils.decode_token(cm_host=cm_host, token=self.get_id_token()) + if decoded_token.get("projects") and len(decoded_token.get("projects")): + self.project_id = decoded_token.get("projects")[0].get("uuid") + self.project_name = decoded_token.get("projects")[0].get("name") + self.user_id = decoded_token.get("uuid") + self.user_email = decoded_token.get("email") + + def _load_tokens(self, refresh: bool = True): + """ + Load Fabric Tokens from the tokens.json if it exists + Otherwise, this is the first attempt, create the tokens and save them + @note this function is invoked when reloading the tokens to ensure tokens + from the token file are read instead of the local variables + """ + # Load the tokens from the JSON + if os.path.exists(self.token_location): + with open(self.token_location, 'r') as stream: + self.tokens = json.loads(stream.read()) + refresh_token = self.get_refresh_token() + else: + # First time login, use environment variable to load the tokens + refresh_token = os.environ.get(Constants.CILOGON_REFRESH_TOKEN) + # Renew the tokens to ensure any project_id changes are taken into account + if refresh and self.auto_refresh and refresh_token: + self.refresh_tokens(refresh_token=refresh_token) + + def _should_renew(self) -> bool: + """ + Check if tokens should be renewed + Returns true if tokens are at least 30 minutes old + @return true if tokens should be renewed; false otherwise + """ + self._check_initialized() + + id_token = self.get_id_token() + created_at = self.tokens.get(CredmgrProxy.CREATED_AT, None) + + created_at_time = datetime.strptime(created_at, CredmgrProxy.TIME_FORMAT) + now = datetime.now(timezone.utc) + + if id_token is None or now - created_at_time >= timedelta(minutes=180): + return True + + return False + + def create_token(self, scope: str = "all", project_id: str = None, project_name: str = None, file_name: str = None, + life_time_in_hours: int = 4, comment: str = "Created via API", + browser_name: str = "chrome") -> Tuple[Status, Union[dict, TokenManagerException]]: + """ + Create token + @param project_id: Project Id + @param project_name: Project Name + @param scope: scope + @param file_name: File name + @param life_time_in_hours: Token lifetime in hours + @param comment: comment associated with the token + @param browser_name: Browser name; allowed values: chrome, firefox, safari, edge + @returns Tuple of Status, token json or Exception + @raises Exception in case of failure + """ + try: + return self.cm_proxy.create(scope=scope, project_id=project_id, project_name=project_name, + file_name=file_name, life_time_in_hours=life_time_in_hours, comment=comment, + browser_name=browser_name) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + return Status.FAILURE, TokenManagerException(error_message) + + def refresh_tokens(self, *, refresh_token: str) -> Tuple[str, str]: + """ + Refresh tokens + User is expected to invoke refresh token API before invoking any other APIs to ensure the token is not expired. + User is also expected to update the returned refresh token in the JupyterHub environment. + @returns tuple of id token and refresh token + @note this exposes an API for the user to refresh tokens explicitly only. CredMgrProxy::refresh already + updates the refresh tokens to the token file atomically. + """ + try: + status, tokens = self.cm_proxy.refresh(project_id=self.project_id, scope=self.scope, + refresh_token=refresh_token, file_name=self.token_location, + project_name=self.project_name) + if status == CmStatus.OK: + self.tokens = tokens + return tokens.get(CredmgrProxy.ID_TOKEN, None), tokens.get(CredmgrProxy.REFRESH_TOKEN, None) + else: + error_message = Utils.extract_error_message(exception=tokens) + raise TokenManagerException(error_message) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + raise TokenManagerException(error_message) + + def revoke_token(self, *, refresh_token: str = None, id_token: str = None, token_hash: str = None, + token_type: TokenType = TokenType.Refresh) -> Tuple[Status, Any]: + """ + Revoke a refresh token + @param refresh_token Refresh Token to be revoked + @param id_token Identity Token + @param token_hash Token Hash + @param token_type type of the token being revoked + @return Tuple of the status and revoked refresh token + """ + if refresh_token is None: + refresh_token = self.get_refresh_token() + if id_token is None: + id_token = self.get_id_token() + if token_hash is None: + token_hash = Utils.generate_sha256(token=id_token) + + try: + return self.cm_proxy.revoke(refresh_token=refresh_token, identity_token=id_token, token_hash=token_hash, + token_type=token_type) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + return Status.FAILURE, TokenManagerException(error_message) + + def token_revoke_list(self, *, project_id: str) -> Tuple[Status, Union[TokenManagerException, List[str]]]: + """ + Get Token Revoke list for a project + @param project_id project_id + @return token revoke list + """ + try: + return self.cm_proxy.token_revoke_list(project_id=project_id) + except Exception as e: + error_message = Utils.extract_error_message(exception=e) + return Status.FAILURE, TokenManagerException(error_message) + + def clear_token_cache(self, *, file_name: str = None): + """ + Clear the cached token + Should be invoked when the user changes projects + @return: + """ + cache_file_name = file_name + if cache_file_name is None: + cache_file_name = self.token_location + status, exception = self.cm_proxy.clear_token_cache(file_name=cache_file_name) + if status == CmStatus.OK: + return Status.OK, None + return Status.FAILURE, f"Failed to clear token cache: {Utils.extract_error_message(exception=exception)}" + + def ensure_valid_token(self) -> str: + """ + Ensures the token is valid and renews it if required. + @return valid identity token + """ + if self._should_renew(): + self._load_tokens() + return self.get_id_token() + + def get_user_id(self) -> str: + """ + Retrieve the user ID associated with the current session. + + This method returns the user ID if it has already been determined. If the user ID + has not been set and an identity token is available, it will attempt to extract + the user ID by decoding the token using the credential manager proxy. + + @return: The user ID if available; otherwise, None. + """ + if not self.user_id and self.get_id_token() and self.cm_proxy: + self._extract_project_and_user_info_from_token(cm_host=self.cm_proxy.host) + return self.user_id + + def get_user_email(self) -> str: + """ + Retrieve the user email associated with the current session. + + This method returns the user email if it has already been determined. If the user email + has not been set and an identity token is available, it will attempt to extract + the user email by decoding the token using the credential manager proxy. + + @return: The user email if available; otherwise, None. + """ + if not self.user_email and self.get_id_token() and self.cm_proxy: + self._extract_project_and_user_info_from_token(cm_host=self.cm_proxy.host) + return self.user_email