From 37f22db0e1b6d58809f250509e6a2e668fbf0b9d Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 12 Apr 2024 09:40:33 -0700 Subject: [PATCH] ENH Add eLog Task status updates from Executor --- lute/execution/executor.py | 34 ++++++++++++++++++++++++++++++---- lute/io/elog.py | 9 +++------ 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index 4efdda60..6429b517 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -44,6 +44,7 @@ from ..tasks.dataclasses import * from ..io.models.base import TaskParameters from ..io.db import record_analysis_db +from ..io.elog import post_elog_run_status if __debug__: warnings.simplefilter("default") @@ -308,6 +309,7 @@ def execute_task(self) -> None: if ret := proc.returncode: logger.info(f"Task failed with return code: {ret}") self._analysis_desc.task_result.task_status = TaskStatus.FAILED + self.Hooks.task_failed(self, msg=Message()) elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING: # Ret code is 0, no exception was thrown, task forgot to set status self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED @@ -406,22 +408,42 @@ def task_started(self: Executor, msg: Message): f"Executor: {self._analysis_desc.task_result.task_name} started" ) self._analysis_desc.task_result.task_status = TaskStatus.RUNNING + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "RUNNING", + } + post_elog_run_status(elog_data) self.add_hook("task_started", task_started) - def task_failed(self: Executor, msg: Message): ... + def task_failed(self: Executor, msg: Message): + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "FAILED", + } + post_elog_run_status(elog_data) self.add_hook("task_failed", task_failed) - def task_stopped(self: Executor, msg: Message): ... + def task_stopped(self: Executor, msg: Message): + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "STOPPED", + } + post_elog_run_status(elog_data) self.add_hook("task_stopped", task_stopped) - def task_done(self: Executor, msg: Message): ... + def task_done(self: Executor, msg: Message): + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "COMPLETED", + } + post_elog_run_status(elog_data) self.add_hook("task_done", task_done) - def task_cancelled(self: Executor, msg: Message): ... + def task_cancelled(self: Executor, msg: Message): + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "CANCELLED", + } + post_elog_run_status(elog_data) self.add_hook("task_cancelled", task_cancelled) @@ -430,6 +452,10 @@ def task_result(self: Executor, msg: Message): self._analysis_desc.task_result = msg.contents logger.info(self._analysis_desc.task_result.summary) logger.info(self._analysis_desc.task_result.task_status) + elog_data: Dict[str, str] = { + f"{self._analysis_desc.task_result.task_name} status": "COMPLETED", + } + post_elog_run_status(elog_data) self.add_hook("task_result", task_result) diff --git a/lute/io/elog.py b/lute/io/elog.py index e260fbb6..7df933e0 100644 --- a/lute/io/elog.py +++ b/lute/io/elog.py @@ -153,7 +153,7 @@ def post_elog_workflow( def get_elog_active_expmt(hutch: str, *, endstation: int = 0) -> str: """Get the current active experiment for a hutch. - This function is the only function to manage the HTTP request independently. + This function is one of two functions to manage the HTTP request independently. This is because it does not require an authorization object, and its result is needed for the generic function `elog_http_request` to work properly. @@ -329,7 +329,7 @@ def post_elog_run_status( """Post a summary to the status/report section of a specific run. In contrast to most eLog update/post mechanisms, this function searches - for a specific environment variable which contains a temporary URL for + for a specific environment variable which contains a specific URL for posting. This is updated every job/run as jobs are submitted by the JID. The URL can optionally be passed to this function if it is known. @@ -352,10 +352,7 @@ def post_elog_run_status( return params: Dict[str, List[Dict[str, str]]] = {"json": post_list} - if update_url: - status_code, resp_msg, _ = elog_http_request( - url=update_url, request_type="POST", **params - ) + requests.post(update_url, **params) def post_elog_message(