From fd5a9b324b7a6f59890a20d30091f67798e0bffb Mon Sep 17 00:00:00 2001 From: tmaeno Date: Fri, 28 Jun 2024 11:23:01 +0200 Subject: [PATCH] execute_pchain in panda_api.py --- pandaclient/PandaToolsPkgInfo.py | 2 +- pandaclient/PchainScript.py | 71 +++++--- pandaclient/panda_api.py | 294 ++++++++++++++++--------------- 3 files changed, 201 insertions(+), 166 deletions(-) diff --git a/pandaclient/PandaToolsPkgInfo.py b/pandaclient/PandaToolsPkgInfo.py index 26576780..4e657012 100644 --- a/pandaclient/PandaToolsPkgInfo.py +++ b/pandaclient/PandaToolsPkgInfo.py @@ -1 +1 @@ -release_version = "1.5.75" +release_version = "1.5.76" diff --git a/pandaclient/PchainScript.py b/pandaclient/PchainScript.py index 34b23259..d513a1c0 100644 --- a/pandaclient/PchainScript.py +++ b/pandaclient/PchainScript.py @@ -1,4 +1,6 @@ import atexit +import copy +import json import os import re import shlex @@ -48,6 +50,13 @@ def main(): group_config.add_argument("--version", action="store_const", const=True, dest="version", default=False, help="Displays version") group_config.add_argument("-v", action="store_const", const=True, dest="verbose", default=False, help="Verbose") + group_config.add_argument( + "--dumpJson", + action="store", + dest="dumpJson", + default=None, + help="Dump all command-line parameters and submission result such as returnCode, returnOut, and requestID to a json file", + ) group_check.add_argument("--check", action="store_const", const=True, dest="checkOnly", default=False, help="Check workflow description locally") group_check.add_argument( "--debug", action="store_const", const=True, dest="debugCheck", default=False, help="verbose mode when checking workflow description locally" @@ -287,29 +296,45 @@ def _onExit(dir, del_command): tmpStat, tmpOut = Client.send_workflow_request(params, **data) # result - exitCode = None + exit_code = 0 + request_id = None + tmp_str = "" if tmpStat != 0: - tmpStr = "workflow {0} failed with {1}".format(action_type, tmpStat) - tmpLog.error(tmpStr) - exitCode = 1 - return exitCode - if tmpOut[0]: - stat_code = tmpOut[1]["status"] - check_log = "messages from the server\n\n" + tmpOut[1]["log"] - if options.checkOnly: - tmpLog.info(check_log) - if stat_code: - tmpLog.info("successfully verified workflow description") + tmp_str = "workflow {0} failed with {1}".format(action_type, tmpStat) + tmpLog.error(tmp_str) + exit_code = 1 + else: + if tmpOut[0]: + stat_code = tmpOut[1]["status"] + check_log = "messages from the server\n\n" + tmpOut[1]["log"] + if options.checkOnly: + tmpLog.info(check_log) + if stat_code: + tmpLog.info("successfully verified workflow description") + else: + tmpLog.error("workflow description is corrupted") else: - tmpLog.error("workflow description is corrupted") + if stat_code: + request_id = tmpOut[1]["request_id"] + tmp_str = "successfully submitted with request_id={0}".format(request_id) + tmpLog.info(tmp_str) + else: + tmpLog.info(check_log) + tmp_str = "workflow submission failed with {0}".format(stat_code) + tmpLog.error(tmp_str) + exit_code = stat_code else: - if stat_code: - tmpLog.info("successfully submitted with request_id={0}".format(tmpOut[1]["request_id"])) - else: - tmpLog.info(check_log) - tmpLog.error("workflow submission failed") - else: - tmpStr = "workflow {0} failed. {1}".format(action_type, tmpOut[1]) - tmpLog.error(tmpStr) - exitCode = 1 - return exitCode + tmp_str = "workflow {0} failed. {1}".format(action_type, tmpOut[1]) + tmpLog.error(tmp_str) + exit_code = 1 + + # dump json + if options.dumpJson: + dump_item = copy.deepcopy(vars(options)) + dump_item["returnCode"] = exit_code + dump_item["returnOut"] = tmp_str + dump_item["requestID"] = request_id + with open(options.dumpJson, "w") as f: + json.dump(dump_item, f) + + return exit_code diff --git a/pandaclient/panda_api.py b/pandaclient/panda_api.py index 3172f37b..aefba52c 100644 --- a/pandaclient/panda_api.py +++ b/pandaclient/panda_api.py @@ -1,17 +1,14 @@ +import copy +import importlib +import json import os import sys -import json -import copy import tempfile -import importlib -from . import PLogger -from . import Client -from . import PBookCore +from . import Client, PBookCore, PLogger class PandaAPI(object): - # constructor def __init__(self): self.command_body = {} @@ -21,97 +18,96 @@ def __init__(self): # kill a task def kill_task(self, task_id, verbose=True): """kill a task - args: - task_id: jediTaskID of the task to be killed - verbose: True to see debug messages - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - tuple of return code and diagnostic message - 0: request is registered - 1: server error - 2: task not found - 3: permission denied - 4: irrelevant task status - 100: non SSL connection - 101: irrelevant taskID + args: + task_id: jediTaskID of the task to be killed + verbose: True to see debug messages + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID """ return Client.killTask(task_id) # finish a task def finish_task(self, task_id, wait_running=False, verbose=False): """finish a task - args: - task_id: jediTaskID of the task to finish - wait_running: True to wait until running jobs are done - verbose: True to see debug messages - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - tuple of return code and diagnostic message - 0: request is registered - 1: server error - 2: task not found - 3: permission denied - 4: irrelevant task status - 100: non SSL connection - 101: irrelevant taskID + args: + task_id: jediTaskID of the task to finish + wait_running: True to wait until running jobs are done + verbose: True to see debug messages + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID """ return Client.finishTask(task_id, wait_running, verbose) # retry a task def retry_task(self, task_id, new_parameters=None, verbose=False): """retry a task - args: - task_id: jediTaskID of the task to retry - new_parameters: a dictionary of task parameters to overwrite - verbose: True to see debug messages - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - tuple of return code and diagnostic message - 0: request is registered - 1: server error - 2: task not found - 3: permission denied - 4: irrelevant task status - 100: non SSL connection - 101: irrelevant taskID + args: + task_id: jediTaskID of the task to retry + new_parameters: a dictionary of task parameters to overwrite + verbose: True to see debug messages + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code and diagnostic message + 0: request is registered + 1: server error + 2: task not found + 3: permission denied + 4: irrelevant task status + 100: non SSL connection + 101: irrelevant taskID """ return Client.retryTask(task_id, verbose, True, new_parameters) # get tasks def get_tasks(self, task_ids=None, limit=1000, days=14, status=None, username=None): """get a list of task dictionaries - args: - task_ids: a list of task IDs, or None to get recent tasks - limit: the max number of tasks to fetch from the server - days: tasks for last N days to fetch - status: filtering with task status - username: user name of the tasks, or None to get own tasks - returns: - a list of task dictionaries + args: + task_ids: a list of task IDs, or None to get recent tasks + limit: the max number of tasks to fetch from the server + days: tasks for last N days to fetch + status: filtering with task status + username: user name of the tasks, or None to get own tasks + returns: + a list of task dictionaries """ if not self.pbook: self.pbook = PBookCore.PBookCore() - return self.pbook.show(task_ids, limit=limit, days=days, format='json', status=status, - username=username) + return self.pbook.show(task_ids, limit=limit, days=days, format="json", status=status, username=username) # show tasks - def show_tasks(self, task_ids=None, limit=1000, days=14, format='standard', status=None, username=None): + def show_tasks(self, task_ids=None, limit=1000, days=14, format="standard", status=None, username=None): """show tasks - args: - task_ids: a list of task IDs, or None to get recent tasks - limit: the max number of tasks to fetch from the server - days: tasks for last N days to fetch - format: standard, long, or plain - status: filtering with task status - username: user name of the tasks, or None to get own tasks - returns: - None + args: + task_ids: a list of task IDs, or None to get recent tasks + limit: the max number of tasks to fetch from the server + days: tasks for last N days to fetch + format: standard, long, or plain + status: filtering with task status + username: user name of the tasks, or None to get own tasks + returns: + None """ if not self.pbook: self.pbook = PBookCore.PBookCore() @@ -120,28 +116,28 @@ def show_tasks(self, task_ids=None, limit=1000, days=14, format='standard', stat # submit a task def submit_task(self, task_params, verbose=False): """submit a task using low-level API - args: - task_params: a dictionary of task parameters - verbose: True to see debug messages - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - tuple of return code, message from the server, and task ID if successful - 0: request is processed - 1: duplication in DEFT - 2: duplication in JEDI - 3: accepted for incremental execution - 4: server error + args: + task_params: a dictionary of task parameters + verbose: True to see debug messages + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + tuple of return code, message from the server, and task ID if successful + 0: request is processed + 1: duplication in DEFT + 2: duplication in JEDI + 3: accepted for incremental execution + 4: server error """ return Client.insertTaskParams(task_params, verbose=verbose, properErrorCode=True) # get metadata of all jobs in a task def get_job_metadata(self, task_id, output_json_filename): """get metadata of all jobs in a task - args: - task_id: task ID - output_json_filename: output json filename + args: + task_id: task ID + output_json_filename: output json filename """ if not self.pbook: self.pbook = PBookCore.PBookCore() @@ -158,8 +154,8 @@ def execute_xyz(self, command_name, module_name, args, console_log=True): sys.argv = copy.copy(args) sys.argv.insert(0, command_name) # set dump file - if '--dumpJson' not in sys.argv: - sys.argv.append('--dumpJson') + if "--dumpJson" not in sys.argv: + sys.argv.append("--dumpJson") with tempfile.NamedTemporaryFile(delete=False) as f: sys.argv.append(f.name) dump_file = f.name @@ -168,22 +164,22 @@ def execute_xyz(self, command_name, module_name, args, console_log=True): PLogger.disable_logging() # run if command_name in self.command_body: - if hasattr(self.command_body[command_name], 'main'): - getattr(self.command_body[command_name], 'main')() + if hasattr(self.command_body[command_name], "main"): + getattr(self.command_body[command_name], "main")() else: self.command_body[command_name].reload() else: self.command_body[command_name] = importlib.import_module(module_name) - if hasattr(self.command_body[command_name], 'main'): - getattr(self.command_body[command_name], 'main')() + if hasattr(self.command_body[command_name], "main"): + getattr(self.command_body[command_name], "main")() stat = True except SystemExit as e: if e.code == 0: stat = True else: - err_str = 'failed with code={0}'.format(e.code) + err_str = "failed with code={0}".format(e.code) except Exception as e: - err_str = 'failed with {0}'.format(str(e)) + err_str = "failed with {0}".format(str(e)) finally: # enable logging if not console_log: @@ -192,7 +188,7 @@ def execute_xyz(self, command_name, module_name, args, console_log=True): self.log.error(err_str) # read dump fle try: - with open(sys.argv[sys.argv.index('--dumpJson') + 1]) as f: + with open(sys.argv[sys.argv.index("--dumpJson") + 1]) as f: ret = json.load(f) if len(ret) == 1: ret = ret[0] @@ -207,75 +203,89 @@ def execute_xyz(self, command_name, module_name, args, console_log=True): def execute_prun(self, args, console_log=True): """execute prun command - args: - args: The arguments used to execute prun. This is a list of strings. - console_log: False to disable console logging + args: + args: The arguments used to execute prun. This is a list of strings, such as ["--outDS","user.hoge.001"] + console_log: False to disable console logging - returns: - status: True if succeeded. Otherwise, False - a dictionary: Task submission attributes including jediTaskID + returns: + status: True if succeeded. Otherwise, False + a dictionary: Task submission attributes including jediTaskID """ - return self.execute_xyz('prun', 'pandaclient.PrunScript', args, console_log) + return self.execute_xyz("prun", "pandaclient.PrunScript", args, console_log) # execute pathena def execute_pathena(self, args, console_log=True): """execute pathena command - args: - args: The arguments used to execute prun. This is a list of strings. - console_log: False to disable console logging + args: + args: The arguments used to execute pathena. This is a list of strings, such as ["--outDS","user.hoge.001"] + console_log: False to disable console logging - returns: - status: True if succeeded. Otherwise, False - a dictionary: Task submission attributes including jediTaskID + returns: + status: True if succeeded. Otherwise, False + a dictionary: Task submission attributes including jediTaskID """ - return self.execute_xyz('pathena', 'pandaclient.PathenaScript', args, console_log) + return self.execute_xyz("pathena", "pandaclient.PathenaScript", args, console_log) # execute phpo def execute_phpo(self, args, console_log=True): """execute phpo command - args: - args: The arguments used to execute prun. This is a list of strings. - console_log: False to disable console logging + args: + args: The arguments used to execute phpo. This is a list of strings, such as ["--outDS","user.hoge.001"] + console_log: False to disable console logging + + returns: + status: True if succeeded. Otherwise, False + a dictionary: Task submission attributes including jediTaskID + """ + return self.execute_xyz("phpo", "pandaclient.PhpoScript", args, console_log) + + # execute pchain + def execute_pchain(self, args, console_log=True): + """execute pchain command + + args: + args: The arguments used to execute chain. This is a list of strings, such as ["--outDS","user.hoge.001"] + console_log: False to disable console logging - returns: - status: True if succeeded. Otherwise, False - a dictionary: Task submission attributes including jediTaskID + returns: + status: True if succeeded. Otherwise, False + a dictionary: Task submission attributes including requestID """ - return self.execute_xyz('phpo', 'pandaclient.PhpoScript', args, console_log) + return self.execute_xyz("pchain", "pandaclient.PchainScript", args, console_log) # hello def hello(self, verbose=False): """Health check with the PanDA server - args: - verbose: True to see verbose message - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - diagnostic message + args: + verbose: True to see verbose message + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + diagnostic message """ return Client.hello(verbose) # increase attempt numbers to retry failed jobs def increase_attempt_nr(self, task_id, increase=3, verbose=False): """increase attempt numbers to retry failed jobs - args: - task_id: jediTaskID of the task - increase: increase for attempt numbers - verbose: True to see verbose message - returns: - status code - 0: communication succeeded to the panda server - 255: communication failure - return code - 0: succeeded - 1: unknown task - 2: invalid task status - 3: permission denied - 4: wrong parameter - None: database error + args: + task_id: jediTaskID of the task + increase: increase for attempt numbers + verbose: True to see verbose message + returns: + status code + 0: communication succeeded to the panda server + 255: communication failure + return code + 0: succeeded + 1: unknown task + 2: invalid task status + 3: permission denied + 4: wrong parameter + None: database error """ return Client.increase_attempt_nr(task_id, increase, verbose)