From df295b65e37872e87849bf41335788dc6f45023a Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Mon, 26 Aug 2024 16:20:34 -0700 Subject: [PATCH] Use sfapi_client --- pyproject.toml | 14 + python/SuperfacilityAPI/README.md | 184 ---- python/SuperfacilityAPI/SuperfacilityAPI.py | 843 ------------------ .../SuperfacilityAccessToken.py | 152 ---- .../SuperfacilityAPI/SuperfacilityErrors.py | 54 -- python/SuperfacilityAPI/__init__.py | 3 - python/SuperfacilityAPI/api_version.py | 1 - python/SuperfacilityAPI/bin/sfapi | 309 ------- python/SuperfacilityAPI/nersc_systems.py | 32 - setup.py | 50 -- sfapi_tests.ipynb | 228 ----- src/SuperfacilityConnector/__init__.py | 1 + src/SuperfacilityConnector/sfapi.py | 255 ++++++ 13 files changed, 270 insertions(+), 1856 deletions(-) create mode 100644 pyproject.toml delete mode 100644 python/SuperfacilityAPI/README.md delete mode 100644 python/SuperfacilityAPI/SuperfacilityAPI.py delete mode 100644 python/SuperfacilityAPI/SuperfacilityAccessToken.py delete mode 100644 python/SuperfacilityAPI/SuperfacilityErrors.py delete mode 100644 python/SuperfacilityAPI/__init__.py delete mode 100644 python/SuperfacilityAPI/api_version.py delete mode 100644 python/SuperfacilityAPI/bin/sfapi delete mode 100644 python/SuperfacilityAPI/nersc_systems.py delete mode 100644 setup.py delete mode 100644 sfapi_tests.ipynb create mode 100644 src/SuperfacilityConnector/__init__.py create mode 100644 src/SuperfacilityConnector/sfapi.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..2fc50f8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "SuperfacilityConnector" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.10" +dependencies = ["ruff>=0.6.1", "click>=8.1.7", "sfapi-client>=0.1.0"] + +[project.scripts] +sfapi = "SuperfacilityConnector.sfapi:cli" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/python/SuperfacilityAPI/README.md b/python/SuperfacilityAPI/README.md deleted file mode 100644 index 1d2f1b4..0000000 --- a/python/SuperfacilityAPI/README.md +++ /dev/null @@ -1,184 +0,0 @@ -## SuperfacilityAPI - -### __init__ -``` - -SuperfacilityAPI - -Parameters ----------- -client_id : str, optional - Client ID obtained from iris, by default None -private_key : str, optional - Private key obtained from iris, by default None - -``` - -### delete_job -``` - -Removes job from queue - -Parameters ----------- -site : str, optional - Site to remove job from, by default 'cori' -jobid : int, optional - Jobid to remove, by default None - -Returns -------- -Dict - -``` - -### get_jobs -``` - -Used to get information about slurm jobs on a system - -Parameters ----------- -site : str, optional - NERSC site where slurm job is running, by default 'cori' -sacct : bool, optional - Whether to use sacct[true] or squeue[false], by default True -jobid : int, optional - Slurm job id to get information for, by default None -user : int, optional - Username to get information for, by default None - -Returns -------- -Dict - -``` - -### groups -``` - -Get the groups for your accout - -Returns -------- -Dict - -``` - -### ls -``` - -ls comand on a site - -Parameters ----------- -site : str, optional - Name of the site you want to ls at, by default 'cori' -remote_path : str, optional - Path on the system, by default None - -Returns -------- -Dict - -``` - -### post_job -``` - -Adds a new job to the queue - -Parameters ----------- -site : str, optional - Site to add job to, by default 'cori' -script : str, optional - Path or script to call sbatch on, by default None -isPath : bool, optional - Is the script a path on the site or a file, by default True - -Returns -------- -int - slurm jobid - -``` - -### projects -``` - -Get information about your projects - -Parameters ----------- -repo_name : str, optional - Get information about a specific project, by default None - -Returns -------- -Dict - -``` - -### roles -``` - -Get roles for your account - -Returns -------- -Dict - -``` - -### status -``` - -Gets status of NERSC systems - -Parameters ----------- -name : str, optional - Name of system to get the status for, by default None - Can be combined with notes/outages/planned to get detailed status -notes : bool, optional - Get notes on the status, by default False -outages : bool, optional - Get current outages, by default False -planned : bool, optional - Get planned outages, by default False -new : bool, optional - Get newest version of the status, by default False - -Returns -------- -Dict - -``` - -### system_names -``` - -Returns list of all systems at NERSC - -Returns -------- -List - -``` - -### tasks -``` - -Used to get SuperfacilityAPI tasks - -Parameters ----------- -task_id : int, optional - SuperfacilityAPI task number, by default None - -Returns -------- -Dict - -``` \ No newline at end of file diff --git a/python/SuperfacilityAPI/SuperfacilityAPI.py b/python/SuperfacilityAPI/SuperfacilityAPI.py deleted file mode 100644 index 768cd48..0000000 --- a/python/SuperfacilityAPI/SuperfacilityAPI.py +++ /dev/null @@ -1,843 +0,0 @@ -from typing import Dict, List -from authlib.integrations.requests_client import ( - OAuth2Session, - OAuthError -) -from authlib.oauth2.rfc7523 import PrivateKeyJWT -import requests -import sys -from time import sleep -from datetime import datetime -import json -import logging -from pathlib import Path -import urllib.parse -from . import SuperfacilityAccessToken - -# Configurations in differnt files -from .SuperfacilityErrors import ( - permissions_warning, - warning_fourOfour, - no_client, - FourOfourException, - InternalServerError, - NoClientException, - SuperfacilityCmdFailed, - SuperfacilitySiteDown, - ApiTokenError -) -from .api_version import API_VERSION -from .nersc_systems import ( - NERSC_DEFAULT_COMPUTE, - nersc_systems, - NerscCompute, - NerscFilesystems -) - -from enum import Flag, auto, Enum - -global HAVE_PANDAS -try: - import pandas as pd - HAVE_PANDAS = True -except ImportError: - HAVE_PANDAS = False - - -sacct_columns = ['account', 'admincomment', 'alloccpus', 'allocnodes', 'alloctres', 'associd', 'avecpu', - 'avecpufreq', 'avediskread', 'avediskwrite', 'avepages', 'averss', 'avevmsize', 'blockid', - 'cluster', 'comment', 'constraints', 'consumedenergy', 'consumedenergyraw', 'cputime', 'cputimeraw', - 'dbindex', 'derivedexitcode', 'elapsed', 'elapsedraw', 'eligible', 'end', 'exitcode', 'flags', - 'gid', 'group', 'jobid', 'jobidraw', 'jobname', 'layout', 'maxdiskread', 'maxdiskreadnode', - 'maxdiskreadtask', 'maxdiskwrite', 'maxdiskwritenode', 'maxdiskwritetask', 'maxpages', - 'maxpagesnode', 'maxpagestask', 'maxrss', 'maxrssnode', 'maxrsstask', 'maxvmsize', 'maxvmsizenode', - 'maxvmsizetask', 'mcslabel', 'mincpu', 'mincpunode', 'mincputask', 'ncpus', 'nnodes', - 'nodelist', 'ntasks', 'priority', 'partition', 'qos', 'qosraw', 'reason', 'reqcpufreq', - 'reqcpufreqmin', 'reqcpufreqmax', 'reqcpufreqgov', 'reqcpus', 'reqmem', 'reqnodes', 'reqtres', - 'reservation', 'reservationid', 'reserved', 'resvcpu', 'resvcpuraw', 'start', 'state', 'submit', - 'suspended', 'systemcpu', 'systemcomment', 'timelimit', 'timelimitraw', 'totalcpu', 'tresusageinave', - 'tresusageinmax', 'tresusageinmaxnode', 'tresusageinmaxtask', 'tresusageinmin', 'tresusageinminnode', - 'tresusageinmintask', 'tresusageintot', 'tresusageoutave', 'tresusageoutmax', 'tresusageoutmaxnode', - 'tresusageoutmaxtask', 'tresusageoutmin', 'tresusageoutminnode', 'tresusageoutmintask', - 'tresusageouttot', 'uid', 'user', 'usercpu', 'wckey', 'wckeyid', 'workdir', ] - -squeue_columns = ['account', 'tres_per_node', 'min_cpus', 'min_tmp_disk', 'end_time', 'features', 'group', - 'over_subscribe', 'jobid', 'name', 'comment', 'time_limit', 'min_memory', 'req_nodes', - 'command', 'priority', 'qos', 'reason', '', 'st', 'user', 'reservation', 'wckey', 'exc_nodes', - 'nice', 's:c:t', 'exec_host', 'cpus', 'nodes', 'dependency', 'array_job_id', 'sockets_per_node', - 'cores_per_socket', 'threads_per_core', 'array_task_id', 'time_left', 'time', 'nodelist', - 'contiguous', 'partition', 'nodelist(reason)', 'start_time', 'state', 'uid', 'submit_time', 'licenses', 'core_spec', 'schednodes', 'work_dir', ] - - -class NerscSystemState(Flag): - ACTIVE = auto() - DOWN = auto() - DEGRADED = auto() - MAINTNAINCE = auto() - UNKNOWN = auto() - - def __str__(self): - return f'{self.name.lower()}' - - -class SuperfacilityAPI: - _status = None - access_token = None - - def __init__(self, token=None, base_url=None): - """SuperfacilityAPI - - Parameters - ---------- - client_id : str, optional - Client ID obtained from iris, by default None - private_key : str, optional - Private key obtained from iris, by default None - """ - self.API_VERSION = API_VERSION - if base_url is None: - # Base url for sfapi requests - self.base_url = f'https://api.nersc.gov/api/v{self.API_VERSION}' - else: - self.base_url = base_url - self.headers = {'accept': 'application/json', - 'Content-Type': 'application/x-www-form-urlencoded'} - self.access_token = token - - def __generic_get(self, sub_url: str, header: Dict = None) -> Dict: - """PRIVATE: Used to make a GET request to the api given a fully qualified sub url. - - - Parameters - ---------- - sub_url : str - Url of the specific funtion to request. - - Returns - ------- - Dict - Dictionary given by requests.Responce.json() - """ - logging.debug(f"__generic_get {sub_url}") - json_resp = {} - if isinstance(self.access_token, str): - self.headers['Authorization'] = f'Bearer {self.access_token}' - elif isinstance(self.access_token, SuperfacilityAccessToken): - self.headers['Authorization'] = f'Bearer {self.access_token.token}' - else: - raise PermissionError("No Token Provided") - - try: - logging.debug( - f"Getting from {self.base_url+sub_url}") - # Perform a get request - resp = requests.get( - self.base_url+sub_url, headers=self.headers if header is None else header) - - status = resp.status_code - # Raise error based on reposnce status [200 OK] [500 err] - resp.raise_for_status() - json_resp = resp.json() - except requests.exceptions.HTTPError as err: - if status == 404: - logging.warning(warning_fourOfour.format( - self.base_url+sub_url)) - raise FourOfourException( - f"404 not found {self.base_url+sub_url}") - elif status == 500: - logging.warning(f"500 Internal Server Error {err}") - raise InternalServerError(f"500 Internal Server Error {err}") - elif status == 403: - logging.warning( - f"The security token included in the request is invalid. {err}") - raise ApiTokenError( - f"The security token included in the request is invalid. {err}") - except requests.exceptions.TooManyRedirects as err: - logging.warning(f"TooManyRedirects {err}") - raise InternalServerError(f"TooManyRedirects {err}") - - return json_resp - - def __generic_post(self, sub_url: str, header: Dict = None, data: Dict = None) -> Dict: - """PRIVATE: Used to make a POST request to the api given a fully qualified sub url. - - - Parameters - ---------- - sub_url : str - Url of the specific funtion to request. - - Returns - ------- - Dict - Dictionary given by requests.Responce.json() - """ - logging.debug(f"__generic_post {sub_url}") - json_resp = {} - - if isinstance(self.access_token, str): - self.headers['Authorization'] = f'Bearer {self.access_token}' - elif isinstance(self.access_token, SuperfacilityAccessToken): - self.headers['Authorization'] = f'Bearer {self.access_token.token}' - else: - raise PermissionError("No Token Provided") - - try: - logging.debug( - f"Sending {data} to {self.base_url+sub_url}") - # Perform a get request - resp = requests.post( - self.base_url+sub_url, - headers=self.headers if header is None else header, - data="" if data is None else urllib.parse.urlencode(data)) - status = resp.status_code - # Raise error based on reposnce status [200 OK] [500 err] - resp.raise_for_status() - except requests.exceptions.HTTPError as err: - if status == 404: - logging.warning(warning_fourOfour.format( - self.base_url+sub_url)) - raise FourOfourException( - f"404 not found {self.base_url+sub_url}") - elif status == 403: - logging.warning( - f"The security token included in the request is invalid. {err}") - raise ApiTokenError( - f"The security token included in the request is invalid. {err}") - elif status == 500: - if self.access_token is None: - logging.warning(no_client) - raise NoClientException(no_client) - logging.warning(f"500 Internal Server Error") - raise InternalServerError(f"500 Internal Server Error") - - json_resp = resp.json() - return json_resp - - def __generic_delete(self, sub_url: str, header: Dict = None) -> Dict: - """PRIVATE: Used to make a DELETE request to the api given a fully qualified sub url. - - - Parameters - ---------- - sub_url : str - Url of the specific funtion to request. - - Returns - ------- - Dict - Dictionary given by requests.Responce.json() - """ - logging.debug(f"__generic_delete {sub_url}") - json_resp = {} - - if isinstance(self.access_token, str): - self.headers['Authorization'] = f'Bearer {self.access_token}' - elif isinstance(self.access_token, SuperfacilityAccessToken): - self.headers['Authorization'] = f'Bearer {self.access_token.token}' - else: - raise PermissionError("No Token Provided") - - try: - # Perform a get request - resp = requests.delete( - self.base_url+sub_url, - headers=self.headers if header is None else header) - status = resp.status_code - # Raise error based on reposnce status [200 OK] [500 err] - resp.raise_for_status() - except requests.exceptions.HTTPError as err: - if status == 404: - logging.warning(warning_fourOfour.format( - self.base_url+sub_url)) - raise FourOfourException( - f"404 not found {self.base_url+sub_url}") - elif status == 403: - logging.warning( - f"The security token included in the request is invalid. {err}") - raise ApiTokenError( - f"The security token included in the request is invalid. {err}") - elif status == 500: - if self.access_token is None: - logging.warning(no_client) - raise NoClientException(no_client) - - logging.warning(f"500 Internal Server Error") - raise InternalServerError(f"500 Internal Server Error") - - json_resp = resp.json() - return json_resp - - def __get_system_status(self) -> None: - """Gets the system status and all systems and stores them. - """ - logging.debug("Getting full status") - self._status = self.__generic_get('/status/') - logging.debug(f"Putting {self._status} into the systems") - self.systems = [system['name'] for system in self._status] - - def system_names(self) -> List: - """Returns list of all systems at NERSC - - Returns - ------- - List - """ - self.__get_system_status() - return self.systems - - def status(self, name: str = None, notes: bool = False, - outages: bool = False, planned: bool = False, - new: bool = False) -> Dict: - """Gets status of NERSC systems - - Parameters - ---------- - name : str, optional - Name of system to get the status for, by default None - Can be combined with notes/outages/planned to get detailed status - notes : bool, optional - Get notes on the status, by default False - outages : bool, optional - Get current outages, by default False - planned : bool, optional - Get planned outages, by default False - new : bool, optional - Get newest version of the status, by default False - - Returns - ------- - Dict - """ - sub_url = '/status' - if notes: - sub_url = '/status/notes' - - if outages: - sub_url = '/status/outages' - - if planned: - sub_url = '/status/outages/planned' - - if name is not None and name in nersc_systems: - sub_url = f'{sub_url}/{name}' - - if name == "muller": - return {'name': 'muller', 'full_name': 'muller', 'description': 'System is active', - 'system_type': 'compute', 'notes': [], 'status': 'active', 'updated_at': 'never'} - - if sub_url == '/status' and not new: - if self._status is None: - self.__get_system_status() - return self._status - - return self.__generic_get(sub_url) - - def system_status(self, name: str = "perlmutter"): - """system_status - - Args: - name (str, optional): Name of the system to check status. Defaults to "perlmutter". - - Returns: - NerscSystemState: State of the system as an enum - """ - # Default to unknown state - state = NerscSystemState.UNKNOWN - # Call the status command to get current status - data = self.status(name=name) - # If there's an error return unknown state - if not isinstance(data, dict): - return state - - # Active comes up for up and degraded so we split them based on descrition - if data['status'] == 'active': - return NerscSystemState.ACTIVE - elif data['status'] == 'degraded': - return NerscSystemState.DEGRADED - else: - state = NerscSystemState.DOWN - if data['description'] == "Scheduled Maintenance": - state = NerscSystemState.MAINTNAINCE - - return state - - def check_status(self, name: str = "perlmutter"): - """Check Status - - Args: - name (str, optional): Name to get status od. Defaults to "perlmutter". - - Returns: - bool: Gives bool value if site is up/down, true/false - """ - # Get status enum - current_status = self.system_status(name=name) - - down = (NerscSystemState.DOWN | NerscSystemState.MAINTNAINCE | - NerscSystemState.UNKNOWN) - # Check if status is any of the down states and return false - if current_status in down: - logging.debug(f"{name} is {current_status}") - return False - - return True - - def ls(self, remote_path: str, site: str = NERSC_DEFAULT_COMPUTE) -> Dict: - """ls comand on a site - - Parameters - ---------- - site : str, optional - Name of the site you want to ls at, by default NERSC_DEFAULT_COMPUTE - remote_path : str, optional - Path on the system, by default None - - Returns - ------- - Dict - """ - if remote_path is None: - return None - - sub_url = f'/utilities/ls' - path = remote_path.replace("/", "%2F") - - sub_url = f'{sub_url}/{site}/{path}' - - return self.__generic_get(sub_url) - - def projects(self) -> Dict: - """Get information about your projects - - Parameters - ---------- - repo_name : str, optional - Get information about a specific project, by default None - - Returns - ------- - Dict - """ - - sub_url = '/account/projects' - - return self.__generic_get(sub_url) - - def get_groups(self, groups: str = None) -> Dict: - """Get information about your groups - - Parameters - ---------- - repo_name : str, optional - Get information about a specific project, by default None - - Returns - ------- - Dict - """ - - sub_url = '/account/groups' - if groups is not None: - sub_url = f'/account/groups/{groups}' - - return self.__generic_get(sub_url) - - def create_groups(self, name: str = "", repo_name: str = ""): - """Create new groups - - Parameters - ---------- - repo_name : str, optional - Get information about a specific project, by default None - - Returns - ------- - Dict - """ - - sub_url = '/account/groups' - - data = {"name": name, "repo_name": repo_name} - - return self.__generic_post(sub_url, data=data) - - def roles(self, ) -> Dict: - """Get roles for your account - - Returns - ------- - Dict - """ - sub_url = '/account/roles' - - return self.__generic_get(sub_url) - - def tasks(self, task_id: int = None) -> Dict: - """Used to get SuperfacilityAPI tasks - - Parameters - ---------- - task_id : int, optional - SuperfacilityAPI task number, by default None - - Returns - ------- - Dict - """ - sub_url = '/tasks' - if task_id is not None: - sub_url = f'{sub_url}/{task_id}' - - return self.__generic_get(sub_url) - - def get_jobs(self, site: str = NERSC_DEFAULT_COMPUTE, sacct: bool = True, - jobid: int = None, user: str = None, partition: str = None) -> Dict: - """Used to get information about slurm jobs on a system - - Parameters - ---------- - site : str, optional - NERSC site where slurm job is running, by default NERSC_DEFAULT_COMPUTE - sacct : bool, optional - Whether to use sacct[true] or squeue[false], by default True - jobid : int, optional - Slurm job id to get information for, by default None - user : int, optional - Username to get information for, by default None - - Returns - ------- - Dict - - """ - - if site not in NerscCompute: - return {'status': "", 'output': [], 'error': ""} - - if not self.check_status(name=site): - logging.debug(site) - raise SuperfacilitySiteDown( - f'{site} is down, Reason: {self.system_status(name=site)}') - # return {'status': "", 'output': [], 'error': ""} - - sub_url = f'/compute/jobs/{site}' - if jobid is not None: - sub_url = f'{sub_url}/{jobid}' - - sub_url = f'{sub_url}?sacct={"true" if sacct else "false"}' - - if user is not None: - sub_url = f'{sub_url}&kwargs=user%3D{user}' - elif partition is not None: - sub_url = f'{sub_url}&kwargs=partition%3D{partition}' - - return self.__generic_get(sub_url) - - def squeue(self, - site: str = NERSC_DEFAULT_COMPUTE, - jobid: int = None, - user: str = None, - partition: str = None, - dataframe: bool = False): - """squeue - - Returns similar information as squeue command line - - Args: - site (str, optional): _description_. Defaults to NERSC_DEFAULT_COMPUTE. - sacct (bool, optional): _description_. Defaults to True. - jobid (int, optional): _description_. Defaults to None. - user (str, optional): _description_. Defaults to None. - partition (str, optional): _description_. Defaults to None. - """ - - jobs = self.get_jobs(site=site, - jobid=jobid, - user=user, - partition=partition, - sacct=False) - if 'output' in jobs: - jobs = jobs['output'] - - if dataframe and HAVE_PANDAS: - if len(jobs) == 0: - return pd.DataFrame(columns=squeue_columns) - else: - return pd.DataFrame(jobs) - - return jobs - - def sacct(self, - site: str = NERSC_DEFAULT_COMPUTE, - jobid: int = None, - user: str = None, - partition: str = None, - dataframe: bool = False): - """sacct - - Returns similar information as sacct command line - - Args: - site (str, optional): _description_. Defaults to NERSC_DEFAULT_COMPUTE. - sacct (bool, optional): _description_. Defaults to True. - jobid (int, optional): _description_. Defaults to None. - user (str, optional): _description_. Defaults to None. - partition (str, optional): _description_. Defaults to None. - """ - - jobs = self.get_jobs(site=site, - jobid=jobid, - user=user, - partition=partition, - sacct=True) - if 'output' in jobs: - jobs = jobs['output'] - - if dataframe and HAVE_PANDAS: - if len(jobs) == 0: - return pd.DataFrame(columns=sacct_columns) - else: - return pd.DataFrame(jobs) - - return jobs - - def post_job(self, site: str = NERSC_DEFAULT_COMPUTE, - script: str = None, isPath: bool = True, - run_async: bool = False, - timeout: int = 30, - sleeptime: int = 2) -> int: - """Adds a new job to the queue - - Parameters - ---------- - site : str, optional - Site to add job to, by default NERSC_DEFAULT_COMPUTE - script : str, optional - Path or script to call sbatch on, by default None - isPath : bool, optional - Is the script a path on the site or a file, by default True - - Returns - ------- - int - slurm jobid - """ - - job_info = {'error': None, 'jobid': None, 'task_id': None} - - if site not in NerscCompute: - job_info['error'] = 'not a compute site' - return job_info - - if not self.check_status(name=site): - logging.debug(site) - raise SuperfacilitySiteDown( - f'{site} is down, Reason: {self.system_status(name=site)}') - - sub_url = f'/compute/jobs/{site}' - script.replace("/", "%2F") - is_path = 'true' if isPath else 'false' - data = {'job': script, 'isPath': is_path} - resp = self.__generic_post(sub_url, data=data) - - logging.debug("Submitted new job, wating for responce.") - if resp == None: - return {'error': -1, 'jobid': None, 'task_id': None} - - task_id = resp['task_id'] - job_info['task_id'] = task_id - if run_async: - logging.debug(task_id) - logging.debug(job_info) - return job_info - - # Waits (up to {timeout} seconds) for the job to be submited before returning - for i in range(timeout): - if i > 0: - sleep(sleeptime) - - logging.debug(f"Checking {i} ...") - task = self.tasks(resp['task_id']) - logging.debug(f"task = {task}") - if task is not None and task['status'] == 'completed': - jobinfo = json.loads(task['result']) - return { - 'error': jobinfo['error'], - 'jobid': jobinfo['jobid'], - 'task_id': task_id - } - - return job_info - - def sbatch(self, site: str = NERSC_DEFAULT_COMPUTE, - script: str = None, isPath: bool = True) -> int: - """Adds a new job to the queue like sbatch - - Parameters - ---------- - site : str, optional - Site to add job to, by default NERSC_DEFAULT_COMPUTE - script : str, optional - Path or script to call sbatch on, by default None - isPath : bool, optional - Is the script a path on the site or a file, by default True - - Returns - ------- - int - slurm jobid - """ - # We can check if the path is on the nersc system - if isPath: - out = self.ls(script) - if out['status'] == "ERROR": - raise FileNotFoundError(f"{script} Not found on {site}") - # Then see if it's a path on the current system - elif Path(script).exists(): - logging.debug( - f"Looks like the script is a path, opending {script}") - with open(Path(script)) as contents: - script = contents.read() - else: - logging.debug(f"Looks like the script is a string {script}") - - job_output = self.post_job(site=site, script=script, isPath=isPath) - - return job_output['jobid'] - - def delete_job(self, site: str = NERSC_DEFAULT_COMPUTE, jobid: int = None) -> Dict: - """Removes job from queue - - Parameters - ---------- - site : str, optional - Site to remove job from, by default NERSC_DEFAULT_COMPUTE - jobid : int, optional - Jobid to remove, by default None - - Returns - ------- - Dict - """ - if site not in NerscCompute: - return None - - down = NerscSystemState.DOWN | NerscSystemState.MAINTNAINCE | NerscSystemState.UNKNOWN - current_status = self.system_status() - if current_status is down: - logging.debug( - f"System is {current_status}, job cannot check jobs") - return None - - sub_url = f'/compute/jobs/{site}/{jobid}' - logging.debug(f"Calling {sub_url}") - - return self.__generic_delete(sub_url) - - def scancel(self, jobid: int, site: str = NERSC_DEFAULT_COMPUTE) -> bool: - """Removes job from queue - - Parameters - ---------- - jobid : int - Jobid to remove - site : str, optional - Site to remove job from, by default NERSC_DEFAULT_COMPUTE - - - Returns - ------- - bool - """ - del_job = self.delete_job(site=site, jobid=jobid) - return (del_job['status'] == 'OK') - - def custom_cmd(self, - run_async: bool = False, - site: str = NERSC_DEFAULT_COMPUTE, cmd: str = None, - timeout: int = 30, sleeptime: int = 2) -> Dict: - """Run custom command - - Parameters - ---------- - site : str, optional - Site to remove job from, by default NERSC_DEFAULT_COMPUTE - cmd: str, - Command to run - - Returns - ------- - Dict - """ - if site not in NerscCompute: - return None - sub_url = f'/utilities/command/{site}' - - data = {'executable': cmd} - - resp = self.__generic_post(sub_url, data=data) - logging.debug("Submitted new job, wating for responce.") - logging.debug(f"{resp}") - if resp == None: - return {'error': -1, 'task_id': None} - - task_id = resp['task_id'] - - # If we want the call async just return task_id - if run_async: - return {'error': None, 'task_id': task_id} - - # Waits (up to {timeout} seconds) for the job to be submited before returning - for i in range(timeout): - if i > 0: - sleep(sleeptime) - task = self.tasks(resp['task_id']) - if isinstance(task, dict) and task['status'] == 'completed': - return json.loads(task['result']) - sleep(sleeptime) - - try: - # Gives back error if something went wrong - task = self.tasks(resp['task_id']) - ret = json.loads(task['result']) - ret['task_id'] = task_id - return ret - except TypeError as e: - logging.warning(f"{type(e).__name__} : {e}") - return {'jobid': f"{type(e).__name__} : {e}"} - - ################## In Progress ####################### - def download(self, - site: str = NERSC_DEFAULT_COMPUTE, remote_path: str = None, - binary: bool = False, local_path: str = '.', save: bool = False) -> Dict: - - if site is None: - raise SuperfacilityCmdFailed("Need site to download from") - if remote_path is None: - raise SuperfacilityCmdFailed("Need a remote path to download") - - if site not in ['perlmutter', 'cori']: - raise SuperfacilityCmdFailed(f"Cannot download from {site}") - - sub_url = '/utilities/download' - file_name = f'{local_path}/{remote_path.split("/")[-1]}' - path = remote_path.replace("/", "%2F") - - sub_url = f'{sub_url}/{site}/{path}' - - if binary: - sub_url = f'{sub_url}?binary=true' - - res = self.__generic_get(sub_url) - if res is not None: - if res['error'] is None: - if save: - with open(file_name, "wb") as f: - byte = bytes(res['file'], 'utf8') - f.write(byte) - return res - else: - return res - else: - return res diff --git a/python/SuperfacilityAPI/SuperfacilityAccessToken.py b/python/SuperfacilityAPI/SuperfacilityAccessToken.py deleted file mode 100644 index ce458bf..0000000 --- a/python/SuperfacilityAPI/SuperfacilityAccessToken.py +++ /dev/null @@ -1,152 +0,0 @@ -from authlib.integrations.requests_client import ( - OAuth2Session, - OAuthError -) -from authlib.oauth2.rfc7523 import PrivateKeyJWT -import sys -from datetime import datetime -from pathlib import Path -import requests -import logging -import os - - -iris_instructions = """ -Go to https://iris.nersc.gov - -Click `Profile` - -Scroll to `Superfacility API Clients` - -`+ New Client` - -Copy the private key. - -Your current IP address is {} -""" - - -class SuperfacilityAccessToken: - client_id = None - private_key = None - key_path = None - session = None - - def __init__(self, name: str = None, - client_id: str = None, - private_key: str = None, - key_path: str = None): - """SuperfacilityAPI - - Parameters - ---------- - client_id : str, optional - Client ID obtained from iris, by default None - private_key : str, optional - Private key obtained from iris, by default None - """ - # TODO: Check a better way to store these, esspecially private key - if client_id is not None and private_key is not None: - self.client_id = client_id - self.private_key = private_key - elif key_path is not None and Path(key_path).exists(): - self.key_path = key_path - elif Path.joinpath(Path.home(), ".superfacility").exists(): - if name is not None: - self.key_path = list(Path.joinpath( - Path.home(), ".superfacility").glob(f"{name}*.pem"))[0] - elif client_id is not None: - self.client_id = client_id - self.key_path = Path.joinpath( - Path.home(), f".superfacility/{client_id}.pem") - else: - self.key_path = list(Path.joinpath( - Path.home(), ".superfacility").glob("*.pem"))[0] - - # Create an access token in the __renew_toekn function - self.access_token = None - self.__token_lifetime = datetime.now() - self.__renew_token() - - @staticmethod - def save_token(tag: str = "sfapi"): - sfdir = Path.joinpath(Path.home(), ".superfacility") - sfdir.mkdir(exist_ok=True) - - ipadder = requests.get("https://ifconfig.me/ip") - - # Sorry for the weird one liner! - O_24 = '.'.join(ipadder.text.split('.')[:-1]) + '.0/24' - print(iris_instructions.format(O_24)) - - client_id = input("Enter client id: ") - key_name = sfdir / f"{tag}-{client_id}.pem" - editor = os.getenv("EDITOR", "vim") - os.system(f'{editor} {key_name}') - try: - key_name.chmod(0o600) - except FileNotFoundError: - print("No key info entered") - - @property - def token(self): - logging.debug( - f"Token lifetime {(datetime.now()-self.__token_lifetime).seconds}") - if ((datetime.now()-self.__token_lifetime).seconds) > 500: - logging.debug( - f"Token lifetime {(self.__token_lifetime - datetime.now()).seconds} renewing token") - self.__renew_token() - - if self.session is not None: - self.access_token = self.session.fetch_token()['access_token'] - - return self.access_token - - def __check_file_and_open(self) -> str: - contents = None - if self.key_path.is_file(): - with open(self.key_path.absolute()) as f: - contents = f.read() - return contents - - def __renew_token(self): - # Create access token from client_id/private_key - token_url = "https://oidc.nersc.gov/c2id/token" - logging.debug(f"{self.__token_lifetime - datetime.now()}") - self.__token_lifetime = datetime.now() - - if self.client_id is None: - logging.debug("Getting client_id from file path") - cid = self.key_path.stem.split('-')[-1] - else: - cid = self.client_id - - logging.debug(f"Getting token for {cid}") - - if self.key_path is not None: - logging.debug( - f"Getting private key from file path {self.key_path}") - pkey = self.__check_file_and_open() - elif self.private_key is not None: - pkey = self.private_key - logging.debug( - f"Private key provided as string") - else: - # If no private key don't look for getting a token - return None - - self.session = OAuth2Session( - cid, # client_id - pkey, # client_secret - PrivateKeyJWT(token_url), # authorization_endpoint - grant_type="client_credentials", - token_endpoint=token_url # token_endpoint - ) - - # Get's the access token - try: - self.access_token = self.session.fetch_token()['access_token'] - except OAuthError as e: - logging.debug( - f"Oauth error {e}\nMake sure your api key is still active in iris.nersc.gov") - return None diff --git a/python/SuperfacilityAPI/SuperfacilityErrors.py b/python/SuperfacilityAPI/SuperfacilityErrors.py deleted file mode 100644 index dffc976..0000000 --- a/python/SuperfacilityAPI/SuperfacilityErrors.py +++ /dev/null @@ -1,54 +0,0 @@ -permissions_warning = """ -This may be caused by a permissions error. - -Check in iris that your key is correct and still active. - -iris.nersc.gov > Profile > Superfacility API Clients -""" - -no_client = """ -Make sure you provided your client ID and private key properly. - -sfapi = SuperfacilityAPI(client_id, private_key) -""" - -warning_fourOfour = """ -############################# -404 Error. Webpage not found! - -{} - -############################# -""" - - -class SuperfacilityError(Exception): - pass - - -class FourOfourException(SuperfacilityError): - pass - - -class InternalServerError(SuperfacilityError): - pass - - -class ApiTokenError(SuperfacilityError): - pass - - -class NoClientException(SuperfacilityError): - pass - - -class PermissionsException(SuperfacilityError): - pass - - -class SuperfacilityCmdFailed(SuperfacilityError): - pass - - -class SuperfacilitySiteDown(SuperfacilityError): - pass diff --git a/python/SuperfacilityAPI/__init__.py b/python/SuperfacilityAPI/__init__.py deleted file mode 100644 index 9b5dbe3..0000000 --- a/python/SuperfacilityAPI/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ - -from .SuperfacilityAccessToken import SuperfacilityAccessToken -from .SuperfacilityAPI import SuperfacilityAPI diff --git a/python/SuperfacilityAPI/api_version.py b/python/SuperfacilityAPI/api_version.py deleted file mode 100644 index 6645c01..0000000 --- a/python/SuperfacilityAPI/api_version.py +++ /dev/null @@ -1 +0,0 @@ -API_VERSION = 1.2 diff --git a/python/SuperfacilityAPI/bin/sfapi b/python/SuperfacilityAPI/bin/sfapi deleted file mode 100644 index de7d00f..0000000 --- a/python/SuperfacilityAPI/bin/sfapi +++ /dev/null @@ -1,309 +0,0 @@ -#!/usr/bin/env python3 - -import sys -from unicodedata import name -from SuperfacilityAPI import ( - SuperfacilityAPI, - SuperfacilityAccessToken, - SuperfacilityErrors -) -from SuperfacilityAPI.nersc_systems import NERSC_DEFAULT_COMPUTE - -import click -from pathlib import Path -import json -import logging - -runasync = False - - -def click_json(*args, **kwargs): - click.echo(json.dumps(*args)) - - -def check_file_and_open(file_path: str = "") -> str: - contents = None - pth = Path(file_path) - if pth.is_file(): - with open(pth.absolute()) as f: - contents = f.read() - return contents - - -@click.group() -@click.option('--clientid', '-id', default=None, - help='Client ID for your key. Can be used to specify which key to look for in $HOME/.superfacility.') -@click.option('--client', '-c', default=None, - help='Client ID for your key. Can be used to specify which key to look for in $HOME/.superfacility.') -@click.option('--debug', '-d', is_flag=True, default=False, - help='Print debug messages from sfapi and SuperfacilityConnector') -@click.option('--sync', is_flag=True, default=False, help='Run async') -@click.pass_context -def cli(ctx, client, clientid, debug, sync): - # Entrypoint for all the cli subcommands - # Basically an __init__ function that sets up the sfapi - ctx.ensure_object(dict) - if debug: - logging.basicConfig(encoding='utf-8', level=logging.DEBUG) - else: - logging.basicConfig(encoding='utf-8', level=logging.ERROR) - - if sync: - runasync = True - - try: - access_token = SuperfacilityAccessToken( - name=client, client_id=clientid) - sfapi = SuperfacilityAPI(token=access_token.token) - except: - sfapi = SuperfacilityAPI() - - ctx.obj['sfapi'] = sfapi - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.pass_context -def status(ctx, site): - sfapi = ctx.obj['sfapi'] - - if site in ['compute', 'computes']: - site = 'cori,perlmutter' - elif site in ['filesystem', 'filesystems']: - site = 'dna,dtns,global_homes,projectb,global_common,community_filesystem' - elif site in ['login', 'logins']: - site = 'cori,perlmutter,jupyter,dtns' - try: - if site == 'all': - ret = sfapi.status(None) - else: - ret = [sfapi.status(site) for site in site.split(",")] - - click_json(ret) - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.pass_context -def outages(ctx, site): - sfapi = ctx.obj['sfapi'] - - if site in ['compute', 'computes']: - site = 'cori,perlmutter' - elif site in ['filesystem', 'filesystems']: - site = 'dna,dtns,global_homes,projectb,global_common,community_filesystem' - elif site in ['login', 'logins']: - site = 'cori,perlmutter,jupyter,dtns' - - try: - if site == 'all': - ret = sfapi.status(None, outages=True) - else: - ret = [sfapi.status(site, outages=True) - for site in site.split(",")] - - click_json(ret) - except SuperfacilityErrors.InternalServerError as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.pass_context -def system_status(ctx, site): - sfapi = ctx.obj['sfapi'] - status_code = sfapi.system_status(name=site) - - output = f"{site}@NERSC is currently {status_code}" - click.echo(output) - - -@cli.command() -@click.pass_context -def token(ctx): - sfapi = ctx.obj['sfapi'] - click.echo(sfapi.access_token) - - -@cli.command() -@click.pass_context -def systems(ctx): - sfapi = ctx.obj['sfapi'] - click_json(sfapi.system_names()) - - -@cli.command() -@click.pass_context -def roles(ctx): - sfapi = ctx.obj['sfapi'] - ret = sfapi.roles() - try: - ret = [{k: oj[k] for k in ['repo_name', 'id', - 'iris_role', 'description']} for oj in ret] - click_json(ret) - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.pass_context -def projects(ctx): - sfapi = ctx.obj['sfapi'] - - ret = sfapi.projects() - click_json(ret) - - -@cli.command() -@click.option('--group', '-g', default=None, help='NERSC Group') -@click.pass_context -def group(ctx, group): - sfapi = ctx.obj['sfapi'] - - ret = sfapi.get_groups(groups=group) - - click_json(ret) - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') -@click.option('--local', '-l', default=None, help='Path to local file to submit.') -@click.pass_context -def sbatch(ctx, site, path, local): - sfapi = ctx.obj['sfapi'] - script = None - isPath = False - - if path is not None: - isPath = True - script = path - elif local is not None: - script = check_file_and_open(local) - - ret = sfapi.post_job(site=site, script=script, - isPath=isPath, run_async=runasync) - if runasync: - click_json(ret) - else: - try: - click.echo(ret['jobid']) - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') -@click.pass_context -def ls(ctx, site, path): - sfapi = ctx.obj['sfapi'] - - ret = sfapi.ls(site=site, remote_path=path) - try: - click_json(ret['entries']) - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') -@click.pass_context -def cat(ctx, site, path): - sfapi = ctx.obj['sfapi'] - - ret = sfapi.download(site=site, remote_path=path, save=False) - logging.debug(ret) - - try: - click.echo_via_pager(ret['file']) - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - - -@cli.command() -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.option('--sacct/--no-sacct', default=False) -@click.option('--user', '-u', default=None, help='User to to get queue info for.') -@click.option('--jobid', '-j', default=None, help='Specific jobid to get queue info for.') -@click.pass_context -def squeue(ctx, site, sacct, user, jobid): - sfapi = ctx.obj['sfapi'] - - ret = sfapi.get_jobs(site=site, sacct=sacct, user=user, jobid=jobid) - - cols = ['jobid', - 'name', - 'account', - 'cpus', - 'features', - 'partition', - 'reason', - 'start_time', - 'state', - 'submit_time', - 'time', - 'time_left', - 'time_limit', - ] - - try: - logging.debug(ret) - if sacct: - outputs = ret['output'] - else: - outputs = [{k: oj[k] for k in cols} for oj in ret['output']] - except Exception as err: - click.echo(f"{type(err).__name__}: {err}") - exit(1) - - for output in outputs: - click_json(output) - - -@cli.command() -@click.argument('jobid') -@click.argument('site', default=NERSC_DEFAULT_COMPUTE) -@click.pass_context -def scancel(ctx, site, jobid): - sfapi = ctx.obj['sfapi'] - logging.info(f"Running delete on {jobid}@{site}") - - ret = sfapi.delete_job(site=site, jobid=jobid) - click.echo(ret) - - -@cli.command() -@click.argument('taskid') -@click.pass_context -def task(ctx, taskid): - sfapi = ctx.obj['sfapi'] - # Waits (up to {timeout} seconds) for the job to be submited before returning - timeout = 40 - sleeptime = 1 - from time import sleep - for i in range(timeout): - if i > 0: - sleep(sleeptime) - - logging.debug(f"Running {i}") - task = sfapi.tasks(task_id=taskid) - logging.debug(f"task = {task}") - if task is not None and task['status'] == 'completed': - jobinfo = json.loads(task['result']) - click.echo( - {'error': jobinfo['error'], 'jobid': jobinfo['jobid'], 'task_id': taskid}) - return - - -@cli.command() -@click.option('--client', '-c', default="sfpai", help='Name the sfapi json file') -def manage_keys(client): - SuperfacilityAccessToken.save_token(tag=client) - - -if __name__ == '__main__': - cli(obj={}) diff --git a/python/SuperfacilityAPI/nersc_systems.py b/python/SuperfacilityAPI/nersc_systems.py deleted file mode 100644 index 63250c9..0000000 --- a/python/SuperfacilityAPI/nersc_systems.py +++ /dev/null @@ -1,32 +0,0 @@ -from enum import Enum, EnumMeta - - -nersc_systems = ['perlmutter', 'cori', 'dna', 'dtns', 'global_homes', 'projectb', 'global_common', - 'community_filesystem', 'matlab', 'jupyter', 'nersc_center', 'helpportal', 'website', - 'rstudio', 'sgns', 'network', 'ldap', 'integ_datalanguage', 'mathematica', 'globus', - 'spin', 'jgi_int_webservers', 'jgidb', 'int', 'webservers', 'iris', 'sciencedatabases', - 'myproxy', 'newt', 'ssh-proxy', 'mongodb', 'nomachine', 'regent', 'archive'] - -# https://stackoverflow.com/a/62854511 - - -class MyEnumMeta(EnumMeta): - def __contains__(cls, item): - return item in cls.__members__.values() - - -class NerscCompute(str, Enum, metaclass=MyEnumMeta): - CORI = 'cori' - PERLMUTTER = 'perlmutter' - MULLER = 'muller' - - -class NerscFilesystems(str, Enum, metaclass=MyEnumMeta): - DNA = 'dna' - DTN = 'dtns' - HOME = 'global_homes' - GLOBAL_COMMON = 'global_common' - CFS = 'community_filesystem' - - -NERSC_DEFAULT_COMPUTE = NerscCompute.PERLMUTTER diff --git a/setup.py b/setup.py deleted file mode 100644 index 209f8a4..0000000 --- a/setup.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python - -import os -from setuptools import setup -from pathlib import Path -import sys - -_dir = Path(__file__).resolve().parent - - -with open(f"{_dir}/README.md") as f: - long_desc = """ - - ## Docs - - """ - try: - long_desc += f.read() - except UnicodeDecodeError: - long_desc += "" - - -if sys.version_info > (3, 7, 0): - install_requires = ['authlib', 'requests', - 'click', 'tabulate', 'pandas', 'numpy'] -else: - print(sys.version_info) - install_requires = ['authlib', 'requests', 'click'] - print(install_requires) - -setup( - name="SuperfacilityConnector", - description="Connector API NERSC Superfacility", - long_description=long_desc, - long_description_content_type="text/markdown", - url="https://github.com/tylern4/superfacilityConnector", - author="Nick Tyler", - author_email="tylern@lbl.gov", - packages=['SuperfacilityAPI'], - package_dir={'': 'python'}, - version='0.3.1b', - scripts=['python/SuperfacilityAPI/bin/sfapi'], - install_requires=install_requires, - classifiers=[ - "Programming Language :: Python :: 3", - "Operating System :: OS Independent", - "Topic :: Scientific/Engineering", - ], - python_requires=">=3.8", -) diff --git a/sfapi_tests.ipynb b/sfapi_tests.ipynb deleted file mode 100644 index e13053a..0000000 --- a/sfapi_tests.ipynb +++ /dev/null @@ -1,228 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [], - "source": [ - "# import logging\n", - "# logging.basicConfig(level=\"DEBUG\")\n", - "\n", - "from SuperfacilityAPI import SuperfacilityAPI, SuperfacilityAccessToken\n", - "import pandas as pd\n", - "\n", - "\n", - "api_key = SuperfacilityAccessToken(\"home\")\n", - "sfapi = SuperfacilityAPI(api_key)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [], - "source": [ - "try:\n", - " jobs = sfapi.get_jobs(site=\"perlmutter\", user=\"tylern\", sacct=False)\n", - "except Exception as err:\n", - " print(err)\n", - "\n", - "jobs_df = pd.DataFrame(jobs['output'])\n" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
accounttres_per_nodemin_cpusmin_tmp_diskend_timefeaturesgroupover_subscribejobidname...partitionnodelist(reason)start_timestateuidsubmit_timelicensescore_specschednodeswork_dir
1nstaffN/A802023-05-11T18:41:26cron95745YES5211080htcondor_workflow_node...cronlogin052023-02-10T17:41:26RUNNING957452023-02-02T12:53:42(null)N/A(null)/global/homes/t/tylern/htcondor_workflow_scron
2nstaffN/A202023-03-12T18:41:26cron95745YES5211082test_supervisor...cronlogin052023-02-10T17:41:26RUNNING957452023-02-09T06:03:57(null)N/A(null)/global/homes/t/tylern/supervisor_example
3m3792N/A25602023-02-13T22:24:35cpu95745NO5448882clas12_ersap...regular_milan_ss11nid0064202023-02-13T10:24:35RUNNING957452023-02-13T09:18:44cfs:1N/A(null)/global/cfs/cdirs/nstaff/tylern/ldrd_clas12/sc...
\n", - "

