Skip to content

Commit

Permalink
ENH Add eLog Task status updates from Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
gadorlhiac committed Apr 12, 2024
1 parent e62183e commit 37f22db
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
34 changes: 30 additions & 4 deletions lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from ..tasks.dataclasses import *
from ..io.models.base import TaskParameters
from ..io.db import record_analysis_db
from ..io.elog import post_elog_run_status

if __debug__:
warnings.simplefilter("default")
Expand Down Expand Up @@ -308,6 +309,7 @@ def execute_task(self) -> None:
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
self.Hooks.task_failed(self, msg=Message())
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
Expand Down Expand Up @@ -406,22 +408,42 @@ def task_started(self: Executor, msg: Message):
f"Executor: {self._analysis_desc.task_result.task_name} started"
)
self._analysis_desc.task_result.task_status = TaskStatus.RUNNING
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "RUNNING",
}
post_elog_run_status(elog_data)

self.add_hook("task_started", task_started)

def task_failed(self: Executor, msg: Message): ...
def task_failed(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "FAILED",
}
post_elog_run_status(elog_data)

self.add_hook("task_failed", task_failed)

def task_stopped(self: Executor, msg: Message): ...
def task_stopped(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "STOPPED",
}
post_elog_run_status(elog_data)

self.add_hook("task_stopped", task_stopped)

def task_done(self: Executor, msg: Message): ...
def task_done(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)

self.add_hook("task_done", task_done)

def task_cancelled(self: Executor, msg: Message): ...
def task_cancelled(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "CANCELLED",
}
post_elog_run_status(elog_data)

self.add_hook("task_cancelled", task_cancelled)

Expand All @@ -430,6 +452,10 @@ def task_result(self: Executor, msg: Message):
self._analysis_desc.task_result = msg.contents
logger.info(self._analysis_desc.task_result.summary)
logger.info(self._analysis_desc.task_result.task_status)
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)

self.add_hook("task_result", task_result)

Expand Down
9 changes: 3 additions & 6 deletions lute/io/elog.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def post_elog_workflow(
def get_elog_active_expmt(hutch: str, *, endstation: int = 0) -> str:
"""Get the current active experiment for a hutch.
This function is the only function to manage the HTTP request independently.
This function is one of two functions to manage the HTTP request independently.
This is because it does not require an authorization object, and its result
is needed for the generic function `elog_http_request` to work properly.
Expand Down Expand Up @@ -329,7 +329,7 @@ def post_elog_run_status(
"""Post a summary to the status/report section of a specific run.
In contrast to most eLog update/post mechanisms, this function searches
for a specific environment variable which contains a temporary URL for
for a specific environment variable which contains a specific URL for
posting. This is updated every job/run as jobs are submitted by the JID.
The URL can optionally be passed to this function if it is known.
Expand All @@ -352,10 +352,7 @@ def post_elog_run_status(
return

params: Dict[str, List[Dict[str, str]]] = {"json": post_list}
if update_url:
status_code, resp_msg, _ = elog_http_request(
url=update_url, request_type="POST", **params
)
requests.post(update_url, **params)


def post_elog_message(
Expand Down

0 comments on commit 37f22db

Please sign in to comment.