3 rows × 49 columns

\n", - "
" - ], - "text/plain": [ - " account tres_per_node min_cpus min_tmp_disk end_time features \\\n", - "1 nstaff N/A 8 0 2023-05-11T18:41:26 cron \n", - "2 nstaff N/A 2 0 2023-03-12T18:41:26 cron \n", - "3 m3792 N/A 256 0 2023-02-13T22:24:35 cpu \n", - "\n", - " group over_subscribe jobid name ... \\\n", - "1 95745 YES 5211080 htcondor_workflow_node ... \n", - "2 95745 YES 5211082 test_supervisor ... \n", - "3 95745 NO 5448882 clas12_ersap ... \n", - "\n", - " partition nodelist(reason) start_time state uid \\\n", - "1 cron login05 2023-02-10T17:41:26 RUNNING 95745 \n", - "2 cron login05 2023-02-10T17:41:26 RUNNING 95745 \n", - "3 regular_milan_ss11 nid006420 2023-02-13T10:24:35 RUNNING 95745 \n", - "\n", - " submit_time licenses core_spec schednodes \\\n", - "1 2023-02-02T12:53:42 (null) N/A (null) \n", - "2 2023-02-09T06:03:57 (null) N/A (null) \n", - "3 2023-02-13T09:18:44 cfs:1 N/A (null) \n", - "\n", - " work_dir \n", - "1 /global/homes/t/tylern/htcondor_workflow_scron \n", - "2 /global/homes/t/tylern/supervisor_example \n", - "3 /global/cfs/cdirs/nstaff/tylern/ldrd_clas12/sc... \n", - "\n", - "[3 rows x 49 columns]" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "running = jobs_df[jobs_df.state == \"RUNNING\"]\n", - "running\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "api", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.0" - }, - "orig_nbformat": 4, - "vscode": { - "interpreter": { - "hash": "53d758f36e0ad572596c98314f7fedded3686c18c2797d6651577eb2add0ebd0" - } - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/src/SuperfacilityConnector/__init__.py b/src/SuperfacilityConnector/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/SuperfacilityConnector/__init__.py @@ -0,0 +1 @@ + diff --git a/src/SuperfacilityConnector/sfapi.py b/src/SuperfacilityConnector/sfapi.py new file mode 100644 index 0000000..5f77a03 --- /dev/null +++ b/src/SuperfacilityConnector/sfapi.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 + +import click +import json +import logging +from pathlib import Path +from sfapi_client import Client +from sfapi_client.jobs import JobCommand +from sfapi_client.compute import Machine +from sfapi_client.exceptions import SfApiError + +NERSC_DEFAULT_COMPUTE = Machine.perlmutter + + +def click_json(elemets): + click.echo(json.dumps(elemets, default=str)) + + +def check_file_and_open(file_path: Path): + ... + + +@click.group() +@click.option('--client', '-c', default=None, + help='Client ID for your key. Can be used to specify which key to look for in $HOME/.superfacility.') +@click.option('--debug', '-d', is_flag=True, default=False, + help='Print debug messages from sfapi and SuperfacilityConnector') +@click.pass_context +def cli(ctx, client=None, clientid=None, debug=False): + ctx.ensure_object(dict) + if debug: + logging.basicConfig(encoding='utf-8', level=logging.DEBUG) + else: + logging.basicConfig(encoding='utf-8', level=logging.ERROR) + + ctx.obj['client'] = Client(key=client) + + +@cli.command() +@click.argument('site', default=NERSC_DEFAULT_COMPUTE) +@click.pass_context +def status(ctx, site): + client: Client = ctx.obj['client'] + + if site in ['compute', 'computes']: + site = 'perlmutter' + elif site in ['filesystem', 'filesystems']: + site = 'dtns,global_homes,global_common,community_filesystem' + elif site in ['login', 'logins']: + site = 'perlmutter,jupyter,dtns' + + try: + if site == 'all': + ret = client.resources.status() + else: + ret = [client.resources.status(site) for site in site.split(",")] + + if isinstance(ret, list): + elemets = [arg.model_dump() for arg in ret] + elif isinstance(ret, dict): + elemets = [arg.model_dump() for _, arg in ret.items()] + else: + elemets = ret.model_dump() + + click_json(elemets) + except SfApiError as err: + click.echo(f"SfApiError: {type(err).__name__}: {err}") + except Exception as err: + click.echo(f"{type(err).__name__}: {err}") + + +@cli.command() +@click.argument('site', default=NERSC_DEFAULT_COMPUTE) +@click.pass_context +def outages(ctx, site): + client: Client = ctx.obj['client'] + + if site in ['compute', 'computes']: + site = 'perlmutter' + elif site in ['filesystem', 'filesystems']: + site = 'dtns,global_homes,global_common,community_filesystem' + elif site in ['login', 'logins']: + site = 'perlmutter,jupyter,dtns' + + try: + if site == 'all': + ret = client.resources.planned_outages() + else: + ret = [client.resources.planned_outages(site) for site in site.split(",")] + + if isinstance(ret, list): + elemets = [a.model_dump() for arg in ret for a in arg] + elif isinstance(ret, dict): + elemets = [a.model_dump() for _, arg in ret.items() for a in arg] + else: + elemets = ret.model_dump() + + click_json(elemets) + except SfApiError as err: + click.echo(f"SfApiError: {type(err).__name__}: {err}") + except Exception as err: + click.echo(f"{type(err).__name__}: {err}") + + +@cli.command() +@click.pass_context +def token(ctx): + client: Client = ctx.obj['client'] + click.echo(client.token) + + +@cli.command() +@click.pass_context +def whoami(ctx): + client: Client = ctx.obj['client'] + click_json(client.user().model_dump()) + + +@cli.command() +@click.pass_context +def groups(ctx): + client: Client = ctx.obj['client'] + user = client.user() + ret = user.groups() + elemets = [arg.model_dump() for arg in ret] + for ele in elemets: + del ele["client"] + del ele["users_"] + + click_json(elemets) + +# @cli.command() +# @click.argument('site', default=NERSC_DEFAULT_COMPUTE) +# @click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') +# @click.option('--local', '-l', default=None, help='Path to local file to submit.') +# @click.pass_context +# def sbatch(ctx, site, path, local): +# client: Client = ctx.obj['client'] +# script = None +# isPath = False + +# if path is not None: +# isPath = True +# script = path +# elif local is not None: +# script = check_file_and_open(local) + + +# @cli.command() +# @click.argument('site', default=NERSC_DEFAULT_COMPUTE) +# @click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') +# @click.pass_context +# def ls(ctx, site, path): +# client: Client = ctx.obj['client'] + +# ret = sfapi.ls(site=site, remote_path=path) +# try: +# click_json(ret['entries']) +# except Exception as err: +# click.echo(f"{type(err).__name__}: {err}") + + +# @cli.command() +# @click.argument('site', default=NERSC_DEFAULT_COMPUTE) +# @click.option('--path', '-p', default=None, help='Path to slurm submit file at NERSC.') +# @click.pass_context +# def cat(ctx, site, path): +# client: Client = ctx.obj['client'] + +# ret = sfapi.download(site=site, remote_path=path, save=False) +# logging.debug(ret) + +# try: +# click.echo_via_pager(ret['file']) +# except Exception as err: +# click.echo(f"{type(err).__name__}: {err}") + + +@cli.command() +@click.argument('site', default=NERSC_DEFAULT_COMPUTE) +@click.option('--user', '-u', default=None, help='User to to get queue info for.') +@click.option('--jobid', '-j', default=None, help='Specific jobid to get queue info for.') +@click.pass_context +def squeue(ctx, site, user, jobid): + client: Client = ctx.obj['client'] + compute = client.compute(site) + + try: + ret = compute.jobs(user=user, jobids=jobid, command=JobCommand.sacct) + if isinstance(ret, list): + elemets = [arg.model_dump() for arg in ret] + elif isinstance(ret, dict): + elemets = [arg.model_dump() for _, arg in ret.items()] + + for ele in elemets: + del ele['compute'] + + click_json(elemets) + except Exception as err: + click.echo(f"{type(err).__name__}: {err}") + exit(1) + + +# @cli.command() +# @click.argument('site', default=NERSC_DEFAULT_COMPUTE) +# @click.option('--user', '-u', default=None, help='User to to get queue info for.') +# @click.option('--jobid', '-j', default=None, help='Specific jobid to get queue info for.') +# @click.pass_context +# def sqs(ctx, site, user, jobid): +# client: Client = ctx.obj['client'] +# try: +# ret = sfapi.sqs(site=site, user=user, jobid=jobid, dataframe=True) +# click.echo(ret) +# except Exception as err: +# click.echo(f"{type(err).__name__}: {err}") +# exit(1) + + +# @cli.command() +# @click.argument('jobid') +# @click.argument('site', default=NERSC_DEFAULT_COMPUTE) +# @click.pass_context +# def scancel(ctx, site, jobid): +# client: Client = ctx.obj['client'] +# logging.info(f"Running delete on {jobid}@{site}") + +# ret = sfapi.delete_job(site=site, jobid=jobid) +# click.echo(ret) + + +# @cli.command() +# @click.argument('taskid') +# @click.pass_context +# def task(ctx, taskid): +# client: Client = ctx.obj['client'] +# # Waits (up to {timeout} seconds) for the job to be submited before returning +# timeout = 40 +# sleeptime = 1 +# from time import sleep +# for i in range(timeout): +# if i > 0: +# sleep(sleeptime) + +# logging.debug(f"Running {i}") +# task = sfapi.tasks(task_id=taskid) +# logging.debug(f"task = {task}") +# if task is not None and task['status'] == 'completed': +# jobinfo = json.loads(task['result']) +# click.echo( +# {'error': jobinfo['error'], 'jobid': jobinfo['jobid'], 'task_id': taskid}) +# return + + +if __name__ == '__main__': + cli(obj={})